You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/06 18:52:18 UTC
[4/5] flink git commit: [FLINK-2240] [runtime] Pass flag to configure
use of bloom filters through runtime configuration.
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
index 7172887..d302487 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -238,7 +238,7 @@ public class ReusingReOpenableHashTableITCase {
new ReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
buildInput, probeInput, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0);
+ this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
// do first join with both inputs
@@ -276,7 +276,7 @@ public class ReusingReOpenableHashTableITCase {
//
//
- private final MutableObjectIterator<Record> getProbeInput(final int numKeys,
+ private MutableObjectIterator<Record> getProbeInput(final int numKeys,
final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
@@ -289,8 +289,7 @@ public class ReusingReOpenableHashTableITCase {
}
@Test
- public void testSpillingHashJoinWithMassiveCollisions() throws IOException
- {
+ public void testSpillingHashJoinWithMassiveCollisions() throws IOException {
// the following two values are known to have a hash-code collision on the initial level.
// we use them to make sure one partition grows over-proportionally large
final int REPEATED_VALUE_1 = 40559;
@@ -311,9 +310,6 @@ public class ReusingReOpenableHashTableITCase {
builds.add(build2);
builds.add(build3);
MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
-
-
-
// allocate the memory for the HashTable
List<MemorySegment> memSegments;
@@ -333,7 +329,7 @@ public class ReusingReOpenableHashTableITCase {
final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
- memSegments, ioManager);
+ memSegments, ioManager, true);
for(int probe = 0; probe < NUM_PROBES; probe++) {
// create a probe input that gives 10 million pairs with 10 values sharing a key
@@ -347,9 +343,8 @@ public class ReusingReOpenableHashTableITCase {
Record record;
final Record recordReuse = new Record();
- while (join.nextRecord())
- {
- int numBuildValues = 0;
+ while (join.nextRecord()) {
+ long numBuildValues = 0;
final Record probeRec = join.getCurrentProbeRecord();
int key = probeRec.getField(0, IntValue.class).getValue();
@@ -369,10 +364,10 @@ public class ReusingReOpenableHashTableITCase {
Long contained = map.get(key);
if (contained == null) {
- contained = Long.valueOf(numBuildValues);
+ contained = numBuildValues;
}
else {
- contained = Long.valueOf(contained.longValue() + numBuildValues);
+ contained = contained + numBuildValues;
}
map.put(key, contained);
@@ -449,8 +444,9 @@ public class ReusingReOpenableHashTableITCase {
final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
- memSegments, ioManager);
- for(int probe = 0; probe < NUM_PROBES; probe++) {
+ memSegments, ioManager, true);
+
+ for (int probe = 0; probe < NUM_PROBES; probe++) {
// create a probe input that gives 10 million pairs with 10 values sharing a key
MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
if(probe == 0) {
@@ -463,7 +459,7 @@ public class ReusingReOpenableHashTableITCase {
while (join.nextRecord())
{
- int numBuildValues = 0;
+ long numBuildValues = 0;
final Record probeRec = join.getCurrentProbeRecord();
int key = probeRec.getField(0, IntValue.class).getValue();
@@ -483,10 +479,10 @@ public class ReusingReOpenableHashTableITCase {
Long contained = map.get(key);
if (contained == null) {
- contained = Long.valueOf(numBuildValues);
+ contained = numBuildValues;
}
else {
- contained = Long.valueOf(contained.longValue() + numBuildValues);
+ contained = contained + numBuildValues;
}
map.put(key, contained);
@@ -526,5 +522,4 @@ public class ReusingReOpenableHashTableITCase {
}
return copy;
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 0aab5fe..642ac7d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.junit.Assert;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -69,11 +70,11 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
private final List<UnilateralSortMerger<Record>> sorters;
private final AbstractInvokable owner;
-
- private final Configuration config;
-
+
private final TaskConfig taskConfig;
+ private final TaskManagerRuntimeInfo taskManageInfo;
+
protected final long perSortMem;
protected final double perSortFractionMem;
@@ -111,11 +112,9 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
this.sorters = new ArrayList<UnilateralSortMerger<Record>>();
this.owner = new DummyInvokable();
-
- this.config = new Configuration();
- this.taskConfig = new TaskConfig(this.config);
-
+ this.taskConfig = new TaskConfig(new Configuration());
this.executionConfig = executionConfig;
+ this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
}
@Parameterized.Parameters
@@ -279,7 +278,10 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
return this.taskConfig;
}
-
+ @Override
+ public TaskManagerRuntimeInfo getTaskManagerInfo() {
+ return this.taskManageInfo;
+ }
@Override
public ExecutionConfig getExecutionConfig() {
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index b71b01e..51c7f93 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.operators.testutils;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.fs.Path;
@@ -43,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.types.Record;
import org.apache.flink.util.MutableObjectIterator;
import org.mockito.invocation.InvocationOnMock;
@@ -193,13 +193,8 @@ public class MockEnvironment implements Environment {
}
@Override
- public Configuration getTaskManagerConfiguration(){
- return new UnmodifiableConfiguration(new Configuration());
- }
-
- @Override
- public String getHostname(){
- return "localhost";
+ public TaskManagerRuntimeInfo getTaskManagerInfo() {
+ return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration()));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 1e25bab..20edc20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.operators.PactTaskContext;
import org.apache.flink.runtime.operators.ResettablePactDriver;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
@@ -54,7 +55,9 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactT
protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024;
- protected static final int PAGE_SIZE = 32 * 1024;
+ protected static final int PAGE_SIZE = 32 * 1024;
+
+ private final TaskManagerRuntimeInfo taskManageInfo;
private final IOManager ioManager;
@@ -110,6 +113,8 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactT
this.executionConfig = executionConfig;
this.comparators = new ArrayList<TypeComparator<IN>>(2);
+
+ this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
}
@Parameterized.Parameters
@@ -292,6 +297,11 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactT
}
@Override
+ public TaskManagerRuntimeInfo getTaskManagerInfo() {
+ return this.taskManageInfo;
+ }
+
+ @Override
public <X> MutableObjectIterator<X> getInput(int index) {
MutableObjectIterator<IN> in = this.input;
if (in == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 38d9992..7debb08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -187,7 +187,7 @@ public class HashVsSortMiniBenchmark {
new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
input1, input2, this.serializer1.getSerializer(), this.comparator1,
this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
- this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE);
+ this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
iterator.open();
@@ -226,7 +226,7 @@ public class HashVsSortMiniBenchmark {
new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
input1, input2, this.serializer1.getSerializer(), this.comparator1,
this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
- this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE);
+ this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
iterator.open();
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 44013ef..9091fa7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.JobID;
@@ -45,6 +46,7 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -86,7 +88,8 @@ public class StreamMockEnvironment implements Environment {
private final int bufferSize;
- public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
+ public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize,
+ MockInputSplitProvider inputSplitProvider, int bufferSize) {
this.jobConfiguration = jobConfig;
this.taskConfiguration = taskConfig;
this.inputs = new LinkedList<InputGate>();
@@ -293,13 +296,8 @@ public class StreamMockEnvironment implements Environment {
}
@Override
- public Configuration getTaskManagerConfiguration(){
- return new UnmodifiableConfiguration(new Configuration());
- }
-
- @Override
- public String getHostname(){
- return "localhost";
+ public TaskManagerRuntimeInfo getTaskManagerInfo() {
+ return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration()));
}
}