You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/06 18:52:18 UTC

[4/5] flink git commit: [FLINK-2240] [runtime] Pass flag to configure use of bloom filters through runtime configuration.

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
index 7172887..d302487 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -238,7 +238,7 @@ public class ReusingReOpenableHashTableITCase {
 				new ReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
 						buildInput, probeInput, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0);
+					this.memoryManager, ioManager, this.parentTask, 1.0, true);
 		
 		iterator.open();
 		// do first join with both inputs
@@ -276,7 +276,7 @@ public class ReusingReOpenableHashTableITCase {
 	//
 	//
 	
-	private final MutableObjectIterator<Record> getProbeInput(final int numKeys,
+	private MutableObjectIterator<Record> getProbeInput(final int numKeys,
 			final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
 		MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
 		MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
@@ -289,8 +289,7 @@ public class ReusingReOpenableHashTableITCase {
 	}
 	
 	@Test
-	public void testSpillingHashJoinWithMassiveCollisions() throws IOException
-	{
+	public void testSpillingHashJoinWithMassiveCollisions() throws IOException {
 		// the following two values are known to have a hash-code collision on the initial level.
 		// we use them to make sure one partition grows over-proportionally large
 		final int REPEATED_VALUE_1 = 40559;
@@ -311,9 +310,6 @@ public class ReusingReOpenableHashTableITCase {
 		builds.add(build2);
 		builds.add(build3);
 		MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
-	
-		
-		
 
 		// allocate the memory for the HashTable
 		List<MemorySegment> memSegments;
@@ -333,7 +329,7 @@ public class ReusingReOpenableHashTableITCase {
 		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
 				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, 
 				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
-				memSegments, ioManager);
+				memSegments, ioManager, true);
 		
 		for(int probe = 0; probe < NUM_PROBES; probe++) {
 			// create a probe input that gives 10 million pairs with 10 values sharing a key
@@ -347,9 +343,8 @@ public class ReusingReOpenableHashTableITCase {
 			Record record;
 			final Record recordReuse = new Record();
 
-			while (join.nextRecord())
-			{
-				int numBuildValues = 0;
+			while (join.nextRecord()) {
+				long numBuildValues = 0;
 		
 				final Record probeRec = join.getCurrentProbeRecord();
 				int key = probeRec.getField(0, IntValue.class).getValue();
@@ -369,10 +364,10 @@ public class ReusingReOpenableHashTableITCase {
 				
 				Long contained = map.get(key);
 				if (contained == null) {
-					contained = Long.valueOf(numBuildValues);
+					contained = numBuildValues;
 				}
 				else {
-					contained = Long.valueOf(contained.longValue() + numBuildValues);
+					contained = contained + numBuildValues;
 				}
 				
 				map.put(key, contained);
@@ -449,8 +444,9 @@ public class ReusingReOpenableHashTableITCase {
 		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
 				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, 
 				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
-				memSegments, ioManager);
-		for(int probe = 0; probe < NUM_PROBES; probe++) {
+				memSegments, ioManager, true);
+		
+		for (int probe = 0; probe < NUM_PROBES; probe++) {
 			// create a probe input that gives 10 million pairs with 10 values sharing a key
 			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
 			if(probe == 0) {
@@ -463,7 +459,7 @@ public class ReusingReOpenableHashTableITCase {
 
 			while (join.nextRecord())
 			{	
-				int numBuildValues = 0;
+				long numBuildValues = 0;
 				
 				final Record probeRec = join.getCurrentProbeRecord();
 				int key = probeRec.getField(0, IntValue.class).getValue();
@@ -483,10 +479,10 @@ public class ReusingReOpenableHashTableITCase {
 				
 				Long contained = map.get(key);
 				if (contained == null) {
-					contained = Long.valueOf(numBuildValues);
+					contained = numBuildValues;
 				}
 				else {
-					contained = Long.valueOf(contained.longValue() + numBuildValues);
+					contained = contained + numBuildValues;
 				}
 				
 				map.put(key, contained);
@@ -526,5 +522,4 @@ public class ReusingReOpenableHashTableITCase {
 		}
 		return copy;
 	}
-	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 0aab5fe..642ac7d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -69,11 +70,11 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 	private final List<UnilateralSortMerger<Record>> sorters;
 	
 	private final AbstractInvokable owner;
-	
-	private final Configuration config;
-	
+
 	private final TaskConfig taskConfig;
 	
+	private final TaskManagerRuntimeInfo taskManageInfo;
+	
 	protected final long perSortMem;
 
 	protected final double perSortFractionMem;
@@ -111,11 +112,9 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 		this.sorters = new ArrayList<UnilateralSortMerger<Record>>();
 		
 		this.owner = new DummyInvokable();
-		
-		this.config = new Configuration();
-		this.taskConfig = new TaskConfig(this.config);
-
+		this.taskConfig = new TaskConfig(new Configuration());
 		this.executionConfig = executionConfig;
+		this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
 	}
 
 	@Parameterized.Parameters
@@ -279,7 +278,10 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 		return this.taskConfig;
 	}
 
-
+	@Override
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return this.taskManageInfo;
+	}
 
 	@Override
 	public ExecutionConfig getExecutionConfig() {

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index b71b01e..51c7f93 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.Path;
@@ -43,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 import org.mockito.invocation.InvocationOnMock;
@@ -193,13 +193,8 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
-	public Configuration getTaskManagerConfiguration(){
-		return new UnmodifiableConfiguration(new Configuration());
-	}
-
-	@Override
-	public String getHostname(){
-		return "localhost";
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration()));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 1e25bab..20edc20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.operators.PactTaskContext;
 import org.apache.flink.runtime.operators.ResettablePactDriver;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -54,7 +55,9 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactT
 	
 	protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024;
 	
-	protected static final int PAGE_SIZE = 32 * 1024; 
+	protected static final int PAGE_SIZE = 32 * 1024;
+
+	private final TaskManagerRuntimeInfo taskManageInfo;
 	
 	private final IOManager ioManager;
 	
@@ -110,6 +113,8 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactT
 
 		this.executionConfig = executionConfig;
 		this.comparators = new ArrayList<TypeComparator<IN>>(2);
+
+		this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
 	}
 
 	@Parameterized.Parameters
@@ -292,6 +297,11 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactT
 	}
 
 	@Override
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return this.taskManageInfo;
+	}
+	
+	@Override
 	public <X> MutableObjectIterator<X> getInput(int index) {
 		MutableObjectIterator<IN> in = this.input;
 		if (in == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 38d9992..7debb08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -187,7 +187,7 @@ public class HashVsSortMiniBenchmark {
 					new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
 						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
 							this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
-							this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE);
+							this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
 			
 			iterator.open();
 			
@@ -226,7 +226,7 @@ public class HashVsSortMiniBenchmark {
 					new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
 						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
 						this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
-						this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE);
+						this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
 			
 			iterator.open();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 44013ef..9091fa7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.JobID;
@@ -45,6 +46,7 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -86,7 +88,8 @@ public class StreamMockEnvironment implements Environment {
 
 	private final int bufferSize;
 
-	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
+	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize,
+									MockInputSplitProvider inputSplitProvider, int bufferSize) {
 		this.jobConfiguration = jobConfig;
 		this.taskConfiguration = taskConfig;
 		this.inputs = new LinkedList<InputGate>();
@@ -293,13 +296,8 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
-	public Configuration getTaskManagerConfiguration(){
-		return new UnmodifiableConfiguration(new Configuration());
-	}
-
-	@Override
-	public String getHostname(){
-		return "localhost";
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration()));
 	}
 }