You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/19 18:01:15 UTC
[09/10] flink git commit: [FLINK-2107] Add hash-based strategies for
left and right outer joins.
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java
new file mode 100644
index 0000000..3b940c2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/**
+ * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that uses a hybrid-hash-join
+ * internally to match the records with equal key. The build side of the hash is the first input of the match.
+ * This implementation DOES NOT reuse objects.
+ */
+public class NonReusingBuildFirstHashJoinIterator<V1, V2, O> extends HashJoinIteratorBase implements JoinTaskIterator<V1, V2, O> {
+
+ protected final MutableHashTable<V1, V2> hashJoin;
+
+ protected final TypeSerializer<V2> probeSideSerializer;
+
+ private final MemoryManager memManager;
+
+ private final MutableObjectIterator<V1> firstInput;
+
+ private final MutableObjectIterator<V2> secondInput;
+
+ private final boolean joinWithEmptyBuildSide;
+
+ private volatile boolean running = true;
+
+ // --------------------------------------------------------------------------------------------
+
+ public NonReusingBuildFirstHashJoinIterator(
+ MutableObjectIterator<V1> firstInput,
+ MutableObjectIterator<V2> secondInput,
+ TypeSerializer<V1> serializer1,
+ TypeComparator<V1> comparator1,
+ TypeSerializer<V2> serializer2,
+ TypeComparator<V2> comparator2,
+ TypePairComparator<V2, V1> pairComparator,
+ MemoryManager memManager, IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean joinWithEmptyBuildSide,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
+ this.memManager = memManager;
+ this.firstInput = firstInput;
+ this.secondInput = secondInput;
+ this.probeSideSerializer = serializer2;
+
+ if(useBitmapFilters && joinWithEmptyBuildSide) {
+ throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side");
+ }
+ this.joinWithEmptyBuildSide = joinWithEmptyBuildSide;
+
+ this.hashJoin = getHashJoin(serializer1, comparator1, serializer2, comparator2,
+ pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void open() throws IOException, MemoryAllocationException, InterruptedException {
+ this.hashJoin.open(this.firstInput, this.secondInput);
+ }
+
+
+ @Override
+ public void close() {
+ // close the join
+ this.hashJoin.close();
+
+ // free the memory
+ final List<MemorySegment> segments = this.hashJoin.getFreedMemory();
+ this.memManager.release(segments);
+ }
+
+ @Override
+ public final boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
+ throws Exception
+ {
+ if (this.hashJoin.nextRecord())
+ {
+ // we have a next record, get the iterators to the probe and build side values
+ final MutableHashTable.HashBucketIterator<V1, V2> buildSideIterator = this.hashJoin.getBuildSideIterator();
+ V1 nextBuildSideRecord;
+
+ // get the first build side value
+ if ((nextBuildSideRecord = buildSideIterator.next()) != null) {
+ V1 tmpRec;
+ final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
+
+ // check if there is another build-side value
+ if ((tmpRec = buildSideIterator.next()) != null) {
+ // more than one build-side value --> copy the probe side
+ V2 probeCopy;
+ probeCopy = this.probeSideSerializer.copy(probeRecord);
+
+ // call match on the first pair
+ matchFunction.join(nextBuildSideRecord, probeCopy, collector);
+
+ // call match on the second pair
+ probeCopy = this.probeSideSerializer.copy(probeRecord);
+ matchFunction.join(tmpRec, probeCopy, collector);
+
+ while (this.running && ((nextBuildSideRecord = buildSideIterator.next()) != null)) {
+ // call match on the next pair
+ // make sure we restore the value of the probe side record
+ probeCopy = this.probeSideSerializer.copy(probeRecord);
+ matchFunction.join(nextBuildSideRecord, probeCopy, collector);
+ }
+ }
+ else {
+ // only single pair matches
+ matchFunction.join(nextBuildSideRecord, probeRecord, collector);
+ }
+ }
+ else if(joinWithEmptyBuildSide) {
+ // build side is empty, join with null
+ final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
+
+ matchFunction.join(null, probeRecord, collector);
+ }
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public void abort() {
+ this.running = false;
+ this.hashJoin.abort();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
deleted file mode 100644
index dbdb5b2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-import java.io.IOException;
-import java.util.List;
-
-
-/**
- * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that uses a hybrid-hash-join
- * internally to match the records with equal key. The build side of the hash is the first input of the match.
- * This implementation DOES NOT reuse objects.
- */
-public class NonReusingBuildFirstHashMatchIterator<V1, V2, O> extends HashMatchIteratorBase implements JoinTaskIterator<V1, V2, O> {
-
- protected final MutableHashTable<V1, V2> hashJoin;
-
- protected final TypeSerializer<V2> probeSideSerializer;
-
- private final MemoryManager memManager;
-
- private final MutableObjectIterator<V1> firstInput;
-
- private final MutableObjectIterator<V2> secondInput;
-
- private volatile boolean running = true;
-
- // --------------------------------------------------------------------------------------------
-
- public NonReusingBuildFirstHashMatchIterator(
- MutableObjectIterator<V1> firstInput,
- MutableObjectIterator<V2> secondInput,
- TypeSerializer<V1> serializer1,
- TypeComparator<V1> comparator1,
- TypeSerializer<V2> serializer2,
- TypeComparator<V2> comparator2,
- TypePairComparator<V2, V1> pairComparator,
- MemoryManager memManager, IOManager ioManager,
- AbstractInvokable ownerTask,
- double memoryFraction,
- boolean useBitmapFilters) throws MemoryAllocationException {
-
- this.memManager = memManager;
- this.firstInput = firstInput;
- this.secondInput = secondInput;
- this.probeSideSerializer = serializer2;
-
- this.hashJoin = getHashJoin(serializer1, comparator1, serializer2, comparator2,
- pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void open() throws IOException, MemoryAllocationException, InterruptedException {
- this.hashJoin.open(this.firstInput, this.secondInput);
- }
-
-
- @Override
- public void close() {
- // close the join
- this.hashJoin.close();
-
- // free the memory
- final List<MemorySegment> segments = this.hashJoin.getFreedMemory();
- this.memManager.release(segments);
- }
-
- @Override
- public final boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
- throws Exception
- {
- if (this.hashJoin.nextRecord())
- {
- // we have a next record, get the iterators to the probe and build side values
- final MutableHashTable.HashBucketIterator<V1, V2> buildSideIterator = this.hashJoin.getBuildSideIterator();
- V1 nextBuildSideRecord;
-
- // get the first build side value
- if ((nextBuildSideRecord = buildSideIterator.next()) != null) {
- V1 tmpRec;
- final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
-
- // check if there is another build-side value
- if ((tmpRec = buildSideIterator.next()) != null) {
- // more than one build-side value --> copy the probe side
- V2 probeCopy;
- probeCopy = this.probeSideSerializer.copy(probeRecord);
-
- // call match on the first pair
- matchFunction.join(nextBuildSideRecord, probeCopy, collector);
-
- // call match on the second pair
- probeCopy = this.probeSideSerializer.copy(probeRecord);
- matchFunction.join(tmpRec, probeCopy, collector);
-
- while (this.running && ((nextBuildSideRecord = buildSideIterator.next()) != null)) {
- // call match on the next pair
- // make sure we restore the value of the probe side record
- probeCopy = this.probeSideSerializer.copy(probeRecord);
- matchFunction.join(nextBuildSideRecord, probeCopy, collector);
- }
- }
- else {
- // only single pair matches
- matchFunction.join(nextBuildSideRecord, probeRecord, collector);
- }
- }
- return true;
- }
- else {
- return false;
- }
- }
-
- @Override
- public void abort() {
- this.running = false;
- this.hashJoin.abort();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashJoinIterator.java
new file mode 100644
index 0000000..77521af
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashJoinIterator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.util.List;
+
+public class NonReusingBuildFirstReOpenableHashJoinIterator<V1, V2, O> extends NonReusingBuildFirstHashJoinIterator<V1, V2, O> {
+
+
+ private final ReOpenableMutableHashTable<V1, V2> reopenHashTable;
+
+ public NonReusingBuildFirstReOpenableHashJoinIterator(
+ MutableObjectIterator<V1> firstInput,
+ MutableObjectIterator<V2> secondInput,
+ TypeSerializer<V1> serializer1,
+ TypeComparator<V1> comparator1,
+ TypeSerializer<V2> serializer2,
+ TypeComparator<V2> comparator2,
+ TypePairComparator<V2, V1> pairComparator,
+ MemoryManager memManager,
+ IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean joinWithEmptyBuildSide,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
+ super(firstInput, secondInput, serializer1, comparator1, serializer2,
+ comparator2, pairComparator, memManager, ioManager, ownerTask,
+ memoryFraction, joinWithEmptyBuildSide, useBitmapFilters);
+
+ reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
+ }
+
+ @Override
+ public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+ TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
+ TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
+ TypePairComparator<PT, BT> pairComparator,
+ MemoryManager memManager, IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
+ final int numPages = memManager.computeNumberOfPages(memoryFraction);
+ final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
+
+ return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+ buildSideComparator, probeSideComparator, pairComparator,
+ memorySegments, ioManager, useBitmapFilters);
+ }
+
+ /**
+ * Set new input for probe side
+ * @throws java.io.IOException
+ */
+ public void reopenProbe(MutableObjectIterator<V2> probeInput) throws IOException {
+ reopenHashTable.reopenProbe(probeInput);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
deleted file mode 100644
index b51c3b1..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.util.MutableObjectIterator;
-
-import java.io.IOException;
-import java.util.List;
-
-public class NonReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends NonReusingBuildFirstHashMatchIterator<V1, V2, O> {
-
-
- private final ReOpenableMutableHashTable<V1, V2> reopenHashTable;
-
- public NonReusingBuildFirstReOpenableHashMatchIterator(
- MutableObjectIterator<V1> firstInput,
- MutableObjectIterator<V2> secondInput,
- TypeSerializer<V1> serializer1,
- TypeComparator<V1> comparator1,
- TypeSerializer<V2> serializer2,
- TypeComparator<V2> comparator2,
- TypePairComparator<V2, V1> pairComparator,
- MemoryManager memManager,
- IOManager ioManager,
- AbstractInvokable ownerTask,
- double memoryFraction,
- boolean useBitmapFilters) throws MemoryAllocationException {
-
- super(firstInput, secondInput, serializer1, comparator1, serializer2,
- comparator2, pairComparator, memManager, ioManager, ownerTask,
- memoryFraction, useBitmapFilters);
-
- reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
- }
-
- @Override
- public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
- TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
- TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
- TypePairComparator<PT, BT> pairComparator,
- MemoryManager memManager, IOManager ioManager,
- AbstractInvokable ownerTask,
- double memoryFraction,
- boolean useBitmapFilters) throws MemoryAllocationException {
-
- final int numPages = memManager.computeNumberOfPages(memoryFraction);
- final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-
- return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
- buildSideComparator, probeSideComparator, pairComparator,
- memorySegments, ioManager, useBitmapFilters);
- }
-
- /**
- * Set new input for probe side
- * @throws java.io.IOException
- */
- public void reopenProbe(MutableObjectIterator<V2> probeInput) throws IOException {
- reopenHashTable.reopenProbe(probeInput);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashJoinIterator.java
new file mode 100644
index 0000000..9ea0b74
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashJoinIterator.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/**
+ * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that uses a hybrid-hash-join
+ * internally to match the records with equal key. The build side of the hash is the second input of the match.
+ */
+public class NonReusingBuildSecondHashJoinIterator<V1, V2, O> extends HashJoinIteratorBase implements JoinTaskIterator<V1, V2, O> {
+
+ protected final MutableHashTable<V2, V1> hashJoin;
+
+ protected final TypeSerializer<V1> probeSideSerializer;
+
+ private final MemoryManager memManager;
+
+ private final MutableObjectIterator<V1> firstInput;
+
+ private final MutableObjectIterator<V2> secondInput;
+
+ private final boolean joinWithEmptyBuildSide;
+
+ private volatile boolean running = true;
+
+ // --------------------------------------------------------------------------------------------
+
+ public NonReusingBuildSecondHashJoinIterator(
+ MutableObjectIterator<V1> firstInput,
+ MutableObjectIterator<V2> secondInput,
+ TypeSerializer<V1> serializer1,
+ TypeComparator<V1> comparator1,
+ TypeSerializer<V2> serializer2,
+ TypeComparator<V2> comparator2,
+ TypePairComparator<V1, V2> pairComparator,
+ MemoryManager memManager, IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean joinWithEmptyBuildSide,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
+ this.memManager = memManager;
+ this.firstInput = firstInput;
+ this.secondInput = secondInput;
+ this.probeSideSerializer = serializer1;
+
+ if(useBitmapFilters && joinWithEmptyBuildSide) {
+ throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side");
+ }
+ this.joinWithEmptyBuildSide = joinWithEmptyBuildSide;
+
+ this.hashJoin = getHashJoin(serializer2, comparator2, serializer1,
+ comparator1, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void open() throws IOException, MemoryAllocationException, InterruptedException {
+ this.hashJoin.open(this.secondInput, this.firstInput);
+ }
+
+ @Override
+ public void close() {
+ // close the join
+ this.hashJoin.close();
+
+ // free the memory
+ final List<MemorySegment> segments = this.hashJoin.getFreedMemory();
+ this.memManager.release(segments);
+ }
+
+ @Override
+ public boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
+ throws Exception
+ {
+ if (this.hashJoin.nextRecord())
+ {
+ // we have a next record, get the iterators to the probe and build side values
+ final MutableHashTable.HashBucketIterator<V2, V1> buildSideIterator = this.hashJoin.getBuildSideIterator();
+ V2 nextBuildSideRecord;
+
+ // get the first build side value
+ if ((nextBuildSideRecord = buildSideIterator.next()) != null) {
+ V2 tmpRec;
+ final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
+
+ // check if there is another build-side value
+ if ((tmpRec = buildSideIterator.next()) != null) {
+ // more than one build-side value --> copy the probe side
+ V1 probeCopy;
+ probeCopy = this.probeSideSerializer.copy(probeRecord);
+
+ // call match on the first pair
+ matchFunction.join(probeCopy, nextBuildSideRecord, collector);
+
+ // call match on the second pair
+ probeCopy = this.probeSideSerializer.copy(probeRecord);
+ matchFunction.join(probeCopy, tmpRec, collector);
+
+ while (this.running && ((nextBuildSideRecord = buildSideIterator.next()) != null)) {
+ // call match on the next pair
+ // make sure we restore the value of the probe side record
+ probeCopy = this.probeSideSerializer.copy(probeRecord);
+ matchFunction.join(probeCopy, nextBuildSideRecord, collector);
+ }
+ }
+ else {
+ // only single pair matches
+ matchFunction.join(probeRecord, nextBuildSideRecord, collector);
+ }
+ }
+ else if(joinWithEmptyBuildSide) {
+ // build side is empty, join with null
+ final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
+
+ matchFunction.join(probeRecord, null, collector);
+ }
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public void abort() {
+ this.running = false;
+ this.hashJoin.abort();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
deleted file mode 100644
index 26dba7b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-import java.io.IOException;
-import java.util.List;
-
-
-/**
- * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that uses a hybrid-hash-join
- * internally to match the records with equal key. The build side of the hash is the second input of the match.
- */
-public class NonReusingBuildSecondHashMatchIterator<V1, V2, O> extends HashMatchIteratorBase implements JoinTaskIterator<V1, V2, O> {
-
- protected final MutableHashTable<V2, V1> hashJoin;
-
- protected final TypeSerializer<V1> probeSideSerializer;
-
- private final MemoryManager memManager;
-
- private final MutableObjectIterator<V1> firstInput;
-
- private final MutableObjectIterator<V2> secondInput;
-
- private volatile boolean running = true;
-
- // --------------------------------------------------------------------------------------------
-
- public NonReusingBuildSecondHashMatchIterator(
- MutableObjectIterator<V1> firstInput,
- MutableObjectIterator<V2> secondInput,
- TypeSerializer<V1> serializer1,
- TypeComparator<V1> comparator1,
- TypeSerializer<V2> serializer2,
- TypeComparator<V2> comparator2,
- TypePairComparator<V1, V2> pairComparator,
- MemoryManager memManager, IOManager ioManager,
- AbstractInvokable ownerTask,
- double memoryFraction,
- boolean useBitmapFilters) throws MemoryAllocationException {
-
- this.memManager = memManager;
- this.firstInput = firstInput;
- this.secondInput = secondInput;
- this.probeSideSerializer = serializer1;
-
- this.hashJoin = getHashJoin(serializer2, comparator2, serializer1,
- comparator1, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void open() throws IOException, MemoryAllocationException, InterruptedException {
- this.hashJoin.open(this.secondInput, this.firstInput);
- }
-
- @Override
- public void close() {
- // close the join
- this.hashJoin.close();
-
- // free the memory
- final List<MemorySegment> segments = this.hashJoin.getFreedMemory();
- this.memManager.release(segments);
- }
-
- @Override
- public boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
- throws Exception
- {
- if (this.hashJoin.nextRecord())
- {
- // we have a next record, get the iterators to the probe and build side values
- final MutableHashTable.HashBucketIterator<V2, V1> buildSideIterator = this.hashJoin.getBuildSideIterator();
- V2 nextBuildSideRecord;
-
- // get the first build side value
- if ((nextBuildSideRecord = buildSideIterator.next()) != null) {
- V2 tmpRec;
- final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
-
- // check if there is another build-side value
- if ((tmpRec = buildSideIterator.next()) != null) {
- // more than one build-side value --> copy the probe side
- V1 probeCopy;
- probeCopy = this.probeSideSerializer.copy(probeRecord);
-
- // call match on the first pair
- matchFunction.join(probeCopy, nextBuildSideRecord, collector);
-
- // call match on the second pair
- probeCopy = this.probeSideSerializer.copy(probeRecord);
- matchFunction.join(probeCopy, tmpRec, collector);
-
- while (this.running && ((nextBuildSideRecord = buildSideIterator.next()) != null)) {
- // call match on the next pair
- // make sure we restore the value of the probe side record
- probeCopy = this.probeSideSerializer.copy(probeRecord);
- matchFunction.join(probeCopy, nextBuildSideRecord, collector);
- }
- }
- else {
- // only single pair matches
- matchFunction.join(probeRecord, nextBuildSideRecord, collector);
- }
- }
- return true;
- }
- else {
- return false;
- }
- }
-
- @Override
- public void abort() {
- this.running = false;
- this.hashJoin.abort();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashJoinIterator.java
new file mode 100644
index 0000000..c9c9165
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashJoinIterator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.util.List;
+
+public class NonReusingBuildSecondReOpenableHashJoinIterator<V1, V2, O> extends NonReusingBuildSecondHashJoinIterator<V1, V2, O> {
+
+
+ private final ReOpenableMutableHashTable<V2, V1> reopenHashTable;
+
+ public NonReusingBuildSecondReOpenableHashJoinIterator(
+ MutableObjectIterator<V1> firstInput,
+ MutableObjectIterator<V2> secondInput,
+ TypeSerializer<V1> serializer1,
+ TypeComparator<V1> comparator1,
+ TypeSerializer<V2> serializer2,
+ TypeComparator<V2> comparator2,
+ TypePairComparator<V1, V2> pairComparator,
+ MemoryManager memManager,
+ IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean joinWithEmptyBuildSide,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
+ super(firstInput, secondInput, serializer1, comparator1, serializer2,
+ comparator2, pairComparator, memManager, ioManager, ownerTask,
+ memoryFraction, joinWithEmptyBuildSide, useBitmapFilters);
+
+ reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
+ }
+
+ @Override
+ public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+ TypeSerializer<BT> buildSideSerializer,
+ TypeComparator<BT> buildSideComparator,
+ TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
+ TypePairComparator<PT, BT> pairComparator,
+ MemoryManager memManager, IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
+ final int numPages = memManager.computeNumberOfPages(memoryFraction);
+ final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
+
+ return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+ buildSideComparator, probeSideComparator, pairComparator,
+ memorySegments, ioManager, useBitmapFilters);
+ }
+
+ /**
+ * Set new input for probe side
+ * @throws java.io.IOException
+ */
+ public void reopenProbe(MutableObjectIterator<V1> probeInput) throws IOException {
+ reopenHashTable.reopenProbe(probeInput);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
deleted file mode 100644
index 92b0fff..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.util.MutableObjectIterator;
-
-import java.io.IOException;
-import java.util.List;
-
-public class NonReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends NonReusingBuildSecondHashMatchIterator<V1, V2, O> {
-
-
- private final ReOpenableMutableHashTable<V2, V1> reopenHashTable;
-
- public NonReusingBuildSecondReOpenableHashMatchIterator(
- MutableObjectIterator<V1> firstInput,
- MutableObjectIterator<V2> secondInput,
- TypeSerializer<V1> serializer1,
- TypeComparator<V1> comparator1,
- TypeSerializer<V2> serializer2,
- TypeComparator<V2> comparator2,
- TypePairComparator<V1, V2> pairComparator,
- MemoryManager memManager,
- IOManager ioManager,
- AbstractInvokable ownerTask,
- double memoryFraction,
- boolean useBitmapFilters) throws MemoryAllocationException {
-
- super(firstInput, secondInput, serializer1, comparator1, serializer2,
- comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
-
- reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
- }
-
- @Override
- public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
- TypeSerializer<BT> buildSideSerializer,
- TypeComparator<BT> buildSideComparator,
- TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
- TypePairComparator<PT, BT> pairComparator,
- MemoryManager memManager, IOManager ioManager,
- AbstractInvokable ownerTask,
- double memoryFraction,
- boolean useBitmapFilters) throws MemoryAllocationException {
-
- final int numPages = memManager.computeNumberOfPages(memoryFraction);
- final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-
- return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
- buildSideComparator, probeSideComparator, pairComparator,
- memorySegments, ioManager, useBitmapFilters);
- }
-
- /**
- * Set new input for probe side
- * @throws java.io.IOException
- */
- public void reopenProbe(MutableObjectIterator<V1> probeInput) throws IOException {
- reopenHashTable.reopenProbe(probeInput);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashJoinIterator.java
new file mode 100644
index 0000000..c1e601d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashJoinIterator.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+
+/**
+ * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that uses a hybrid-hash-join
+ * internally to match the records with equal key. The build side of the hash is the first input of the match.
+ */
+public class ReusingBuildFirstHashJoinIterator<V1, V2, O> extends HashJoinIteratorBase implements JoinTaskIterator<V1, V2, O> {
+
+ protected final MutableHashTable<V1, V2> hashJoin;
+
+ private final V1 nextBuildSideObject;
+
+ private final V1 tempBuildSideRecord;
+
+ protected final TypeSerializer<V2> probeSideSerializer;
+
+ private final MemoryManager memManager;
+
+ private final MutableObjectIterator<V1> firstInput;
+
+ private final MutableObjectIterator<V2> secondInput;
+
+ private final boolean joinWithEmptyBuildSide;
+
+ private volatile boolean running = true;
+
+ // --------------------------------------------------------------------------------------------
+
+ public ReusingBuildFirstHashJoinIterator(
+ MutableObjectIterator<V1> firstInput,
+ MutableObjectIterator<V2> secondInput,
+ TypeSerializer<V1> serializer1,
+ TypeComparator<V1> comparator1,
+ TypeSerializer<V2> serializer2,
+ TypeComparator<V2> comparator2,
+ TypePairComparator<V2, V1> pairComparator,
+ MemoryManager memManager,
+ IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean joinWithEmptyBuildSide,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
+ this.memManager = memManager;
+ this.firstInput = firstInput;
+ this.secondInput = secondInput;
+ this.probeSideSerializer = serializer2;
+
+ if(useBitmapFilters && joinWithEmptyBuildSide) {
+ throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side");
+ }
+ this.joinWithEmptyBuildSide = joinWithEmptyBuildSide;
+
+ this.nextBuildSideObject = serializer1.createInstance();
+ this.tempBuildSideRecord = serializer1.createInstance();
+
+ this.hashJoin = getHashJoin(serializer1, comparator1, serializer2,
+ comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void open() throws IOException, MemoryAllocationException, InterruptedException {
+ this.hashJoin.open(this.firstInput, this.secondInput);
+ }
+
+
+ @Override
+ public void close() {
+ // close the join
+ this.hashJoin.close();
+
+ // free the memory
+ final List<MemorySegment> segments = this.hashJoin.getFreedMemory();
+ this.memManager.release(segments);
+ }
+
+ @Override
+ public final boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
+ throws Exception
+ {
+ if (this.hashJoin.nextRecord())
+ {
+ // we have a next record, get the iterators to the probe and build side values
+ final MutableHashTable.HashBucketIterator<V1, V2> buildSideIterator = this.hashJoin.getBuildSideIterator();
+ V1 nextBuildSideRecord = this.nextBuildSideObject;
+
+ // get the first build side value
+ if ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null) {
+ V1 tmpRec = this.tempBuildSideRecord;
+ final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
+
+ // check if there is another build-side value
+ if ((tmpRec = buildSideIterator.next(tmpRec)) != null) {
+
+ // call match on the first pair
+ matchFunction.join(nextBuildSideRecord, probeRecord, collector);
+
+ // call match on the second pair
+ matchFunction.join(tmpRec, probeRecord, collector);
+
+ while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) {
+ // call match on the next pair
+ // make sure we restore the value of the probe side record
+ matchFunction.join(nextBuildSideRecord, probeRecord, collector);
+ }
+ }
+ else {
+ // only single pair matches
+ matchFunction.join(nextBuildSideRecord, probeRecord, collector);
+ }
+ }
+ else if(joinWithEmptyBuildSide) {
+ // build side is empty, join with null
+ final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
+
+ matchFunction.join(null, probeRecord, collector);
+ }
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public void abort() {
+ this.running = false;
+ this.hashJoin.abort();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
deleted file mode 100644
index 65dfd89..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-
-/**
- * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that uses a hybrid-hash-join
- * internally to match the records with equal key. The build side of the hash is the first input of the match.
- */
-public class ReusingBuildFirstHashMatchIterator<V1, V2, O> extends HashMatchIteratorBase implements JoinTaskIterator<V1, V2, O> {
-
- protected final MutableHashTable<V1, V2> hashJoin;
-
- private final V1 nextBuildSideObject;
-
- private final V1 tempBuildSideRecord;
-
- protected final TypeSerializer<V2> probeSideSerializer;
-
- private final MemoryManager memManager;
-
- private final MutableObjectIterator<V1> firstInput;
-
- private final MutableObjectIterator<V2> secondInput;
-
- private volatile boolean running = true;
-
- // --------------------------------------------------------------------------------------------
-
- public ReusingBuildFirstHashMatchIterator(
- MutableObjectIterator<V1> firstInput,
- MutableObjectIterator<V2> secondInput,
- TypeSerializer<V1> serializer1,
- TypeComparator<V1> comparator1,
- TypeSerializer<V2> serializer2,
- TypeComparator<V2> comparator2,
- TypePairComparator<V2, V1> pairComparator,
- MemoryManager memManager,
- IOManager ioManager,
- AbstractInvokable ownerTask,
- double memoryFraction,
- boolean useBitmapFilters) throws MemoryAllocationException {
-
- this.memManager = memManager;
- this.firstInput = firstInput;
- this.secondInput = secondInput;
- this.probeSideSerializer = serializer2;
-
- this.nextBuildSideObject = serializer1.createInstance();
- this.tempBuildSideRecord = serializer1.createInstance();
-
- this.hashJoin = getHashJoin(serializer1, comparator1, serializer2,
- comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void open() throws IOException, MemoryAllocationException, InterruptedException {
- this.hashJoin.open(this.firstInput, this.secondInput);
- }
-
-
- @Override
- public void close() {
- // close the join
- this.hashJoin.close();
-
- // free the memory
- final List<MemorySegment> segments = this.hashJoin.getFreedMemory();
- this.memManager.release(segments);
- }
-
- @Override
- public final boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
- throws Exception
- {
- if (this.hashJoin.nextRecord())
- {
- // we have a next record, get the iterators to the probe and build side values
- final MutableHashTable.HashBucketIterator<V1, V2> buildSideIterator = this.hashJoin.getBuildSideIterator();
- V1 nextBuildSideRecord = this.nextBuildSideObject;
-
- // get the first build side value
- if ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null) {
- V1 tmpRec = this.tempBuildSideRecord;
- final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
-
- // check if there is another build-side value
- if ((tmpRec = buildSideIterator.next(tmpRec)) != null) {
-
- // call match on the first pair
- matchFunction.join(nextBuildSideRecord, probeRecord, collector);
-
- // call match on the second pair
- matchFunction.join(tmpRec, probeRecord, collector);
-
- while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) {
- // call match on the next pair
- // make sure we restore the value of the probe side record
- matchFunction.join(nextBuildSideRecord, probeRecord, collector);
- }
- }
- else {
- // only single pair matches
- matchFunction.join(nextBuildSideRecord, probeRecord, collector);
- }
- }
- return true;
- }
- else {
- return false;
- }
- }
-
- @Override
- public void abort() {
- this.running = false;
- this.hashJoin.abort();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashJoinIterator.java
new file mode 100644
index 0000000..1cc3f91
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashJoinIterator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class ReusingBuildFirstReOpenableHashJoinIterator<V1, V2, O> extends ReusingBuildFirstHashJoinIterator<V1, V2, O> {
+
+
+ private final ReOpenableMutableHashTable<V1, V2> reopenHashTable;
+
+ public ReusingBuildFirstReOpenableHashJoinIterator(
+ MutableObjectIterator<V1> firstInput,
+ MutableObjectIterator<V2> secondInput,
+ TypeSerializer<V1> serializer1,
+ TypeComparator<V1> comparator1,
+ TypeSerializer<V2> serializer2,
+ TypeComparator<V2> comparator2,
+ TypePairComparator<V2, V1> pairComparator,
+ MemoryManager memManager,
+ IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean joinWithEmptyBuildSide,
+ boolean useBitmapFilters)
+ throws MemoryAllocationException
+ {
+ super(firstInput, secondInput, serializer1, comparator1, serializer2,
+ comparator2, pairComparator, memManager, ioManager, ownerTask,
+ memoryFraction, joinWithEmptyBuildSide, useBitmapFilters);
+ reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
+ }
+
+ @Override
+ public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+ TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
+ TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
+ TypePairComparator<PT, BT> pairComparator,
+ MemoryManager memManager, IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
+ final int numPages = memManager.computeNumberOfPages(memoryFraction);
+ final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
+
+ return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+ buildSideComparator, probeSideComparator, pairComparator,
+ memorySegments, ioManager, useBitmapFilters);
+ }
+
+ /**
+ * Set new input for probe side
+ * @throws IOException
+ */
+ public void reopenProbe(MutableObjectIterator<V2> probeInput) throws IOException {
+ reopenHashTable.reopenProbe(probeInput);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
deleted file mode 100644
index 5635865..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class ReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends ReusingBuildFirstHashMatchIterator<V1, V2, O> {
-
-
- private final ReOpenableMutableHashTable<V1, V2> reopenHashTable;
-
- public ReusingBuildFirstReOpenableHashMatchIterator(
- MutableObjectIterator<V1> firstInput,
- MutableObjectIterator<V2> secondInput,
- TypeSerializer<V1> serializer1,
- TypeComparator<V1> comparator1,
- TypeSerializer<V2> serializer2,
- TypeComparator<V2> comparator2,
- TypePairComparator<V2, V1> pairComparator,
- MemoryManager memManager,
- IOManager ioManager,
- AbstractInvokable ownerTask,
- double memoryFraction,
- boolean useBitmapFilters)
- throws MemoryAllocationException
- {
- super(firstInput, secondInput, serializer1, comparator1, serializer2,
- comparator2, pairComparator, memManager, ioManager, ownerTask,
- memoryFraction, useBitmapFilters);
- reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
- }
-
- @Override
- public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
- TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
- TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
- TypePairComparator<PT, BT> pairComparator,
- MemoryManager memManager, IOManager ioManager,
- AbstractInvokable ownerTask,
- double memoryFraction,
- boolean useBitmapFilters) throws MemoryAllocationException {
-
- final int numPages = memManager.computeNumberOfPages(memoryFraction);
- final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-
- return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
- buildSideComparator, probeSideComparator, pairComparator,
- memorySegments, ioManager, useBitmapFilters);
- }
-
- /**
- * Set new input for probe side
- * @throws IOException
- */
- public void reopenProbe(MutableObjectIterator<V2> probeInput) throws IOException {
- reopenHashTable.reopenProbe(probeInput);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashJoinIterator.java
new file mode 100644
index 0000000..4402665
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashJoinIterator.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+
+/**
+ * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that uses a hybrid-hash-join
+ * internally to match the records with equal key. The build side of the hash is the second input of the match.
+ */
+public class ReusingBuildSecondHashJoinIterator<V1, V2, O> extends HashJoinIteratorBase implements JoinTaskIterator<V1, V2, O> {
+
+ protected final MutableHashTable<V2, V1> hashJoin;
+
+ private final V2 nextBuildSideObject;
+
+ private final V2 tempBuildSideRecord;
+
+ protected final TypeSerializer<V1> probeSideSerializer;
+
+ private final MemoryManager memManager;
+
+ private final MutableObjectIterator<V1> firstInput;
+
+ private final MutableObjectIterator<V2> secondInput;
+
+ private final boolean joinWithEmptyBuildSide;
+
+ private volatile boolean running = true;
+
+ // --------------------------------------------------------------------------------------------
+
+ public ReusingBuildSecondHashJoinIterator(
+ MutableObjectIterator<V1> firstInput,
+ MutableObjectIterator<V2> secondInput,
+ TypeSerializer<V1> serializer1,
+ TypeComparator<V1> comparator1,
+ TypeSerializer<V2> serializer2,
+ TypeComparator<V2> comparator2,
+ TypePairComparator<V1, V2> pairComparator,
+ MemoryManager memManager,
+ IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean joinWithEmptyBuildSide,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
+ this.memManager = memManager;
+ this.firstInput = firstInput;
+ this.secondInput = secondInput;
+ this.probeSideSerializer = serializer1;
+
+ if(useBitmapFilters && joinWithEmptyBuildSide) {
+ throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side");
+ }
+ this.joinWithEmptyBuildSide = joinWithEmptyBuildSide;
+
+ this.nextBuildSideObject = serializer2.createInstance();
+ this.tempBuildSideRecord = serializer2.createInstance();
+
+ this.hashJoin = getHashJoin(serializer2, comparator2, serializer1, comparator1, pairComparator,
+ memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void open() throws IOException, MemoryAllocationException, InterruptedException {
+ this.hashJoin.open(this.secondInput, this.firstInput);
+ }
+
+ @Override
+ public void close() {
+ // close the join
+ this.hashJoin.close();
+
+ // free the memory
+ final List<MemorySegment> segments = this.hashJoin.getFreedMemory();
+ this.memManager.release(segments);
+ }
+
+ @Override
+ public boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
+ throws Exception
+ {
+ if (this.hashJoin.nextRecord())
+ {
+ // we have a next record, get the iterators to the probe and build side values
+ final MutableHashTable.HashBucketIterator<V2, V1> buildSideIterator = this.hashJoin.getBuildSideIterator();
+ V2 nextBuildSideRecord = this.nextBuildSideObject;
+
+ // get the first build side value
+ if ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null) {
+ V2 tmpRec = this.tempBuildSideRecord;
+ final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
+
+ // check if there is another build-side value
+ if ((tmpRec = buildSideIterator.next(tmpRec)) != null) {
+ // call match on the first pair
+ matchFunction.join(probeRecord, nextBuildSideRecord, collector);
+
+ // call match on the second pair
+ matchFunction.join(probeRecord, tmpRec, collector);
+
+ while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) {
+ // call match on the next pair
+ // make sure we restore the value of the probe side record
+ matchFunction.join(probeRecord, nextBuildSideRecord, collector);
+ }
+ }
+ else {
+ // only single pair matches
+ matchFunction.join(probeRecord, nextBuildSideRecord, collector);
+ }
+ }
+ else if(joinWithEmptyBuildSide) {
+ // build side is empty, join with null
+ final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
+
+ matchFunction.join(probeRecord, null, collector);
+ }
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public void abort() {
+ this.running = false;
+ this.hashJoin.abort();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
deleted file mode 100644
index 156f259..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-
-/**
- * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that uses a hybrid-hash-join
- * internally to match the records with equal key. The build side of the hash is the second input of the match.
- */
-public class ReusingBuildSecondHashMatchIterator<V1, V2, O> extends HashMatchIteratorBase implements JoinTaskIterator<V1, V2, O> {
-
- protected final MutableHashTable<V2, V1> hashJoin;
-
- private final V2 nextBuildSideObject;
-
- private final V2 tempBuildSideRecord;
-
- protected final TypeSerializer<V1> probeSideSerializer;
-
- private final MemoryManager memManager;
-
- private final MutableObjectIterator<V1> firstInput;
-
- private final MutableObjectIterator<V2> secondInput;
-
- private volatile boolean running = true;
-
- // --------------------------------------------------------------------------------------------
-
- public ReusingBuildSecondHashMatchIterator(
- MutableObjectIterator<V1> firstInput,
- MutableObjectIterator<V2> secondInput,
- TypeSerializer<V1> serializer1,
- TypeComparator<V1> comparator1,
- TypeSerializer<V2> serializer2,
- TypeComparator<V2> comparator2,
- TypePairComparator<V1, V2> pairComparator,
- MemoryManager memManager,
- IOManager ioManager,
- AbstractInvokable ownerTask,
- double memoryFraction,
- boolean useBitmapFilters) throws MemoryAllocationException {
-
- this.memManager = memManager;
- this.firstInput = firstInput;
- this.secondInput = secondInput;
- this.probeSideSerializer = serializer1;
-
- this.nextBuildSideObject = serializer2.createInstance();
- this.tempBuildSideRecord = serializer2.createInstance();
-
- this.hashJoin = getHashJoin(serializer2, comparator2, serializer1, comparator1, pairComparator,
- memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void open() throws IOException, MemoryAllocationException, InterruptedException {
- this.hashJoin.open(this.secondInput, this.firstInput);
- }
-
- @Override
- public void close() {
- // close the join
- this.hashJoin.close();
-
- // free the memory
- final List<MemorySegment> segments = this.hashJoin.getFreedMemory();
- this.memManager.release(segments);
- }
-
- @Override
- public boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
- throws Exception
- {
- if (this.hashJoin.nextRecord())
- {
- // we have a next record, get the iterators to the probe and build side values
- final MutableHashTable.HashBucketIterator<V2, V1> buildSideIterator = this.hashJoin.getBuildSideIterator();
- V2 nextBuildSideRecord = this.nextBuildSideObject;
-
- // get the first build side value
- if ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null) {
- V2 tmpRec = this.tempBuildSideRecord;
- final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
-
- // check if there is another build-side value
- if ((tmpRec = buildSideIterator.next(tmpRec)) != null) {
- // call match on the first pair
- matchFunction.join(probeRecord, nextBuildSideRecord, collector);
-
- // call match on the second pair
- matchFunction.join(probeRecord, tmpRec, collector);
-
- while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) {
- // call match on the next pair
- // make sure we restore the value of the probe side record
- matchFunction.join(probeRecord, nextBuildSideRecord, collector);
- }
- }
- else {
- // only single pair matches
- matchFunction.join(probeRecord, nextBuildSideRecord, collector);
- }
- }
- return true;
- }
- else {
- return false;
- }
- }
-
- @Override
- public void abort() {
- this.running = false;
- this.hashJoin.abort();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashJoinIterator.java
new file mode 100644
index 0000000..190398f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashJoinIterator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class ReusingBuildSecondReOpenableHashJoinIterator<V1, V2, O> extends ReusingBuildSecondHashJoinIterator<V1, V2, O> {
+
+
+ private final ReOpenableMutableHashTable<V2, V1> reopenHashTable;
+
+ public ReusingBuildSecondReOpenableHashJoinIterator(
+ MutableObjectIterator<V1> firstInput,
+ MutableObjectIterator<V2> secondInput,
+ TypeSerializer<V1> serializer1,
+ TypeComparator<V1> comparator1,
+ TypeSerializer<V2> serializer2,
+ TypeComparator<V2> comparator2,
+ TypePairComparator<V1, V2> pairComparator,
+ MemoryManager memManager,
+ IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean joinWithEmptyBuildSide,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
+ super(firstInput, secondInput, serializer1, comparator1, serializer2,
+ comparator2, pairComparator, memManager, ioManager, ownerTask,
+ memoryFraction, joinWithEmptyBuildSide, useBitmapFilters);
+
+ reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
+ }
+
+ @Override
+ public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+ TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
+ TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
+ TypePairComparator<PT, BT> pairComparator,
+ MemoryManager memManager, IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean useBitmapFilters) throws MemoryAllocationException {
+
+ final int numPages = memManager.computeNumberOfPages(memoryFraction);
+ final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
+
+ return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+ buildSideComparator, probeSideComparator, pairComparator,
+ memorySegments, ioManager, useBitmapFilters);
+ }
+
+ /**
+ * Set new input for probe side
+ * @throws IOException
+ */
+ public void reopenProbe(MutableObjectIterator<V1> probeInput) throws IOException {
+ reopenHashTable.reopenProbe(probeInput);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
deleted file mode 100644
index a0791fe..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class ReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends ReusingBuildSecondHashMatchIterator<V1, V2, O> {
-
-
- private final ReOpenableMutableHashTable<V2, V1> reopenHashTable;
-
- public ReusingBuildSecondReOpenableHashMatchIterator(
- MutableObjectIterator<V1> firstInput,
- MutableObjectIterator<V2> secondInput,
- TypeSerializer<V1> serializer1,
- TypeComparator<V1> comparator1,
- TypeSerializer<V2> serializer2,
- TypeComparator<V2> comparator2,
- TypePairComparator<V1, V2> pairComparator,
- MemoryManager memManager,
- IOManager ioManager,
- AbstractInvokable ownerTask,
- double memoryFraction,
- boolean useBitmapFilters) throws MemoryAllocationException {
-
- super(firstInput, secondInput, serializer1, comparator1, serializer2,
- comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
-
- reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
- }
-
- @Override
- public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
- TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
- TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
- TypePairComparator<PT, BT> pairComparator,
- MemoryManager memManager, IOManager ioManager,
- AbstractInvokable ownerTask,
- double memoryFraction,
- boolean useBitmapFilters) throws MemoryAllocationException {
-
- final int numPages = memManager.computeNumberOfPages(memoryFraction);
- final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-
- return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
- buildSideComparator, probeSideComparator, pairComparator,
- memorySegments, ioManager, useBitmapFilters);
- }
-
- /**
- * Set new input for probe side
- * @throws IOException
- */
- public void reopenProbe(MutableObjectIterator<V1> probeInput) throws IOException {
- reopenHashTable.reopenProbe(probeInput);
- }
-}