You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by ChengXiangLi <gi...@git.apache.org> on 2015/12/18 06:14:22 UTC

[GitHub] flink pull request: [FLINK-2971] support outer join for hash join ...

GitHub user ChengXiangLi opened a pull request:

    https://github.com/apache/flink/pull/1469

    [FLINK-2971] support outer join for hash join on build side.

    1. There are 4 reserved bytes left in bucket header of `MutableHashTable`, as there are only 9 elements in each bucket, This PR could use 2 bytes to build a BitSet which is used to mark whether elements in that bucket has been probed during probe phase. After probe phase, return the elements which has not been probed at the end.
    2. As build side outer join is supported, we could support more flexible strategy for left outer join, right outer join and full outer join, new supported join types includes:
      * left outer join with `REPARTITION_HASH_FIRST`. 
      * right outer join with `REPARTITION_HASH_SECOND`
      * full outer join with `REPARTITION_HASH_FIRST` or `REPARTITION_HASH_SECOND`.
    3. But there is still some limitations about broadcast hash join, the following join types are still not supported for obviously reason:
      * left outer join with `BROADCAST_HASH_FIRST`.
      * right outer join with `BROADCAST_HASH_SECOND`.
      * full outer join with `BROADCAST_HASH_FIRST` and `BROADCAST_HASH_SECOND`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ChengXiangLi/flink hashFullOuterJoin

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1469.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1469
    
----
commit a3445a0666dd9c349bc11fca2e2554d500175280
Author: chengxiang li <ch...@intel.com>
Date:   2015-12-17T03:34:41Z

    [FLINK-2871] support outer join for hash on build side.

commit 92961bcd26e2dafb70006ea673abb07a67b77c9b
Author: chengxiang li <ch...@intel.com>
Date:   2015-12-18T04:52:55Z

    fix format

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50218331
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -486,11 +518,43 @@ protected boolean processProbeIter() throws IOException{
     			}
     		}
     		// -------------- partition done ---------------
    -		
    +
     		return false;
     	}
     	
    +	protected boolean processUnmatchedBuildIter() throws IOException  {
    +		if (this.unmatchedBuildVisited) {
    +			return false;
    +		}
    +		
    +		this.probeMatchedPhase = false;
    +		UnmatchedBuildIterator<BT, PT> unmatchedBuildIter = new UnmatchedBuildIterator<>(this.buildSideSerializer, this.numBuckets,
    +			this.bucketsPerSegmentBits, this.bucketsPerSegmentMask, this.buckets, this.partitionsBeingBuilt, probedSet);
    +		this.unmatchedBuildIterator = unmatchedBuildIter;
    +		
    +		// There maybe none unmatched build element, so we add a verification here to make sure we do not return (null, null) to user.
    +		if (unmatchedBuildIter.next() == null) {
    --- End diff --
    
    Yes, it's better to have `hasNext()` method. Just UnmatchedBuildIterator is quite complicated, redesign is risky, and the `back()` method would only be called once, it does not introduce extra effort, so i would just leave it here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49656489
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -1558,7 +1641,209 @@ public void reset() {
     		}
     
     	} // end HashBucketIterator
    +	
    +	/**
    +	 * Iterate all the elements in memory which has not been probed during probe phase.
    +	 */
    +	public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> {
    +	
    +		private final TypeSerializer<BT> accessor;
    +	
    +		private final long totalBucketNumber;
    +		
    +		private final int bucketsPerSegmentBits;
    +		
    +		private final int bucketsPerSegmentMask;
    +		
    +		private final MemorySegment[] buckets;
    +		
    +		private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
    +		
    +		private final BitSet probedSet;
    +		
    +		private MemorySegment bucket;
    +		
    +		private MemorySegment[] overflowSegments;
    +		
    +		private HashPartition<BT, PT> partition;
    +		
    +		private int scanCount;
    +		
    +		private int bucketInSegmentOffset;
    +		
    +		private int countInSegment;
    +		
    +		private int numInSegment;
    +		
    +		UnmatchedBuildIterator(
    +			TypeSerializer<BT> accessor,
    +			long totalBucketNumber,
    +			int bucketsPerSegmentBits,
    +			int bucketsPerSegmentMask,
    +			MemorySegment[] buckets,
    +			ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
    +			BitSet probedSet) {
    +			
    +			this.accessor = accessor;
    +			this.totalBucketNumber = totalBucketNumber;
    +			this.bucketsPerSegmentBits = bucketsPerSegmentBits;
    +			this.bucketsPerSegmentMask = bucketsPerSegmentMask;
    +			this.buckets = buckets;
    +			this.partitionsBeingBuilt = partitionsBeingBuilt;
    +			this.probedSet = probedSet;
    +			init();
    +		}
    +		
    +		private void init() {
    +			scanCount = -1;
    +			while (!moveToNextBucket()) {
    +				if (scanCount >= totalBucketNumber) {
    +					break;
    +				}
    +			}
    +		}
    +		
    +		public BT next(BT reuse) {
    +			while (true) {
    +				BT result = nextInBucket(reuse);
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +		
    +		public BT next() {
    +			while (true) {
    +				BT result = nextInBucket();
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +	
    +		private boolean moveToNextBucket() {
    +			scanCount++;
    +			if (scanCount > totalBucketNumber - 1) {
    +				return false;
    +			}
    +			final int bucketArrayPos = scanCount >> this.bucketsPerSegmentBits;
    +			final int currentBucketInSegmentOffset = (scanCount & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
    +			MemorySegment currentBucket = this.buckets[bucketArrayPos];
    +			final int partitionNumber = currentBucket.get(currentBucketInSegmentOffset + HEADER_PARTITION_OFFSET);
    +			final HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
    +			if (p.isInMemory()) {
    +				set(currentBucket, p.overflowSegments, p, currentBucketInSegmentOffset);
    +				return true;
    +			} else {
    +				return false;
    +			}
    +		}
    +	
    +		private void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition<BT, PT> partition,
    +			int bucketInSegmentOffset) {
    +			this.bucket = bucket;
    +			this.overflowSegments = overflowSegments;
    +			this.partition = partition;
    +			this.bucketInSegmentOffset = bucketInSegmentOffset;
    +			this.countInSegment = bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    +			this.numInSegment = 0;
    +		}
    +	
    +		public BT nextInBucket(BT reuse) {
    +			// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
    +			while (true) {
    +				probedSet.setMemorySegment(bucket, this.bucketInSegmentOffset + HEADER_BITSET_OFFSET);
     
    +				while (this.numInSegment < this.countInSegment) {
    +					boolean probed = probedSet.get(numInSegment);
    +					if (!probed) {
    +						final long pointer = this.bucket.getLong(this.bucketInSegmentOffset +
    +							BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
    +						// deserialize the key to check whether it is really equal, or whether we had only a hash collision
    +						try {
    +							this.partition.setReadPosition(pointer);
    +							reuse = this.accessor.deserialize(reuse, this.partition);
    +							this.numInSegment++;
    +							return reuse;
    +						} catch (IOException ioex) {
    +							throw new RuntimeException("Error deserializing key or value from the hashtable: " +
    +								ioex.getMessage(), ioex);
    +						}
    +					} else {
    +						this.numInSegment++;
    +					}
    +				}
    +
    +				// this segment is done. check if there is another chained bucket
    +				final long forwardPointer = this.bucket.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
    +				if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
    +					return null;
    +				}
    +
    +				final int overflowSegNum = (int) (forwardPointer >>> 32);
    +				this.bucket = this.overflowSegments[overflowSegNum];
    +				this.bucketInSegmentOffset = (int) forwardPointer;
    +				this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    +				this.numInSegment = 0;
    +			}
    +		}
    +	
    +		public BT nextInBucket() {
    --- End diff --
    
    Change visibility to `private`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50511127
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -1558,7 +1641,209 @@ public void reset() {
     		}
     
     	} // end HashBucketIterator
    +	
    +	/**
    +	 * Iterate all the elements in memory which has not been probed during probe phase.
    +	 */
    +	public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> {
    +	
    +		private final TypeSerializer<BT> accessor;
    +	
    +		private final long totalBucketNumber;
    +		
    +		private final int bucketsPerSegmentBits;
    +		
    +		private final int bucketsPerSegmentMask;
    +		
    +		private final MemorySegment[] buckets;
    +		
    +		private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
    +		
    +		private final BitSet probedSet;
    +		
    +		private MemorySegment bucket;
    +		
    +		private MemorySegment[] overflowSegments;
    +		
    +		private HashPartition<BT, PT> partition;
    +		
    +		private int scanCount;
    +		
    +		private int bucketInSegmentOffset;
    +		
    +		private int countInSegment;
    +		
    +		private int numInSegment;
    +		
    +		UnmatchedBuildIterator(
    +			TypeSerializer<BT> accessor,
    +			long totalBucketNumber,
    +			int bucketsPerSegmentBits,
    +			int bucketsPerSegmentMask,
    +			MemorySegment[] buckets,
    +			ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
    +			BitSet probedSet) {
    +			
    +			this.accessor = accessor;
    +			this.totalBucketNumber = totalBucketNumber;
    +			this.bucketsPerSegmentBits = bucketsPerSegmentBits;
    +			this.bucketsPerSegmentMask = bucketsPerSegmentMask;
    +			this.buckets = buckets;
    +			this.partitionsBeingBuilt = partitionsBeingBuilt;
    +			this.probedSet = probedSet;
    +			init();
    +		}
    +		
    +		private void init() {
    +			scanCount = -1;
    +			while (!moveToNextBucket()) {
    +				if (scanCount >= totalBucketNumber) {
    +					break;
    +				}
    +			}
    +		}
    +		
    +		public BT next(BT reuse) {
    +			while (true) {
    +				BT result = nextInBucket(reuse);
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +		
    +		public BT next() {
    +			while (true) {
    +				BT result = nextInBucket();
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +	
    +		private boolean moveToNextBucket() {
    --- End diff --
    
    Sounds good! :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50220144
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java ---
    @@ -130,11 +130,11 @@ public void testSortBothMerge() {
     			
     			final UnilateralSortMerger<Tuple2<Integer, String>> sorter1 = new UnilateralSortMerger<>(
     					this.memoryManager, this.ioManager, input1, this.parentTask, this.serializer1, 
    -					this.comparator1.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true);
    +					this.comparator1.duplicate(), (double)MEMORY_FOR_SORTER/MEMORY_SIZE, 128, 0.8f, true);
    --- End diff --
    
    If does not cast to double, the result would 0.0 which is not the desired memory fraction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on the pull request:

    https://github.com/apache/flink/pull/1469#issuecomment-173151704
  
    I did simple regression test based on `HashVsSortMiniBenchmark`, the result looks like:
    
    Test | Before | After
    ------ | ------ | --------
    testBuildFirst | 6.63s | 6.65s
    testBuildSecond | 3.7s | 3.8s
    
    The inner join performance is not influenced by this PR, which fit into my expectation. There is a flag called `buildsideOuterJoin` in `MutableHashTable`, all the extra effort only happens while `buildSideOuterJoin` is true.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49656327
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -486,11 +518,43 @@ protected boolean processProbeIter() throws IOException{
     			}
     		}
     		// -------------- partition done ---------------
    -		
    +
     		return false;
     	}
     	
    +	protected boolean processUnmatchedBuildIter() throws IOException  {
    +		if (this.unmatchedBuildVisited) {
    +			return false;
    +		}
    +		
    +		this.probeMatchedPhase = false;
    +		UnmatchedBuildIterator<BT, PT> unmatchedBuildIter = new UnmatchedBuildIterator<>(this.buildSideSerializer, this.numBuckets,
    +			this.bucketsPerSegmentBits, this.bucketsPerSegmentMask, this.buckets, this.partitionsBeingBuilt, probedSet);
    +		this.unmatchedBuildIterator = unmatchedBuildIter;
    +		
    +		// There maybe none unmatched build element, so we add a verification here to make sure we do not return (null, null) to user.
    +		if (unmatchedBuildIter.next() == null) {
    --- End diff --
    
    IMO, it would be nicer to implement a `hasNext()` method for `UnmatchedBuildIterator` instead of calling `next()` and `back()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49655529
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java ---
    @@ -144,13 +148,20 @@ public final boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction,
     					// only single pair matches
     					matchFunction.join(nextBuildSideRecord, probeRecord, collector);
     				}
    -			}
    -			else if(joinWithEmptyBuildSide) {
    -				// build side is empty, join with null
    -				final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
    +			} else {
    +				if (probeSideOuterJoin && probeRecord != null && nextBuildSideRecord == null) {
    --- End diff --
    
    Can you add a few comments in this `else` branch? 
    Please do the same for the other adapted HashJoinIterators.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49657432
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -1558,7 +1641,209 @@ public void reset() {
     		}
     
     	} // end HashBucketIterator
    +	
    +	/**
    +	 * Iterate all the elements in memory which has not been probed during probe phase.
    +	 */
    +	public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> {
    +	
    +		private final TypeSerializer<BT> accessor;
    +	
    +		private final long totalBucketNumber;
    +		
    +		private final int bucketsPerSegmentBits;
    +		
    +		private final int bucketsPerSegmentMask;
    +		
    +		private final MemorySegment[] buckets;
    +		
    +		private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
    +		
    +		private final BitSet probedSet;
    +		
    +		private MemorySegment bucket;
    +		
    +		private MemorySegment[] overflowSegments;
    +		
    +		private HashPartition<BT, PT> partition;
    +		
    +		private int scanCount;
    +		
    +		private int bucketInSegmentOffset;
    +		
    +		private int countInSegment;
    +		
    +		private int numInSegment;
    +		
    +		UnmatchedBuildIterator(
    +			TypeSerializer<BT> accessor,
    +			long totalBucketNumber,
    +			int bucketsPerSegmentBits,
    +			int bucketsPerSegmentMask,
    +			MemorySegment[] buckets,
    +			ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
    +			BitSet probedSet) {
    +			
    +			this.accessor = accessor;
    +			this.totalBucketNumber = totalBucketNumber;
    +			this.bucketsPerSegmentBits = bucketsPerSegmentBits;
    +			this.bucketsPerSegmentMask = bucketsPerSegmentMask;
    +			this.buckets = buckets;
    +			this.partitionsBeingBuilt = partitionsBeingBuilt;
    +			this.probedSet = probedSet;
    +			init();
    +		}
    +		
    +		private void init() {
    +			scanCount = -1;
    +			while (!moveToNextBucket()) {
    +				if (scanCount >= totalBucketNumber) {
    +					break;
    +				}
    +			}
    +		}
    +		
    +		public BT next(BT reuse) {
    +			while (true) {
    +				BT result = nextInBucket(reuse);
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +		
    +		public BT next() {
    +			while (true) {
    +				BT result = nextInBucket();
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +	
    +		private boolean moveToNextBucket() {
    +			scanCount++;
    +			if (scanCount > totalBucketNumber - 1) {
    +				return false;
    +			}
    +			final int bucketArrayPos = scanCount >> this.bucketsPerSegmentBits;
    +			final int currentBucketInSegmentOffset = (scanCount & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
    +			MemorySegment currentBucket = this.buckets[bucketArrayPos];
    +			final int partitionNumber = currentBucket.get(currentBucketInSegmentOffset + HEADER_PARTITION_OFFSET);
    +			final HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
    +			if (p.isInMemory()) {
    +				set(currentBucket, p.overflowSegments, p, currentBucketInSegmentOffset);
    +				return true;
    +			} else {
    +				return false;
    +			}
    +		}
    +	
    +		private void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition<BT, PT> partition,
    +			int bucketInSegmentOffset) {
    +			this.bucket = bucket;
    +			this.overflowSegments = overflowSegments;
    +			this.partition = partition;
    +			this.bucketInSegmentOffset = bucketInSegmentOffset;
    +			this.countInSegment = bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    +			this.numInSegment = 0;
    +		}
    +	
    +		public BT nextInBucket(BT reuse) {
    +			// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
    +			while (true) {
    +				probedSet.setMemorySegment(bucket, this.bucketInSegmentOffset + HEADER_BITSET_OFFSET);
     
    +				while (this.numInSegment < this.countInSegment) {
    +					boolean probed = probedSet.get(numInSegment);
    +					if (!probed) {
    +						final long pointer = this.bucket.getLong(this.bucketInSegmentOffset +
    +							BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
    +						// deserialize the key to check whether it is really equal, or whether we had only a hash collision
    +						try {
    +							this.partition.setReadPosition(pointer);
    +							reuse = this.accessor.deserialize(reuse, this.partition);
    +							this.numInSegment++;
    +							return reuse;
    +						} catch (IOException ioex) {
    +							throw new RuntimeException("Error deserializing key or value from the hashtable: " +
    +								ioex.getMessage(), ioex);
    +						}
    +					} else {
    +						this.numInSegment++;
    +					}
    +				}
    +
    +				// this segment is done. check if there is another chained bucket
    +				final long forwardPointer = this.bucket.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
    +				if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
    +					return null;
    +				}
    +
    +				final int overflowSegNum = (int) (forwardPointer >>> 32);
    +				this.bucket = this.overflowSegments[overflowSegNum];
    +				this.bucketInSegmentOffset = (int) forwardPointer;
    +				this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    +				this.numInSegment = 0;
    +			}
    +		}
    +	
    +		public BT nextInBucket() {
    +			// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
    +			while (true) {
    +				probedSet.setMemorySegment(bucket, this.bucketInSegmentOffset + HEADER_BITSET_OFFSET);
    +
    +				while (this.numInSegment < this.countInSegment) {
    +					boolean probed = probedSet.get(numInSegment);
    +					if (!probed) {
    +						final long pointer = this.bucket.getLong(this.bucketInSegmentOffset +
    +							BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
    +						// deserialize the key to check whether it is really equal, or whether we had only a hash collision
    --- End diff --
    
    This comment should be removed. Keys are not compared.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50220219
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java ---
    @@ -184,7 +184,7 @@ public void testBuildFirst() {
     					new ReusingBuildFirstHashJoinIterator<>(
     						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
     							this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
    -							this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, false, true);
    +							this.memoryManager, this.ioManager, this.parentTask, 1, false, false, true);
    --- End diff --
    
    The parameter is memory fraction instead of memory size, i guess it's been changed and forgot to update this benchmark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1469#issuecomment-171652959
  
    The tests look mostly good. 
    I would change the `HashTableITCase.testHashWithBuildSideOuterJoin2()` to process outer join tuples.
    Can you explain the rational for the changes in `HashVsSortMiniBenchmark`?
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50381597
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java ---
    @@ -184,7 +184,7 @@ public void testBuildFirst() {
     					new ReusingBuildFirstHashJoinIterator<>(
     						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
     							this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
    -							this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, false, true);
    +							this.memoryManager, this.ioManager, this.parentTask, 1, false, false, true);
    --- End diff --
    
    Ah, I see :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49657465
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -1558,7 +1641,209 @@ public void reset() {
     		}
     
     	} // end HashBucketIterator
    +	
    +	/**
    +	 * Iterate all the elements in memory which has not been probed during probe phase.
    +	 */
    +	public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> {
    +	
    +		private final TypeSerializer<BT> accessor;
    +	
    +		private final long totalBucketNumber;
    +		
    +		private final int bucketsPerSegmentBits;
    +		
    +		private final int bucketsPerSegmentMask;
    +		
    +		private final MemorySegment[] buckets;
    +		
    +		private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
    +		
    +		private final BitSet probedSet;
    +		
    +		private MemorySegment bucket;
    +		
    +		private MemorySegment[] overflowSegments;
    +		
    +		private HashPartition<BT, PT> partition;
    +		
    +		private int scanCount;
    +		
    +		private int bucketInSegmentOffset;
    +		
    +		private int countInSegment;
    +		
    +		private int numInSegment;
    +		
    +		UnmatchedBuildIterator(
    +			TypeSerializer<BT> accessor,
    +			long totalBucketNumber,
    +			int bucketsPerSegmentBits,
    +			int bucketsPerSegmentMask,
    +			MemorySegment[] buckets,
    +			ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
    +			BitSet probedSet) {
    +			
    +			this.accessor = accessor;
    +			this.totalBucketNumber = totalBucketNumber;
    +			this.bucketsPerSegmentBits = bucketsPerSegmentBits;
    +			this.bucketsPerSegmentMask = bucketsPerSegmentMask;
    +			this.buckets = buckets;
    +			this.partitionsBeingBuilt = partitionsBeingBuilt;
    +			this.probedSet = probedSet;
    +			init();
    +		}
    +		
    +		private void init() {
    +			scanCount = -1;
    +			while (!moveToNextBucket()) {
    +				if (scanCount >= totalBucketNumber) {
    +					break;
    +				}
    +			}
    +		}
    +		
    +		public BT next(BT reuse) {
    +			while (true) {
    +				BT result = nextInBucket(reuse);
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +		
    +		public BT next() {
    +			while (true) {
    +				BT result = nextInBucket();
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +	
    +		private boolean moveToNextBucket() {
    +			scanCount++;
    +			if (scanCount > totalBucketNumber - 1) {
    +				return false;
    +			}
    +			final int bucketArrayPos = scanCount >> this.bucketsPerSegmentBits;
    +			final int currentBucketInSegmentOffset = (scanCount & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
    +			MemorySegment currentBucket = this.buckets[bucketArrayPos];
    +			final int partitionNumber = currentBucket.get(currentBucketInSegmentOffset + HEADER_PARTITION_OFFSET);
    +			final HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
    +			if (p.isInMemory()) {
    +				set(currentBucket, p.overflowSegments, p, currentBucketInSegmentOffset);
    +				return true;
    +			} else {
    +				return false;
    +			}
    +		}
    +	
    +		private void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition<BT, PT> partition,
    +			int bucketInSegmentOffset) {
    +			this.bucket = bucket;
    +			this.overflowSegments = overflowSegments;
    +			this.partition = partition;
    +			this.bucketInSegmentOffset = bucketInSegmentOffset;
    +			this.countInSegment = bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    +			this.numInSegment = 0;
    +		}
    +	
    +		public BT nextInBucket(BT reuse) {
    +			// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
    +			while (true) {
    +				probedSet.setMemorySegment(bucket, this.bucketInSegmentOffset + HEADER_BITSET_OFFSET);
     
    +				while (this.numInSegment < this.countInSegment) {
    +					boolean probed = probedSet.get(numInSegment);
    +					if (!probed) {
    +						final long pointer = this.bucket.getLong(this.bucketInSegmentOffset +
    +							BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
    +						// deserialize the key to check whether it is really equal, or whether we had only a hash collision
    --- End diff --
    
    This comment should be removed. Keys are not compared.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49656407
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -1558,7 +1641,209 @@ public void reset() {
     		}
     
     	} // end HashBucketIterator
    +	
    +	/**
    +	 * Iterate all the elements in memory which has not been probed during probe phase.
    +	 */
    +	public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> {
    --- End diff --
    
    A few more inline comments would be good for this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49656863
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -1558,7 +1641,209 @@ public void reset() {
     		}
     
     	} // end HashBucketIterator
    +	
    +	/**
    +	 * Iterate all the elements in memory which has not been probed during probe phase.
    +	 */
    +	public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> {
    +	
    +		private final TypeSerializer<BT> accessor;
    +	
    +		private final long totalBucketNumber;
    +		
    +		private final int bucketsPerSegmentBits;
    +		
    +		private final int bucketsPerSegmentMask;
    +		
    +		private final MemorySegment[] buckets;
    +		
    +		private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
    +		
    +		private final BitSet probedSet;
    +		
    +		private MemorySegment bucket;
    +		
    +		private MemorySegment[] overflowSegments;
    +		
    +		private HashPartition<BT, PT> partition;
    +		
    +		private int scanCount;
    +		
    +		private int bucketInSegmentOffset;
    +		
    +		private int countInSegment;
    +		
    +		private int numInSegment;
    +		
    +		UnmatchedBuildIterator(
    +			TypeSerializer<BT> accessor,
    +			long totalBucketNumber,
    +			int bucketsPerSegmentBits,
    +			int bucketsPerSegmentMask,
    +			MemorySegment[] buckets,
    +			ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
    +			BitSet probedSet) {
    +			
    +			this.accessor = accessor;
    +			this.totalBucketNumber = totalBucketNumber;
    +			this.bucketsPerSegmentBits = bucketsPerSegmentBits;
    +			this.bucketsPerSegmentMask = bucketsPerSegmentMask;
    +			this.buckets = buckets;
    +			this.partitionsBeingBuilt = partitionsBeingBuilt;
    +			this.probedSet = probedSet;
    +			init();
    +		}
    +		
    +		private void init() {
    +			scanCount = -1;
    +			while (!moveToNextBucket()) {
    +				if (scanCount >= totalBucketNumber) {
    +					break;
    +				}
    +			}
    +		}
    +		
    +		public BT next(BT reuse) {
    +			while (true) {
    +				BT result = nextInBucket(reuse);
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +		
    +		public BT next() {
    +			while (true) {
    +				BT result = nextInBucket();
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +	
    +		private boolean moveToNextBucket() {
    +			scanCount++;
    +			if (scanCount > totalBucketNumber - 1) {
    +				return false;
    +			}
    +			final int bucketArrayPos = scanCount >> this.bucketsPerSegmentBits;
    +			final int currentBucketInSegmentOffset = (scanCount & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
    +			MemorySegment currentBucket = this.buckets[bucketArrayPos];
    +			final int partitionNumber = currentBucket.get(currentBucketInSegmentOffset + HEADER_PARTITION_OFFSET);
    +			final HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
    +			if (p.isInMemory()) {
    +				set(currentBucket, p.overflowSegments, p, currentBucketInSegmentOffset);
    +				return true;
    +			} else {
    +				return false;
    +			}
    +		}
    +	
    +		private void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition<BT, PT> partition,
    --- End diff --
    
    rename to `setBucket()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50232198
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -1558,7 +1641,209 @@ public void reset() {
     		}
     
     	} // end HashBucketIterator
    +	
    +	/**
    +	 * Iterate all the elements in memory which has not been probed during probe phase.
    +	 */
    +	public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> {
    +	
    +		private final TypeSerializer<BT> accessor;
    +	
    +		private final long totalBucketNumber;
    +		
    +		private final int bucketsPerSegmentBits;
    +		
    +		private final int bucketsPerSegmentMask;
    +		
    +		private final MemorySegment[] buckets;
    +		
    +		private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
    +		
    +		private final BitSet probedSet;
    +		
    +		private MemorySegment bucket;
    +		
    +		private MemorySegment[] overflowSegments;
    +		
    +		private HashPartition<BT, PT> partition;
    +		
    +		private int scanCount;
    +		
    +		private int bucketInSegmentOffset;
    +		
    +		private int countInSegment;
    +		
    +		private int numInSegment;
    +		
    +		UnmatchedBuildIterator(
    +			TypeSerializer<BT> accessor,
    +			long totalBucketNumber,
    +			int bucketsPerSegmentBits,
    +			int bucketsPerSegmentMask,
    +			MemorySegment[] buckets,
    +			ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
    +			BitSet probedSet) {
    +			
    +			this.accessor = accessor;
    +			this.totalBucketNumber = totalBucketNumber;
    +			this.bucketsPerSegmentBits = bucketsPerSegmentBits;
    +			this.bucketsPerSegmentMask = bucketsPerSegmentMask;
    +			this.buckets = buckets;
    +			this.partitionsBeingBuilt = partitionsBeingBuilt;
    +			this.probedSet = probedSet;
    +			init();
    +		}
    +		
    +		private void init() {
    +			scanCount = -1;
    +			while (!moveToNextBucket()) {
    +				if (scanCount >= totalBucketNumber) {
    +					break;
    +				}
    +			}
    +		}
    +		
    +		public BT next(BT reuse) {
    +			while (true) {
    +				BT result = nextInBucket(reuse);
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +		
    +		public BT next() {
    +			while (true) {
    +				BT result = nextInBucket();
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +	
    +		private boolean moveToNextBucket() {
    --- End diff --
    
    Next bucket may be spilled on disk, so we need a loop here to make sure we move to next on-heap bucket.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50219555
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java ---
    @@ -1534,6 +1534,106 @@ public void testBucketsNotFulfillSegment() throws Exception {
     		join.close();
     		this.memManager.release(join.getFreedMemory());
     	}
    +
    +	@Test
    +	public void testHashWithBuildSideOuterJoin1() throws Exception {
    +		final int NUM_KEYS = 20000;
    +		final int BUILD_VALS_PER_KEY = 1;
    +		final int PROBE_VALS_PER_KEY = 1;
    +
    +		// create a build input that gives 40000 pairs with 1 values sharing the same key
    +		MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(2 * NUM_KEYS, BUILD_VALS_PER_KEY, false);
    +
    +		// create a probe input that gives 20000 pairs with 1 values sharing a key
    +		MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
    +
    +		// allocate the memory for the HashTable
    +		List<MemorySegment> memSegments;
    +		try {
    +			// 33 is minimum number of pages required to perform hash join this inputs
    +			memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
    +		}
    +		catch (MemoryAllocationException maex) {
    +			fail("Memory for the Join could not be provided.");
    +			return;
    +		}
    +
    +		// ----------------------------------------------------------------------------------------
    +
    +		final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>(
    +			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
    +			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
    +			memSegments, ioManager);
    +		join.open(buildInput, probeInput, true);
    +
    +		final IntPair recordReuse = new IntPair();
    +		int numRecordsInJoinResult = 0;
    +
    +		while (join.nextRecord()) {
    +			MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
    +			while (buildSide.next(recordReuse) != null) {
    +				numRecordsInJoinResult++;
    +			}
    +		}
    +		Assert.assertEquals("Wrong number of records in join result.", 2 * NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);
    +
    +		join.close();
    +		this.memManager.release(join.getFreedMemory());
    +	}
    +	
    +	@Test
    +	public void testHashWithBuildSideOuterJoin2() throws Exception {
    +		final int NUM_KEYS = 40000;
    +		final int BUILD_VALS_PER_KEY = 2;
    +		final int PROBE_VALS_PER_KEY = 1;
    +		
    +		// The keys of probe and build sides are overlapped, so there would be none unmatched build elements
    +		// after probe phase.
    +		
    +		// create a build input that gives 40000 pairs with 2 values sharing the same key
    +		MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
    +		
    +		// create a probe input that gives 20000 pairs with 1 values sharing a key
    +		MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
    +		
    +		// allocate the memory for the HashTable
    +		List<MemorySegment> memSegments;
    +		try {
    +			// 33 is minimum number of pages required to perform hash join this inputs
    +			memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
    +		}
    +		catch (MemoryAllocationException maex) {
    +			fail("Memory for the Join could not be provided.");
    +			return;
    +		}
    +		
    +		// ----------------------------------------------------------------------------------------
    +		
    +		final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>(
    +			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
    +			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
    +			memSegments, ioManager);
    +		join.open(buildInput, probeInput, true);
    +		
    +		final IntPair recordReuse = new IntPair();
    +		int numRecordsInJoinResult = 0;
    +		
    +		while (join.nextRecord()) {
    +			MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
    +			IntPair next = buildSide.next(recordReuse);
    --- End diff --
    
    That's what this test case designed for, make sure build side outer join works well in this case, and does not return `(null, null)` to user.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49726468
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java ---
    @@ -1534,6 +1534,106 @@ public void testBucketsNotFulfillSegment() throws Exception {
     		join.close();
     		this.memManager.release(join.getFreedMemory());
     	}
    +
    +	@Test
    +	public void testHashWithBuildSideOuterJoin1() throws Exception {
    +		final int NUM_KEYS = 20000;
    +		final int BUILD_VALS_PER_KEY = 1;
    +		final int PROBE_VALS_PER_KEY = 1;
    +
    +		// create a build input that gives 40000 pairs with 1 values sharing the same key
    +		MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(2 * NUM_KEYS, BUILD_VALS_PER_KEY, false);
    +
    +		// create a probe input that gives 20000 pairs with 1 values sharing a key
    +		MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
    +
    +		// allocate the memory for the HashTable
    +		List<MemorySegment> memSegments;
    +		try {
    +			// 33 is minimum number of pages required to perform hash join this inputs
    +			memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
    +		}
    +		catch (MemoryAllocationException maex) {
    +			fail("Memory for the Join could not be provided.");
    +			return;
    +		}
    +
    +		// ----------------------------------------------------------------------------------------
    +
    +		final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>(
    +			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
    +			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
    +			memSegments, ioManager);
    +		join.open(buildInput, probeInput, true);
    +
    +		final IntPair recordReuse = new IntPair();
    +		int numRecordsInJoinResult = 0;
    +
    +		while (join.nextRecord()) {
    +			MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
    +			while (buildSide.next(recordReuse) != null) {
    +				numRecordsInJoinResult++;
    +			}
    +		}
    +		Assert.assertEquals("Wrong number of records in join result.", 2 * NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);
    +
    +		join.close();
    +		this.memManager.release(join.getFreedMemory());
    +	}
    +	
    +	@Test
    +	public void testHashWithBuildSideOuterJoin2() throws Exception {
    +		final int NUM_KEYS = 40000;
    +		final int BUILD_VALS_PER_KEY = 2;
    +		final int PROBE_VALS_PER_KEY = 1;
    +		
    +		// The keys of probe and build sides are overlapped, so there would be none unmatched build elements
    +		// after probe phase.
    +		
    +		// create a build input that gives 40000 pairs with 2 values sharing the same key
    --- End diff --
    
    build input gives 80000 pairs with 40000 distinct key values


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1469#issuecomment-173547539
  
    Thanks for the update and the regression test @ChengXiangLi!
    Looks good, only minor changes to do, IMO.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50495814
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -1558,7 +1641,209 @@ public void reset() {
     		}
     
     	} // end HashBucketIterator
    +	
    +	/**
    +	 * Iterate all the elements in memory which has not been probed during probe phase.
    +	 */
    +	public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> {
    +	
    +		private final TypeSerializer<BT> accessor;
    +	
    +		private final long totalBucketNumber;
    +		
    +		private final int bucketsPerSegmentBits;
    +		
    +		private final int bucketsPerSegmentMask;
    +		
    +		private final MemorySegment[] buckets;
    +		
    +		private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
    +		
    +		private final BitSet probedSet;
    +		
    +		private MemorySegment bucket;
    +		
    +		private MemorySegment[] overflowSegments;
    +		
    +		private HashPartition<BT, PT> partition;
    +		
    +		private int scanCount;
    +		
    +		private int bucketInSegmentOffset;
    +		
    +		private int countInSegment;
    +		
    +		private int numInSegment;
    +		
    +		UnmatchedBuildIterator(
    +			TypeSerializer<BT> accessor,
    +			long totalBucketNumber,
    +			int bucketsPerSegmentBits,
    +			int bucketsPerSegmentMask,
    +			MemorySegment[] buckets,
    +			ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
    +			BitSet probedSet) {
    +			
    +			this.accessor = accessor;
    +			this.totalBucketNumber = totalBucketNumber;
    +			this.bucketsPerSegmentBits = bucketsPerSegmentBits;
    +			this.bucketsPerSegmentMask = bucketsPerSegmentMask;
    +			this.buckets = buckets;
    +			this.partitionsBeingBuilt = partitionsBeingBuilt;
    +			this.probedSet = probedSet;
    +			init();
    +		}
    +		
    +		private void init() {
    +			scanCount = -1;
    +			while (!moveToNextBucket()) {
    +				if (scanCount >= totalBucketNumber) {
    +					break;
    +				}
    +			}
    +		}
    +		
    +		public BT next(BT reuse) {
    +			while (true) {
    +				BT result = nextInBucket(reuse);
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +		
    +		public BT next() {
    +			while (true) {
    +				BT result = nextInBucket();
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +	
    +		private boolean moveToNextBucket() {
    --- End diff --
    
    I would extract the loop outside next method into `moveToNextOnHeapBucket`, which would be reused by both next methods, should be more readable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49655894
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -420,15 +440,23 @@ public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<P
     	//                              Life-Cycle
     	// ------------------------------------------------------------------------
     	
    +	public void open(final MutableObjectIterator<BT> buildSide, final MutableObjectIterator<PT> probeSide)
    +		throws IOException {
    +
    +		open(buildSide, probeSide, false);
    +	}
    +	
     	/**
     	 * Opens the hash join. This method reads the build-side input and constructs the initial
     	 * hash table, gradually spilling partitions that do not fit into memory. 
     	 * 
     	 * @throws IOException Thrown, if an I/O problem occurs while spilling a partition.
    --- End diff --
    
    Can you add the parameters of the `open` method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49654923
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java ---
    @@ -0,0 +1,65 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.optimizer.operators;
    +
    +import org.apache.flink.api.common.operators.util.FieldList;
    +import org.apache.flink.optimizer.dag.TwoInputNode;
    +import org.apache.flink.optimizer.dataproperties.LocalProperties;
    +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
    +import org.apache.flink.optimizer.plan.Channel;
    +import org.apache.flink.optimizer.plan.DualInputPlanNode;
    +import org.apache.flink.runtime.operators.DriverStrategy;
    +
    +import java.util.Collections;
    +import java.util.List;
    +
    +public class HashFullOuterJoinBuildSecondDescriptor extends AbstractJoinDescriptor {
    +	public HashFullOuterJoinBuildSecondDescriptor(FieldList keys1, FieldList keys2,
    +		boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed) {
    --- End diff --
    
    broadcast parameters can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49657059
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -1558,7 +1641,209 @@ public void reset() {
     		}
     
     	} // end HashBucketIterator
    +	
    +	/**
    +	 * Iterate all the elements in memory which has not been probed during probe phase.
    +	 */
    +	public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> {
    +	
    +		private final TypeSerializer<BT> accessor;
    +	
    +		private final long totalBucketNumber;
    +		
    +		private final int bucketsPerSegmentBits;
    +		
    +		private final int bucketsPerSegmentMask;
    +		
    +		private final MemorySegment[] buckets;
    +		
    +		private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
    +		
    +		private final BitSet probedSet;
    +		
    +		private MemorySegment bucket;
    +		
    +		private MemorySegment[] overflowSegments;
    +		
    +		private HashPartition<BT, PT> partition;
    +		
    +		private int scanCount;
    +		
    +		private int bucketInSegmentOffset;
    +		
    +		private int countInSegment;
    +		
    +		private int numInSegment;
    +		
    +		UnmatchedBuildIterator(
    +			TypeSerializer<BT> accessor,
    +			long totalBucketNumber,
    +			int bucketsPerSegmentBits,
    +			int bucketsPerSegmentMask,
    +			MemorySegment[] buckets,
    +			ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
    +			BitSet probedSet) {
    +			
    +			this.accessor = accessor;
    +			this.totalBucketNumber = totalBucketNumber;
    +			this.bucketsPerSegmentBits = bucketsPerSegmentBits;
    +			this.bucketsPerSegmentMask = bucketsPerSegmentMask;
    +			this.buckets = buckets;
    +			this.partitionsBeingBuilt = partitionsBeingBuilt;
    +			this.probedSet = probedSet;
    +			init();
    +		}
    +		
    +		private void init() {
    +			scanCount = -1;
    +			while (!moveToNextBucket()) {
    +				if (scanCount >= totalBucketNumber) {
    +					break;
    +				}
    +			}
    +		}
    +		
    +		public BT next(BT reuse) {
    +			while (true) {
    +				BT result = nextInBucket(reuse);
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +		
    +		public BT next() {
    +			while (true) {
    +				BT result = nextInBucket();
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +	
    +		private boolean moveToNextBucket() {
    +			scanCount++;
    +			if (scanCount > totalBucketNumber - 1) {
    +				return false;
    +			}
    +			final int bucketArrayPos = scanCount >> this.bucketsPerSegmentBits;
    +			final int currentBucketInSegmentOffset = (scanCount & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
    +			MemorySegment currentBucket = this.buckets[bucketArrayPos];
    +			final int partitionNumber = currentBucket.get(currentBucketInSegmentOffset + HEADER_PARTITION_OFFSET);
    +			final HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
    +			if (p.isInMemory()) {
    +				set(currentBucket, p.overflowSegments, p, currentBucketInSegmentOffset);
    +				return true;
    +			} else {
    +				return false;
    +			}
    +		}
    +	
    +		private void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition<BT, PT> partition,
    +			int bucketInSegmentOffset) {
    +			this.bucket = bucket;
    +			this.overflowSegments = overflowSegments;
    +			this.partition = partition;
    +			this.bucketInSegmentOffset = bucketInSegmentOffset;
    +			this.countInSegment = bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    +			this.numInSegment = 0;
    +		}
    +	
    +		public BT nextInBucket(BT reuse) {
    +			// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
    +			while (true) {
    +				probedSet.setMemorySegment(bucket, this.bucketInSegmentOffset + HEADER_BITSET_OFFSET);
    --- End diff --
    
    This needs only to be done when we move to the next bucket. Not for every record we read.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50384749
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -486,11 +518,43 @@ protected boolean processProbeIter() throws IOException{
     			}
     		}
     		// -------------- partition done ---------------
    -		
    +
     		return false;
     	}
     	
    +	protected boolean processUnmatchedBuildIter() throws IOException  {
    +		if (this.unmatchedBuildVisited) {
    +			return false;
    +		}
    +		
    +		this.probeMatchedPhase = false;
    +		UnmatchedBuildIterator<BT, PT> unmatchedBuildIter = new UnmatchedBuildIterator<>(this.buildSideSerializer, this.numBuckets,
    +			this.bucketsPerSegmentBits, this.bucketsPerSegmentMask, this.buckets, this.partitionsBeingBuilt, probedSet);
    +		this.unmatchedBuildIterator = unmatchedBuildIter;
    +		
    +		// There maybe none unmatched build element, so we add a verification here to make sure we do not return (null, null) to user.
    +		if (unmatchedBuildIter.next() == null) {
    +			this.unmatchedBuildVisited = true;
    +			return false;
    +		}
    +		
    +		unmatchedBuildIter.back();
    +		
    +		// While visit the unmatched build elements, the probe element is null, and the unmatchedBuildIterator
    +		// would iterate all the unmatched build elements, so we return false during the second calling of this method.
    +		if (!this.unmatchedBuildVisited) {
    --- End diff --
    
    But you leave the method in line 538 with `return false` and the check in line 526 prevents that you get to line 545 if `unmatchedBuildVisited == true` again. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50391192
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -486,11 +518,43 @@ protected boolean processProbeIter() throws IOException{
     			}
     		}
     		// -------------- partition done ---------------
    -		
    +
     		return false;
     	}
     	
    +	protected boolean processUnmatchedBuildIter() throws IOException  {
    +		if (this.unmatchedBuildVisited) {
    +			return false;
    +		}
    +		
    +		this.probeMatchedPhase = false;
    +		UnmatchedBuildIterator<BT, PT> unmatchedBuildIter = new UnmatchedBuildIterator<>(this.buildSideSerializer, this.numBuckets,
    +			this.bucketsPerSegmentBits, this.bucketsPerSegmentMask, this.buckets, this.partitionsBeingBuilt, probedSet);
    +		this.unmatchedBuildIterator = unmatchedBuildIter;
    +		
    +		// There maybe none unmatched build element, so we add a verification here to make sure we do not return (null, null) to user.
    +		if (unmatchedBuildIter.next() == null) {
    --- End diff --
    
    OK, sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50390858
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java ---
    @@ -1534,6 +1534,106 @@ public void testBucketsNotFulfillSegment() throws Exception {
     		join.close();
     		this.memManager.release(join.getFreedMemory());
     	}
    +
    +	@Test
    +	public void testHashWithBuildSideOuterJoin1() throws Exception {
    +		final int NUM_KEYS = 20000;
    +		final int BUILD_VALS_PER_KEY = 1;
    +		final int PROBE_VALS_PER_KEY = 1;
    +
    +		// create a build input that gives 40000 pairs with 1 values sharing the same key
    +		MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(2 * NUM_KEYS, BUILD_VALS_PER_KEY, false);
    +
    +		// create a probe input that gives 20000 pairs with 1 values sharing a key
    +		MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
    +
    +		// allocate the memory for the HashTable
    +		List<MemorySegment> memSegments;
    +		try {
    +			// 33 is minimum number of pages required to perform hash join this inputs
    +			memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
    +		}
    +		catch (MemoryAllocationException maex) {
    +			fail("Memory for the Join could not be provided.");
    +			return;
    +		}
    +
    +		// ----------------------------------------------------------------------------------------
    +
    +		final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>(
    +			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
    +			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
    +			memSegments, ioManager);
    +		join.open(buildInput, probeInput, true);
    +
    +		final IntPair recordReuse = new IntPair();
    +		int numRecordsInJoinResult = 0;
    +
    +		while (join.nextRecord()) {
    +			MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
    +			while (buildSide.next(recordReuse) != null) {
    +				numRecordsInJoinResult++;
    +			}
    +		}
    +		Assert.assertEquals("Wrong number of records in join result.", 2 * NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);
    +
    +		join.close();
    +		this.memManager.release(join.getFreedMemory());
    +	}
    +	
    +	@Test
    +	public void testHashWithBuildSideOuterJoin2() throws Exception {
    +		final int NUM_KEYS = 40000;
    +		final int BUILD_VALS_PER_KEY = 2;
    +		final int PROBE_VALS_PER_KEY = 1;
    +		
    +		// The keys of probe and build sides are overlapped, so there would be none unmatched build elements
    +		// after probe phase.
    +		
    +		// create a build input that gives 40000 pairs with 2 values sharing the same key
    +		MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
    +		
    +		// create a probe input that gives 20000 pairs with 1 values sharing a key
    +		MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
    +		
    +		// allocate the memory for the HashTable
    +		List<MemorySegment> memSegments;
    +		try {
    +			// 33 is minimum number of pages required to perform hash join this inputs
    +			memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
    +		}
    +		catch (MemoryAllocationException maex) {
    +			fail("Memory for the Join could not be provided.");
    +			return;
    +		}
    +		
    +		// ----------------------------------------------------------------------------------------
    +		
    +		final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>(
    +			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
    +			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
    +			memSegments, ioManager);
    +		join.open(buildInput, probeInput, true);
    +		
    +		final IntPair recordReuse = new IntPair();
    +		int numRecordsInJoinResult = 0;
    +		
    +		while (join.nextRecord()) {
    +			MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
    +			IntPair next = buildSide.next(recordReuse);
    --- End diff --
    
    OK, maybe add a brief comment to make clear which behavior is tested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1469


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49656172
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -486,11 +518,43 @@ protected boolean processProbeIter() throws IOException{
     			}
     		}
     		// -------------- partition done ---------------
    -		
    +
     		return false;
     	}
     	
    +	protected boolean processUnmatchedBuildIter() throws IOException  {
    +		if (this.unmatchedBuildVisited) {
    +			return false;
    +		}
    +		
    +		this.probeMatchedPhase = false;
    +		UnmatchedBuildIterator<BT, PT> unmatchedBuildIter = new UnmatchedBuildIterator<>(this.buildSideSerializer, this.numBuckets,
    +			this.bucketsPerSegmentBits, this.bucketsPerSegmentMask, this.buckets, this.partitionsBeingBuilt, probedSet);
    +		this.unmatchedBuildIterator = unmatchedBuildIter;
    +		
    +		// There maybe none unmatched build element, so we add a verification here to make sure we do not return (null, null) to user.
    +		if (unmatchedBuildIter.next() == null) {
    +			this.unmatchedBuildVisited = true;
    +			return false;
    +		}
    +		
    +		unmatchedBuildIter.back();
    +		
    +		// While visit the unmatched build elements, the probe element is null, and the unmatchedBuildIterator
    +		// would iterate all the unmatched build elements, so we return false during the second calling of this method.
    +		if (!this.unmatchedBuildVisited) {
    --- End diff --
    
    I think this condition is not necessary. `unmatchedBuildVisited` must be `false` at this place, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1469#issuecomment-174946482
  
    Thanks for the update @ChengXiangLi! 
    
    PR is good to merge, IMO.
    Do you want to do it yourself?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49655995
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -420,15 +440,23 @@ public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<P
     	//                              Life-Cycle
     	// ------------------------------------------------------------------------
     	
    +	public void open(final MutableObjectIterator<BT> buildSide, final MutableObjectIterator<PT> probeSide)
    --- End diff --
    
    Please copy the JavaDocs of the other `open` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50385614
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -1558,7 +1641,209 @@ public void reset() {
     		}
     
     	} // end HashBucketIterator
    +	
    +	/**
    +	 * Iterate all the elements in memory which has not been probed during probe phase.
    +	 */
    +	public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> {
    +	
    +		private final TypeSerializer<BT> accessor;
    +	
    +		private final long totalBucketNumber;
    +		
    +		private final int bucketsPerSegmentBits;
    +		
    +		private final int bucketsPerSegmentMask;
    +		
    +		private final MemorySegment[] buckets;
    +		
    +		private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
    +		
    +		private final BitSet probedSet;
    +		
    +		private MemorySegment bucket;
    +		
    +		private MemorySegment[] overflowSegments;
    +		
    +		private HashPartition<BT, PT> partition;
    +		
    +		private int scanCount;
    +		
    +		private int bucketInSegmentOffset;
    +		
    +		private int countInSegment;
    +		
    +		private int numInSegment;
    +		
    +		UnmatchedBuildIterator(
    +			TypeSerializer<BT> accessor,
    +			long totalBucketNumber,
    +			int bucketsPerSegmentBits,
    +			int bucketsPerSegmentMask,
    +			MemorySegment[] buckets,
    +			ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
    +			BitSet probedSet) {
    +			
    +			this.accessor = accessor;
    +			this.totalBucketNumber = totalBucketNumber;
    +			this.bucketsPerSegmentBits = bucketsPerSegmentBits;
    +			this.bucketsPerSegmentMask = bucketsPerSegmentMask;
    +			this.buckets = buckets;
    +			this.partitionsBeingBuilt = partitionsBeingBuilt;
    +			this.probedSet = probedSet;
    +			init();
    +		}
    +		
    +		private void init() {
    +			scanCount = -1;
    +			while (!moveToNextBucket()) {
    +				if (scanCount >= totalBucketNumber) {
    +					break;
    +				}
    +			}
    +		}
    +		
    +		public BT next(BT reuse) {
    +			while (true) {
    +				BT result = nextInBucket(reuse);
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +		
    +		public BT next() {
    +			while (true) {
    +				BT result = nextInBucket();
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +	
    +		private boolean moveToNextBucket() {
    --- End diff --
    
    Yes, but you could also move the loop inside of the `moveToNextBucket` method and rename the method to something like moveToNextOnHeapBucket, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49654822
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java ---
    @@ -0,0 +1,66 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.optimizer.operators;
    +
    +import org.apache.flink.api.common.operators.util.FieldList;
    +import org.apache.flink.optimizer.dag.TwoInputNode;
    +import org.apache.flink.optimizer.dataproperties.LocalProperties;
    +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
    +import org.apache.flink.optimizer.plan.Channel;
    +import org.apache.flink.optimizer.plan.DualInputPlanNode;
    +import org.apache.flink.runtime.operators.DriverStrategy;
    +
    +import java.util.Collections;
    +import java.util.List;
    +
    +public class HashFullOuterJoinBuildFirstDescriptor extends AbstractJoinDescriptor {
    +
    +	public HashFullOuterJoinBuildFirstDescriptor(FieldList keys1, FieldList keys2,
    +		boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed) {
    --- End diff --
    
    Broadcast-Forward shipping strategies are not valid for full outer joins.
    Hence, the broadcast parameters can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49655692
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -57,7 +58,7 @@
      * <pre>
      * +----------------------------- Bucket x ----------------------------
      * |Partition (1 byte) | Status (1 byte) | element count (2 bytes) |
    - * | next-bucket-in-chain-pointer (8 bytes) | reserved (4 bytes) |
    + * | next-bucket-in-chain-pointer (8 bytes) | bitSet (2 bytes) | reserved (2 bytes) |
    --- End diff --
    
    Can you change `bitSet` to something that describes its purpose such as for example `probeFlags`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1469#issuecomment-165740727
  
    Thanks for the PR! I will shepherd it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49657379
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -1558,7 +1641,209 @@ public void reset() {
     		}
     
     	} // end HashBucketIterator
    +	
    +	/**
    +	 * Iterate all the elements in memory which has not been probed during probe phase.
    +	 */
    +	public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> {
    +	
    +		private final TypeSerializer<BT> accessor;
    +	
    +		private final long totalBucketNumber;
    +		
    +		private final int bucketsPerSegmentBits;
    +		
    +		private final int bucketsPerSegmentMask;
    +		
    +		private final MemorySegment[] buckets;
    +		
    +		private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
    +		
    +		private final BitSet probedSet;
    +		
    +		private MemorySegment bucket;
    +		
    +		private MemorySegment[] overflowSegments;
    +		
    +		private HashPartition<BT, PT> partition;
    +		
    +		private int scanCount;
    +		
    +		private int bucketInSegmentOffset;
    +		
    +		private int countInSegment;
    +		
    +		private int numInSegment;
    +		
    +		UnmatchedBuildIterator(
    +			TypeSerializer<BT> accessor,
    +			long totalBucketNumber,
    +			int bucketsPerSegmentBits,
    +			int bucketsPerSegmentMask,
    +			MemorySegment[] buckets,
    +			ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
    +			BitSet probedSet) {
    +			
    +			this.accessor = accessor;
    +			this.totalBucketNumber = totalBucketNumber;
    +			this.bucketsPerSegmentBits = bucketsPerSegmentBits;
    +			this.bucketsPerSegmentMask = bucketsPerSegmentMask;
    +			this.buckets = buckets;
    +			this.partitionsBeingBuilt = partitionsBeingBuilt;
    +			this.probedSet = probedSet;
    +			init();
    +		}
    +		
    +		private void init() {
    +			scanCount = -1;
    +			while (!moveToNextBucket()) {
    +				if (scanCount >= totalBucketNumber) {
    +					break;
    +				}
    +			}
    +		}
    +		
    +		public BT next(BT reuse) {
    +			while (true) {
    +				BT result = nextInBucket(reuse);
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +		
    +		public BT next() {
    +			while (true) {
    +				BT result = nextInBucket();
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +	
    +		private boolean moveToNextBucket() {
    +			scanCount++;
    +			if (scanCount > totalBucketNumber - 1) {
    +				return false;
    +			}
    +			final int bucketArrayPos = scanCount >> this.bucketsPerSegmentBits;
    +			final int currentBucketInSegmentOffset = (scanCount & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
    +			MemorySegment currentBucket = this.buckets[bucketArrayPos];
    +			final int partitionNumber = currentBucket.get(currentBucketInSegmentOffset + HEADER_PARTITION_OFFSET);
    +			final HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
    +			if (p.isInMemory()) {
    +				set(currentBucket, p.overflowSegments, p, currentBucketInSegmentOffset);
    +				return true;
    +			} else {
    +				return false;
    +			}
    +		}
    +	
    +		private void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition<BT, PT> partition,
    +			int bucketInSegmentOffset) {
    +			this.bucket = bucket;
    +			this.overflowSegments = overflowSegments;
    +			this.partition = partition;
    +			this.bucketInSegmentOffset = bucketInSegmentOffset;
    +			this.countInSegment = bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    +			this.numInSegment = 0;
    +		}
    +	
    +		public BT nextInBucket(BT reuse) {
    +			// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
    +			while (true) {
    +				probedSet.setMemorySegment(bucket, this.bucketInSegmentOffset + HEADER_BITSET_OFFSET);
     
    +				while (this.numInSegment < this.countInSegment) {
    +					boolean probed = probedSet.get(numInSegment);
    +					if (!probed) {
    +						final long pointer = this.bucket.getLong(this.bucketInSegmentOffset +
    +							BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
    +						// deserialize the key to check whether it is really equal, or whether we had only a hash collision
    +						try {
    +							this.partition.setReadPosition(pointer);
    +							reuse = this.accessor.deserialize(reuse, this.partition);
    +							this.numInSegment++;
    +							return reuse;
    +						} catch (IOException ioex) {
    +							throw new RuntimeException("Error deserializing key or value from the hashtable: " +
    +								ioex.getMessage(), ioex);
    +						}
    +					} else {
    +						this.numInSegment++;
    +					}
    +				}
    +
    +				// this segment is done. check if there is another chained bucket
    +				final long forwardPointer = this.bucket.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
    +				if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
    +					return null;
    +				}
    +
    +				final int overflowSegNum = (int) (forwardPointer >>> 32);
    +				this.bucket = this.overflowSegments[overflowSegNum];
    +				this.bucketInSegmentOffset = (int) forwardPointer;
    +				this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    +				this.numInSegment = 0;
    +			}
    +		}
    +	
    +		public BT nextInBucket() {
    +			// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
    +			while (true) {
    +				probedSet.setMemorySegment(bucket, this.bucketInSegmentOffset + HEADER_BITSET_OFFSET);
    --- End diff --
    
    This only needs to be done when we move to the next part of the bucket / overflow part of a bucket. Not for every record we read.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49728566
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java ---
    @@ -184,7 +184,7 @@ public void testBuildFirst() {
     					new ReusingBuildFirstHashJoinIterator<>(
     						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
     							this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
    -							this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, false, true);
    +							this.memoryManager, this.ioManager, this.parentTask, 1, false, false, true);
    --- End diff --
    
    Why did you set the memory to `1`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49656801
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -1558,7 +1641,209 @@ public void reset() {
     		}
     
     	} // end HashBucketIterator
    +	
    +	/**
    +	 * Iterate all the elements in memory which has not been probed during probe phase.
    +	 */
    +	public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> {
    +	
    +		private final TypeSerializer<BT> accessor;
    +	
    +		private final long totalBucketNumber;
    +		
    +		private final int bucketsPerSegmentBits;
    +		
    +		private final int bucketsPerSegmentMask;
    +		
    +		private final MemorySegment[] buckets;
    +		
    +		private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
    +		
    +		private final BitSet probedSet;
    +		
    +		private MemorySegment bucket;
    +		
    +		private MemorySegment[] overflowSegments;
    +		
    +		private HashPartition<BT, PT> partition;
    +		
    +		private int scanCount;
    +		
    +		private int bucketInSegmentOffset;
    +		
    +		private int countInSegment;
    +		
    +		private int numInSegment;
    +		
    +		UnmatchedBuildIterator(
    +			TypeSerializer<BT> accessor,
    +			long totalBucketNumber,
    +			int bucketsPerSegmentBits,
    +			int bucketsPerSegmentMask,
    +			MemorySegment[] buckets,
    +			ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
    +			BitSet probedSet) {
    +			
    +			this.accessor = accessor;
    +			this.totalBucketNumber = totalBucketNumber;
    +			this.bucketsPerSegmentBits = bucketsPerSegmentBits;
    +			this.bucketsPerSegmentMask = bucketsPerSegmentMask;
    +			this.buckets = buckets;
    +			this.partitionsBeingBuilt = partitionsBeingBuilt;
    +			this.probedSet = probedSet;
    +			init();
    +		}
    +		
    +		private void init() {
    +			scanCount = -1;
    +			while (!moveToNextBucket()) {
    +				if (scanCount >= totalBucketNumber) {
    +					break;
    +				}
    +			}
    +		}
    +		
    +		public BT next(BT reuse) {
    +			while (true) {
    +				BT result = nextInBucket(reuse);
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +		
    +		public BT next() {
    +			while (true) {
    +				BT result = nextInBucket();
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +	
    +		private boolean moveToNextBucket() {
    +			scanCount++;
    +			if (scanCount > totalBucketNumber - 1) {
    +				return false;
    +			}
    +			final int bucketArrayPos = scanCount >> this.bucketsPerSegmentBits;
    +			final int currentBucketInSegmentOffset = (scanCount & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
    +			MemorySegment currentBucket = this.buckets[bucketArrayPos];
    +			final int partitionNumber = currentBucket.get(currentBucketInSegmentOffset + HEADER_PARTITION_OFFSET);
    +			final HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
    +			if (p.isInMemory()) {
    +				set(currentBucket, p.overflowSegments, p, currentBucketInSegmentOffset);
    +				return true;
    +			} else {
    +				return false;
    +			}
    +		}
    +	
    +		private void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition<BT, PT> partition,
    +			int bucketInSegmentOffset) {
    +			this.bucket = bucket;
    +			this.overflowSegments = overflowSegments;
    +			this.partition = partition;
    +			this.bucketInSegmentOffset = bucketInSegmentOffset;
    +			this.countInSegment = bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    +			this.numInSegment = 0;
    +		}
    +	
    +		public BT nextInBucket(BT reuse) {
    +			// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
    +			while (true) {
    +				probedSet.setMemorySegment(bucket, this.bucketInSegmentOffset + HEADER_BITSET_OFFSET);
     
    +				while (this.numInSegment < this.countInSegment) {
    +					boolean probed = probedSet.get(numInSegment);
    +					if (!probed) {
    +						final long pointer = this.bucket.getLong(this.bucketInSegmentOffset +
    +							BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
    +						// deserialize the key to check whether it is really equal, or whether we had only a hash collision
    +						try {
    +							this.partition.setReadPosition(pointer);
    +							reuse = this.accessor.deserialize(reuse, this.partition);
    +							this.numInSegment++;
    +							return reuse;
    +						} catch (IOException ioex) {
    +							throw new RuntimeException("Error deserializing key or value from the hashtable: " +
    +								ioex.getMessage(), ioex);
    +						}
    +					} else {
    +						this.numInSegment++;
    +					}
    +				}
    +
    +				// this segment is done. check if there is another chained bucket
    +				final long forwardPointer = this.bucket.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
    +				if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
    +					return null;
    +				}
    +
    +				final int overflowSegNum = (int) (forwardPointer >>> 32);
    +				this.bucket = this.overflowSegments[overflowSegNum];
    +				this.bucketInSegmentOffset = (int) forwardPointer;
    +				this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    +				this.numInSegment = 0;
    +			}
    +		}
    +	
    +		public BT nextInBucket() {
    +			// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
    +			while (true) {
    +				probedSet.setMemorySegment(bucket, this.bucketInSegmentOffset + HEADER_BITSET_OFFSET);
    +
    +				while (this.numInSegment < this.countInSegment) {
    +					boolean probed = probedSet.get(numInSegment);
    +					if (!probed) {
    +						final long pointer = this.bucket.getLong(this.bucketInSegmentOffset +
    +							BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
    +						// deserialize the key to check whether it is really equal, or whether we had only a hash collision
    +						try {
    +							this.partition.setReadPosition(pointer);
    +							BT result = this.accessor.deserialize(this.partition);
    +							this.numInSegment++;
    +							return result;
    +						} catch (IOException ioex) {
    +							throw new RuntimeException("Error deserializing key or value from the hashtable: " +
    +								ioex.getMessage(), ioex);
    +						}
    +					} else {
    +						this.numInSegment++;
    +					}
    +				}
    +	
    +				// this segment is done. check if there is another chained bucket
    +				final long forwardPointer = this.bucket.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
    +				if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
    +					return null;
    +				}
    +
    +				final int overflowSegNum = (int) (forwardPointer >>> 32);
    +				this.bucket = this.overflowSegments[overflowSegNum];
    +				this.bucketInSegmentOffset = (int) forwardPointer;
    +				this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    +				this.numInSegment = 0;
    +			}
    +		}
    +		
    +		public void back() {
    --- End diff --
    
    Can you replace this method by a `hasNext()` method (see previous comment in `processUnmatchedBuildIter()`)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49656918
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -1558,7 +1641,209 @@ public void reset() {
     		}
     
     	} // end HashBucketIterator
    +	
    +	/**
    +	 * Iterate all the elements in memory which has not been probed during probe phase.
    +	 */
    +	public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> {
    +	
    +		private final TypeSerializer<BT> accessor;
    +	
    +		private final long totalBucketNumber;
    +		
    +		private final int bucketsPerSegmentBits;
    +		
    +		private final int bucketsPerSegmentMask;
    +		
    +		private final MemorySegment[] buckets;
    +		
    +		private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
    +		
    +		private final BitSet probedSet;
    +		
    +		private MemorySegment bucket;
    --- End diff --
    
    rename to `bucketSegment`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50385880
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java ---
    @@ -0,0 +1,66 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.optimizer.operators;
    +
    +import org.apache.flink.api.common.operators.util.FieldList;
    +import org.apache.flink.optimizer.dag.TwoInputNode;
    +import org.apache.flink.optimizer.dataproperties.LocalProperties;
    +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
    +import org.apache.flink.optimizer.plan.Channel;
    +import org.apache.flink.optimizer.plan.DualInputPlanNode;
    +import org.apache.flink.runtime.operators.DriverStrategy;
    +
    +import java.util.Collections;
    +import java.util.List;
    +
    +public class HashFullOuterJoinBuildFirstDescriptor extends AbstractJoinDescriptor {
    +
    +	public HashFullOuterJoinBuildFirstDescriptor(FieldList keys1, FieldList keys2,
    --- End diff --
    
    the `repartitionAllowed` parameter can be removed as well, as it is the only possible strategy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49728408
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java ---
    @@ -130,11 +130,11 @@ public void testSortBothMerge() {
     			
     			final UnilateralSortMerger<Tuple2<Integer, String>> sorter1 = new UnilateralSortMerger<>(
     					this.memoryManager, this.ioManager, input1, this.parentTask, this.serializer1, 
    -					this.comparator1.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true);
    +					this.comparator1.duplicate(), (double)MEMORY_FOR_SORTER/MEMORY_SIZE, 128, 0.8f, true);
    --- End diff --
    
    Why did you change this line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50390580
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java ---
    @@ -1534,6 +1534,106 @@ public void testBucketsNotFulfillSegment() throws Exception {
     		join.close();
     		this.memManager.release(join.getFreedMemory());
     	}
    +
    +	@Test
    +	public void testHashWithBuildSideOuterJoin1() throws Exception {
    +		final int NUM_KEYS = 20000;
    +		final int BUILD_VALS_PER_KEY = 1;
    +		final int PROBE_VALS_PER_KEY = 1;
    +
    +		// create a build input that gives 40000 pairs with 1 values sharing the same key
    +		MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(2 * NUM_KEYS, BUILD_VALS_PER_KEY, false);
    +
    +		// create a probe input that gives 20000 pairs with 1 values sharing a key
    +		MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
    +
    +		// allocate the memory for the HashTable
    +		List<MemorySegment> memSegments;
    +		try {
    +			// 33 is minimum number of pages required to perform hash join this inputs
    +			memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
    +		}
    +		catch (MemoryAllocationException maex) {
    +			fail("Memory for the Join could not be provided.");
    +			return;
    +		}
    +
    +		// ----------------------------------------------------------------------------------------
    +
    +		final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>(
    +			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
    +			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
    +			memSegments, ioManager);
    +		join.open(buildInput, probeInput, true);
    +
    +		final IntPair recordReuse = new IntPair();
    +		int numRecordsInJoinResult = 0;
    +
    +		while (join.nextRecord()) {
    +			MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
    +			while (buildSide.next(recordReuse) != null) {
    +				numRecordsInJoinResult++;
    +			}
    +		}
    +		Assert.assertEquals("Wrong number of records in join result.", 2 * NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);
    +
    +		join.close();
    +		this.memManager.release(join.getFreedMemory());
    +	}
    +	
    +	@Test
    +	public void testHashWithBuildSideOuterJoin2() throws Exception {
    +		final int NUM_KEYS = 40000;
    +		final int BUILD_VALS_PER_KEY = 2;
    +		final int PROBE_VALS_PER_KEY = 1;
    +		
    +		// The keys of probe and build sides are overlapped, so there would be none unmatched build elements
    +		// after probe phase.
    +		
    +		// create a build input that gives 80000 pairs with 2 values sharing the same key
    +		MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
    +		
    +		// create a probe input that gives 20000 pairs with 1 values sharing a key
    --- End diff --
    
    should be 40000 pairs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49656659
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -1558,7 +1641,209 @@ public void reset() {
     		}
     
     	} // end HashBucketIterator
    +	
    +	/**
    +	 * Iterate all the elements in memory which has not been probed during probe phase.
    +	 */
    +	public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> {
    +	
    +		private final TypeSerializer<BT> accessor;
    +	
    +		private final long totalBucketNumber;
    +		
    +		private final int bucketsPerSegmentBits;
    +		
    +		private final int bucketsPerSegmentMask;
    +		
    +		private final MemorySegment[] buckets;
    +		
    +		private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
    +		
    +		private final BitSet probedSet;
    +		
    +		private MemorySegment bucket;
    +		
    +		private MemorySegment[] overflowSegments;
    +		
    +		private HashPartition<BT, PT> partition;
    +		
    +		private int scanCount;
    +		
    +		private int bucketInSegmentOffset;
    +		
    +		private int countInSegment;
    +		
    +		private int numInSegment;
    +		
    +		UnmatchedBuildIterator(
    +			TypeSerializer<BT> accessor,
    +			long totalBucketNumber,
    +			int bucketsPerSegmentBits,
    +			int bucketsPerSegmentMask,
    +			MemorySegment[] buckets,
    +			ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
    +			BitSet probedSet) {
    +			
    +			this.accessor = accessor;
    +			this.totalBucketNumber = totalBucketNumber;
    +			this.bucketsPerSegmentBits = bucketsPerSegmentBits;
    +			this.bucketsPerSegmentMask = bucketsPerSegmentMask;
    +			this.buckets = buckets;
    +			this.partitionsBeingBuilt = partitionsBeingBuilt;
    +			this.probedSet = probedSet;
    +			init();
    +		}
    +		
    +		private void init() {
    +			scanCount = -1;
    +			while (!moveToNextBucket()) {
    +				if (scanCount >= totalBucketNumber) {
    +					break;
    +				}
    +			}
    +		}
    +		
    +		public BT next(BT reuse) {
    +			while (true) {
    +				BT result = nextInBucket(reuse);
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +		
    +		public BT next() {
    +			while (true) {
    +				BT result = nextInBucket();
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +	
    +		private boolean moveToNextBucket() {
    --- End diff --
    
    If you implement `moveToNextBucket` such that it forwards to the next in-memory bucket, you do not need the `while(!moveToNextBucket()` loops at several places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1469#issuecomment-171452040
  
    Hi @ChengXiangLi, very nice PR! Sorry for not reviewing it earlier.
    I had only a few minor comments. I did not check the tests yet, but hope to do it in the next days.
    
    Although, I do not expect major performance implications for inner joins, it would be good to check that to be on the safe side.
    Have you done any performance regression tests?
    
    Thanks, Fabian



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50495621
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -486,11 +518,43 @@ protected boolean processProbeIter() throws IOException{
     			}
     		}
     		// -------------- partition done ---------------
    -		
    +
     		return false;
     	}
     	
    +	protected boolean processUnmatchedBuildIter() throws IOException  {
    +		if (this.unmatchedBuildVisited) {
    +			return false;
    +		}
    +		
    +		this.probeMatchedPhase = false;
    +		UnmatchedBuildIterator<BT, PT> unmatchedBuildIter = new UnmatchedBuildIterator<>(this.buildSideSerializer, this.numBuckets,
    +			this.bucketsPerSegmentBits, this.bucketsPerSegmentMask, this.buckets, this.partitionsBeingBuilt, probedSet);
    +		this.unmatchedBuildIterator = unmatchedBuildIter;
    +		
    +		// There maybe none unmatched build element, so we add a verification here to make sure we do not return (null, null) to user.
    +		if (unmatchedBuildIter.next() == null) {
    +			this.unmatchedBuildVisited = true;
    +			return false;
    +		}
    +		
    +		unmatchedBuildIter.back();
    +		
    +		// While visit the unmatched build elements, the probe element is null, and the unmatchedBuildIterator
    +		// would iterate all the unmatched build elements, so we return false during the second calling of this method.
    +		if (!this.unmatchedBuildVisited) {
    --- End diff --
    
    Yes, you are right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49655245
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java ---
    @@ -73,6 +73,8 @@
     	LEFT_OUTER_MERGE(LeftOuterJoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
     	RIGHT_OUTER_MERGE(RightOuterJoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
     	FULL_OUTER_MERGE(FullOuterJoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
    +	FULL_OUTER_HYBRIDHASH_BUILD_FIRST(FullOuterJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2),
    --- End diff --
    
    The strategies in this file are ordered by algorithm type, not by logical operator type. 
    These hash join strategies should be put next to the other hash join strategies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50217624
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -486,11 +518,43 @@ protected boolean processProbeIter() throws IOException{
     			}
     		}
     		// -------------- partition done ---------------
    -		
    +
     		return false;
     	}
     	
    +	protected boolean processUnmatchedBuildIter() throws IOException  {
    +		if (this.unmatchedBuildVisited) {
    +			return false;
    +		}
    +		
    +		this.probeMatchedPhase = false;
    +		UnmatchedBuildIterator<BT, PT> unmatchedBuildIter = new UnmatchedBuildIterator<>(this.buildSideSerializer, this.numBuckets,
    +			this.bucketsPerSegmentBits, this.bucketsPerSegmentMask, this.buckets, this.partitionsBeingBuilt, probedSet);
    +		this.unmatchedBuildIterator = unmatchedBuildIter;
    +		
    +		// There maybe none unmatched build element, so we add a verification here to make sure we do not return (null, null) to user.
    +		if (unmatchedBuildIter.next() == null) {
    +			this.unmatchedBuildVisited = true;
    +			return false;
    +		}
    +		
    +		unmatchedBuildIter.back();
    +		
    +		// While visit the unmatched build elements, the probe element is null, and the unmatchedBuildIterator
    +		// would iterate all the unmatched build elements, so we return false during the second calling of this method.
    +		if (!this.unmatchedBuildVisited) {
    --- End diff --
    
    It could be true if there is no unmatched build records, as showing at line 536.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r50385924
  
    --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java ---
    @@ -0,0 +1,65 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.optimizer.operators;
    +
    +import org.apache.flink.api.common.operators.util.FieldList;
    +import org.apache.flink.optimizer.dag.TwoInputNode;
    +import org.apache.flink.optimizer.dataproperties.LocalProperties;
    +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
    +import org.apache.flink.optimizer.plan.Channel;
    +import org.apache.flink.optimizer.plan.DualInputPlanNode;
    +import org.apache.flink.runtime.operators.DriverStrategy;
    +
    +import java.util.Collections;
    +import java.util.List;
    +
    +public class HashFullOuterJoinBuildSecondDescriptor extends AbstractJoinDescriptor {
    +	public HashFullOuterJoinBuildSecondDescriptor(FieldList keys1, FieldList keys2,
    --- End diff --
    
    same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49656473
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---
    @@ -1558,7 +1641,209 @@ public void reset() {
     		}
     
     	} // end HashBucketIterator
    +	
    +	/**
    +	 * Iterate all the elements in memory which has not been probed during probe phase.
    +	 */
    +	public static class UnmatchedBuildIterator<BT, PT> implements MutableObjectIterator<BT> {
    +	
    +		private final TypeSerializer<BT> accessor;
    +	
    +		private final long totalBucketNumber;
    +		
    +		private final int bucketsPerSegmentBits;
    +		
    +		private final int bucketsPerSegmentMask;
    +		
    +		private final MemorySegment[] buckets;
    +		
    +		private final ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt;
    +		
    +		private final BitSet probedSet;
    +		
    +		private MemorySegment bucket;
    +		
    +		private MemorySegment[] overflowSegments;
    +		
    +		private HashPartition<BT, PT> partition;
    +		
    +		private int scanCount;
    +		
    +		private int bucketInSegmentOffset;
    +		
    +		private int countInSegment;
    +		
    +		private int numInSegment;
    +		
    +		UnmatchedBuildIterator(
    +			TypeSerializer<BT> accessor,
    +			long totalBucketNumber,
    +			int bucketsPerSegmentBits,
    +			int bucketsPerSegmentMask,
    +			MemorySegment[] buckets,
    +			ArrayList<HashPartition<BT, PT>> partitionsBeingBuilt,
    +			BitSet probedSet) {
    +			
    +			this.accessor = accessor;
    +			this.totalBucketNumber = totalBucketNumber;
    +			this.bucketsPerSegmentBits = bucketsPerSegmentBits;
    +			this.bucketsPerSegmentMask = bucketsPerSegmentMask;
    +			this.buckets = buckets;
    +			this.partitionsBeingBuilt = partitionsBeingBuilt;
    +			this.probedSet = probedSet;
    +			init();
    +		}
    +		
    +		private void init() {
    +			scanCount = -1;
    +			while (!moveToNextBucket()) {
    +				if (scanCount >= totalBucketNumber) {
    +					break;
    +				}
    +			}
    +		}
    +		
    +		public BT next(BT reuse) {
    +			while (true) {
    +				BT result = nextInBucket(reuse);
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +		
    +		public BT next() {
    +			while (true) {
    +				BT result = nextInBucket();
    +				if (result == null) {
    +					while (!moveToNextBucket()) {
    +						if (scanCount >= totalBucketNumber) {
    +							return null;
    +						}
    +					}
    +				} else {
    +					return result;
    +				}
    +			}
    +		}
    +	
    +		private boolean moveToNextBucket() {
    +			scanCount++;
    +			if (scanCount > totalBucketNumber - 1) {
    +				return false;
    +			}
    +			final int bucketArrayPos = scanCount >> this.bucketsPerSegmentBits;
    +			final int currentBucketInSegmentOffset = (scanCount & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
    +			MemorySegment currentBucket = this.buckets[bucketArrayPos];
    +			final int partitionNumber = currentBucket.get(currentBucketInSegmentOffset + HEADER_PARTITION_OFFSET);
    +			final HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
    +			if (p.isInMemory()) {
    +				set(currentBucket, p.overflowSegments, p, currentBucketInSegmentOffset);
    +				return true;
    +			} else {
    +				return false;
    +			}
    +		}
    +	
    +		private void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition<BT, PT> partition,
    +			int bucketInSegmentOffset) {
    +			this.bucket = bucket;
    +			this.overflowSegments = overflowSegments;
    +			this.partition = partition;
    +			this.bucketInSegmentOffset = bucketInSegmentOffset;
    +			this.countInSegment = bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
    +			this.numInSegment = 0;
    +		}
    +	
    +		public BT nextInBucket(BT reuse) {
    --- End diff --
    
    Change visibility to `private`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2871] support outer join for hash join ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1469#discussion_r49726803
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java ---
    @@ -1534,6 +1534,106 @@ public void testBucketsNotFulfillSegment() throws Exception {
     		join.close();
     		this.memManager.release(join.getFreedMemory());
     	}
    +
    +	@Test
    +	public void testHashWithBuildSideOuterJoin1() throws Exception {
    +		final int NUM_KEYS = 20000;
    +		final int BUILD_VALS_PER_KEY = 1;
    +		final int PROBE_VALS_PER_KEY = 1;
    +
    +		// create a build input that gives 40000 pairs with 1 values sharing the same key
    +		MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(2 * NUM_KEYS, BUILD_VALS_PER_KEY, false);
    +
    +		// create a probe input that gives 20000 pairs with 1 values sharing a key
    +		MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
    +
    +		// allocate the memory for the HashTable
    +		List<MemorySegment> memSegments;
    +		try {
    +			// 33 is minimum number of pages required to perform hash join this inputs
    +			memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
    +		}
    +		catch (MemoryAllocationException maex) {
    +			fail("Memory for the Join could not be provided.");
    +			return;
    +		}
    +
    +		// ----------------------------------------------------------------------------------------
    +
    +		final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>(
    +			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
    +			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
    +			memSegments, ioManager);
    +		join.open(buildInput, probeInput, true);
    +
    +		final IntPair recordReuse = new IntPair();
    +		int numRecordsInJoinResult = 0;
    +
    +		while (join.nextRecord()) {
    +			MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
    +			while (buildSide.next(recordReuse) != null) {
    +				numRecordsInJoinResult++;
    +			}
    +		}
    +		Assert.assertEquals("Wrong number of records in join result.", 2 * NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);
    +
    +		join.close();
    +		this.memManager.release(join.getFreedMemory());
    +	}
    +	
    +	@Test
    +	public void testHashWithBuildSideOuterJoin2() throws Exception {
    +		final int NUM_KEYS = 40000;
    +		final int BUILD_VALS_PER_KEY = 2;
    +		final int PROBE_VALS_PER_KEY = 1;
    +		
    +		// The keys of probe and build sides are overlapped, so there would be none unmatched build elements
    +		// after probe phase.
    +		
    +		// create a build input that gives 40000 pairs with 2 values sharing the same key
    +		MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
    +		
    +		// create a probe input that gives 20000 pairs with 1 values sharing a key
    +		MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
    +		
    +		// allocate the memory for the HashTable
    +		List<MemorySegment> memSegments;
    +		try {
    +			// 33 is minimum number of pages required to perform hash join this inputs
    +			memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
    +		}
    +		catch (MemoryAllocationException maex) {
    +			fail("Memory for the Join could not be provided.");
    +			return;
    +		}
    +		
    +		// ----------------------------------------------------------------------------------------
    +		
    +		final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>(
    +			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
    +			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
    +			memSegments, ioManager);
    +		join.open(buildInput, probeInput, true);
    +		
    +		final IntPair recordReuse = new IntPair();
    +		int numRecordsInJoinResult = 0;
    +		
    +		while (join.nextRecord()) {
    +			MutableObjectIterator<IntPair> buildSide = join.getBuildSideIterator();
    +			IntPair next = buildSide.next(recordReuse);
    --- End diff --
    
    Does not really test the added outer join feature.
    All build keys are included in the probe side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---