You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/06 18:52:15 UTC
[1/5] flink git commit: [FLINK-2492] [runtime] Rename former 'match'
classes to 'join' to reflect consistent naming scheme.
Repository: flink
Updated Branches:
refs/heads/master 5a788ec23 -> 0b73b4387
[FLINK-2492] [runtime] Rename former 'match' classes to 'join' to reflect consistent naming scheme.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/685086a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/685086a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/685086a3
Branch: refs/heads/master
Commit: 685086a3dd9afcec2eec76485298bc7b3f031a3c
Parents: a5b84b2
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 6 15:18:14 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 6 16:56:36 2015 +0200
----------------------------------------------------------------------
.../plantranslate/JobGraphGenerator.java | 4 +-
.../AbstractCachedBuildSideJoinDriver.java | 204 +++++++++++++++++++
.../AbstractCachedBuildSideMatchDriver.java | 204 -------------------
.../operators/BuildFirstCachedJoinDriver.java | 27 +++
.../operators/BuildFirstCachedMatchDriver.java | 27 ---
.../operators/BuildSecondCachedJoinDriver.java | 27 +++
.../operators/BuildSecondCachedMatchDriver.java | 27 ---
.../flink/runtime/operators/DriverStrategy.java | 10 +-
.../flink/runtime/operators/JoinDriver.java | 185 +++++++++++++++++
.../flink/runtime/operators/MatchDriver.java | 191 -----------------
.../runtime/operators/CachedMatchTaskTest.java | 22 +-
.../operators/MatchTaskExternalITCase.java | 6 +-
.../flink/runtime/operators/MatchTaskTest.java | 46 ++---
.../ConnectedComponentsNepheleITCase.java | 4 +-
.../CustomCompensatableDanglingPageRank.java | 4 +-
...mpensatableDanglingPageRankWithCombiner.java | 4 +-
.../CompensatableDanglingPageRank.java | 4 +-
17 files changed, 495 insertions(+), 501 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 6fd2796..d440063 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -70,7 +70,7 @@ import org.apache.flink.runtime.operators.DataSourceTask;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.JoinWithSolutionSetFirstDriver;
import org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver;
-import org.apache.flink.runtime.operators.MatchDriver;
+import org.apache.flink.runtime.operators.JoinDriver;
import org.apache.flink.runtime.operators.NoOpDriver;
import org.apache.flink.runtime.operators.RegularPactTask;
import org.apache.flink.runtime.operators.chaining.ChainedDriver;
@@ -336,7 +336,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
}
// adjust the driver
- if (conf.getDriver().equals(MatchDriver.class)) {
+ if (conf.getDriver().equals(JoinDriver.class)) {
conf.setDriver(inputNum == 0 ? JoinWithSolutionSetFirstDriver.class : JoinWithSolutionSetSecondDriver.class);
}
else if (conf.getDriver().equals(CoGroupDriver.class)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
new file mode 100644
index 0000000..aff8d01
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildSecondReOpenableHashMatchIterator;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends JoinDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
+
+ private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator;
+
+ private final int buildSideIndex;
+
+ private final int probeSideIndex;
+
+ private boolean objectReuseEnabled = false;
+
+
+ protected AbstractCachedBuildSideJoinDriver(int buildSideIndex, int probeSideIndex) {
+ this.buildSideIndex = buildSideIndex;
+ this.probeSideIndex = probeSideIndex;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public boolean isInputResettable(int inputNum) {
+ if (inputNum < 0 || inputNum > 1) {
+ throw new IndexOutOfBoundsException();
+ }
+ return inputNum == buildSideIndex;
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ TaskConfig config = this.taskContext.getTaskConfig();
+
+ TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
+ TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
+ TypeComparator<IT1> comparator1 = this.taskContext.getDriverComparator(0);
+ TypeComparator<IT2> comparator2 = this.taskContext.getDriverComparator(1);
+ MutableObjectIterator<IT1> input1 = this.taskContext.getInput(0);
+ MutableObjectIterator<IT2> input2 = this.taskContext.getInput(1);
+
+ TypePairComparatorFactory<IT1, IT2> pairComparatorFactory =
+ this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader());
+
+ double availableMemory = config.getRelativeMemoryDriver();
+
+ ExecutionConfig executionConfig = taskContext.getExecutionConfig();
+ objectReuseEnabled = executionConfig.isObjectReuseEnabled();
+
+ if (objectReuseEnabled) {
+ if (buildSideIndex == 0 && probeSideIndex == 1) {
+
+ matchIterator = new ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(
+ input1, input2,
+ serializer1, comparator1,
+ serializer2, comparator2,
+ pairComparatorFactory.createComparator21(comparator1, comparator2),
+ this.taskContext.getMemoryManager(),
+ this.taskContext.getIOManager(),
+ this.taskContext.getOwningNepheleTask(),
+ availableMemory);
+
+
+ } else if (buildSideIndex == 1 && probeSideIndex == 0) {
+
+ matchIterator = new ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(
+ input1, input2,
+ serializer1, comparator1,
+ serializer2, comparator2,
+ pairComparatorFactory.createComparator12(comparator1, comparator2),
+ this.taskContext.getMemoryManager(),
+ this.taskContext.getIOManager(),
+ this.taskContext.getOwningNepheleTask(),
+ availableMemory);
+
+ } else {
+ throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
+ }
+ } else {
+ if (buildSideIndex == 0 && probeSideIndex == 1) {
+
+ matchIterator = new NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(
+ input1, input2,
+ serializer1, comparator1,
+ serializer2, comparator2,
+ pairComparatorFactory.createComparator21(comparator1, comparator2),
+ this.taskContext.getMemoryManager(),
+ this.taskContext.getIOManager(),
+ this.taskContext.getOwningNepheleTask(),
+ availableMemory);
+
+
+ } else if (buildSideIndex == 1 && probeSideIndex == 0) {
+
+ matchIterator = new NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(
+ input1, input2,
+ serializer1, comparator1,
+ serializer2, comparator2,
+ pairComparatorFactory.createComparator12(comparator1, comparator2),
+ this.taskContext.getMemoryManager(),
+ this.taskContext.getIOManager(),
+ this.taskContext.getOwningNepheleTask(),
+ availableMemory);
+
+ } else {
+ throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
+ }
+ }
+
+ this.matchIterator.open();
+ }
+
+ @Override
+ public void prepare() throws Exception {
+ // nothing
+ }
+
+ @Override
+ public void run() throws Exception {
+
+ final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
+ final Collector<OT> collector = this.taskContext.getOutputCollector();
+
+ while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector));
+
+ }
+
+ @Override
+ public void cleanup() throws Exception {}
+
+ @Override
+ public void reset() throws Exception {
+
+ MutableObjectIterator<IT1> input1 = this.taskContext.getInput(0);
+ MutableObjectIterator<IT2> input2 = this.taskContext.getInput(1);
+
+ if (objectReuseEnabled) {
+ if (buildSideIndex == 0 && probeSideIndex == 1) {
+ final ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+
+ matchIterator.reopenProbe(input2);
+ } else {
+ final ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+ matchIterator.reopenProbe(input1);
+ }
+ } else {
+ if (buildSideIndex == 0 && probeSideIndex == 1) {
+ final NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+
+ matchIterator.reopenProbe(input2);
+ } else {
+ final NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+ matchIterator.reopenProbe(input1);
+ }
+ }
+ }
+
+ @Override
+ public void teardown() {
+ this.running = false;
+ if (this.matchIterator != null) {
+ this.matchIterator.close();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ this.running = false;
+ if (this.matchIterator != null) {
+ this.matchIterator.abort();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
deleted file mode 100644
index f3b2dfd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.ReusingBuildSecondReOpenableHashMatchIterator;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends MatchDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
-
- private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator;
-
- private final int buildSideIndex;
-
- private final int probeSideIndex;
-
- private boolean objectReuseEnabled = false;
-
-
- protected AbstractCachedBuildSideMatchDriver(int buildSideIndex, int probeSideIndex) {
- this.buildSideIndex = buildSideIndex;
- this.probeSideIndex = probeSideIndex;
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public boolean isInputResettable(int inputNum) {
- if (inputNum < 0 || inputNum > 1) {
- throw new IndexOutOfBoundsException();
- }
- return inputNum == buildSideIndex;
- }
-
- @Override
- public void initialize() throws Exception {
- TaskConfig config = this.taskContext.getTaskConfig();
-
- TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
- TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
- TypeComparator<IT1> comparator1 = this.taskContext.getDriverComparator(0);
- TypeComparator<IT2> comparator2 = this.taskContext.getDriverComparator(1);
- MutableObjectIterator<IT1> input1 = this.taskContext.getInput(0);
- MutableObjectIterator<IT2> input2 = this.taskContext.getInput(1);
-
- TypePairComparatorFactory<IT1, IT2> pairComparatorFactory =
- this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader());
-
- double availableMemory = config.getRelativeMemoryDriver();
-
- ExecutionConfig executionConfig = taskContext.getExecutionConfig();
- objectReuseEnabled = executionConfig.isObjectReuseEnabled();
-
- if (objectReuseEnabled) {
- if (buildSideIndex == 0 && probeSideIndex == 1) {
-
- matchIterator = new ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(
- input1, input2,
- serializer1, comparator1,
- serializer2, comparator2,
- pairComparatorFactory.createComparator21(comparator1, comparator2),
- this.taskContext.getMemoryManager(),
- this.taskContext.getIOManager(),
- this.taskContext.getOwningNepheleTask(),
- availableMemory);
-
-
- } else if (buildSideIndex == 1 && probeSideIndex == 0) {
-
- matchIterator = new ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(
- input1, input2,
- serializer1, comparator1,
- serializer2, comparator2,
- pairComparatorFactory.createComparator12(comparator1, comparator2),
- this.taskContext.getMemoryManager(),
- this.taskContext.getIOManager(),
- this.taskContext.getOwningNepheleTask(),
- availableMemory);
-
- } else {
- throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
- }
- } else {
- if (buildSideIndex == 0 && probeSideIndex == 1) {
-
- matchIterator = new NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(
- input1, input2,
- serializer1, comparator1,
- serializer2, comparator2,
- pairComparatorFactory.createComparator21(comparator1, comparator2),
- this.taskContext.getMemoryManager(),
- this.taskContext.getIOManager(),
- this.taskContext.getOwningNepheleTask(),
- availableMemory);
-
-
- } else if (buildSideIndex == 1 && probeSideIndex == 0) {
-
- matchIterator = new NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(
- input1, input2,
- serializer1, comparator1,
- serializer2, comparator2,
- pairComparatorFactory.createComparator12(comparator1, comparator2),
- this.taskContext.getMemoryManager(),
- this.taskContext.getIOManager(),
- this.taskContext.getOwningNepheleTask(),
- availableMemory);
-
- } else {
- throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
- }
- }
-
- this.matchIterator.open();
- }
-
- @Override
- public void prepare() throws Exception {
- // nothing
- }
-
- @Override
- public void run() throws Exception {
-
- final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
- final Collector<OT> collector = this.taskContext.getOutputCollector();
-
- while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector));
-
- }
-
- @Override
- public void cleanup() throws Exception {}
-
- @Override
- public void reset() throws Exception {
-
- MutableObjectIterator<IT1> input1 = this.taskContext.getInput(0);
- MutableObjectIterator<IT2> input2 = this.taskContext.getInput(1);
-
- if (objectReuseEnabled) {
- if (buildSideIndex == 0 && probeSideIndex == 1) {
- final ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
-
- matchIterator.reopenProbe(input2);
- } else {
- final ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
- matchIterator.reopenProbe(input1);
- }
- } else {
- if (buildSideIndex == 0 && probeSideIndex == 1) {
- final NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
-
- matchIterator.reopenProbe(input2);
- } else {
- final NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
- matchIterator.reopenProbe(input1);
- }
- }
- }
-
- @Override
- public void teardown() {
- this.running = false;
- if (this.matchIterator != null) {
- this.matchIterator.close();
- }
- }
-
- @Override
- public void cancel() {
- this.running = false;
- if (this.matchIterator != null) {
- this.matchIterator.abort();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildFirstCachedJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildFirstCachedJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildFirstCachedJoinDriver.java
new file mode 100644
index 0000000..6da221f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildFirstCachedJoinDriver.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+public class BuildFirstCachedJoinDriver<IT1, IT2, OT> extends AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> {
+
+ public BuildFirstCachedJoinDriver() {
+ super(0, 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildFirstCachedMatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildFirstCachedMatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildFirstCachedMatchDriver.java
deleted file mode 100644
index c141767..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildFirstCachedMatchDriver.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-public class BuildFirstCachedMatchDriver<IT1, IT2, OT> extends AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> {
-
- public BuildFirstCachedMatchDriver() {
- super(0, 1);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildSecondCachedJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildSecondCachedJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildSecondCachedJoinDriver.java
new file mode 100644
index 0000000..44824c5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildSecondCachedJoinDriver.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+public class BuildSecondCachedJoinDriver<IT1, IT2, OT> extends AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> {
+
+ public BuildSecondCachedJoinDriver() {
+ super(1, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildSecondCachedMatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildSecondCachedMatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildSecondCachedMatchDriver.java
deleted file mode 100644
index eed03ab..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildSecondCachedMatchDriver.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-public class BuildSecondCachedMatchDriver<IT1, IT2, OT> extends AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> {
-
- public BuildSecondCachedMatchDriver() {
- super(1, 0);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index 4a0035c..3aadf2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -72,7 +72,7 @@ public enum DriverStrategy {
ALL_GROUP_COMBINE(AllGroupCombineDriver.class, null, PIPELINED, 0),
// both inputs are merged, but materialized to the side for block-nested-loop-join among values with equal key
- MERGE(MatchDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
+ MERGE(JoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
// co-grouping inputs
CO_GROUP(CoGroupDriver.class, null, PIPELINED, PIPELINED, 2),
@@ -81,13 +81,13 @@ public enum DriverStrategy {
// the first input is build side, the second side is probe side of a hybrid hash table
- HYBRIDHASH_BUILD_FIRST(MatchDriver.class, null, FULL_DAM, MATERIALIZING, 2),
+ HYBRIDHASH_BUILD_FIRST(JoinDriver.class, null, FULL_DAM, MATERIALIZING, 2),
// the second input is build side, the first side is probe side of a hybrid hash table
- HYBRIDHASH_BUILD_SECOND(MatchDriver.class, null, MATERIALIZING, FULL_DAM, 2),
+ HYBRIDHASH_BUILD_SECOND(JoinDriver.class, null, MATERIALIZING, FULL_DAM, 2),
// a cached variant of HYBRIDHASH_BUILD_FIRST, that can only be used inside of iterations
- HYBRIDHASH_BUILD_FIRST_CACHED(BuildFirstCachedMatchDriver.class, null, FULL_DAM, MATERIALIZING, 2),
+ HYBRIDHASH_BUILD_FIRST_CACHED(BuildFirstCachedJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2),
// cached variant of HYBRIDHASH_BUILD_SECOND, that can only be used inside of iterations
- HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedMatchDriver.class, null, MATERIALIZING, FULL_DAM, 2),
+ HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2),
// the second input is inner loop, the first input is outer loop and block-wise processed
NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, FULL_DAM, 0),
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
new file mode 100644
index 0000000..af3da55
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
+import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
+import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+/**
+ * The join driver implements the logic of a join operator at runtime. It instantiates either
+ * hash or sort-merge based strategies to find joining pairs of records.
+ *
+ * @see org.apache.flink.api.common.functions.FlatJoinFunction
+ */
+public class JoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(JoinDriver.class);
+
+ protected PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
+
+ private volatile JoinTaskIterator<IT1, IT2, OT> joinIterator; // the iterator that does the actual join
+
+ protected volatile boolean running;
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
+ this.taskContext = context;
+ this.running = true;
+ }
+
+ @Override
+ public int getNumberOfInputs() {
+ return 2;
+ }
+
+ @Override
+ public Class<FlatJoinFunction<IT1, IT2, OT>> getStubType() {
+ @SuppressWarnings("unchecked")
+ final Class<FlatJoinFunction<IT1, IT2, OT>> clazz = (Class<FlatJoinFunction<IT1, IT2, OT>>) (Class<?>) FlatJoinFunction.class;
+ return clazz;
+ }
+
+ @Override
+ public int getNumberOfDriverComparators() {
+ return 2;
+ }
+
+ @Override
+ public void prepare() throws Exception{
+ final TaskConfig config = this.taskContext.getTaskConfig();
+
+ // obtain task manager's memory manager and I/O manager
+ final MemoryManager memoryManager = this.taskContext.getMemoryManager();
+ final IOManager ioManager = this.taskContext.getIOManager();
+
+ // set up memory and I/O parameters
+ final double fractionAvailableMemory = config.getRelativeMemoryDriver();
+ final int numPages = memoryManager.computeNumberOfPages(fractionAvailableMemory);
+
+ // test minimum memory requirements
+ final DriverStrategy ls = config.getDriverStrategy();
+
+ final MutableObjectIterator<IT1> in1 = this.taskContext.getInput(0);
+ final MutableObjectIterator<IT2> in2 = this.taskContext.getInput(1);
+
+ // get the key positions and types
+ final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
+ final TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
+ final TypeComparator<IT1> comparator1 = this.taskContext.getDriverComparator(0);
+ final TypeComparator<IT2> comparator2 = this.taskContext.getDriverComparator(1);
+
+ final TypePairComparatorFactory<IT1, IT2> pairComparatorFactory = config.getPairComparatorFactory(
+ this.taskContext.getUserCodeClassLoader());
+ if (pairComparatorFactory == null) {
+ throw new Exception("Missing pair comparator factory for join driver");
+ }
+
+ ExecutionConfig executionConfig = taskContext.getExecutionConfig();
+ boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Join Driver object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+ }
+
+ // create and return joining iterator according to provided local strategy.
+ if (objectReuseEnabled) {
+ switch (ls) {
+ case MERGE:
+ this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
+
+ break;
+ case HYBRIDHASH_BUILD_FIRST:
+ this.joinIterator = new ReusingBuildFirstHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+ break;
+ case HYBRIDHASH_BUILD_SECOND:
+ this.joinIterator = new ReusingBuildSecondHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+ break;
+ default:
+ throw new Exception("Unsupported driver strategy for join driver: " + ls.name());
+ }
+ } else {
+ switch (ls) {
+ case MERGE:
+ this.joinIterator = new NonReusingMergeInnerJoinIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
+
+ break;
+ case HYBRIDHASH_BUILD_FIRST:
+ this.joinIterator = new NonReusingBuildFirstHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+ break;
+ case HYBRIDHASH_BUILD_SECOND:
+ this.joinIterator = new NonReusingBuildSecondHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+ break;
+ default:
+ throw new Exception("Unsupported driver strategy for join driver: " + ls.name());
+ }
+ }
+
+ // open the iterator - this triggers the sorting or hash-table building
+ // and blocks until the iterator is ready
+ this.joinIterator.open();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this.taskContext.formatLogString("join task iterator ready."));
+ }
+ }
+
+ @Override
+ public void run() throws Exception {
+ final FlatJoinFunction<IT1, IT2, OT> joinStub = this.taskContext.getStub();
+ final Collector<OT> collector = this.taskContext.getOutputCollector();
+ final JoinTaskIterator<IT1, IT2, OT> joinIterator = this.joinIterator;
+
+ while (this.running && joinIterator.callWithNextKey(joinStub, collector));
+ }
+
+ @Override
+ public void cleanup() throws Exception {
+ if (this.joinIterator != null) {
+ this.joinIterator.close();
+ this.joinIterator = null;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ this.running = false;
+ if (this.joinIterator != null) {
+ this.joinIterator.abort();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
deleted file mode 100644
index e54fca5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
-import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
-import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-/**
- * Match task which is executed by a Task Manager. The task has two inputs and one or multiple outputs.
- * It is provided with a JoinFunction implementation.
- * <p>
- * The MatchTask matches all pairs of records that share the same key and come from different inputs. Each pair of
- * matching records is handed to the <code>match()</code> method of the JoinFunction.
- *
- * @see org.apache.flink.api.common.functions.FlatJoinFunction
- */
-public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
-
- protected static final Logger LOG = LoggerFactory.getLogger(MatchDriver.class);
-
- protected PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
-
- private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator; // the iterator that does the actual matching
-
- protected volatile boolean running;
-
- private boolean objectReuseEnabled = false;
-
- // ------------------------------------------------------------------------
-
- @Override
- public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
- this.taskContext = context;
- this.running = true;
- }
-
- @Override
- public int getNumberOfInputs() {
- return 2;
- }
-
- @Override
- public Class<FlatJoinFunction<IT1, IT2, OT>> getStubType() {
- @SuppressWarnings("unchecked")
- final Class<FlatJoinFunction<IT1, IT2, OT>> clazz = (Class<FlatJoinFunction<IT1, IT2, OT>>) (Class<?>) FlatJoinFunction.class;
- return clazz;
- }
-
- @Override
- public int getNumberOfDriverComparators() {
- return 2;
- }
-
- @Override
- public void prepare() throws Exception{
- final TaskConfig config = this.taskContext.getTaskConfig();
-
- // obtain task manager's memory manager and I/O manager
- final MemoryManager memoryManager = this.taskContext.getMemoryManager();
- final IOManager ioManager = this.taskContext.getIOManager();
-
- // set up memory and I/O parameters
- final double fractionAvailableMemory = config.getRelativeMemoryDriver();
- final int numPages = memoryManager.computeNumberOfPages(fractionAvailableMemory);
-
- // test minimum memory requirements
- final DriverStrategy ls = config.getDriverStrategy();
-
- final MutableObjectIterator<IT1> in1 = this.taskContext.getInput(0);
- final MutableObjectIterator<IT2> in2 = this.taskContext.getInput(1);
-
- // get the key positions and types
- final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
- final TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
- final TypeComparator<IT1> comparator1 = this.taskContext.getDriverComparator(0);
- final TypeComparator<IT2> comparator2 = this.taskContext.getDriverComparator(1);
-
- final TypePairComparatorFactory<IT1, IT2> pairComparatorFactory = config.getPairComparatorFactory(
- this.taskContext.getUserCodeClassLoader());
- if (pairComparatorFactory == null) {
- throw new Exception("Missing pair comparator factory for Match driver");
- }
-
- ExecutionConfig executionConfig = taskContext.getExecutionConfig();
- this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("MatchDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
- }
-
- // create and return MatchTaskIterator according to provided local strategy.
- if (this.objectReuseEnabled) {
- switch (ls) {
- case MERGE:
- this.matchIterator = new ReusingMergeInnerJoinIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
-
- break;
- case HYBRIDHASH_BUILD_FIRST:
- this.matchIterator = new ReusingBuildFirstHashMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
- break;
- case HYBRIDHASH_BUILD_SECOND:
- this.matchIterator = new ReusingBuildSecondHashMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
- break;
- default:
- throw new Exception("Unsupported driver strategy for Match driver: " + ls.name());
- }
- } else {
- switch (ls) {
- case MERGE:
- this.matchIterator = new NonReusingMergeInnerJoinIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
-
- break;
- case HYBRIDHASH_BUILD_FIRST:
- this.matchIterator = new NonReusingBuildFirstHashMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
- break;
- case HYBRIDHASH_BUILD_SECOND:
- this.matchIterator = new NonReusingBuildSecondHashMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
- break;
- default:
- throw new Exception("Unsupported driver strategy for Match driver: " + ls.name());
- }
- }
-
- // open MatchTaskIterator - this triggers the sorting or hash-table building
- // and blocks until the iterator is ready
- this.matchIterator.open();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(this.taskContext.formatLogString("Match task iterator ready."));
- }
- }
-
- @Override
- public void run() throws Exception {
- final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
- final Collector<OT> collector = this.taskContext.getOutputCollector();
- final JoinTaskIterator<IT1, IT2, OT> matchIterator = this.matchIterator;
-
- while (this.running && matchIterator.callWithNextKey(matchStub, collector));
- }
-
- @Override
- public void cleanup() throws Exception {
- if (this.matchIterator != null) {
- this.matchIterator.close();
- this.matchIterator = null;
- }
- }
-
- @Override
- public void cancel() {
- this.running = false;
- if (this.matchIterator != null) {
- this.matchIterator.abort();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
index a57287a..c93c302 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
@@ -78,7 +78,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
getTaskConfig().setRelativeMemoryDriver(1.0f);
- BuildFirstCachedMatchDriver<Record, Record, Record> testTask = new BuildFirstCachedMatchDriver<Record, Record, Record>();
+ BuildFirstCachedJoinDriver<Record, Record, Record> testTask = new BuildFirstCachedJoinDriver<Record, Record, Record>();
try {
testResettableDriver(testTask, MockMatchStub.class, 3);
@@ -109,7 +109,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
getTaskConfig().setRelativeMemoryDriver(1.0f);
- BuildSecondCachedMatchDriver<Record, Record, Record> testTask = new BuildSecondCachedMatchDriver<Record, Record, Record>();
+ BuildSecondCachedJoinDriver<Record, Record, Record> testTask = new BuildSecondCachedJoinDriver<Record, Record, Record>();
try {
testResettableDriver(testTask, MockMatchStub.class, 3);
@@ -140,7 +140,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
getTaskConfig().setRelativeMemoryDriver(1.0f);
- BuildFirstCachedMatchDriver<Record, Record, Record> testTask = new BuildFirstCachedMatchDriver<Record, Record, Record>();
+ BuildFirstCachedJoinDriver<Record, Record, Record> testTask = new BuildFirstCachedJoinDriver<Record, Record, Record>();
try {
testResettableDriver(testTask, MockMatchStub.class, 3);
@@ -171,7 +171,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
getTaskConfig().setRelativeMemoryDriver(1.0f);
- BuildSecondCachedMatchDriver<Record, Record, Record> testTask = new BuildSecondCachedMatchDriver<Record, Record, Record>();
+ BuildSecondCachedJoinDriver<Record, Record, Record> testTask = new BuildSecondCachedJoinDriver<Record, Record, Record>();
try {
testResettableDriver(testTask, MockMatchStub.class, 3);
@@ -202,7 +202,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
getTaskConfig().setRelativeMemoryDriver(1.0f);
- BuildFirstCachedMatchDriver<Record, Record, Record> testTask = new BuildFirstCachedMatchDriver<Record, Record, Record>();
+ BuildFirstCachedJoinDriver<Record, Record, Record> testTask = new BuildFirstCachedJoinDriver<Record, Record, Record>();
try {
testResettableDriver(testTask, MockMatchStub.class, 3);
@@ -233,7 +233,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
getTaskConfig().setRelativeMemoryDriver(1.0f);
- BuildFirstCachedMatchDriver<Record, Record, Record> testTask = new BuildFirstCachedMatchDriver<Record, Record, Record>();
+ BuildFirstCachedJoinDriver<Record, Record, Record> testTask = new BuildFirstCachedJoinDriver<Record, Record, Record>();
try {
testResettableDriver(testTask, MockFailingMatchStub.class, 3);
@@ -263,7 +263,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
getTaskConfig().setRelativeMemoryDriver(1.0f);
- BuildSecondCachedMatchDriver<Record, Record, Record> testTask = new BuildSecondCachedMatchDriver<Record, Record, Record>();
+ BuildSecondCachedJoinDriver<Record, Record, Record> testTask = new BuildSecondCachedJoinDriver<Record, Record, Record>();
try {
testResettableDriver(testTask, MockFailingMatchStub.class, 3);
@@ -294,7 +294,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
getTaskConfig().setRelativeMemoryDriver(1.0f);
- final BuildFirstCachedMatchDriver<Record, Record, Record> testTask = new BuildFirstCachedMatchDriver<Record, Record, Record>();
+ final BuildFirstCachedJoinDriver<Record, Record, Record> testTask = new BuildFirstCachedJoinDriver<Record, Record, Record>();
final AtomicBoolean success = new AtomicBoolean(false);
@@ -338,7 +338,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
getTaskConfig().setRelativeMemoryDriver(1.0f);
- final BuildSecondCachedMatchDriver<Record, Record, Record> testTask = new BuildSecondCachedMatchDriver<Record, Record, Record>();
+ final BuildSecondCachedJoinDriver<Record, Record, Record> testTask = new BuildSecondCachedJoinDriver<Record, Record, Record>();
final AtomicBoolean success = new AtomicBoolean(false);
@@ -382,7 +382,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
getTaskConfig().setRelativeMemoryDriver(1.0f);
- final BuildFirstCachedMatchDriver<Record, Record, Record> testTask = new BuildFirstCachedMatchDriver<Record, Record, Record>();
+ final BuildFirstCachedJoinDriver<Record, Record, Record> testTask = new BuildFirstCachedJoinDriver<Record, Record, Record>();
final AtomicBoolean success = new AtomicBoolean(false);
@@ -426,7 +426,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
getTaskConfig().setRelativeMemoryDriver(1.0f);
- final BuildSecondCachedMatchDriver<Record, Record, Record> testTask = new BuildSecondCachedMatchDriver<Record, Record, Record>();
+ final BuildSecondCachedJoinDriver<Record, Record, Record> testTask = new BuildSecondCachedJoinDriver<Record, Record, Record>();
final AtomicBoolean success = new AtomicBoolean(false);
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
index 29be8f8..30c1610 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
@@ -80,7 +80,7 @@ public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Rec
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
setNumFileHandlesForSort(4);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
@@ -113,7 +113,7 @@ public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Rec
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
getTaskConfig().setRelativeMemoryDriver(hash_frac);
- MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
testDriver(testTask, MockMatchStub.class);
@@ -144,7 +144,7 @@ public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Rec
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
getTaskConfig().setRelativeMemoryDriver(hash_frac);
- MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
testDriver(testTask, MockMatchStub.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
index 16aea69..8fbf05e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
@@ -90,7 +90,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
setNumFileHandlesForSort(4);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
@@ -124,7 +124,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
setNumFileHandlesForSort(4);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
@@ -160,7 +160,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
setNumFileHandlesForSort(4);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
@@ -196,7 +196,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
setNumFileHandlesForSort(4);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
@@ -232,7 +232,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
setNumFileHandlesForSort(4);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
@@ -268,7 +268,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
setNumFileHandlesForSort(4);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
@@ -304,7 +304,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
setNumFileHandlesForSort(4);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
@@ -339,7 +339,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
setNumFileHandlesForSort(4);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
@@ -375,7 +375,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
setNumFileHandlesForSort(4);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
@@ -404,7 +404,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
setNumFileHandlesForSort(4);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
@@ -455,7 +455,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
setNumFileHandlesForSort(4);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
@@ -506,7 +506,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
setNumFileHandlesForSort(4);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
@@ -556,7 +556,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
getTaskConfig().setRelativeMemoryDriver(hash_frac);
- MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
testDriver(testTask, MockMatchStub.class);
@@ -587,7 +587,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
getTaskConfig().setRelativeMemoryDriver(hash_frac);
- MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
testDriver(testTask, MockMatchStub.class);
@@ -618,7 +618,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
getTaskConfig().setRelativeMemoryDriver(hash_frac);
- MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
testDriver(testTask, MockMatchStub.class);
@@ -649,7 +649,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
getTaskConfig().setRelativeMemoryDriver(hash_frac);
- MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
testDriver(testTask, MockMatchStub.class);
@@ -680,7 +680,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
getTaskConfig().setRelativeMemoryDriver(hash_frac);
- MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
testDriver(testTask, MockMatchStub.class);
@@ -711,7 +711,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
getTaskConfig().setRelativeMemoryDriver(hash_frac);
- MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
testDriver(testTask, MockFailingMatchStub.class);
@@ -741,7 +741,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
getTaskConfig().setRelativeMemoryDriver(hash_frac);
- MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
try {
testDriver(testTask, MockFailingMatchStub.class);
@@ -773,7 +773,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
getTaskConfig().setRelativeMemoryDriver(hash_frac);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
final AtomicBoolean success = new AtomicBoolean(false);
@@ -827,7 +827,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
getTaskConfig().setRelativeMemoryDriver(hash_frac);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
final AtomicBoolean success = new AtomicBoolean(false);
@@ -876,7 +876,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
getTaskConfig().setRelativeMemoryDriver(hash_frac);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
final AtomicBoolean success = new AtomicBoolean(false);
@@ -920,7 +920,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
getTaskConfig().setRelativeMemoryDriver(hash_frac);
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
final AtomicBoolean success = new AtomicBoolean(false);
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 8e9266d..7a3639b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -47,7 +47,7 @@ import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
+import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
import org.apache.flink.runtime.operators.CollectorMapDriver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.GroupReduceDriver;
@@ -276,7 +276,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
headConfig.setIterationHeadIndexOfSyncOutput(2);
// the driver
- headConfig.setDriver(BuildSecondCachedMatchDriver.class);
+ headConfig.setDriver(BuildSecondCachedJoinDriver.class);
headConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
headConfig.setStubWrapper(
new UserCodeClassWrapper<NeighborWithComponentIDJoin>(NeighborWithComponentIDJoin.class));
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
index c29e868..a6e6b6e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
+import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
import org.apache.flink.runtime.operators.CoGroupDriver;
import org.apache.flink.runtime.operators.CollectorMapDriver;
import org.apache.flink.runtime.operators.DriverStrategy;
@@ -204,7 +204,7 @@ public class CustomCompensatableDanglingPageRank {
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
intermediateConfig.setIterationId(ITERATION_ID);
// intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
- intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class);
+ intermediateConfig.setDriver(BuildSecondCachedJoinDriver.class);
intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
intermediateConfig.setRelativeMemoryDriver((double) matchMemory / totalMemoryConsumption);
intermediateConfig.addInputToGroup(0);
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
index de9e0a1..0bf258f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
+import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
import org.apache.flink.runtime.operators.CoGroupDriver;
import org.apache.flink.runtime.operators.CollectorMapDriver;
import org.apache.flink.runtime.operators.DriverStrategy;
@@ -204,7 +204,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
intermediateConfig.setIterationId(ITERATION_ID);
// intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
- intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class);
+ intermediateConfig.setDriver(BuildSecondCachedJoinDriver.class);
intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
intermediateConfig.setRelativeMemoryDriver((double)matchMemory/totalMemoryConsumption);
intermediateConfig.addInputToGroup(0);
http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
index eb2ccdc..78038b3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
@@ -37,7 +37,7 @@ import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
+import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
import org.apache.flink.runtime.operators.CoGroupDriver;
import org.apache.flink.runtime.operators.CollectorMapDriver;
import org.apache.flink.runtime.operators.DriverStrategy;
@@ -183,7 +183,7 @@ public class CompensatableDanglingPageRank {
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
intermediateConfig.setIterationId(ITERATION_ID);
// intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
- intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class);
+ intermediateConfig.setDriver(BuildSecondCachedJoinDriver.class);
intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
intermediateConfig.setRelativeMemoryDriver((double)matchMemory/totalMemoryConsumption);
intermediateConfig.addInputToGroup(0);
[3/5] flink git commit: [FLINK-2240] Add bloom filter to filter probe
records during hash join.
Posted by se...@apache.org.
[FLINK-2240] Add bloom filter to filter probe records during hash join.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61dcae39
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61dcae39
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61dcae39
Branch: refs/heads/master
Commit: 61dcae391cb3b45ba3aff47d4d9163889d2958a4
Parents: 685086a
Author: chengxiang li <ch...@intel.com>
Authored: Fri Jul 3 23:53:47 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 6 17:14:39 2015 +0200
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 13 +-
.../operators/hash/MutableHashTable.java | 145 +++++++++-
.../runtime/operators/util/BloomFilter.java | 226 ++++++++++++++++
.../MutableHashTablePerformanceBenchmark.java | 268 +++++++++++++++++++
.../runtime/operators/util/BloomFilterTest.java | 162 +++++++++++
5 files changed, 806 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index c76741b..dad2d99 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -178,7 +178,7 @@ public final class ConfigConstants {
* for hybrid hash joins.
*/
public static final String DEFAULT_SPILLING_MAX_FAN_KEY = "taskmanager.runtime.max-fan";
-
+
/**
* Key for the default spilling threshold. When more than the threshold memory of the sort buffers is full, the
* sorter will start spilling to disk.
@@ -190,6 +190,12 @@ public final class ConfigConstants {
* A value of 0 indicates infinite waiting.
*/
public static final String FS_STREAM_OPENING_TIMEOUT_KEY = "taskmanager.runtime.fs_timeout";
+
+ /**
+ * While spill probe record to disk during probe phase, whether enable bloom filter to filter the probe records
+ * to minimize the spilled probe records count.
+ */
+ public static final String HASHJOIN_ENABLE_BLOOMFILTER = "hashjoin.bloomfilter.enabled";
// ------------------------ YARN Configuration ------------------------
@@ -552,6 +558,11 @@ public final class ConfigConstants {
* The default timeout for filesystem stream opening: infinite (means max long milliseconds).
*/
public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0;
+
+ /**
+ * Enable bloom filter for hash join as it promote hash join performance most of the time.
+ */
+ public static final boolean DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER = true;
// ------------------------ YARN Configuration ------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 7f07cfb..4a57986 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -16,16 +16,16 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.hash;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.iterative.io.HashPartitionIterator;
+import org.apache.flink.runtime.operators.util.BloomFilter;
import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.util.MutableObjectIterator;
@@ -194,6 +195,11 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
* Constant for the bucket status, indicating that the bucket is in memory.
*/
private static final byte BUCKET_STATUS_IN_MEMORY = 0;
+
+ /**
+ * Constant for the bucket status, indicating that the bucket has filter.
+ */
+ private static final byte BUCKET_STATUS_IN_FILTER = 1;
// ------------------------------------------------------------------------
// Members
@@ -348,6 +354,8 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
private boolean running = true;
+ private BloomFilter bloomFilter;
+
// ------------------------------------------------------------------------
// Construction and Teardown
// ------------------------------------------------------------------------
@@ -469,12 +477,19 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
this.recordComparator.setReference(next);
this.bucketIterator.set(bucket, p.overflowSegments, p, hash, bucketInSegmentOffset);
return true;
- }
- else {
- p.insertIntoProbeBuffer(next);
+ } else {
+ byte status = bucket.get(bucketInSegmentOffset + HEADER_STATUS_OFFSET);
+ if (status == BUCKET_STATUS_IN_FILTER) {
+ this.bloomFilter.setBitsLocation(bucket, bucketInSegmentOffset + BUCKET_HEADER_LENGTH);
+ // Use BloomFilter to filter out all the probe records which would not match any key in spilled build table buckets.
+ if (this.bloomFilter.testHash(hash)) {
+ p.insertIntoProbeBuffer(next);
+ }
+ } else {
+ p.insertIntoProbeBuffer(next);
+ }
}
}
-
// -------------- partition done ---------------
return false;
@@ -710,6 +725,27 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
p.finalizeBuildPhase(this.ioManager, this.currentEnumerator, this.writeBehindBuffers);
}
}
+
+ private void initBloomFilter(int numBuckets) {
+ int avgNumRecordsPerBucket = getEstimatedMaxBucketEntries(this.availableMemory.size(), this.segmentSize,
+ numBuckets, this.avgRecordLen);
+ // Assign all bucket size to bloom filter except bucket header length.
+ int byteSize = HASH_BUCKET_SIZE - BUCKET_HEADER_LENGTH;
+ this.bloomFilter = new BloomFilter(avgNumRecordsPerBucket, byteSize);
+ if (LOG.isDebugEnabled()) {
+ double fpp = BloomFilter.estimateFalsePositiveProbability(avgNumRecordsPerBucket, byteSize << 3);
+ LOG.debug(String.format("Create BloomFilter with average input entries per bucket[%d], bytes size[%d], false positive probability[%f].",
+ avgNumRecordsPerBucket, byteSize, fpp));
+ }
+ }
+
+ final private int getEstimatedMaxBucketEntries(int numBuffers, int bufferSize, int numBuckets, int recordLenBytes) {
+ final long totalSize = ((long) bufferSize) * numBuffers;
+ final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
+ final long maxNumRecordsStorable = (MAX_RECURSION_DEPTH + 1) * numRecordsStorable;
+ final long maxNumRecordsPerBucket = maxNumRecordsStorable / numBuckets;
+ return (int) maxNumRecordsPerBucket;
+ }
/**
* @param p
@@ -816,7 +852,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
final int hashCode = hash(btComparator.hash(rec), nextRecursionLevel);
insertIntoTable(rec, hashCode);
}
-
+
// finalize the partitions
for (int i = 0; i < this.partitionsBeingBuilt.size(); i++) {
HashPartition<BT, PT> part = this.partitionsBeingBuilt.get(i);
@@ -853,6 +889,14 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
if (pointer != -1) {
// record was inserted into an in-memory partition. a pointer must be inserted into the buckets
insertBucketEntry(p, bucket, bucketInSegmentPos, hashCode, pointer);
+ } else {
+ byte status = bucket.get(bucketInSegmentPos + HEADER_STATUS_OFFSET);
+ if (status == BUCKET_STATUS_IN_FILTER) {
+ // While partition has been spilled, relocation bloom filter bits for current bucket,
+ // and build bloom filter with hashcode.
+ this.bloomFilter.setBitsLocation(bucket, bucketInSegmentPos + BUCKET_HEADER_LENGTH);
+ this.bloomFilter.addHash(hashCode);
+ }
}
}
@@ -1047,6 +1091,12 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
this.buckets = table;
this.numBuckets = numBuckets;
+
+ boolean enableBloomFilter = GlobalConfiguration.getBoolean(
+ ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, ConfigConstants.DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER);
+ if (enableBloomFilter) {
+ initBloomFilter(numBuckets);
+ }
}
/**
@@ -1088,6 +1138,12 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
final HashPartition<BT, PT> p = partitions.get(largestPartNum);
+ boolean enableBloomFilter = GlobalConfiguration.getBoolean(
+ ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, ConfigConstants.DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER);
+ if (enableBloomFilter) {
+ buildBloomFilterForBucketsInPartition(largestPartNum, p);
+ }
+
// spill the partition
int numBuffersFreed = p.spillPartition(this.availableMemory, this.ioManager,
this.currentEnumerator.next(), this.writeBehindBuffers);
@@ -1101,6 +1157,81 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
return largestPartNum;
}
+ final protected void buildBloomFilterForBucketsInPartition(int partNum, HashPartition<BT, PT> partition) {
+ // Find all the buckets which belongs to this partition, and build bloom filter for each bucket(include its overflow buckets).
+ final int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
+ for (MemorySegment segment : this.buckets) {
+ for (int i = 0; i < bucketsPerSegment; i++) {
+ final int bucketInSegmentOffset = i * HASH_BUCKET_SIZE;
+ byte partitionNumber = segment.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
+ if (partitionNumber == partNum) {
+ byte status = segment.get(bucketInSegmentOffset + HEADER_STATUS_OFFSET);
+ if (status == BUCKET_STATUS_IN_MEMORY) {
+ buildBloomFilterForBucket(bucketInSegmentOffset, segment, partition);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Set all the bucket memory except bucket header as the bit set of bloom filter, and use hash code of build records
+ * to build bloom filter.
+ *
+ * @param bucketInSegmentPos
+ * @param bucket
+ * @param p
+ */
+ final void buildBloomFilterForBucket(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
+ final int count = bucket.getShort(bucketInSegmentPos + HEADER_COUNT_OFFSET);
+ int[] hashCodes = new int[count];
+ // As the hashcode and bloom filter occupy same bytes, so we read all hashcode out at first and then write back to bloom filter.
+ for (int i = 0; i < count; i++) {
+ hashCodes[i] = bucket.getInt(bucketInSegmentPos + BUCKET_HEADER_LENGTH + i * HASH_CODE_LEN);
+ }
+ this.bloomFilter.setBitsLocation(bucket, bucketInSegmentPos + BUCKET_HEADER_LENGTH);
+ for (int hashCode : hashCodes) {
+ this.bloomFilter.addHash(hashCode);
+ }
+ buildBloomFilterForExtraOverflowSegments(bucketInSegmentPos, bucket, p);
+ }
+
+ final private void buildBloomFilterForExtraOverflowSegments(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
+ int totalCount = 0;
+ boolean skip = false;
+ long forwardPointer = bucket.getLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET);
+ while (forwardPointer != BUCKET_FORWARD_POINTER_NOT_SET) {
+ final int overflowSegNum = (int) (forwardPointer >>> 32);
+ if (overflowSegNum < 0 || overflowSegNum >= p.numOverflowSegments) {
+ skip = true;
+ break;
+ }
+ MemorySegment overflowSegment = p.overflowSegments[overflowSegNum];
+ int bucketInOverflowSegmentOffset = (int) (forwardPointer & 0xffffffff);
+
+ final int count = overflowSegment.getShort(bucketInOverflowSegmentOffset + HEADER_COUNT_OFFSET);
+ totalCount += count;
+ // The bits size of bloom filter per bucket is 112 * 8, while expected input entries is greater than 2048, the fpp would higher than 0.9,
+ // which make the bloom filter an overhead instead of optimization.
+ if (totalCount > 2048) {
+ skip = true;
+ break;
+ }
+
+ for (int i = 0; i < count; i++) {
+ int hashCode = overflowSegment.getInt(bucketInOverflowSegmentOffset + BUCKET_HEADER_LENGTH + i * HASH_CODE_LEN);
+ this.bloomFilter.addHash(hashCode);
+ }
+
+ forwardPointer = overflowSegment.getLong(bucketInOverflowSegmentOffset + HEADER_FORWARD_OFFSET);
+
+ }
+
+ if (!skip) {
+ bucket.put(bucketInSegmentPos + HEADER_STATUS_OFFSET, BUCKET_STATUS_IN_FILTER);
+ }
+ }
+
/**
* This method makes sure that at least a certain number of memory segments is in the list of free segments.
* Free memory can be in the list of free segments, or in the return-queue where segments used to write behind are
http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
new file mode 100644
index 0000000..947a56b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.core.memory.MemorySegment;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * BloomFilter is a probabilistic data structure for set membership check. BloomFilters are
+ * highly space efficient when compared to using a HashSet. Because of the probabilistic nature of
+ * bloom filter false positive (element not present in bloom filter but test() says true) are
+ * possible but false negatives are not possible (if element is present then test() will never
+ * say false). The false positive probability is configurable depending on which storage requirement
+ * may increase or decrease. Lower the false positive probability greater is the space requirement.
+ * Bloom filters are sensitive to number of elements that will be inserted in the bloom filter.
+ * During the creation of bloom filter expected number of entries must be specified. If the number
+ * of insertions exceed the specified initial number of entries then false positive probability will
+ * increase accordingly.
+ * <p/>
+ * Internally, this implementation of bloom filter uses MemorySegment to store BitSet, BloomFilter and
+ * BitSet are designed to be able to switch between different MemorySegments, so that Flink can share
+ * the same BloomFilter/BitSet object instance for different bloom filters.
+ * <p/>
+ * Part of this class refers to the implementation from Apache Hive project
+ * https://github.com/apache/hive/blob/master/common/src/java/org/apache/hive/common/util/BloomFilter.java
+ */
+
+public class BloomFilter {
+
+ protected BitSet bitSet;
+ protected int expectedEntries;
+ protected int numHashFunctions;
+
+ public BloomFilter(int expectedEntries, int byteSize) {
+ checkArgument(expectedEntries > 0, "expectedEntries should be > 0");
+ this.expectedEntries = expectedEntries;
+ this.numHashFunctions = optimalNumOfHashFunctions(expectedEntries, byteSize << 3);
+ this.bitSet = new BitSet(byteSize);
+ }
+
+ public void setBitsLocation(MemorySegment memorySegment, int offset) {
+ this.bitSet.setMemorySegment(memorySegment, offset);
+ }
+
+ /**
+ * Compute optimal bits number with given input entries and expected false positive probability.
+ *
+ * @param inputEntries
+ * @param fpp
+ * @return optimal bits number
+ */
+ public static int optimalNumOfBits(long inputEntries, double fpp) {
+ int numBits = (int) (-inputEntries * Math.log(fpp) / (Math.log(2) * Math.log(2)));
+ return numBits;
+ }
+
+ /**
+ * Compute the false positive probability based on given input entries and bits size.
+ * Note: this is just the math expected value, you should not expect the fpp in real case would under the return value for certain.
+ *
+ * @param inputEntries
+ * @param bitSize
+ * @return
+ */
+ public static double estimateFalsePositiveProbability(long inputEntries, int bitSize) {
+ int numFunction = optimalNumOfHashFunctions(inputEntries, bitSize);
+ double p = Math.pow(Math.E, -(double) numFunction * inputEntries / bitSize);
+ double estimatedFPP = Math.pow(1 - p, numFunction);
+ return estimatedFPP;
+ }
+
+ /**
+ * compute the optimal hash function number with given input entries and bits size, which would
+ * make the false positive probability lowest.
+ *
+ * @param expectEntries
+ * @param bitSize
+ * @return hash function number
+ */
+ static int optimalNumOfHashFunctions(long expectEntries, long bitSize) {
+ return Math.max(1, (int) Math.round((double) bitSize / expectEntries * Math.log(2)));
+ }
+
+ public void addHash(int hash32) {
+ int hash1 = hash32;
+ int hash2 = hash32 >>> 16;
+
+ for (int i = 1; i <= numHashFunctions; i++) {
+ int combinedHash = hash1 + (i * hash2);
+ // hashcode should be positive, flip all the bits if it's negative
+ if (combinedHash < 0) {
+ combinedHash = ~combinedHash;
+ }
+ int pos = combinedHash % bitSet.bitSize();
+ bitSet.set(pos);
+ }
+ }
+
+ public boolean testHash(int hash32) {
+ int hash1 = hash32;
+ int hash2 = hash32 >>> 16;
+
+ for (int i = 1; i <= numHashFunctions; i++) {
+ int combinedHash = hash1 + (i * hash2);
+ // hashcode should be positive, flip all the bits if it's negative
+ if (combinedHash < 0) {
+ combinedHash = ~combinedHash;
+ }
+ int pos = combinedHash % bitSet.bitSize();
+ if (!bitSet.get(pos)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void reset() {
+ this.bitSet.clear();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder output = new StringBuilder();
+ output.append("BloomFilter:\n");
+ output.append("\thash function number:").append(numHashFunctions).append("\n");
+ output.append(bitSet);
+ return output.toString();
+ }
+
+ /**
+ * Bare metal bit set implementation. For performance reasons, this implementation does not check
+ * for index bounds nor expand the bit set size if the specified index is greater than the size.
+ */
+ public class BitSet {
+ private MemorySegment memorySegment;
+ // MemorySegment byte array offset.
+ private int offset;
+ // MemorySegment byte size.
+ private int length;
+ private final int LONG_POSITION_MASK = 0xffffffc0;
+
+ public BitSet(int byteSize) {
+ Preconditions.checkArgument(byteSize > 0, "bits size should be greater than 0.");
+ Preconditions.checkArgument(byteSize << 29 == 0, "bytes size should be integral multiple of long size(8 Bytes).");
+ this.length = byteSize;
+ }
+
+ public void setMemorySegment(MemorySegment memorySegment, int offset) {
+ this.memorySegment = memorySegment;
+ this.offset = offset;
+ }
+
+ /**
+ * Sets the bit at specified index.
+ *
+ * @param index - position
+ */
+ public void set(int index) {
+ int longIndex = (index & LONG_POSITION_MASK) >>> 3;
+ long current = memorySegment.getLong(offset + longIndex);
+ current |= (1L << index);
+ memorySegment.putLong(offset + longIndex, current);
+ }
+
+ /**
+ * Returns true if the bit is set in the specified index.
+ *
+ * @param index - position
+ * @return - value at the bit position
+ */
+ public boolean get(int index) {
+ int longIndex = (index & LONG_POSITION_MASK) >>> 3;
+ long current = memorySegment.getLong(offset + longIndex);
+ return (current & (1L << index)) != 0;
+ }
+
+ /**
+ * Number of bits
+ */
+ public int bitSize() {
+ return length << 3;
+ }
+
+ public MemorySegment getMemorySegment() {
+ return this.memorySegment;
+ }
+
+ /**
+ * Clear the bit set.
+ */
+ public void clear() {
+ long zeroValue = 0L;
+ for (int i = 0; i < (length / 8); i++) {
+ memorySegment.putLong(offset + i * 8, zeroValue);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder output = new StringBuilder();
+ output.append("BitSet:\n");
+ output.append("\tMemorySegment:").append(memorySegment.size()).append("\n");
+ output.append("\tOffset:").append(offset).append("\n");
+ output.append("\tLength:").append(length).append("\n");
+ return output.toString();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
new file mode 100644
index 0000000..452e4c1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.types.StringPair;
+import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
+import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
+import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+public class MutableHashTablePerformanceBenchmark {
+ private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
+
+ private MemoryManager memManager;
+ private IOManager ioManager;
+
+ private TypeSerializer<StringPair> pairBuildSideAccesssor;
+ private TypeSerializer<StringPair> pairProbeSideAccesssor;
+ private TypeComparator<StringPair> pairBuildSideComparator;
+ private TypeComparator<StringPair> pairProbeSideComparator;
+ private TypePairComparator<StringPair, StringPair> pairComparator;
+
+ private static final String COMMENT = "this comments should contains a 96 byte data, 100 plus another integer value and seperator char.";
+
+
+ @Before
+ public void setup() {
+ this.pairBuildSideAccesssor = new StringPairSerializer();
+ this.pairProbeSideAccesssor = new StringPairSerializer();
+ this.pairBuildSideComparator = new StringPairComparator();
+ this.pairProbeSideComparator = new StringPairComparator();
+ this.pairComparator = new StringPairPairComparator();
+
+ this.memManager = new DefaultMemoryManager(64 * 1024 * 1024, 1);
+ this.ioManager = new IOManagerAsync();
+ }
+
+ @After
+ public void tearDown() {
+ // shut down I/O manager and Memory Manager and verify the correct shutdown
+ this.ioManager.shutdown();
+ if (!this.ioManager.isProperlyShutDown()) {
+ fail("I/O manager was not property shut down.");
+ }
+ if (!this.memManager.verifyEmpty()) {
+ fail("Not all memory was properly released to the memory manager --> Memory Leak.");
+ }
+ }
+
+ @Test
+ public void compareMutableHashTablePerformance1() throws IOException {
+ // ----------------------------------------------90% filtered during probe spill phase-----------------------------------------
+ // create a build input with 1000000 records with key spread between [0 -- 10000000] with step of 10 for nearby records.
+ int buildSize = 1000000;
+ int buildStep = 10;
+ int buildScope = buildStep * buildSize;
+ // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+ int probeSize = 5000000;
+ int probeStep = 1;
+ int probeScope = buildSize;
+
+ int expectedResult = 500000;
+
+ long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+ long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+
+ System.out.println("HybridHashJoin2:");
+ System.out.println("Build input size: " + 100 * buildSize);
+ System.out.println("Probe input size: " + 100 * probeSize);
+ System.out.println("Available memory: " + this.memManager.getMemorySize());
+ System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+ System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
+ }
+
+ @Test
+ public void compareMutableHashTablePerformance2() throws IOException {
+ // ----------------------------------------------80% filtered during probe spill phase-----------------------------------------
+ // create a build input with 1000000 records with key spread between [0 -- 5000000] with step of 5 for nearby records.
+ int buildSize = 1000000;
+ int buildStep = 5;
+ int buildScope = buildStep * buildSize;
+ // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+ int probeSize = 5000000;
+ int probeStep = 1;
+ int probeScope = buildSize;
+
+ int expectedResult = 1000000;
+
+ long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+ long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+
+ System.out.println("HybridHashJoin3:");
+ System.out.println("Build input size: " + 100 * buildSize);
+ System.out.println("Probe input size: " + 100 * probeSize);
+ System.out.println("Available memory: " + this.memManager.getMemorySize());
+ System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+ System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
+ }
+
+ @Test
+ public void compareMutableHashTablePerformance3() throws IOException {
+ // ----------------------------------------------50% filtered during probe spill phase-------------------------------------------------
+ // create a build input with 1000000 records with key spread between [0 -- 2000000] with step of 2 for nearby records.
+ int buildSize = 1000000;
+ int buildStep = 2;
+ int buildScope = buildStep * buildSize;
+ // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+ int probeSize = 5000000;
+ int probeStep = 1;
+ int probeScope = buildSize;
+
+ int expectedResult = 2500000;
+
+ long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+ long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+
+ System.out.println("HybridHashJoin4:");
+ System.out.println("Build input size: " + 100 * buildSize);
+ System.out.println("Probe input size: " + 100 * probeSize);
+ System.out.println("Available memory: " + this.memManager.getMemorySize());
+ System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+ System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
+ }
+
+ @Test
+ public void compareMutableHashTablePerformance4() throws IOException {
+ // ----------------------------------------------0% filtered during probe spill phase-----------------------------------------
+ // create a build input with 1000000 records with key spread between [0 -- 1000000] with step of 1 for nearby records.
+ int buildSize = 1000000;
+ int buildStep = 1;
+ int buildScope = buildStep * buildSize;
+ // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+ int probeSize = 5000000;
+ int probeStep = 1;
+ int probeScope = buildSize;
+
+ int expectedResult = probeSize / buildStep;
+
+ long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+ long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+
+ System.out.println("HybridHashJoin5:");
+ System.out.println("Build input size: " + 100 * buildSize);
+ System.out.println("Probe input size: " + 100 * probeSize);
+ System.out.println("Available memory: " + this.memManager.getMemorySize());
+ System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+ System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
+ }
+
+ private long hybridHashJoin(int buildSize, int buildStep, int buildScope, int probeSize,
+ int probeStep, int probeScope, int expectedResultSize, boolean enableBloomFilter) throws IOException {
+
+ InputIterator buildIterator = new InputIterator(buildSize, buildStep, buildScope);
+ InputIterator probeIterator = new InputIterator(probeSize, probeStep, probeScope);
+
+ Configuration conf = new Configuration();
+ conf.setBoolean(ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, enableBloomFilter);
+ GlobalConfiguration.includeConfiguration(conf);
+
+ // allocate the memory for the HashTable
+ List<MemorySegment> memSegments;
+ try {
+ // 33 is minimum number of pages required to perform hash join this inputs
+ memSegments = this.memManager.allocatePages(MEM_OWNER, (int) (this.memManager.getMemorySize() / this.memManager.getPageSize()));
+ } catch (MemoryAllocationException maex) {
+ fail("Memory for the Join could not be provided.");
+ return -1;
+ }
+
+ // ----------------------------------------------------------------------------------------
+
+ long start = System.currentTimeMillis();
+ final MutableHashTable<StringPair, StringPair> join = new MutableHashTable<StringPair, StringPair>(
+ this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
+ this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
+ memSegments, ioManager);
+ join.open(buildIterator, probeIterator);
+
+ final StringPair recordReuse = new StringPair();
+ int numRecordsInJoinResult = 0;
+
+ while (join.nextRecord()) {
+ MutableHashTable.HashBucketIterator<StringPair, StringPair> buildSide = join.getBuildSideIterator();
+ while (buildSide.next(recordReuse) != null) {
+ numRecordsInJoinResult++;
+ }
+ }
+ Assert.assertEquals("Wrong number of records in join result.", expectedResultSize, numRecordsInJoinResult);
+
+ join.close();
+ long cost = System.currentTimeMillis() - start;
+ // ----------------------------------------------------------------------------------------
+
+ this.memManager.release(join.getFreedMemory());
+ return cost;
+ }
+
+
+ static class InputIterator implements MutableObjectIterator<StringPair> {
+
+ private int numLeft;
+ private int distance;
+ private int scope;
+
+ public InputIterator(int size, int distance, int scope) {
+ this.numLeft = size;
+ this.distance = distance;
+ this.scope = scope;
+ }
+
+ @Override
+ public StringPair next(StringPair reuse) throws IOException {
+ if (this.numLeft > 0) {
+ numLeft--;
+ int currentKey = (numLeft * distance) % scope;
+ reuse.setKey(Integer.toString(currentKey));
+ reuse.setValue(COMMENT);
+ return reuse;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public StringPair next() throws IOException {
+ return next(new StringPair());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
new file mode 100644
index 0000000..cbbeca0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class BloomFilterTest {
+
+ private static BloomFilter bloomFilter;
+ private static final int INPUT_SIZE = 1024;
+ private static final double FALSE_POSITIVE_PROBABILITY = 0.05;
+
+ @BeforeClass
+ public static void init() {
+ int bitsSize = BloomFilter.optimalNumOfBits(INPUT_SIZE, FALSE_POSITIVE_PROBABILITY);
+ bitsSize = bitsSize + (Long.SIZE - (bitsSize % Long.SIZE));
+ int byteSize = bitsSize >>> 3;
+ MemorySegment memorySegment = new MemorySegment(new byte[byteSize]);
+ bloomFilter = new BloomFilter(INPUT_SIZE, byteSize);
+ bloomFilter.setBitsLocation(memorySegment, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBloomFilterArguments1() {
+ new BloomFilter(-1, 128);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBloomFilterArguments2() {
+ new BloomFilter(0, 128);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBloomFilterArguments3() {
+ new BloomFilter(1024, -1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBloomFilterArguments4() {
+ new BloomFilter(1024, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBloomFilterArguments5() {
+ new BloomFilter(1024, 21);
+ }
+
+ @Test
+ public void testBloomNumBits() {
+ assertEquals(0, BloomFilter.optimalNumOfBits(0, 0));
+ assertEquals(0, BloomFilter.optimalNumOfBits(0, 1));
+ assertEquals(0, BloomFilter.optimalNumOfBits(1, 1));
+ assertEquals(7, BloomFilter.optimalNumOfBits(1, 0.03));
+ assertEquals(72, BloomFilter.optimalNumOfBits(10, 0.03));
+ assertEquals(729, BloomFilter.optimalNumOfBits(100, 0.03));
+ assertEquals(7298, BloomFilter.optimalNumOfBits(1000, 0.03));
+ assertEquals(72984, BloomFilter.optimalNumOfBits(10000, 0.03));
+ assertEquals(729844, BloomFilter.optimalNumOfBits(100000, 0.03));
+ assertEquals(7298440, BloomFilter.optimalNumOfBits(1000000, 0.03));
+ assertEquals(6235224, BloomFilter.optimalNumOfBits(1000000, 0.05));
+ }
+
+ @Test
+ public void testBloomFilterNumHashFunctions() {
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(-1, -1));
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(0, 0));
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(10, 0));
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(10, 10));
+ assertEquals(7, BloomFilter.optimalNumOfHashFunctions(10, 100));
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(100, 100));
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(1000, 100));
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(10000, 100));
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(100000, 100));
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(1000000, 100));
+ }
+
+ @Test
+ public void testBloomFilterFalsePositiveProbability() {
+ assertEquals(7298440, BloomFilter.optimalNumOfBits(1000000, 0.03));
+ assertEquals(6235224, BloomFilter.optimalNumOfBits(1000000, 0.05));
+ assertEquals(4792529, BloomFilter.optimalNumOfBits(1000000, 0.1));
+ assertEquals(3349834, BloomFilter.optimalNumOfBits(1000000, 0.2));
+ assertEquals(2505911, BloomFilter.optimalNumOfBits(1000000, 0.3));
+ assertEquals(1907139, BloomFilter.optimalNumOfBits(1000000, 0.4));
+
+ // Make sure the estimated fpp error is less than 1%.
+ assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 7298440) - 0.03) < 0.01);
+ assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 6235224) - 0.05) < 0.01);
+ assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 4792529) - 0.1) < 0.01);
+ assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 3349834) - 0.2) < 0.01);
+ assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 2505911) - 0.3) < 0.01);
+ assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 1907139) - 0.4) < 0.01);
+ }
+
+ @Test
+ public void testHashcodeInput() {
+ bloomFilter.reset();
+ int val1 = "val1".hashCode();
+ int val2 = "val2".hashCode();
+ int val3 = "val3".hashCode();
+ int val4 = "val4".hashCode();
+ int val5 = "val5".hashCode();
+
+ assertFalse(bloomFilter.testHash(val1));
+ assertFalse(bloomFilter.testHash(val2));
+ assertFalse(bloomFilter.testHash(val3));
+ assertFalse(bloomFilter.testHash(val4));
+ assertFalse(bloomFilter.testHash(val5));
+ bloomFilter.addHash(val1);
+ assertTrue(bloomFilter.testHash(val1));
+ assertFalse(bloomFilter.testHash(val2));
+ assertFalse(bloomFilter.testHash(val3));
+ assertFalse(bloomFilter.testHash(val4));
+ assertFalse(bloomFilter.testHash(val5));
+ bloomFilter.addHash(val2);
+ assertTrue(bloomFilter.testHash(val1));
+ assertTrue(bloomFilter.testHash(val2));
+ assertFalse(bloomFilter.testHash(val3));
+ assertFalse(bloomFilter.testHash(val4));
+ assertFalse(bloomFilter.testHash(val5));
+ bloomFilter.addHash(val3);
+ assertTrue(bloomFilter.testHash(val1));
+ assertTrue(bloomFilter.testHash(val2));
+ assertTrue(bloomFilter.testHash(val3));
+ assertFalse(bloomFilter.testHash(val4));
+ assertFalse(bloomFilter.testHash(val5));
+ bloomFilter.addHash(val4);
+ assertTrue(bloomFilter.testHash(val1));
+ assertTrue(bloomFilter.testHash(val2));
+ assertTrue(bloomFilter.testHash(val3));
+ assertTrue(bloomFilter.testHash(val4));
+ assertFalse(bloomFilter.testHash(val5));
+ bloomFilter.addHash(val5);
+ assertTrue(bloomFilter.testHash(val1));
+ assertTrue(bloomFilter.testHash(val2));
+ assertTrue(bloomFilter.testHash(val3));
+ assertTrue(bloomFilter.testHash(val4));
+ assertTrue(bloomFilter.testHash(val5));
+ }
+}
\ No newline at end of file
[2/5] flink git commit: [hotfix] Increase timeout for YARN tests to
180 seconds to prevent occasional CI failures.
Posted by se...@apache.org.
[hotfix] Increase timeout for YARN tests to 180 seconds to prevent occasional CI failures.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a5b84b2b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a5b84b2b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a5b84b2b
Branch: refs/heads/master
Commit: a5b84b2b8284bcdaa649050b5090d79d8b58344c
Parents: 5a788ec
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 6 16:55:32 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 6 16:56:36 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/yarn/YarnTestBase.java | 36 ++++++++++++--------
1 file changed, 21 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a5b84b2b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
index 23b8940..2d22700 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
@@ -425,13 +425,15 @@ public abstract class YarnTestBase {
System.setErr(new PrintStream(errContent));
- final int START_TIMEOUT_SECONDS = 60;
-
+ // we wait for at most three minutes
+ final int START_TIMEOUT_SECONDS = 180;
+ final long deadline = System.currentTimeMillis() + (START_TIMEOUT_SECONDS * 1000);
+
Runner runner = new Runner(args, type);
runner.start();
boolean expectedStringSeen = false;
- for(int second = 0; second < START_TIMEOUT_SECONDS; second++) {
+ do {
sleep(1000);
String outContentString = outContent.toString();
String errContentString = errContent.toString();
@@ -448,8 +450,7 @@ public abstract class YarnTestBase {
}
}
// check output for correct TaskManager startup.
- if(outContentString.contains(terminateAfterString)
- || errContentString.contains(terminateAfterString) ) {
+ if (outContentString.contains(terminateAfterString) || errContentString.contains(terminateAfterString) ) {
expectedStringSeen = true;
LOG.info("Found expected output in redirected streams");
// send "stop" command to command line interface
@@ -457,23 +458,28 @@ public abstract class YarnTestBase {
runner.sendStop();
// wait for the thread to stop
try {
- runner.join(10000);
- } catch (InterruptedException e) {
+ runner.join(30000);
+ }
+ catch (InterruptedException e) {
LOG.debug("Interrupted while stopping runner", e);
}
LOG.warn("RunWithArgs runner stopped.");
- break;
}
- // check if thread died
- if(!runner.isAlive()) {
- sendOutput();
- if(runner.getReturnValue() != 0) {
- Assert.fail("Runner thread died before the test was finished. Return value = " + runner.getReturnValue());
- } else {
- LOG.info("Runner stopped earlier than expected with return value = 0");
+ else {
+ // check if thread died
+ if (!runner.isAlive()) {
+ sendOutput();
+ if (runner.getReturnValue() != 0) {
+ Assert.fail("Runner thread died before the test was finished. Return value = "
+ + runner.getReturnValue());
+ } else {
+ LOG.info("Runner stopped earlier than expected with return value = 0");
+ }
}
}
}
+ while (!expectedStringSeen && System.currentTimeMillis() < deadline);
+
sendOutput();
Assert.assertTrue("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " +
"expected string did not show up", expectedStringSeen);
[4/5] flink git commit: [FLINK-2240] [runtime] Pass flag to configure
use of bloom filters through runtime configuration.
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
index 7172887..d302487 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -238,7 +238,7 @@ public class ReusingReOpenableHashTableITCase {
new ReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
buildInput, probeInput, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0);
+ this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
// do first join with both inputs
@@ -276,7 +276,7 @@ public class ReusingReOpenableHashTableITCase {
//
//
- private final MutableObjectIterator<Record> getProbeInput(final int numKeys,
+ private MutableObjectIterator<Record> getProbeInput(final int numKeys,
final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
@@ -289,8 +289,7 @@ public class ReusingReOpenableHashTableITCase {
}
@Test
- public void testSpillingHashJoinWithMassiveCollisions() throws IOException
- {
+ public void testSpillingHashJoinWithMassiveCollisions() throws IOException {
// the following two values are known to have a hash-code collision on the initial level.
// we use them to make sure one partition grows over-proportionally large
final int REPEATED_VALUE_1 = 40559;
@@ -311,9 +310,6 @@ public class ReusingReOpenableHashTableITCase {
builds.add(build2);
builds.add(build3);
MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
-
-
-
// allocate the memory for the HashTable
List<MemorySegment> memSegments;
@@ -333,7 +329,7 @@ public class ReusingReOpenableHashTableITCase {
final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
- memSegments, ioManager);
+ memSegments, ioManager, true);
for(int probe = 0; probe < NUM_PROBES; probe++) {
// create a probe input that gives 10 million pairs with 10 values sharing a key
@@ -347,9 +343,8 @@ public class ReusingReOpenableHashTableITCase {
Record record;
final Record recordReuse = new Record();
- while (join.nextRecord())
- {
- int numBuildValues = 0;
+ while (join.nextRecord()) {
+ long numBuildValues = 0;
final Record probeRec = join.getCurrentProbeRecord();
int key = probeRec.getField(0, IntValue.class).getValue();
@@ -369,10 +364,10 @@ public class ReusingReOpenableHashTableITCase {
Long contained = map.get(key);
if (contained == null) {
- contained = Long.valueOf(numBuildValues);
+ contained = numBuildValues;
}
else {
- contained = Long.valueOf(contained.longValue() + numBuildValues);
+ contained = contained + numBuildValues;
}
map.put(key, contained);
@@ -449,8 +444,9 @@ public class ReusingReOpenableHashTableITCase {
final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
- memSegments, ioManager);
- for(int probe = 0; probe < NUM_PROBES; probe++) {
+ memSegments, ioManager, true);
+
+ for (int probe = 0; probe < NUM_PROBES; probe++) {
// create a probe input that gives 10 million pairs with 10 values sharing a key
MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
if(probe == 0) {
@@ -463,7 +459,7 @@ public class ReusingReOpenableHashTableITCase {
while (join.nextRecord())
{
- int numBuildValues = 0;
+ long numBuildValues = 0;
final Record probeRec = join.getCurrentProbeRecord();
int key = probeRec.getField(0, IntValue.class).getValue();
@@ -483,10 +479,10 @@ public class ReusingReOpenableHashTableITCase {
Long contained = map.get(key);
if (contained == null) {
- contained = Long.valueOf(numBuildValues);
+ contained = numBuildValues;
}
else {
- contained = Long.valueOf(contained.longValue() + numBuildValues);
+ contained = contained + numBuildValues;
}
map.put(key, contained);
@@ -526,5 +522,4 @@ public class ReusingReOpenableHashTableITCase {
}
return copy;
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 0aab5fe..642ac7d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.junit.Assert;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -69,11 +70,11 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
private final List<UnilateralSortMerger<Record>> sorters;
private final AbstractInvokable owner;
-
- private final Configuration config;
-
+
private final TaskConfig taskConfig;
+ private final TaskManagerRuntimeInfo taskManageInfo;
+
protected final long perSortMem;
protected final double perSortFractionMem;
@@ -111,11 +112,9 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
this.sorters = new ArrayList<UnilateralSortMerger<Record>>();
this.owner = new DummyInvokable();
-
- this.config = new Configuration();
- this.taskConfig = new TaskConfig(this.config);
-
+ this.taskConfig = new TaskConfig(new Configuration());
this.executionConfig = executionConfig;
+ this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
}
@Parameterized.Parameters
@@ -279,7 +278,10 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
return this.taskConfig;
}
-
+ @Override
+ public TaskManagerRuntimeInfo getTaskManagerInfo() {
+ return this.taskManageInfo;
+ }
@Override
public ExecutionConfig getExecutionConfig() {
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index b71b01e..51c7f93 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.operators.testutils;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.fs.Path;
@@ -43,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.types.Record;
import org.apache.flink.util.MutableObjectIterator;
import org.mockito.invocation.InvocationOnMock;
@@ -193,13 +193,8 @@ public class MockEnvironment implements Environment {
}
@Override
- public Configuration getTaskManagerConfiguration(){
- return new UnmodifiableConfiguration(new Configuration());
- }
-
- @Override
- public String getHostname(){
- return "localhost";
+ public TaskManagerRuntimeInfo getTaskManagerInfo() {
+ return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration()));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 1e25bab..20edc20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.operators.PactTaskContext;
import org.apache.flink.runtime.operators.ResettablePactDriver;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
@@ -54,7 +55,9 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactT
protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024;
- protected static final int PAGE_SIZE = 32 * 1024;
+ protected static final int PAGE_SIZE = 32 * 1024;
+
+ private final TaskManagerRuntimeInfo taskManageInfo;
private final IOManager ioManager;
@@ -110,6 +113,8 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactT
this.executionConfig = executionConfig;
this.comparators = new ArrayList<TypeComparator<IN>>(2);
+
+ this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
}
@Parameterized.Parameters
@@ -292,6 +297,11 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactT
}
@Override
+ public TaskManagerRuntimeInfo getTaskManagerInfo() {
+ return this.taskManageInfo;
+ }
+
+ @Override
public <X> MutableObjectIterator<X> getInput(int index) {
MutableObjectIterator<IN> in = this.input;
if (in == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 38d9992..7debb08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -187,7 +187,7 @@ public class HashVsSortMiniBenchmark {
new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
input1, input2, this.serializer1.getSerializer(), this.comparator1,
this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
- this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE);
+ this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
iterator.open();
@@ -226,7 +226,7 @@ public class HashVsSortMiniBenchmark {
new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
input1, input2, this.serializer1.getSerializer(), this.comparator1,
this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
- this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE);
+ this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
iterator.open();
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 44013ef..9091fa7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.JobID;
@@ -45,6 +46,7 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -86,7 +88,8 @@ public class StreamMockEnvironment implements Environment {
private final int bufferSize;
- public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
+ public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize,
+ MockInputSplitProvider inputSplitProvider, int bufferSize) {
this.jobConfiguration = jobConfig;
this.taskConfiguration = taskConfig;
this.inputs = new LinkedList<InputGate>();
@@ -293,13 +296,8 @@ public class StreamMockEnvironment implements Environment {
}
@Override
- public Configuration getTaskManagerConfiguration(){
- return new UnmodifiableConfiguration(new Configuration());
- }
-
- @Override
- public String getHostname(){
- return "localhost";
+ public TaskManagerRuntimeInfo getTaskManagerInfo() {
+ return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration()));
}
}
[5/5] flink git commit: [FLINK-2240] [runtime] Pass flag to configure
use of bloom filters through runtime configuration.
Posted by se...@apache.org.
[FLINK-2240] [runtime] Pass flag to configure use of bloom filters through runtime configuration.
Also make sure that most tests run with enabled bloom filters, to increase test coverage.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b73b438
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b73b438
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b73b438
Branch: refs/heads/master
Commit: 0b73b4387a855627209a4dbaef930321a5090594
Parents: 61dcae3
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 6 15:49:07 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 6 18:13:00 2015 +0200
----------------------------------------------------------------------
docs/setup/config.md | 15 +-
.../flink/configuration/ConfigConstants.java | 26 +--
.../flink/runtime/execution/Environment.java | 12 +-
.../AbstractCachedBuildSideJoinDriver.java | 20 +-
.../flink/runtime/operators/JoinDriver.java | 65 +++++--
.../flink/runtime/operators/PactDriver.java | 10 +-
.../runtime/operators/PactTaskContext.java | 9 +-
.../runtime/operators/RegularPactTask.java | 24 ++-
.../operators/hash/HashMatchIteratorBase.java | 11 +-
.../operators/hash/MutableHashTable.java | 192 ++++---------------
.../NonReusingBuildFirstHashMatchIterator.java | 8 +-
...ngBuildFirstReOpenableHashMatchIterator.java | 26 ++-
.../NonReusingBuildSecondHashMatchIterator.java | 8 +-
...gBuildSecondReOpenableHashMatchIterator.java | 23 ++-
.../hash/ReOpenableMutableHashTable.java | 18 +-
.../ReusingBuildFirstHashMatchIterator.java | 8 +-
...ngBuildFirstReOpenableHashMatchIterator.java | 22 ++-
.../ReusingBuildSecondHashMatchIterator.java | 8 +-
...gBuildSecondReOpenableHashMatchIterator.java | 26 ++-
.../runtime/taskmanager/RuntimeEnvironment.java | 16 +-
.../operators/drivers/TestTaskContext.java | 9 +
.../MutableHashTablePerformanceBenchmark.java | 10 +-
.../hash/NonReusingHashMatchIteratorITCase.java | 12 +-
.../NonReusingReOpenableHashTableITCase.java | 33 ++--
.../hash/ReusingHashMatchIteratorITCase.java | 12 +-
.../hash/ReusingReOpenableHashTableITCase.java | 33 ++--
.../operators/testutils/DriverTestBase.java | 18 +-
.../operators/testutils/MockEnvironment.java | 11 +-
.../testutils/UnaryOperatorTestBase.java | 12 +-
.../operators/util/HashVsSortMiniBenchmark.java | 4 +-
.../runtime/tasks/StreamMockEnvironment.java | 14 +-
31 files changed, 338 insertions(+), 377 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index ba541d4..53b9ae0 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -244,11 +244,6 @@ free for objects created by user-defined functions. (DEFAULT: 0.7)
This parameter is only evaluated, if `taskmanager.memory.size` is not set.
- `jobclient.polling.interval`: The interval (in seconds) in which the client
polls the JobManager for the status of its job (DEFAULT: 2).
-- `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins and
-fan-out for spilling hash tables. Limits the number of file handles per operator,
-but may cause intermediate merging/partitioning, if set too small (DEFAULT: 128).
-- `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts spilling
-when this fraction of its memory budget is full (DEFAULT: 0.8).
- `taskmanager.heartbeat-interval`: The interval in which the TaskManager sends
heartbeats to the JobManager.
- `jobmanager.max-heartbeat-delay-before-failure.msecs`: The maximum time that a
@@ -324,6 +319,16 @@ sample exceeds this value (possible because of misconfiguration of the parser),
the sampling aborts. This value can be overridden for a specific input with the
input format's parameters (DEFAULT: 2097152 (= 2 MiBytes)).
+### Runtime Algorithms
+
+- `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins and
+fan-out for spilling hash tables. Limits the number of file handles per operator,
+but may cause intermediate merging/partitioning, if set too small (DEFAULT: 128).
+- `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts spilling
+when this fraction of its memory budget is full (DEFAULT: 0.8).
+- `taskmanager.runtime.hashjoin-bloom-filters`: If true, the hash join uses bloom filters to pre-filter records against spilled partitions. (DEFAULT: true)
+
+
## YARN
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index dad2d99..d145eb2 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -172,6 +172,8 @@ public final class ConfigConstants {
*/
public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration";
+ // --------------------------- Runtime Algorithms -------------------------------
+
/**
* Parameter for the maximum fan for out-of-core algorithms.
* Corresponds to the maximum fan-in for merge-sorts and the maximum fan-out
@@ -184,18 +186,17 @@ public final class ConfigConstants {
* sorter will start spilling to disk.
*/
public static final String DEFAULT_SORT_SPILLING_THRESHOLD_KEY = "taskmanager.runtime.sort-spilling-threshold";
+
+ /**
+ * Parameter to switch hash join bloom filters for spilled partitions on and off.
+ */
+ public static final String RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY = "taskmanager.runtime.hashjoin-bloom-filters";
/**
* The config parameter defining the timeout for filesystem stream opening.
* A value of 0 indicates infinite waiting.
*/
public static final String FS_STREAM_OPENING_TIMEOUT_KEY = "taskmanager.runtime.fs_timeout";
-
- /**
- * While spill probe record to disk during probe phase, whether enable bloom filter to filter the probe records
- * to minimize the spilled probe records count.
- */
- public static final String HASHJOIN_ENABLE_BLOOMFILTER = "hashjoin.bloomfilter.enabled";
// ------------------------ YARN Configuration ------------------------
@@ -543,6 +544,13 @@ public final class ConfigConstants {
* The default task manager's maximum registration duration
*/
public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";
+
+ // ------------------------ Runtime Algorithms ------------------------
+
+ /**
+ * Default setting for the switch for hash join bloom filters for spilled partitions.
+ */
+ public static final boolean DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS = true;
/**
* The default value for the maximum spilling fan in/out.
@@ -558,15 +566,9 @@ public final class ConfigConstants {
* The default timeout for filesystem stream opening: infinite (means max long milliseconds).
*/
public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0;
-
- /**
- * Enable bloom filter for hash join as it promote hash join performance most of the time.
- */
- public static final boolean DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER = true;
// ------------------------ YARN Configuration ------------------------
-
/**
* Minimum amount of Heap memory to subtract from the requested TaskManager size.
* We came up with these values experimentally.
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index af29560..c742ce5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import java.util.Map;
import java.util.concurrent.Future;
@@ -72,14 +73,11 @@ public interface Environment {
Configuration getTaskConfiguration();
/**
- * @return The task manager configuration
- */
- Configuration getTaskManagerConfiguration();
-
- /**
- * @return Hostname of the task manager
+ * Gets the task manager info, with configuration and hostname.
+ *
+ * @return The task manager info, with configuration and hostname.
*/
- String getHostname();
+ TaskManagerRuntimeInfo getTaskManagerInfo();
/**
* Returns the job-wide configuration object that was attached to the JobGraph.
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
index aff8d01..4096f0c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashMatchIterator;
import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashMatchIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashMatchIterator;
@@ -74,7 +75,10 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader());
double availableMemory = config.getRelativeMemoryDriver();
-
+ boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration().getBoolean(
+ ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY,
+ ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
+
ExecutionConfig executionConfig = taskContext.getExecutionConfig();
objectReuseEnabled = executionConfig.isObjectReuseEnabled();
@@ -89,7 +93,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
- availableMemory);
+ availableMemory,
+ hashJoinUseBitMaps);
} else if (buildSideIndex == 1 && probeSideIndex == 0) {
@@ -102,7 +107,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
- availableMemory);
+ availableMemory,
+ hashJoinUseBitMaps);
} else {
throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
@@ -118,7 +124,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
- availableMemory);
+ availableMemory,
+ hashJoinUseBitMaps);
} else if (buildSideIndex == 1 && probeSideIndex == 0) {
@@ -131,7 +138,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getMemoryManager(),
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
- availableMemory);
+ availableMemory,
+ hashJoinUseBitMaps);
} else {
throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
@@ -148,12 +156,10 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
@Override
public void run() throws Exception {
-
final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector));
-
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
index af3da55..5df715f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
@@ -19,25 +19,27 @@
package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
-import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
+import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* The join driver implements the logic of a join operator at runtime. It instantiates either
* hash or sort-merge based strategies to find joining pairs of records.
@@ -115,19 +117,40 @@ public class JoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1
if (LOG.isDebugEnabled()) {
LOG.debug("Join Driver object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
}
+
+ boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration().getBoolean(
+ ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY,
+ ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
// create and return joining iterator according to provided local strategy.
if (objectReuseEnabled) {
switch (ls) {
case MERGE:
- this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
-
+ this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2,
+ serializer1, comparator1,
+ serializer2, comparator2,
+ pairComparatorFactory.createComparator12(comparator1, comparator2),
+ memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
break;
case HYBRIDHASH_BUILD_FIRST:
- this.joinIterator = new ReusingBuildFirstHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+ this.joinIterator = new ReusingBuildFirstHashMatchIterator<>(in1, in2,
+ serializer1, comparator1,
+ serializer2, comparator2,
+ pairComparatorFactory.createComparator21(comparator1, comparator2),
+ memoryManager, ioManager,
+ this.taskContext.getOwningNepheleTask(),
+ fractionAvailableMemory,
+ hashJoinUseBitMaps);
break;
case HYBRIDHASH_BUILD_SECOND:
- this.joinIterator = new ReusingBuildSecondHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+ this.joinIterator = new ReusingBuildSecondHashMatchIterator<>(in1, in2,
+ serializer1, comparator1,
+ serializer2, comparator2,
+ pairComparatorFactory.createComparator12(comparator1, comparator2),
+ memoryManager, ioManager,
+ this.taskContext.getOwningNepheleTask(),
+ fractionAvailableMemory,
+ hashJoinUseBitMaps);
break;
default:
throw new Exception("Unsupported driver strategy for join driver: " + ls.name());
@@ -135,14 +158,32 @@ public class JoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1
} else {
switch (ls) {
case MERGE:
- this.joinIterator = new NonReusingMergeInnerJoinIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
+ this.joinIterator = new NonReusingMergeInnerJoinIterator<>(in1, in2,
+ serializer1, comparator1,
+ serializer2, comparator2,
+ pairComparatorFactory.createComparator12(comparator1, comparator2),
+ memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
break;
case HYBRIDHASH_BUILD_FIRST:
- this.joinIterator = new NonReusingBuildFirstHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+ this.joinIterator = new NonReusingBuildFirstHashMatchIterator<>(in1, in2,
+ serializer1, comparator1,
+ serializer2, comparator2,
+ pairComparatorFactory.createComparator21(comparator1, comparator2),
+ memoryManager, ioManager,
+ this.taskContext.getOwningNepheleTask(),
+ fractionAvailableMemory,
+ hashJoinUseBitMaps);
break;
case HYBRIDHASH_BUILD_SECOND:
- this.joinIterator = new NonReusingBuildSecondHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+ this.joinIterator = new NonReusingBuildSecondHashMatchIterator<>(in1, in2,
+ serializer1, comparator1,
+ serializer2, comparator2,
+ pairComparatorFactory.createComparator12(comparator1, comparator2),
+ memoryManager, ioManager,
+ this.taskContext.getOwningNepheleTask(),
+ fractionAvailableMemory,
+ hashJoinUseBitMaps);
break;
default:
throw new Exception("Unsupported driver strategy for join driver: " + ls.name());
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
index a53f5bf..288f7ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
@@ -16,16 +16,14 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.functions.Function;
-
/**
- * The interface to be implemented by all pact drivers that run alone (or as the primary driver) in a nephele task.
- * The driver is the code that deals with everything that specific to a certain PACT. It implements the actual
- * <i>map</i> or <i>reduce</i> specific code.
+ * The interface to be implemented by all drivers that run alone (or as the primary driver) in a task.
+ * A driver implements the actual code to perform a batch operation, like <i>map()</i>,
+ * <i>reduce()</i>, <i>join()</i>, or <i>coGroup()</i>.
*
* @see PactTaskContext
*
@@ -37,7 +35,7 @@ public interface PactDriver<S extends Function, OT> {
void setup(PactTaskContext<S, OT> context);
/**
- * Gets the number of inputs (= Nephele Gates and Readers) that the task has.
+ * Gets the number of inputs that the task has.
*
* @return The number of inputs.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
index bc23fa3..5c2ed67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
@@ -26,15 +26,14 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
/**
- * A runtime task is the task that is executed by the flink engine inside a task vertex.
- * It typically has a {@link PactDriver}, and optionally multiple chained drivers. In addition, it
- * deals with the runtime setup and teardown and the control-flow logic. The latter appears especially
- * in the case of iterations.
+ * The task context gives a driver (e.g., {@link MapDriver}, or {@link JoinDriver}) access to
+ * the runtime components and configuration that they can use to fulfil their task.
*
* @param <S> The UDF type.
* @param <OT> The produced data type.
@@ -44,6 +43,8 @@ import org.apache.flink.util.MutableObjectIterator;
public interface PactTaskContext<S, OT> {
TaskConfig getTaskConfig();
+
+ TaskManagerRuntimeInfo getTaskManagerInfo();
ClassLoader getUserCodeClassLoader();
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 78bf383..873d948 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -60,6 +60,7 @@ import org.apache.flink.runtime.operators.util.ReaderIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
@@ -660,7 +661,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
*/
protected void initInputReaders() throws Exception {
final int numInputs = getNumTaskInputs();
- final MutableReader<?>[] inputReaders = new MutableReader[numInputs];
+ final MutableReader<?>[] inputReaders = new MutableReader<?>[numInputs];
int currentReaderOffset = 0;
@@ -705,7 +706,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
*/
protected void initBroadcastInputReaders() throws Exception {
final int numBroadcastInputs = this.config.getNumBroadcastInputs();
- final MutableReader<?>[] broadcastInputReaders = new MutableReader[numBroadcastInputs];
+ final MutableReader<?>[] broadcastInputReaders = new MutableReader<?>[numBroadcastInputs];
int currentReaderOffset = config.getNumInputs();
@@ -737,8 +738,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
*/
protected void initInputsSerializersAndComparators(int numInputs, int numComparators) throws Exception {
this.inputSerializers = new TypeSerializerFactory<?>[numInputs];
- this.inputComparators = numComparators > 0 ? new TypeComparator[numComparators] : null;
- this.inputIterators = new MutableObjectIterator[numInputs];
+ this.inputComparators = numComparators > 0 ? new TypeComparator<?>[numComparators] : null;
+ this.inputIterators = new MutableObjectIterator<?>[numInputs];
ClassLoader userCodeClassLoader = getUserCodeClassLoader();
@@ -764,7 +765,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
* Creates all the serializers and iterators for the broadcast inputs.
*/
protected void initBroadcastInputsSerializers(int numBroadcastInputs) throws Exception {
- this.broadcastInputSerializers = new TypeSerializerFactory[numBroadcastInputs];
+ this.broadcastInputSerializers = new TypeSerializerFactory<?>[numBroadcastInputs];
ClassLoader userCodeClassLoader = getUserCodeClassLoader();
@@ -787,8 +788,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
final MemoryManager memMan = getMemoryManager();
final IOManager ioMan = getIOManager();
- this.localStrategies = new CloseableInputProvider[numInputs];
- this.inputs = new MutableObjectIterator[numInputs];
+ this.localStrategies = new CloseableInputProvider<?>[numInputs];
+ this.inputs = new MutableObjectIterator<?>[numInputs];
this.excludeFromReset = new boolean[numInputs];
this.inputIsCached = new boolean[numInputs];
this.inputIsAsyncMaterialized = new boolean[numInputs];
@@ -807,8 +808,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// acts as a pipeline breaker. this one should only be there, if a pipeline breaker is needed.
// the second variant spills to the side and will not read unless the result is also consumed
// in a pipelined fashion.
- this.resettableInputs = new SpillingResettableMutableObjectIterator[numInputs];
- this.tempBarriers = new TempBarrier[numInputs];
+ this.resettableInputs = new SpillingResettableMutableObjectIterator<?>[numInputs];
+ this.tempBarriers = new TempBarrier<?>[numInputs];
for (int i = 0; i < numInputs; i++) {
final int memoryPages;
@@ -1044,6 +1045,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
}
@Override
+ public TaskManagerRuntimeInfo getTaskManagerInfo() {
+ return getEnvironment().getTaskManagerInfo();
+ }
+
+ @Override
public MemoryManager getMemoryManager() {
return getEnvironment().getMemoryManager();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
index 4e0112a..08b6191 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
@@ -32,6 +32,7 @@ import java.util.List;
* Common methods for all Hash Join Iterators.
*/
public class HashMatchIteratorBase {
+
public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
TypeSerializer<BT> buildSideSerializer,
TypeComparator<BT> buildSideComparator,
@@ -41,11 +42,15 @@ public class HashMatchIteratorBase {
MemoryManager memManager,
IOManager ioManager,
AbstractInvokable ownerTask,
- double memoryFraction) throws MemoryAllocationException {
+ double memoryFraction,
+ boolean useBloomFilters) throws MemoryAllocationException {
final int numPages = memManager.computeNumberOfPages(memoryFraction);
final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
- return new MutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
+
+ return new MutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+ buildSideComparator, probeSideComparator, pairComparator,
+ memorySegments, ioManager,
+ useBloomFilters);
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 4a57986..b0042fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -24,10 +24,9 @@ import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -46,22 +45,16 @@ import org.apache.flink.runtime.operators.util.BloomFilter;
import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.util.MutableObjectIterator;
-
/**
* An implementation of a Hybrid Hash Join. The join starts operating in memory and gradually starts
* spilling contents to disk, when the memory is not sufficient. It does not need to know a priori
* how large the input will be.
- * <p>
- * The design of this class follows on many parts the design presented in
- * "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al. In its current state, the
- * implementation lacks features like dynamic role reversal, partition tuning, or histogram guided partitioning.
- *<p>
- *
- *
- * <hr>
*
- * The layout of the buckets inside a memory segment is as follows:
+ * <p>The design of this class follows on many parts the design presented in
+ * "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al. In its current state, the
+ * implementation lacks features like dynamic role reversal, partition tuning, or histogram guided partitioning.</p>
*
+ * <p>The layout of the buckets inside a memory segment is as follows:</p>
* <pre>
* +----------------------------- Bucket x ----------------------------
* |Partition (1 byte) | Status (1 byte) | element count (2 bytes) |
@@ -189,8 +182,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
*/
private static final long BUCKET_FORWARD_POINTER_NOT_SET = ~0x0L;
-// private static final byte BUCKET_STATUS_SPILLED = 1;
-
/**
* Constant for the bucket status, indicating that the bucket is in memory.
*/
@@ -274,11 +265,14 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
*/
protected final int bucketsPerSegmentBits;
- /**
+ /**
* An estimate for the average record length.
*/
private final int avgRecordLen;
+ /** Flag to enable/disable bloom filters for spilled partitions */
+ private final boolean useBloomFilters;
+
// ------------------------------------------------------------------------
/**
@@ -296,8 +290,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
*/
private HashBucketIterator<BT, PT> bucketIterator;
-// private LazyHashBucketIterator<BT, PT> lazyBucketIterator;
-
/**
* Iterator over the elements from the probe side.
*/
@@ -319,6 +311,10 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
* of hash-codes and pointers to the elements.
*/
protected MemorySegment[] buckets;
+
+ /** The bloom filter utility used to transform hash buckets of spilled partitions into a
+ * probabilistic filter */
+ private BloomFilter bloomFilter;
/**
* The number of buckets in the current table. The bucket array is not necessarily fully
@@ -353,25 +349,35 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
protected boolean furtherPartitioning = false;
private boolean running = true;
-
- private BloomFilter bloomFilter;
// ------------------------------------------------------------------------
// Construction and Teardown
// ------------------------------------------------------------------------
+
+ public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer,
+ TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator,
+ TypePairComparator<PT, BT> comparator,
+ List<MemorySegment> memorySegments, IOManager ioManager)
+ {
+ this(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, comparator,
+ memorySegments, ioManager, true);
+ }
public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer,
TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator,
- TypePairComparator<PT, BT> comparator, List<MemorySegment> memorySegments, IOManager ioManager)
+ TypePairComparator<PT, BT> comparator,
+ List<MemorySegment> memorySegments,
+ IOManager ioManager,
+ boolean useBloomFilters)
{
this(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, comparator,
- memorySegments, ioManager, DEFAULT_RECORD_LEN);
+ memorySegments, ioManager, DEFAULT_RECORD_LEN, useBloomFilters);
}
public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer,
TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> comparator, List<MemorySegment> memorySegments,
- IOManager ioManager, int avgRecordLen)
+ IOManager ioManager, int avgRecordLen, boolean useBloomFilters)
{
// some sanity checks first
if (memorySegments == null) {
@@ -390,6 +396,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
this.recordComparator = comparator;
this.availableMemory = memorySegments;
this.ioManager = ioManager;
+ this.useBloomFilters = useBloomFilters;
this.avgRecordLen = avgRecordLen > 0 ? avgRecordLen :
buildSideSerializer.getLength() == -1 ? DEFAULT_RECORD_LEN : buildSideSerializer.getLength();
@@ -551,16 +558,12 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
public boolean nextRecord() throws IOException {
-
+
final boolean probeProcessing = processProbeIter();
- if(probeProcessing) {
- return true;
- }
- return prepareNextPartition();
+ return probeProcessing || prepareNextPartition();
}
- public HashBucketIterator<BT, PT> getMatchesFor(PT record) throws IOException
- {
+ public HashBucketIterator<BT, PT> getMatchesFor(PT record) throws IOException {
final TypeComparator<PT> probeAccessors = this.probeSideComparator;
final int hash = hash(probeAccessors.hash(record), this.currentRecursionDepth);
final int posHashCode = hash % this.numBuckets;
@@ -585,32 +588,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
}
-// public LazyHashBucketIterator<BT, PT> getLazyMatchesFor(PT record) throws IOException
-// {
-// final TypeComparator<PT> probeAccessors = this.probeSideComparator;
-// final int hash = hash(probeAccessors.hash(record), this.currentRecursionDepth);
-// final int posHashCode = hash % this.numBuckets;
-//
-// // get the bucket for the given hash code
-// final int bucketArrayPos = posHashCode >> this.bucketsPerSegmentBits;
-// final int bucketInSegmentOffset = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
-// final MemorySegment bucket = this.buckets[bucketArrayPos];
-//
-// // get the basic characteristics of the bucket
-// final int partitionNumber = bucket.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
-// final HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
-//
-// // for an in-memory partition, process set the return iterators, else spill the probe records
-// if (p.isInMemory()) {
-// this.recordComparator.setReference(record);
-// this.lazyBucketIterator.set(bucket, p.overflowSegments, p, hash, bucketInSegmentOffset);
-// return this.lazyBucketIterator;
-// }
-// else {
-// throw new IllegalStateException("Method is not applicable to partially spilled hash tables.");
-// }
-// }
-
public PT getCurrentProbeRecord() {
return this.probeIterator.getCurrent();
}
@@ -739,7 +716,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
}
- final private int getEstimatedMaxBucketEntries(int numBuffers, int bufferSize, int numBuckets, int recordLenBytes) {
+ private int getEstimatedMaxBucketEntries(int numBuffers, int bufferSize, int numBuckets, int recordLenBytes) {
final long totalSize = ((long) bufferSize) * numBuffers;
final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
final long maxNumRecordsStorable = (MAX_RECURSION_DEPTH + 1) * numRecordsStorable;
@@ -1092,9 +1069,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
this.buckets = table;
this.numBuckets = numBuckets;
- boolean enableBloomFilter = GlobalConfiguration.getBoolean(
- ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, ConfigConstants.DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER);
- if (enableBloomFilter) {
+ if (useBloomFilters) {
initBloomFilter(numBuckets);
}
}
@@ -1107,8 +1082,8 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
this.numBuckets = 0;
if (this.buckets != null) {
- for (int i = 0; i < this.buckets.length; i++) {
- this.availableMemory.add(this.buckets[i]);
+ for (MemorySegment bucket : this.buckets) {
+ this.availableMemory.add(bucket);
}
this.buckets = null;
}
@@ -1138,9 +1113,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
final HashPartition<BT, PT> p = partitions.get(largestPartNum);
- boolean enableBloomFilter = GlobalConfiguration.getBoolean(
- ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, ConfigConstants.DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER);
- if (enableBloomFilter) {
+ if (useBloomFilters) {
buildBloomFilterForBucketsInPartition(largestPartNum, p);
}
@@ -1196,7 +1169,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
buildBloomFilterForExtraOverflowSegments(bucketInSegmentPos, bucket, p);
}
- final private void buildBloomFilterForExtraOverflowSegments(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
+ private void buildBloomFilterForExtraOverflowSegments(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
int totalCount = 0;
boolean skip = false;
long forwardPointer = bucket.getLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET);
@@ -1207,7 +1180,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
break;
}
MemorySegment overflowSegment = p.overflowSegments[overflowSegNum];
- int bucketInOverflowSegmentOffset = (int) (forwardPointer & 0xffffffff);
+ int bucketInOverflowSegmentOffset = (int) forwardPointer;
final int count = overflowSegment.getShort(bucketInOverflowSegmentOffset + HEADER_COUNT_OFFSET);
totalCount += count;
@@ -1587,93 +1560,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
} // end HashBucketIterator
-
-
- // ======================================================================================================
-
-// public static final class LazyHashBucketIterator<BT, PT> {
-//
-// private final TypePairComparator<PT, BT> comparator;
-//
-// private MemorySegment bucket;
-//
-// private MemorySegment[] overflowSegments;
-//
-// private HashPartition<BT, PT> partition;
-//
-// private int bucketInSegmentOffset;
-//
-// private int searchHashCode;
-//
-// private int posInSegment;
-//
-// private int countInSegment;
-//
-// private int numInSegment;
-//
-// private LazyHashBucketIterator(TypePairComparator<PT, BT> comparator) {
-// this.comparator = comparator;
-// }
-//
-//
-// void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition<BT, PT> partition,
-// int searchHashCode, int bucketInSegmentOffset) {
-//
-// this.bucket = bucket;
-// this.overflowSegments = overflowSegments;
-// this.partition = partition;
-// this.searchHashCode = searchHashCode;
-// this.bucketInSegmentOffset = bucketInSegmentOffset;
-//
-// this.posInSegment = this.bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
-// this.countInSegment = bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
-// this.numInSegment = 0;
-// }
-//
-// public boolean next(BT target) {
-// // loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
-// while (true) {
-//
-// while (this.numInSegment < this.countInSegment) {
-//
-// final int thisCode = this.bucket.getInt(this.posInSegment);
-// this.posInSegment += HASH_CODE_LEN;
-//
-// // check if the hash code matches
-// if (thisCode == this.searchHashCode) {
-// // get the pointer to the pair
-// final long pointer = this.bucket.getLong(this.bucketInSegmentOffset +
-// BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
-// this.numInSegment++;
-//
-// // check whether it is really equal, or whether we had only a hash collision
-// LazyDeSerializable lds = (LazyDeSerializable) target;
-// lds.setDeSerializer(this.partition, this.partition.getWriteView(), pointer);
-// if (this.comparator.equalToReference(target)) {
-// return true;
-// }
-// }
-// else {
-// this.numInSegment++;
-// }
-// }
-//
-// // this segment is done. check if there is another chained bucket
-// final long forwardPointer = this.bucket.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
-// if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
-// return false;
-// }
-//
-// final int overflowSegNum = (int) (forwardPointer >>> 32);
-// this.bucket = this.overflowSegments[overflowSegNum];
-// this.bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff);
-// this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
-// this.posInSegment = this.bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
-// this.numInSegment = 0;
-// }
-// }
-// }
-
// ======================================================================================================
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
index c2d7805..5000dab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
@@ -67,16 +67,16 @@ public class NonReusingBuildFirstHashMatchIterator<V1, V2, O> extends HashMatchI
TypePairComparator<V2, V1> pairComparator,
MemoryManager memManager, IOManager ioManager,
AbstractInvokable ownerTask,
- double memoryFraction)
- throws MemoryAllocationException
- {
+ double memoryFraction,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
this.memManager = memManager;
this.firstInput = firstInput;
this.secondInput = secondInput;
this.probeSideSerializer = serializer2;
this.hashJoin = getHashJoin(serializer1, comparator1, serializer2, comparator2,
- pairComparator, memManager, ioManager, ownerTask, memoryFraction);
+ pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
index ee870a6..af1626a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
@@ -48,25 +48,32 @@ public class NonReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends
MemoryManager memManager,
IOManager ioManager,
AbstractInvokable ownerTask,
- double memoryFraction)
- throws MemoryAllocationException
- {
+ double memoryFraction,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
super(firstInput, secondInput, serializer1, comparator1, serializer2,
comparator2, pairComparator, memManager, ioManager, ownerTask,
- memoryFraction);
+ memoryFraction, useBitmapFilters);
+
reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
}
@Override
- public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
+ public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+ TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> pairComparator,
- MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
- throws MemoryAllocationException
- {
+ MemoryManager memManager, IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
final int numPages = memManager.computeNumberOfPages(memoryFraction);
final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
- return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
+
+ return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+ buildSideComparator, probeSideComparator, pairComparator,
+ memorySegments, ioManager, useBitmapFilters);
}
/**
@@ -76,5 +83,4 @@ public class NonReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends
public void reopenProbe(MutableObjectIterator<V2> probeInput) throws IOException {
reopenHashTable.reopenProbe(probeInput);
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
index 6099ac7..83952c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
@@ -66,16 +66,16 @@ public class NonReusingBuildSecondHashMatchIterator<V1, V2, O> extends HashMatch
TypePairComparator<V1, V2> pairComparator,
MemoryManager memManager, IOManager ioManager,
AbstractInvokable ownerTask,
- double memoryFraction)
- throws MemoryAllocationException
- {
+ double memoryFraction,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
this.memManager = memManager;
this.firstInput = firstInput;
this.secondInput = secondInput;
this.probeSideSerializer = serializer1;
this.hashJoin = getHashJoin(serializer2, comparator2, serializer1,
- comparator1, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
+ comparator1, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
index bc7e65b..029be5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
@@ -48,11 +48,12 @@ public class NonReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends
MemoryManager memManager,
IOManager ioManager,
AbstractInvokable ownerTask,
- double memoryFraction)
- throws MemoryAllocationException
- {
+ double memoryFraction,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
super(firstInput, secondInput, serializer1, comparator1, serializer2,
- comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
+ comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
+
reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
}
@@ -62,12 +63,17 @@ public class NonReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends
TypeComparator<BT> buildSideComparator,
TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> pairComparator,
- MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
- throws MemoryAllocationException
- {
+ MemoryManager memManager, IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
final int numPages = memManager.computeNumberOfPages(memoryFraction);
final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
- return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
+
+ return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+ buildSideComparator, probeSideComparator, pairComparator,
+ memorySegments, ioManager, useBitmapFilters);
}
/**
@@ -77,5 +83,4 @@ public class NonReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends
public void reopenProbe(MutableObjectIterator<V1> probeInput) throws IOException {
reopenHashTable.reopenProbe(probeInput);
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
index 6819924..fd5fcde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
@@ -33,17 +33,12 @@ import org.apache.flink.util.MutableObjectIterator;
public class ReOpenableMutableHashTable<BT, PT> extends MutableHashTable<BT, PT> {
- /**
- * Channel for the spilled partitions
- */
+ /** Channel for the spilled partitions */
private final FileIOChannel.Enumerator spilledInMemoryPartitions;
- /**
- * Stores the initial partitions and a list of the files that contain the spilled contents
- */
+ /** Stores the initial partitions and a list of the files that contain the spilled contents */
private List<HashPartition<BT, PT>> initialPartitions;
-
/**
* The values of these variables are stored here after the initial open()
* Required to restore the initial state before each additional probe phase.
@@ -58,16 +53,17 @@ public class ReOpenableMutableHashTable<BT, PT> extends MutableHashTable<BT, PT>
TypeComparator<BT> buildSideComparator,
TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> comparator,
- List<MemorySegment> memorySegments, IOManager ioManager) {
+ List<MemorySegment> memorySegments, IOManager ioManager,
+ boolean useBitmapFilters) {
+
super(buildSideSerializer, probeSideSerializer, buildSideComparator,
- probeSideComparator, comparator, memorySegments, ioManager);
+ probeSideComparator, comparator, memorySegments, ioManager, useBitmapFilters);
keepBuildSidePartitions = true;
spilledInMemoryPartitions = ioManager.createChannelEnumerator();
}
@Override
- public void open(MutableObjectIterator<BT> buildSide,
- MutableObjectIterator<PT> probeSide) throws IOException {
+ public void open(MutableObjectIterator<BT> buildSide, MutableObjectIterator<PT> probeSide) throws IOException {
super.open(buildSide, probeSide);
initialPartitions = new ArrayList<HashPartition<BT, PT>>( partitionsBeingBuilt );
initialPartitionFanOut = (byte) partitionsBeingBuilt.size();
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
index da76045..b4aaa95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
@@ -71,9 +71,9 @@ public class ReusingBuildFirstHashMatchIterator<V1, V2, O> extends HashMatchIter
MemoryManager memManager,
IOManager ioManager,
AbstractInvokable ownerTask,
- double memoryFraction)
- throws MemoryAllocationException
- {
+ double memoryFraction,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
this.memManager = memManager;
this.firstInput = firstInput;
this.secondInput = secondInput;
@@ -83,7 +83,7 @@ public class ReusingBuildFirstHashMatchIterator<V1, V2, O> extends HashMatchIter
this.tempBuildSideRecord = serializer1.createInstance();
this.hashJoin = getHashJoin(serializer1, comparator1, serializer2,
- comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
+ comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
index 5501271..714a1f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
@@ -48,25 +48,32 @@ public class ReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends Reu
MemoryManager memManager,
IOManager ioManager,
AbstractInvokable ownerTask,
- double memoryFraction)
+ double memoryFraction,
+ boolean useBitmapFilters)
throws MemoryAllocationException
{
super(firstInput, secondInput, serializer1, comparator1, serializer2,
comparator2, pairComparator, memManager, ioManager, ownerTask,
- memoryFraction);
+ memoryFraction, useBitmapFilters);
reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
}
@Override
- public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
+ public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+ TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> pairComparator,
- MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
- throws MemoryAllocationException
- {
+ MemoryManager memManager, IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
final int numPages = memManager.computeNumberOfPages(memoryFraction);
final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
- return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
+
+ return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+ buildSideComparator, probeSideComparator, pairComparator,
+ memorySegments, ioManager, useBitmapFilters);
}
/**
@@ -76,5 +83,4 @@ public class ReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends Reu
public void reopenProbe(MutableObjectIterator<V2> probeInput) throws IOException {
reopenHashTable.reopenProbe(probeInput);
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
index a9435ef..b7c3e29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
@@ -71,9 +71,9 @@ public class ReusingBuildSecondHashMatchIterator<V1, V2, O> extends HashMatchIte
MemoryManager memManager,
IOManager ioManager,
AbstractInvokable ownerTask,
- double memoryFraction)
- throws MemoryAllocationException
- {
+ double memoryFraction,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
this.memManager = memManager;
this.firstInput = firstInput;
this.secondInput = secondInput;
@@ -83,7 +83,7 @@ public class ReusingBuildSecondHashMatchIterator<V1, V2, O> extends HashMatchIte
this.tempBuildSideRecord = serializer2.createInstance();
this.hashJoin = getHashJoin(serializer2, comparator2, serializer1, comparator1, pairComparator,
- memManager, ioManager, ownerTask, memoryFraction);
+ memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
index 559d20a..4b4cdf5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
@@ -48,24 +48,31 @@ public class ReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends Re
MemoryManager memManager,
IOManager ioManager,
AbstractInvokable ownerTask,
- double memoryFraction)
- throws MemoryAllocationException
- {
+ double memoryFraction,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
super(firstInput, secondInput, serializer1, comparator1, serializer2,
- comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
+ comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
+
reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
}
@Override
- public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
+ public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+ TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
TypePairComparator<PT, BT> pairComparator,
- MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
- throws MemoryAllocationException
- {
+ MemoryManager memManager, IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
final int numPages = memManager.computeNumberOfPages(memoryFraction);
final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
- return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
+
+ return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+ buildSideComparator, probeSideComparator, pairComparator,
+ memorySegments, ioManager, useBitmapFilters);
}
/**
@@ -75,5 +82,4 @@ public class ReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends Re
public void reopenProbe(MutableObjectIterator<V1> probeInput) throws IOException {
reopenHashTable.reopenProbe(probeInput);
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index cd6dbd6..8cfc1c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -75,9 +75,7 @@ public class RuntimeEnvironment implements Environment {
private final AccumulatorRegistry accumulatorRegistry;
- private final Configuration taskManagerConfiguration;
-
- private final String hostname;
+ private final TaskManagerRuntimeInfo taskManagerInfo;
// ------------------------------------------------------------------------
@@ -124,8 +122,7 @@ public class RuntimeEnvironment implements Environment {
this.writers = checkNotNull(writers);
this.inputGates = checkNotNull(inputGates);
this.jobManager = checkNotNull(jobManager);
- this.taskManagerConfiguration = checkNotNull(taskManagerInfo).getConfiguration();
- this.hostname = taskManagerInfo.getHostname();
+ this.taskManagerInfo = checkNotNull(taskManagerInfo);
}
// ------------------------------------------------------------------------
@@ -176,13 +173,8 @@ public class RuntimeEnvironment implements Environment {
}
@Override
- public Configuration getTaskManagerConfiguration(){
- return taskManagerConfiguration;
- }
-
- @Override
- public String getHostname(){
- return hostname;
+ public TaskManagerRuntimeInfo getTaskManagerInfo() {
+ return taskManagerInfo;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index b1466c9..30e417b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.PactTaskContext;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
@@ -62,6 +63,8 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
private ExecutionConfig executionConfig = new ExecutionConfig();
+ private TaskManagerRuntimeInfo taskManageInfo;
+
// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
@@ -70,6 +73,7 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
public TestTaskContext(long memoryInBytes) {
this.memoryManager = new DefaultMemoryManager(memoryInBytes,1 ,32 * 1024, true);
+ this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
}
// --------------------------------------------------------------------------------------------
@@ -156,6 +160,11 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
}
@Override
+ public TaskManagerRuntimeInfo getTaskManagerInfo() {
+ return this.taskManageInfo;
+ }
+
+ @Override
@SuppressWarnings("unchecked")
public <X> MutableObjectIterator<X> getInput(int index) {
switch (index) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
index 452e4c1..c0f8f59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
@@ -24,9 +24,6 @@ import java.util.List;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -40,6 +37,7 @@ import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
import org.apache.flink.util.MutableObjectIterator;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -192,10 +190,6 @@ public class MutableHashTablePerformanceBenchmark {
InputIterator buildIterator = new InputIterator(buildSize, buildStep, buildScope);
InputIterator probeIterator = new InputIterator(probeSize, probeStep, probeScope);
- Configuration conf = new Configuration();
- conf.setBoolean(ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, enableBloomFilter);
- GlobalConfiguration.includeConfiguration(conf);
-
// allocate the memory for the HashTable
List<MemorySegment> memSegments;
try {
@@ -212,7 +206,7 @@ public class MutableHashTablePerformanceBenchmark {
final MutableHashTable<StringPair, StringPair> join = new MutableHashTable<StringPair, StringPair>(
this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
- memSegments, ioManager);
+ memSegments, ioManager, enableBloomFilter);
join.open(buildIterator, probeIterator);
final StringPair recordReuse = new StringPair();
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
index f4d2251..0d5a26e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
@@ -155,7 +155,7 @@ public class NonReusingHashMatchIteratorITCase {
new NonReusingBuildFirstHashMatchIterator<Record, Record, Record>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0);
+ this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
@@ -242,7 +242,7 @@ public class NonReusingHashMatchIteratorITCase {
new NonReusingBuildFirstHashMatchIterator<Record, Record, Record>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0);
+ this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
@@ -291,7 +291,7 @@ public class NonReusingHashMatchIteratorITCase {
new NonReusingBuildSecondHashMatchIterator<Record, Record, Record>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0);
+ this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
@@ -378,7 +378,7 @@ public class NonReusingHashMatchIteratorITCase {
new NonReusingBuildSecondHashMatchIterator<Record, Record, Record>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0);
+ this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
@@ -425,7 +425,7 @@ public class NonReusingHashMatchIteratorITCase {
new NonReusingBuildSecondHashMatchIterator<IntPair, Record, Record>(
input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
- this.memoryManager, this.ioManager, this.parentTask, 1.0);
+ this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
iterator.open();
@@ -472,7 +472,7 @@ public class NonReusingHashMatchIteratorITCase {
new NonReusingBuildFirstHashMatchIterator<IntPair, Record, Record>(
input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
- this.memoryManager, this.ioManager, this.parentTask, 1.0);
+ this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
iterator.open();
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
index f5105bb..306a370 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -239,7 +239,7 @@ public class NonReusingReOpenableHashTableITCase {
new NonReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
buildInput, probeInput, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0);
+ this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
// do first join with both inputs
@@ -277,7 +277,7 @@ public class NonReusingReOpenableHashTableITCase {
//
//
- private final MutableObjectIterator<Record> getProbeInput(final int numKeys,
+ private MutableObjectIterator<Record> getProbeInput(final int numKeys,
final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
@@ -334,9 +334,9 @@ public class NonReusingReOpenableHashTableITCase {
final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
- memSegments, ioManager);
+ memSegments, ioManager, true);
- for(int probe = 0; probe < NUM_PROBES; probe++) {
+ for (int probe = 0; probe < NUM_PROBES; probe++) {
// create a probe input that gives 10 million pairs with 10 values sharing a key
MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
if(probe == 0) {
@@ -348,9 +348,8 @@ public class NonReusingReOpenableHashTableITCase {
Record record;
final Record recordReuse = new Record();
- while (join.nextRecord())
- {
- int numBuildValues = 0;
+ while (join.nextRecord()) {
+ long numBuildValues = 0;
final Record probeRec = join.getCurrentProbeRecord();
int key = probeRec.getField(0, IntValue.class).getValue();
@@ -370,10 +369,10 @@ public class NonReusingReOpenableHashTableITCase {
Long contained = map.get(key);
if (contained == null) {
- contained = Long.valueOf(numBuildValues);
+ contained = numBuildValues;
}
else {
- contained = Long.valueOf(contained.longValue() + numBuildValues);
+ contained = contained + numBuildValues;
}
map.put(key, contained);
@@ -450,11 +449,12 @@ public class NonReusingReOpenableHashTableITCase {
final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
- memSegments, ioManager);
- for(int probe = 0; probe < NUM_PROBES; probe++) {
+ memSegments, ioManager, true);
+
+ for (int probe = 0; probe < NUM_PROBES; probe++) {
// create a probe input that gives 10 million pairs with 10 values sharing a key
MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
- if(probe == 0) {
+ if (probe == 0) {
join.open(buildInput, probeInput);
} else {
join.reopenProbe(probeInput);
@@ -462,9 +462,8 @@ public class NonReusingReOpenableHashTableITCase {
Record record;
final Record recordReuse = new Record();
- while (join.nextRecord())
- {
- int numBuildValues = 0;
+ while (join.nextRecord()) {
+ long numBuildValues = 0;
final Record probeRec = join.getCurrentProbeRecord();
int key = probeRec.getField(0, IntValue.class).getValue();
@@ -484,10 +483,10 @@ public class NonReusingReOpenableHashTableITCase {
Long contained = map.get(key);
if (contained == null) {
- contained = Long.valueOf(numBuildValues);
+ contained = numBuildValues;
}
else {
- contained = Long.valueOf(contained.longValue() + numBuildValues);
+ contained = contained + numBuildValues;
}
map.put(key, contained);
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
index 18cd8d0..f770ca4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
@@ -155,7 +155,7 @@ public class ReusingHashMatchIteratorITCase {
new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0);
+ this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
@@ -242,7 +242,7 @@ public class ReusingHashMatchIteratorITCase {
new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0);
+ this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
@@ -291,7 +291,7 @@ public class ReusingHashMatchIteratorITCase {
new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0);
+ this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
@@ -378,7 +378,7 @@ public class ReusingHashMatchIteratorITCase {
new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0);
+ this.memoryManager, ioManager, this.parentTask, 1.0, true);
iterator.open();
@@ -425,7 +425,7 @@ public class ReusingHashMatchIteratorITCase {
new ReusingBuildSecondHashMatchIterator<IntPair, Record, Record>(
input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
- this.memoryManager, this.ioManager, this.parentTask, 1.0);
+ this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
iterator.open();
@@ -472,7 +472,7 @@ public class ReusingHashMatchIteratorITCase {
new ReusingBuildFirstHashMatchIterator<IntPair, Record, Record>(
input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
- this.memoryManager, this.ioManager, this.parentTask, 1.0);
+ this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
iterator.open();