You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pg...@apache.org on 2022/02/10 18:14:35 UTC
[hive] branch master updated: HIVE-25583: Support parallel load for HastTables - Interfaces (#2999) (Panagiotis Garefalakis reviewed by Ramesh Kumar)
This is an automated email from the ASF dual-hosted git repository.
pgaref pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a96c697 HIVE-25583: Support parallel load for HastTables - Interfaces (#2999) (Panagiotis Garefalakis reviewed by Ramesh Kumar)
a96c697 is described below
commit a96c6978063e91a493f07a57d88b2df6dfa56073
Author: Panagiotis Garefalakis <pg...@apache.org>
AuthorDate: Thu Feb 10 20:14:12 2022 +0200
HIVE-25583: Support parallel load for HastTables - Interfaces (#2999) (Panagiotis Garefalakis reviewed by Ramesh Kumar)
* Introducing VectorMapJoinFastHashTableContainerBase class that implements VectorMapJoinHashTable
* Each VectorMapJoinFastStringHashMapContainer is a singleton that contains an array of HashTables (1 or more)
* VectorMapJoinFastTableContainer now initializes VectorMapJoinFastHashTableContainers instead of HTs directly
Change-Id: I14f71dab3859b75d822914966151548793998c39
---
.../persistence/MapJoinTableContainerSerDe.java | 2 +-
.../fast/VectorMapJoinFastBytesHashMap.java | 8 +-
.../fast/VectorMapJoinFastBytesHashMultiSet.java | 6 +-
.../fast/VectorMapJoinFastBytesHashSet.java | 6 +-
.../fast/VectorMapJoinFastBytesHashTable.java | 9 +-
.../mapjoin/fast/VectorMapJoinFastHashTable.java | 16 +-
.../VectorMapJoinFastHashTableContainerBase.java | 45 +++++
.../fast/VectorMapJoinFastHashTableLoader.java | 2 +-
.../mapjoin/fast/VectorMapJoinFastLongHashMap.java | 14 +-
.../VectorMapJoinFastLongHashMapContainer.java | 222 +++++++++++++++++++++
.../fast/VectorMapJoinFastLongHashMultiSet.java | 8 +-
...VectorMapJoinFastLongHashMultiSetContainer.java | 170 ++++++++++++++++
.../mapjoin/fast/VectorMapJoinFastLongHashSet.java | 7 +-
.../VectorMapJoinFastLongHashSetContainer.java | 169 ++++++++++++++++
.../fast/VectorMapJoinFastLongHashTable.java | 29 +--
.../fast/VectorMapJoinFastMultiKeyHashMap.java | 4 +-
.../VectorMapJoinFastMultiKeyHashMapContainer.java | 193 ++++++++++++++++++
.../VectorMapJoinFastMultiKeyHashMultiSet.java | 4 +-
...orMapJoinFastMultiKeyHashMultiSetContainer.java | 124 ++++++++++++
.../fast/VectorMapJoinFastMultiKeyHashSet.java | 4 +-
.../VectorMapJoinFastMultiKeyHashSetContainer.java | 122 +++++++++++
.../fast/VectorMapJoinFastStringCommon.java | 27 ++-
.../fast/VectorMapJoinFastStringHashMap.java | 5 +-
.../VectorMapJoinFastStringHashMapContainer.java | 211 ++++++++++++++++++++
.../fast/VectorMapJoinFastStringHashMultiSet.java | 5 +-
...ctorMapJoinFastStringHashMultiSetContainer.java | 143 +++++++++++++
.../fast/VectorMapJoinFastStringHashSet.java | 4 +-
.../VectorMapJoinFastStringHashSetContainer.java | 142 +++++++++++++
.../fast/VectorMapJoinFastTableContainer.java | 138 +++++++------
.../mapjoin/hashtable/VectorMapJoinHashTable.java | 11 +-
.../optimized/VectorMapJoinOptimizedHashTable.java | 11 +-
.../exec/tez/TestVectorMapJoinFastHashTable.java | 3 +-
.../ql/exec/vector/mapjoin/MapJoinTestConfig.java | 2 +-
.../vector/mapjoin/fast/CheckFastRowHashMap.java | 10 +-
.../fast/TestVectorMapJoinFastRowHashMap.java | 154 +++++++-------
35 files changed, 1787 insertions(+), 243 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
index 5fff1e3..79c31a5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
@@ -216,7 +216,7 @@ public class MapJoinTableContainerSerDe {
FileSystem fs, Path folder, Configuration hconf) throws HiveException {
try {
VectorMapJoinFastTableContainer tableContainer =
- new VectorMapJoinFastTableContainer(mapJoinDesc, hconf, -1);
+ new VectorMapJoinFastTableContainer(mapJoinDesc, hconf, -1, 1);
tableContainer.setSerde(keyContext, valueContext);
if (fs.exists(folder)) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
index d314b6a..de50e69 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
@@ -48,7 +48,7 @@ public abstract class VectorMapJoinFastBytesHashMap
private long fullOuterNullKeyRefWord;
- private static class NonMatchedBytesHashMapIterator extends VectorMapJoinFastNonMatchedIterator {
+ public static class NonMatchedBytesHashMapIterator extends VectorMapJoinFastNonMatchedIterator {
private VectorMapJoinFastBytesHashMap hashMap;
@@ -152,15 +152,13 @@ public abstract class VectorMapJoinFastBytesHashMap
return new NonMatchedBytesHashMapIterator(matchTracker, this);
}
- public void add(byte[] keyBytes, int keyStart, int keyLength, BytesWritable currentValue) {
+ public void add(byte[] keyBytes, int keyStart, int keyLength, BytesWritable currentValue, long hashCode) {
if (checkResize()) {
expandAndRehash();
}
- long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
- int intHashCode = (int) hashCode;
- int slot = (intHashCode & logicalHashBucketMask);
+ int slot = ((int) hashCode & logicalHashBucketMask);
long probeSlot = slot;
int i = 0;
boolean isNewKey;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
index c384c09..1b81db5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
@@ -49,15 +49,13 @@ public abstract class VectorMapJoinFastBytesHashMultiSet
return new VectorMapJoinFastBytesHashMultiSetStore.HashMultiSetResult();
}
- public void add(byte[] keyBytes, int keyStart, int keyLength, BytesWritable currentValue) {
+ public void add(byte[] keyBytes, int keyStart, int keyLength, BytesWritable currentValue, long hashCode) {
if (checkResize()) {
expandAndRehash();
}
- long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
- int intHashCode = (int) hashCode;
- int slot = (intHashCode & logicalHashBucketMask);
+ int slot = ((int) hashCode & logicalHashBucketMask);
long probeSlot = slot;
int i = 0;
boolean isNewKey;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
index 19b5791..32adbc8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
@@ -44,15 +44,13 @@ public abstract class VectorMapJoinFastBytesHashSet
return new VectorMapJoinFastBytesHashSetStore.HashSetResult();
}
- public void add(byte[] keyBytes, int keyStart, int keyLength, BytesWritable currentValue) {
+ public void add(byte[] keyBytes, int keyStart, int keyLength, BytesWritable currentValue, long hashCode) {
if (checkResize()) {
expandAndRehash();
}
- long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
- int intHashCode = (int) hashCode;
- int slot = (intHashCode & logicalHashBucketMask);
+ int slot = ((int) hashCode & logicalHashBucketMask);
long probeSlot = slot;
int i = 0;
boolean isNewKey;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
index 80f4546..160969c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
@@ -28,8 +28,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.WriteBuffers;
import org.apache.hadoop.io.BytesWritable;
-import com.google.common.annotations.VisibleForTesting;
-
/*
* An single byte array value hash map optimized for vector map join.
*/
@@ -46,11 +44,12 @@ public abstract class VectorMapJoinFastBytesHashTable
protected BytesWritable testKeyBytesWritable;
@Override
- public void putRow(BytesWritable currentKey, BytesWritable currentValue) throws HiveException, IOException {
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+ throws HiveException, IOException {
// No deserialization of key(s) here -- just get reference to bytes.
byte[] keyBytes = currentKey.getBytes();
int keyLength = currentKey.getLength();
- add(keyBytes, 0, keyLength, currentValue);
+ add(keyBytes, 0, keyLength, currentValue, hashCode);
}
@Override
@@ -60,7 +59,7 @@ public abstract class VectorMapJoinFastBytesHashTable
}
public abstract void add(byte[] keyBytes, int keyStart, int keyLength,
- BytesWritable currentValue);
+ BytesWritable currentValue, long hashCode);
protected void expandAndRehash() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
index f4541f7..84095c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
@@ -64,7 +64,7 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab
private static void validateCapacity(long capacity) {
if (Long.bitCount(capacity) != 1) {
- throw new AssertionError("Capacity must be a power of two");
+ throw new AssertionError("Capacity must be a power of two " + capacity);
}
if (capacity <= 0) {
throw new AssertionError("Invalid capacity " + capacity);
@@ -72,7 +72,12 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab
}
private static int nextHighestPowerOfTwo(int v) {
- return Integer.highestOneBit(v) << 1;
+ int value = Integer.highestOneBit(v);
+ if (Integer.highestOneBit(v) == HIGHEST_INT_POWER_OF_2) {
+ LOG.warn("Reached highest 2 power: {}", HIGHEST_INT_POWER_OF_2);
+ return value;
+ }
+ return value << 1;
}
public VectorMapJoinFastHashTable(
@@ -81,14 +86,13 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab
this.isFullOuter = isFullOuter;
- initialCapacity = (Long.bitCount(initialCapacity) == 1)
+ this.logicalHashBucketCount = (Long.bitCount(initialCapacity) == 1)
? initialCapacity : nextHighestPowerOfTwo(initialCapacity);
+ LOG.info("Initial Capacity {} Recomputed Capacity {}", initialCapacity, logicalHashBucketCount);
- validateCapacity(initialCapacity);
+ validateCapacity(logicalHashBucketCount);
this.estimatedKeyCount = estimatedKeyCount;
-
- logicalHashBucketCount = initialCapacity;
logicalHashBucketMask = logicalHashBucketCount - 1;
resizeThreshold = (int)(logicalHashBucketCount * loadFactor);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableContainerBase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableContainerBase.java
new file mode 100644
index 0000000..8568725
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableContainerBase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+
+/**
+ * MultiHT implementation base
+ */
+public abstract class VectorMapJoinFastHashTableContainerBase implements VectorMapJoinHashTable {
+
+ public abstract void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+ throws HiveException, IOException;
+
+ public abstract long getHashCode(BytesWritable currentKey) throws HiveException, IOException;
+
+ public abstract long getEstimatedMemorySize();
+
+ public abstract int size();
+
+ // To be removed..
+ public boolean containsLongKey(long currentKey) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
index 49a08f3..e0d8e8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
@@ -137,7 +137,7 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
long keyCount = Math.max(estKeyCount, inputRecords);
VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
- new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
+ new VectorMapJoinFastTableContainer(desc, hconf, keyCount, 1);
LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {} " +
"estKeyCount : {} keyCount : {}", inputName, cacheKey,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
index bfc829f..2a805d0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
@@ -22,9 +22,6 @@ import java.io.IOException;
import org.apache.hadoop.hive.common.MemoryEstimate;
import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.ql.util.JavaDataModel;
-// import org.slf4j.Logger;
-// import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.JoinUtil;
import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
@@ -54,7 +51,7 @@ public class VectorMapJoinFastLongHashMap
private long fullOuterNullKeyValueRef;
- private static class NonMatchedLongHashMapIterator extends VectorMapJoinFastNonMatchedIterator {
+ public static class NonMatchedLongHashMapIterator extends VectorMapJoinFastNonMatchedIterator {
private VectorMapJoinFastLongHashMap hashMap;
@@ -67,7 +64,7 @@ public class VectorMapJoinFastLongHashMap
private VectorMapJoinFastValueStore.HashMapResult nonMatchedHashMapResult;
- NonMatchedLongHashMapIterator(MatchTracker matchTracker,
+ public NonMatchedLongHashMapIterator(MatchTracker matchTracker,
VectorMapJoinFastLongHashMap hashMap) {
super(matchTracker);
this.hashMap = hashMap;
@@ -142,10 +139,10 @@ public class VectorMapJoinFastLongHashMap
}
@Override
- public void putRow(BytesWritable currentKey, BytesWritable currentValue)
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
throws HiveException, IOException {
- if (!adaptPutRow(currentKey, currentValue)) {
+ if (!adaptPutRow(hashCode, currentKey, currentValue)) {
// Ignore NULL keys, except for FULL OUTER.
if (isFullOuter) {
@@ -170,7 +167,8 @@ public class VectorMapJoinFastLongHashMap
testValueBytesWritable = new BytesWritable();
}
testValueBytesWritable.set(currentValue, 0, currentValue.length);
- add(currentKey, testValueBytesWritable);
+ long hashCode = HashCodeUtil.calculateLongHashCode(currentKey);
+ add(hashCode, currentKey, testValueBytesWritable);
}
@Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMapContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMapContainer.java
new file mode 100644
index 0000000..266fdd8
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMapContainer.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import org.apache.hadoop.hive.common.MemoryEstimate;
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMap;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/*
+ * Single LONG key hash map optimized for vector map join.
+ */
+public class VectorMapJoinFastLongHashMapContainer extends VectorMapJoinFastHashTableContainerBase implements
+ VectorMapJoinLongHashMap, MemoryEstimate {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMapContainer.class);
+
+ private final VectorMapJoinFastLongHashMap[] vectorMapJoinFastLongHashMaps;
+ private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+ private final HashTableKeyType hashTableKeyType;
+ private final int numThreads;
+ private final boolean minMaxEnabled;
+
+ public VectorMapJoinFastLongHashMapContainer(
+ boolean isFullOuter,
+ boolean minMaxEnabled,
+ HashTableKeyType hashTableKeyType,
+ int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+ int numHTs) {
+ this.hashTableKeyType = hashTableKeyType;
+ this.vectorMapJoinFastLongHashMaps = new VectorMapJoinFastLongHashMap[numHTs];
+ LOG.info("Initializing {} HT Containers ", numHTs);
+ for (int i = 0; i < numHTs; ++i) {
+ vectorMapJoinFastLongHashMaps[i] =
+ new VectorMapJoinFastLongHashMap(isFullOuter, minMaxEnabled, hashTableKeyType, initialCapacity, loadFactor,
+ writeBuffersSize, estimatedKeyCount, tableDesc);
+ }
+ PrimitiveTypeInfo[] primitiveTypeInfos = { hashTableKeyType.getPrimitiveTypeInfo() };
+ this.keyBinarySortableDeserializeRead =
+ BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+ this.numThreads = numHTs;
+ this.minMaxEnabled = minMaxEnabled;
+ }
+
+ @Override
+ public boolean useMinMax() {
+ return minMaxEnabled;
+ }
+
+ @Override
+ public long min() {
+ long min = Long.MAX_VALUE;
+ for (int i = 0; i < numThreads; ++i) {
+ long currentMin = vectorMapJoinFastLongHashMaps[i].min();
+ if (currentMin < min) {
+ min = currentMin;
+ }
+ }
+ return min;
+ }
+
+ @Override
+ public long max() {
+ long max = Long.MIN_VALUE;
+ for (int i = 0; i < numThreads; ++i) {
+ long currentMax = vectorMapJoinFastLongHashMaps[i].max();
+ if (currentMax > max) {
+ max = currentMax;
+ }
+ }
+ return max;
+ }
+
+ public static class NonMatchedLongHashMapIterator extends VectorMapJoinFastNonMatchedIterator {
+
+ private VectorMapJoinFastLongHashMap.NonMatchedLongHashMapIterator[] hashMapIterators;
+ private int index;
+ private int numThreads;
+
+ private NonMatchedLongHashMapIterator(MatchTracker matchTracker,
+ VectorMapJoinFastLongHashMap[] vectorMapJoinFastLongHashMaps, int numThreads) {
+ super(matchTracker);
+ hashMapIterators = new VectorMapJoinFastLongHashMap.NonMatchedLongHashMapIterator[numThreads];
+ for (int i = 0; i < numThreads; ++i) {
+ hashMapIterators[i] = new VectorMapJoinFastLongHashMap.NonMatchedLongHashMapIterator(matchTracker,
+ vectorMapJoinFastLongHashMaps[i]);
+ }
+ index = 0;
+ this.numThreads = numThreads;
+ }
+
+ public void init() {
+ for (int i = 0; i < numThreads; ++i) {
+ hashMapIterators[i].init();
+ }
+ index = 0;
+ }
+
+ public boolean findNextNonMatched() {
+ for (; index < numThreads; ++index) {
+ if (hashMapIterators[index].findNextNonMatched()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean readNonMatchedLongKey() {
+ return hashMapIterators[index].readNonMatchedLongKey();
+ }
+
+ public long getNonMatchedLongKey() {
+ return hashMapIterators[index].getNonMatchedLongKey();
+ }
+
+ public VectorMapJoinHashMapResult getNonMatchedHashMapResult() {
+ return hashMapIterators[index].getNonMatchedHashMapResult();
+ }
+ }
+
+ public VectorMapJoinHashMapResult createHashMapResult() {
+ return new VectorMapJoinFastValueStore.HashMapResult();
+ }
+
+ @Override
+ public int spillPartitionId() {
+ throw new UnsupportedOperationException();
+ }
+
+ public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+ byte[] keyBytes = currentKey.getBytes();
+ int keyLength = currentKey.getLength();
+ keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+ try {
+ if (!keyBinarySortableDeserializeRead.readNextField()) {
+ return 0;
+ }
+ } catch(Exception e) {
+ throw new HiveException("DeserializeRead details: " +
+ keyBinarySortableDeserializeRead.getDetailedReadPositionString(), e);
+ }
+ long key = VectorMapJoinFastLongHashUtil.deserializeLongKey(keyBinarySortableDeserializeRead, hashTableKeyType);
+ return HashCodeUtil.calculateLongHashCode(key);
+ }
+
+ @Override
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+ throws HiveException, IOException {
+ vectorMapJoinFastLongHashMaps[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+ }
+
+ public JoinUtil.JoinResult lookup(long key, VectorMapJoinHashMapResult hashMapResult) {
+ long hashCode = HashCodeUtil.calculateLongHashCode(key);
+ return vectorMapJoinFastLongHashMaps[(int) ((numThreads - 1) & hashCode)].lookup(key, hashMapResult);
+ }
+
+ public JoinUtil.JoinResult lookup(long key, VectorMapJoinHashMapResult hashMapResult,
+ MatchTracker matchTracker) {
+ long hashCode = HashCodeUtil.calculateLongHashCode(key);
+ return vectorMapJoinFastLongHashMaps[(int) ((numThreads - 1) & hashCode)].lookup(key, hashMapResult, matchTracker);
+ }
+
+ public long getEstimatedMemorySize() {
+ long estimatedMemorySize = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ estimatedMemorySize += vectorMapJoinFastLongHashMaps[i].getEstimatedMemorySize();
+ }
+ return estimatedMemorySize;
+ }
+
+ @Override
+ public int size() {
+ int size = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ size += vectorMapJoinFastLongHashMaps[i].size();
+ }
+ return size;
+ }
+
+ @Override
+ public MatchTracker createMatchTracker() {
+ int count = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ count += vectorMapJoinFastLongHashMaps[i].logicalHashBucketCount;
+ }
+ return MatchTracker.create(count);
+ }
+
+ @Override
+ public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+ return new NonMatchedLongHashMapIterator(matchTracker, vectorMapJoinFastLongHashMaps, numThreads);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
index 55f038b..8701e4b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHash
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMultiSet;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
-import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hive.common.util.HashCodeUtil;
@@ -51,10 +50,10 @@ public class VectorMapJoinFastLongHashMultiSet
}
@Override
- public void putRow(BytesWritable currentKey, BytesWritable currentValue)
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
throws HiveException, IOException {
- if (!adaptPutRow(currentKey, currentValue)) {
+ if (!adaptPutRow(hashCode, currentKey, currentValue)) {
// Ignore NULL keys, except for FULL OUTER.
if (isFullOuter) {
@@ -75,7 +74,8 @@ public class VectorMapJoinFastLongHashMultiSet
*/
@VisibleForTesting
public void testPutRow(long currentKey) throws HiveException, IOException {
- add(currentKey, null);
+ long hashCode = HashCodeUtil.calculateLongHashCode(currentKey);
+ add(hashCode, currentKey, null);
}
@Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSetContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSetContainer.java
new file mode 100644
index 0000000..c7184d7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSetContainer.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMultiSet;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/*
+ * Single LONG key hash multi-set optimized for vector map join.
+ */
+public class VectorMapJoinFastLongHashMultiSetContainer extends VectorMapJoinFastHashTableContainerBase implements
+ VectorMapJoinLongHashMultiSet {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMultiSetContainer.class);
+
+ private final VectorMapJoinFastLongHashMultiSet[] vectorMapJoinFastLongHashMultiSets;
+ private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+ private final HashTableKeyType hashTableKeyType;
+ private final int numThreads;
+ private final boolean minMaxEnabled;
+
+ public VectorMapJoinFastLongHashMultiSetContainer(
+ boolean isFullOuter,
+ boolean minMaxEnabled,
+ HashTableKeyType hashTableKeyType,
+ int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+ int numHTs) {
+ this.hashTableKeyType = hashTableKeyType;
+ this.vectorMapJoinFastLongHashMultiSets = new VectorMapJoinFastLongHashMultiSet[numHTs];
+ LOG.info("Initializing {} HT Containers ", numHTs);
+ for (int i = 0; i < numHTs; ++i) {
+ vectorMapJoinFastLongHashMultiSets[i] = new VectorMapJoinFastLongHashMultiSet(isFullOuter,
+ minMaxEnabled, hashTableKeyType,
+ initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+ }
+ PrimitiveTypeInfo[] primitiveTypeInfos = { hashTableKeyType.getPrimitiveTypeInfo() };
+ this.keyBinarySortableDeserializeRead =
+ BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+ this.numThreads = numHTs;
+ this.minMaxEnabled = minMaxEnabled;
+ }
+
+ public VectorMapJoinHashMultiSetResult createHashMultiSetResult() {
+ return new VectorMapJoinFastHashMultiSet.HashMultiSetResult();
+ }
+
+ @Override
+ public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+ byte[] keyBytes = currentKey.getBytes();
+ int keyLength = currentKey.getLength();
+ keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+ try {
+ if (!keyBinarySortableDeserializeRead.readNextField()) {
+ return 0;
+ }
+ } catch (Exception e) {
+ throw new HiveException("DeserializeRead details: " +
+ keyBinarySortableDeserializeRead.getDetailedReadPositionString(), e);
+ }
+ long key = VectorMapJoinFastLongHashUtil.deserializeLongKey(keyBinarySortableDeserializeRead, hashTableKeyType);
+ return HashCodeUtil.calculateLongHashCode(key);
+ }
+
+ @Override
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+ throws HiveException, IOException {
+ vectorMapJoinFastLongHashMultiSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+ }
+
+ public JoinUtil.JoinResult contains(long key, VectorMapJoinHashMultiSetResult hashMultiSetResult) {
+ long hashCode = HashCodeUtil.calculateLongHashCode(key);
+ return vectorMapJoinFastLongHashMultiSets[(int) ((numThreads - 1) & hashCode)].contains(key, hashMultiSetResult);
+ }
+
+ @Override
+ public long getEstimatedMemorySize() {
+ long estimatedMemorySize = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ estimatedMemorySize += vectorMapJoinFastLongHashMultiSets[i].getEstimatedMemorySize();
+ }
+ return estimatedMemorySize;
+ }
+
+ @Override
+ public int size() {
+ int size = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ size += vectorMapJoinFastLongHashMultiSets[i].size();
+ }
+ return size;
+ }
+
+ @Override
+ public MatchTracker createMatchTracker() {
+ int count = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ count += vectorMapJoinFastLongHashMultiSets[i].logicalHashBucketCount;
+ }
+ return MatchTracker.create(count);
+ }
+
+ @Override
+ public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int spillPartitionId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean useMinMax() {
+ return minMaxEnabled;
+ }
+
+ @Override
+ public long min() {
+ long min = Long.MAX_VALUE;
+ for (int i = 0; i < numThreads; ++i) {
+ long currentMin = vectorMapJoinFastLongHashMultiSets[i].min();
+ if (currentMin < min) {
+ min = currentMin;
+ }
+ }
+ return min;
+ }
+
+ @Override
+ public long max() {
+ long max = Long.MIN_VALUE;
+ for (int i = 0; i < numThreads; ++i) {
+ long currentMax = vectorMapJoinFastLongHashMultiSets[i].max();
+ if (currentMax > max) {
+ max = currentMax;
+ }
+ }
+ return max;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
index 3fb9941..21e9b52 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
@@ -49,11 +49,11 @@ public class VectorMapJoinFastLongHashSet
}
@Override
- public void putRow(BytesWritable currentKey, BytesWritable currentValue)
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
throws HiveException, IOException {
// Ignore NULL keys (HashSet not used for FULL OUTER).
- adaptPutRow(currentKey, currentValue);
+ adaptPutRow(hashCode, currentKey, currentValue);
}
@Override
@@ -67,7 +67,8 @@ public class VectorMapJoinFastLongHashSet
*/
@VisibleForTesting
public void testPutRow(long currentKey) throws HiveException, IOException {
- add(currentKey, null);
+ long hashCode = HashCodeUtil.calculateLongHashCode(currentKey);
+ add(hashCode, currentKey, null);
}
@Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSetContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSetContainer.java
new file mode 100644
index 0000000..1690739
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSetContainer.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.JoinUtil.JoinResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashSet;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+
+/*
+ * Single LONG key hash set optimized for vector map join.
+ */
+public class VectorMapJoinFastLongHashSetContainer extends VectorMapJoinFastHashTableContainerBase implements VectorMapJoinLongHashSet{
+
+ public static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashSetContainer.class);
+
+ private final VectorMapJoinFastLongHashSet[] vectorMapJoinFastLongHashSets;
+ private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+ private final HashTableKeyType hashTableKeyType;
+ private final int numThreads;
+ private final boolean minMaxEnabled;
+
+ public VectorMapJoinFastLongHashSetContainer(
+ boolean isFullOuter,
+ boolean minMaxEnabled,
+ HashTableKeyType hashTableKeyType,
+ int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+ int numHTs) {
+ this.hashTableKeyType = hashTableKeyType;
+ this.vectorMapJoinFastLongHashSets = new VectorMapJoinFastLongHashSet[numHTs];
+ LOG.info("Initializing {} HT Containers ", numHTs);
+ for (int i = 0; i < numHTs; ++i) {
+ vectorMapJoinFastLongHashSets[i] =
+ new VectorMapJoinFastLongHashSet(isFullOuter, minMaxEnabled, hashTableKeyType, initialCapacity,
+ loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+ }
+ PrimitiveTypeInfo[] primitiveTypeInfos = { hashTableKeyType.getPrimitiveTypeInfo() };
+ this.keyBinarySortableDeserializeRead =
+ BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+ this.numThreads = numHTs;
+ this.minMaxEnabled = minMaxEnabled;
+ }
+
+ public VectorMapJoinHashSetResult createHashSetResult() {
+ return new VectorMapJoinFastHashSet.HashSetResult();
+ }
+
+ @Override
+ public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+ byte[] keyBytes = currentKey.getBytes();
+ int keyLength = currentKey.getLength();
+ keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+ try {
+ if (!keyBinarySortableDeserializeRead.readNextField()) {
+ return 0;
+ }
+ } catch (Exception e) {
+ throw new HiveException("DeserializeRead details: " +
+ keyBinarySortableDeserializeRead.getDetailedReadPositionString(), e);
+ }
+ long key = VectorMapJoinFastLongHashUtil.deserializeLongKey(keyBinarySortableDeserializeRead, hashTableKeyType);
+ return HashCodeUtil.calculateLongHashCode(key);
+ }
+
+ @Override
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+ throws HiveException, IOException {
+ vectorMapJoinFastLongHashSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+ }
+
+ public JoinResult contains(long key, VectorMapJoinHashSetResult hashSetResult) {
+ long hashCode = HashCodeUtil.calculateLongHashCode(key);
+ return vectorMapJoinFastLongHashSets[(int) ((numThreads - 1) & hashCode)].contains(key, hashSetResult);
+ }
+
+ @Override
+ public long getEstimatedMemorySize() {
+ long estimatedMemorySize = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ estimatedMemorySize += vectorMapJoinFastLongHashSets[i].getEstimatedMemorySize();
+ }
+ return estimatedMemorySize;
+ }
+
+ @Override
+ public int size() {
+ int size = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ size += vectorMapJoinFastLongHashSets[i].size();
+ }
+ return size;
+ }
+
+ @Override
+ public MatchTracker createMatchTracker() {
+ int count = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ count += vectorMapJoinFastLongHashSets[i].logicalHashBucketCount;
+ }
+ return MatchTracker.create(count);
+ }
+
+ @Override
+ public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int spillPartitionId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean useMinMax() {
+ return minMaxEnabled;
+ }
+
+ @Override
+ public long min() {
+ long min = Long.MAX_VALUE;
+ for (int i = 0; i < numThreads; ++i) {
+ long currentMin = vectorMapJoinFastLongHashSets[i].min();
+ if (currentMin < min) {
+ min = currentMin;
+ }
+ }
+ return min;
+ }
+
+ @Override
+ public long max() {
+ long max = Long.MIN_VALUE;
+ for (int i = 0; i < numThreads; ++i) {
+ long currentMax = vectorMapJoinFastLongHashSets[i].max();
+ if (currentMax > max) {
+ max = currentMax;
+ }
+ }
+ return max;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
index d275a3e..c838f29 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
@@ -24,20 +24,13 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.exec.JoinUtil;
-import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMap;
-import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMap;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashTable;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hive.common.util.HashCodeUtil;
-import org.apache.tez.runtime.library.api.KeyValueReader;
-
-import com.google.common.annotations.VisibleForTesting;
/*
* An single long value map optimized for vector map join.
@@ -71,7 +64,8 @@ public abstract class VectorMapJoinFastLongHashTable
return max;
}
- public boolean adaptPutRow(BytesWritable currentKey, BytesWritable currentValue) throws HiveException, IOException {
+ public boolean adaptPutRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+ throws HiveException, IOException {
byte[] keyBytes = currentKey.getBytes();
int keyLength = currentKey.getLength();
keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
@@ -80,30 +74,23 @@ public abstract class VectorMapJoinFastLongHashTable
return false;
}
} catch (Exception e) {
- throw new HiveException(
- "\nDeserializeRead details: " +
- keyBinarySortableDeserializeRead.getDetailedReadPositionString() +
- "\nException: " + e.toString());
+ throw new HiveException("DeserializeRead details: " +
+ keyBinarySortableDeserializeRead.getDetailedReadPositionString(), e);
}
-
- long key = VectorMapJoinFastLongHashUtil.deserializeLongKey(
- keyBinarySortableDeserializeRead, hashTableKeyType);
-
- add(key, currentValue);
+ long key = VectorMapJoinFastLongHashUtil.deserializeLongKey(keyBinarySortableDeserializeRead, hashTableKeyType);
+ add(hashCode, key, currentValue);
return true;
}
protected abstract void assignSlot(int slot, long key, boolean isNewKey, BytesWritable currentValue);
- public void add(long key, BytesWritable currentValue) {
+ public void add(long hashCode, long key, BytesWritable currentValue) {
if (checkResize()) {
expandAndRehash();
}
- long hashCode = HashCodeUtil.calculateLongHashCode(key);
- int intHashCode = (int) hashCode;
- int slot = (intHashCode & logicalHashBucketMask);
+ int slot = ((int) hashCode & logicalHashBucketMask);
long probeSlot = slot;
int i = 0;
boolean isNewKey;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
index 4ab1601..fb83807 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.BytesWritable;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hive.common.util.HashCodeUtil;
/*
* An multi-key value hash map optimized for vector map join.
@@ -45,7 +46,8 @@ public class VectorMapJoinFastMultiKeyHashMap
}
testKeyBytesWritable.set(currentKey, 0, currentKey.length);
testValueBytesWritable.set(currentValue, 0, currentValue.length);
- putRow(testKeyBytesWritable, testValueBytesWritable);
+ long hashCode = HashCodeUtil.murmurHash(currentKey, 0, currentKey.length);
+ putRow(hashCode, testKeyBytesWritable, testValueBytesWritable);
}
public VectorMapJoinFastMultiKeyHashMap(
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMapContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMapContainer.java
new file mode 100644
index 0000000..bb3bba5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMapContainer.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.BytesWritable;
+
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Multi-key value hash map optimized for vector map join.
+ *
+ * The key is stored as the provided bytes (uninterpreted).
+ */
+public class VectorMapJoinFastMultiKeyHashMapContainer
+ extends VectorMapJoinFastHashTableContainerBase implements VectorMapJoinBytesHashMap {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastMultiKeyHashMapContainer.class);
+
+ private final VectorMapJoinFastMultiKeyHashMap[] vectorMapJoinFastMultiKeyHashMaps;
+ private BytesWritable testKeyBytesWritable;
+ private final int numThreads;
+
+ public VectorMapJoinFastMultiKeyHashMapContainer(
+ boolean isFullOuter,
+ int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, int numHTs) {
+ this.vectorMapJoinFastMultiKeyHashMaps = new VectorMapJoinFastMultiKeyHashMap[numHTs];
+ LOG.info("Initializing {} HT Containers ", numHTs);
+ for (int i = 0; i < numHTs; ++i) {
+ vectorMapJoinFastMultiKeyHashMaps[i] =
+ new VectorMapJoinFastMultiKeyHashMap(isFullOuter, initialCapacity, loadFactor, writeBuffersSize,
+ estimatedKeyCount);
+ }
+ this.numThreads = numHTs;
+ }
+
+ public static class NonMatchedBytesHashMapParallelIterator extends VectorMapJoinFastNonMatchedIterator {
+
+ private VectorMapJoinFastBytesHashMap.NonMatchedBytesHashMapIterator[] hashMapIterators;
+ private int index;
+ private int numThreads;
+
+ NonMatchedBytesHashMapParallelIterator(MatchTracker matchTracker,
+ VectorMapJoinFastBytesHashMap[] hashMaps, int numThreads) {
+ super(matchTracker);
+ hashMapIterators = new VectorMapJoinFastBytesHashMap.NonMatchedBytesHashMapIterator[4];
+ for (int i = 0; i < numThreads; ++i) {
+ hashMapIterators[i] = new VectorMapJoinFastBytesHashMap.NonMatchedBytesHashMapIterator(matchTracker,
+ hashMaps[i]);
+ }
+ index = 0;
+ this.numThreads = numThreads;
+ }
+
+ @Override
+ public void init() {
+ for (int i = 0; i < numThreads; ++i) {
+ hashMapIterators[i].init();
+ }
+ index = 0;
+ }
+
+ @Override
+ public boolean findNextNonMatched() {
+ for (; index < numThreads; ++index) {
+ if (hashMapIterators[index].findNextNonMatched()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean readNonMatchedBytesKey() throws HiveException {
+ return hashMapIterators[index].readNonMatchedBytesKey();
+ }
+
+ @Override
+ public byte[] getNonMatchedBytes() {
+ return hashMapIterators[index].getNonMatchedBytes();
+ }
+
+ @Override
+ public int getNonMatchedBytesOffset() {
+ return hashMapIterators[index].getNonMatchedBytesOffset();
+ }
+
+ @Override
+ public int getNonMatchedBytesLength() {
+ return hashMapIterators[index].getNonMatchedBytesLength();
+ }
+
+ @Override
+ public VectorMapJoinHashMapResult getNonMatchedHashMapResult() {
+ return hashMapIterators[index].getNonMatchedHashMapResult();
+ }
+ }
+
+ @Override
+ public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+ byte[] keyBytes = currentKey.getBytes();
+ int keyLength = currentKey.getLength();
+ return HashCodeUtil.murmurHash(keyBytes, 0, keyLength);
+ }
+
+ @Override
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+ throws HiveException, IOException {
+ vectorMapJoinFastMultiKeyHashMaps[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+ }
+
+ @Override
+ public long getEstimatedMemorySize() {
+ long estimatedMemorySize = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ estimatedMemorySize += vectorMapJoinFastMultiKeyHashMaps[i].getEstimatedMemorySize();
+ }
+ return estimatedMemorySize;
+ }
+
+ @Override
+ public int size() {
+ int size = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ size += vectorMapJoinFastMultiKeyHashMaps[i].size();
+ }
+ return size;
+ }
+
+ @Override
+ public MatchTracker createMatchTracker() {
+ int count = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ count += vectorMapJoinFastMultiKeyHashMaps[i].logicalHashBucketCount;
+ }
+ return MatchTracker.create(count);
+ }
+
+ @Override
+ public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+ return new NonMatchedBytesHashMapParallelIterator(matchTracker, vectorMapJoinFastMultiKeyHashMaps, numThreads);
+ }
+
+ @Override
+ public int spillPartitionId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JoinUtil.JoinResult lookup(byte[] keyBytes, int keyStart, int keyLength,
+ VectorMapJoinHashMapResult hashMapResult) throws IOException {
+ long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
+ return vectorMapJoinFastMultiKeyHashMaps[(int) ((numThreads - 1) & hashCode)].lookup(keyBytes, keyStart, keyLength, hashMapResult);
+ }
+
+ @Override
+ public JoinUtil.JoinResult lookup(byte[] keyBytes, int keyStart, int keyLength,
+ VectorMapJoinHashMapResult hashMapResult, MatchTracker matchTracker) throws IOException {
+ long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
+ return vectorMapJoinFastMultiKeyHashMaps[(int) ((numThreads - 1) & hashCode)].lookup(keyBytes, keyStart, keyLength, hashMapResult,
+ matchTracker);
+ }
+
+ @Override
+ public VectorMapJoinHashMapResult createHashMapResult() {
+ return new VectorMapJoinFastBytesHashMapStore.HashMapResult();
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
index 960115f..b7e5fa2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.BytesWritable;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hive.common.util.HashCodeUtil;
/*
* An multi-key hash multi-set optimized for vector map join.
@@ -43,7 +44,8 @@ public class VectorMapJoinFastMultiKeyHashMultiSet
testKeyBytesWritable = new BytesWritable();
}
testKeyBytesWritable.set(currentKey, 0, currentKey.length);
- putRow(testKeyBytesWritable, null);
+ long hashCode = HashCodeUtil.murmurHash(currentKey, 0, currentKey.length);
+ putRow(hashCode, testKeyBytesWritable, null);
}
public VectorMapJoinFastMultiKeyHashMultiSet(
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSetContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSetContainer.java
new file mode 100644
index 0000000..2a0427b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSetContainer.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMultiSet;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.BytesWritable;
+
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Multi-key hash multi-set optimized for vector map join.
+ *
+ * The key is stored as the provided bytes (uninterpreted).
+ */
+public class VectorMapJoinFastMultiKeyHashMultiSetContainer
+ extends VectorMapJoinFastHashTableContainerBase implements VectorMapJoinBytesHashMultiSet {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastMultiKeyHashMultiSetContainer.class);
+
+ private final VectorMapJoinFastMultiKeyHashMultiSet[] vectorMapJoinFastMultiKeyHashMultiSets;
+ private final int numThreads;
+
+ public VectorMapJoinFastMultiKeyHashMultiSetContainer(
+ boolean isFullOuter,
+ int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, int numHTs) {
+ this.vectorMapJoinFastMultiKeyHashMultiSets = new VectorMapJoinFastMultiKeyHashMultiSet[numHTs];
+ LOG.info("Initializing {} HT Containers ", numHTs);
+ for (int i = 0; i < numHTs; ++i) {
+ this.vectorMapJoinFastMultiKeyHashMultiSets[i] =
+ new VectorMapJoinFastMultiKeyHashMultiSet(isFullOuter, initialCapacity, loadFactor, writeBuffersSize,
+ estimatedKeyCount);
+ }
+ this.numThreads = numHTs;
+ }
+
+ @Override
+ public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+ byte[] keyBytes = currentKey.getBytes();
+ int keyLength = currentKey.getLength();
+ return HashCodeUtil.murmurHash(keyBytes, 0, keyLength);
+ }
+
+ @Override
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+ throws HiveException, IOException {
+ vectorMapJoinFastMultiKeyHashMultiSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey,
+ currentValue);
+ }
+
+ @Override
+ public long getEstimatedMemorySize() {
+ long estimatedMemorySize = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ estimatedMemorySize += vectorMapJoinFastMultiKeyHashMultiSets[i].getEstimatedMemorySize();
+ }
+ return estimatedMemorySize;
+ }
+
+ @Override
+ public int size() {
+ int size = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ size += vectorMapJoinFastMultiKeyHashMultiSets[i].size();
+ }
+ return size;
+ }
+
+ @Override
+ public MatchTracker createMatchTracker() {
+ int count = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ count += vectorMapJoinFastMultiKeyHashMultiSets[i].logicalHashBucketCount;
+ }
+ return MatchTracker.create(count);
+ }
+
+ @Override
+ public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int spillPartitionId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JoinUtil.JoinResult contains(byte[] keyBytes, int keyStart, int keyLength,
+ VectorMapJoinHashMultiSetResult hashMultiSetResult) throws IOException {
+ long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
+ return vectorMapJoinFastMultiKeyHashMultiSets[(int) ((numThreads - 1) & hashCode)].contains(keyBytes, keyStart, keyLength,
+ hashMultiSetResult);
+ }
+
+ @Override
+ public VectorMapJoinHashMultiSetResult createHashMultiSetResult() {
+ return new VectorMapJoinFastBytesHashMultiSetStore.HashMultiSetResult();
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
index 9c1183b..fed192a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.BytesWritable;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hive.common.util.HashCodeUtil;
/*
* An multi-key hash set optimized for vector map join.
@@ -43,7 +44,8 @@ public class VectorMapJoinFastMultiKeyHashSet
testKeyBytesWritable = new BytesWritable();
}
testKeyBytesWritable.set(currentKey, 0, currentKey.length);
- putRow(testKeyBytesWritable, null);
+ long hashCode = HashCodeUtil.murmurHash(currentKey, 0, currentKey.length);
+ putRow(hashCode, testKeyBytesWritable, null);
}
public VectorMapJoinFastMultiKeyHashSet(
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSetContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSetContainer.java
new file mode 100644
index 0000000..2a9b91c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSetContainer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashSet;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.BytesWritable;
+
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Multi-key hash set optimized for vector map join.
+ *
+ * The key is stored as the provided bytes (uninterpreted).
+ */
+public class VectorMapJoinFastMultiKeyHashSetContainer
+ extends VectorMapJoinFastHashTableContainerBase implements VectorMapJoinBytesHashSet {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastMultiKeyHashSetContainer.class);
+
+ private final VectorMapJoinFastMultiKeyHashSet[] vectorMapJoinFastMultiKeyHashSets;
+ private final int numThreads;
+
+ public VectorMapJoinFastMultiKeyHashSetContainer(
+ boolean isFullOuter,
+ int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, int numHTs) {
+ this.vectorMapJoinFastMultiKeyHashSets = new VectorMapJoinFastMultiKeyHashSet[numHTs];
+ LOG.info("Initializing {} HT Containers ", numHTs);
+ for (int i = 0; i < numHTs; ++i) {
+ this.vectorMapJoinFastMultiKeyHashSets[i] =
+ new VectorMapJoinFastMultiKeyHashSet(isFullOuter, initialCapacity, loadFactor, writeBuffersSize,
+ estimatedKeyCount);
+ }
+ this.numThreads = numHTs;
+ }
+
+ @Override
+ public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+ byte[] keyBytes = currentKey.getBytes();
+ int keyLength = currentKey.getLength();
+ return HashCodeUtil.murmurHash(keyBytes, 0, keyLength);
+ }
+
+ @Override
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+ throws HiveException, IOException {
+ vectorMapJoinFastMultiKeyHashSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+ }
+
+ @Override
+ public long getEstimatedMemorySize() {
+ long estimatedMemorySize = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ estimatedMemorySize += vectorMapJoinFastMultiKeyHashSets[i].getEstimatedMemorySize();
+ }
+ return estimatedMemorySize;
+ }
+
+ @Override
+ public int size() {
+ int size = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ size += vectorMapJoinFastMultiKeyHashSets[i].size();
+ }
+ return size;
+ }
+
+ @Override
+ public MatchTracker createMatchTracker() {
+ int count = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ count += vectorMapJoinFastMultiKeyHashSets[i].logicalHashBucketCount;
+ }
+ return MatchTracker.create(count);
+ }
+
+ @Override
+ public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int spillPartitionId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JoinUtil.JoinResult contains(byte[] keyBytes, int keyStart, int keyLength,
+ VectorMapJoinHashSetResult hashSetResult) throws IOException {
+ long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
+ return vectorMapJoinFastMultiKeyHashSets[(int) ((numThreads - 1) & hashCode)].contains(keyBytes, keyStart, keyLength, hashSetResult);
+ }
+
+ @Override
+ public VectorMapJoinHashSetResult createHashSetResult() {
+ return new VectorMapJoinFastBytesHashSetStore.HashSetResult();
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java
index bc0f303..afc7c68 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeseriali
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +40,7 @@ public class VectorMapJoinFastStringCommon {
private BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
public boolean adaptPutRow(VectorMapJoinFastBytesHashTable hashTable,
- BytesWritable currentKey, BytesWritable currentValue) throws HiveException, IOException {
+ BytesWritable currentKey, BytesWritable currentValue, long hashCode) throws HiveException, IOException {
byte[] keyBytes = currentKey.getBytes();
int keyLength = currentKey.getLength();
@@ -49,20 +50,34 @@ public class VectorMapJoinFastStringCommon {
return false;
}
} catch (Exception e) {
- throw new HiveException(
- "\nDeserializeRead details: " +
- keyBinarySortableDeserializeRead.getDetailedReadPositionString() +
- "\nException: " + e.toString());
+ throw new HiveException("DeserializeRead details: " +
+ keyBinarySortableDeserializeRead.getDetailedReadPositionString(), e);
}
hashTable.add(
keyBinarySortableDeserializeRead.currentBytes,
keyBinarySortableDeserializeRead.currentBytesStart,
keyBinarySortableDeserializeRead.currentBytesLength,
- currentValue);
+ currentValue, hashCode);
return true;
}
+ public long calculateLongHashCode(BytesWritable currentKey) throws HiveException, IOException {
+ byte[] keyBytes = currentKey.getBytes();
+ int keyLength = currentKey.getLength();
+ keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+ try {
+ if (!keyBinarySortableDeserializeRead.readNextField()) {
+ return 0;
+ }
+ } catch (Exception e) {
+ throw new HiveException("DeserializeRead details: " +
+ keyBinarySortableDeserializeRead.getDetailedReadPositionString(), e);
+ }
+ return HashCodeUtil.murmurHash(keyBinarySortableDeserializeRead.currentBytes, keyBinarySortableDeserializeRead.currentBytesStart,
+ keyBinarySortableDeserializeRead.currentBytesLength);
+ }
+
public VectorMapJoinFastStringCommon(TableDesc tableDesc) {
PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo };
keyBinarySortableDeserializeRead = BinarySortableDeserializeRead.with(
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
index f5c5e42..182ca2b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
@@ -34,8 +34,9 @@ public class VectorMapJoinFastStringHashMap extends VectorMapJoinFastBytesHashMa
private VectorMapJoinFastStringCommon stringCommon;
@Override
- public void putRow(BytesWritable currentKey, BytesWritable currentValue) throws HiveException, IOException {
- if (!stringCommon.adaptPutRow(this, currentKey, currentValue)) {
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+ throws HiveException, IOException {
+ if (!stringCommon.adaptPutRow(this, currentKey, currentValue, hashCode)) {
// Ignore NULL keys, except for FULL OUTER.
if (isFullOuter) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMapContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMapContainer.java
new file mode 100644
index 0000000..ab7e3bf
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMapContainer.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Single STRING key hash map optimized for vector map join.
+ *
+ * The key will be deserialized and just the bytes will be stored.
+ */
+public class VectorMapJoinFastStringHashMapContainer extends VectorMapJoinFastHashTableContainerBase implements
+ VectorMapJoinBytesHashMap {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastStringHashMapContainer.class);
+
+ private final VectorMapJoinFastStringHashMap[] vectorMapJoinFastStringHashMaps;
+ private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+ private final int numThreads;
+
+ public VectorMapJoinFastStringHashMapContainer(
+ boolean isFullOuter,
+ int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+ int numHTs) {
+ this.vectorMapJoinFastStringHashMaps = new VectorMapJoinFastStringHashMap[numHTs];
+ LOG.info("Initializing {} HT Containers ", numHTs);
+ for (int i = 0; i < numHTs; ++i) {
+ vectorMapJoinFastStringHashMaps[i] = new VectorMapJoinFastStringHashMap(isFullOuter,
+ initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+ }
+ PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo };
+ this.keyBinarySortableDeserializeRead =
+ BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+ this.numThreads = numHTs;
+ }
+
+ private static class NonMatchedBytesHashMapIterator extends VectorMapJoinFastNonMatchedIterator {
+
+ private VectorMapJoinFastBytesHashMap.NonMatchedBytesHashMapIterator[] hashMapIterators;
+ private int index;
+ private int numThreads;
+
+ NonMatchedBytesHashMapIterator(MatchTracker matchTracker,
+ VectorMapJoinFastStringHashMap[] hashMaps, int numThreads) {
+ super(matchTracker);
+ hashMapIterators = new VectorMapJoinFastBytesHashMap.NonMatchedBytesHashMapIterator[4];
+ for (int i = 0; i < numThreads; ++i) {
+ hashMapIterators[i] = new VectorMapJoinFastBytesHashMap.NonMatchedBytesHashMapIterator(matchTracker,
+ hashMaps[i]);
+ }
+ index = 0;
+ this.numThreads = numThreads;
+ }
+
+ @Override
+ public void init() {
+ for (int i = 0; i < numThreads; ++i) {
+ hashMapIterators[i].init();
+ }
+ index = 0;
+ }
+
+ @Override
+ public boolean findNextNonMatched() {
+ for (; index < numThreads; ++index) {
+ if (hashMapIterators[index].findNextNonMatched()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean readNonMatchedBytesKey() throws HiveException {
+ return hashMapIterators[index].readNonMatchedBytesKey();
+ }
+
+ @Override
+ public byte[] getNonMatchedBytes() {
+ return hashMapIterators[index].getNonMatchedBytes();
+ }
+
+ @Override
+ public int getNonMatchedBytesOffset() {
+ return hashMapIterators[index].getNonMatchedBytesOffset();
+ }
+
+ @Override
+ public int getNonMatchedBytesLength() {
+ return hashMapIterators[index].getNonMatchedBytesLength();
+ }
+
+ @Override
+ public VectorMapJoinHashMapResult getNonMatchedHashMapResult() {
+ return hashMapIterators[index].getNonMatchedHashMapResult();
+ }
+ }
+
+ @Override
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+ throws HiveException, IOException {
+ vectorMapJoinFastStringHashMaps[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+ }
+
+ @Override
+ public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+ byte[] keyBytes = currentKey.getBytes();
+ int keyLength = currentKey.getLength();
+ keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+ try {
+ if (!keyBinarySortableDeserializeRead.readNextField()) {
+ return 0;
+ }
+ } catch (Exception e) {
+ throw new HiveException("DeserializeRead details: " +
+ keyBinarySortableDeserializeRead.getDetailedReadPositionString(), e);
+ }
+ return HashCodeUtil.murmurHash(
+ keyBinarySortableDeserializeRead.currentBytes,
+ keyBinarySortableDeserializeRead.currentBytesStart,
+ keyBinarySortableDeserializeRead.currentBytesLength);
+ }
+
+ @Override
+ public long getEstimatedMemorySize() {
+ long estimatedMemorySize = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ estimatedMemorySize += vectorMapJoinFastStringHashMaps[i].getEstimatedMemorySize();
+ }
+ return estimatedMemorySize;
+ }
+
+ @Override
+ public int size() {
+ int size = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ size += vectorMapJoinFastStringHashMaps[i].size();
+ }
+ return size;
+ }
+
+ @Override
+ public MatchTracker createMatchTracker() {
+ int count = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ count += vectorMapJoinFastStringHashMaps[i].logicalHashBucketCount;
+ }
+ return MatchTracker.create(count);
+ }
+
+ @Override
+ public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+ return new NonMatchedBytesHashMapIterator(matchTracker, vectorMapJoinFastStringHashMaps, numThreads);
+ }
+
+ @Override
+ public int spillPartitionId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JoinUtil.JoinResult lookup(byte[] keyBytes, int keyStart, int keyLength,
+ VectorMapJoinHashMapResult hashMapResult) throws IOException {
+ long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
+ return vectorMapJoinFastStringHashMaps[(int) ((numThreads - 1) & hashCode)].lookup(keyBytes, keyStart, keyLength, hashMapResult);
+ }
+
+ @Override
+ public JoinUtil.JoinResult lookup(byte[] keyBytes, int keyStart, int keyLength,
+ VectorMapJoinHashMapResult hashMapResult, MatchTracker matchTracker) throws IOException {
+ long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
+ return vectorMapJoinFastStringHashMaps[(int) ((numThreads - 1) & hashCode)].lookup(keyBytes, keyStart, keyLength, hashMapResult,
+ matchTracker);
+ }
+
+ @Override
+ public VectorMapJoinHashMapResult createHashMapResult() {
+ return new VectorMapJoinFastBytesHashMapStore.HashMapResult();
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
index da37369..7586c60 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
@@ -36,8 +36,9 @@ public class VectorMapJoinFastStringHashMultiSet extends VectorMapJoinFastBytesH
private long fullOuterNullKeyValueCount;
@Override
- public void putRow(BytesWritable currentKey, BytesWritable currentValue) throws HiveException, IOException {
- if (!stringCommon.adaptPutRow(this, currentKey, currentValue)) {
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+ throws HiveException, IOException {
+ if (!stringCommon.adaptPutRow(this, currentKey, currentValue, hashCode)) {
// Ignore NULL keys, except for FULL OUTER.
if (isFullOuter) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSetContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSetContainer.java
new file mode 100644
index 0000000..df4ac1e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSetContainer.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMultiSet;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Single STRING key hash multi-set optimized for vector map join.
+ *
+ * The key will be deserialized and just the bytes will be stored.
+ */
+public class VectorMapJoinFastStringHashMultiSetContainer extends VectorMapJoinFastHashTableContainerBase implements
+ VectorMapJoinBytesHashMultiSet {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastStringHashMultiSetContainer.class);
+
+ private final VectorMapJoinFastStringHashMultiSet[] vectorMapJoinFastStringHashMultiSets;
+ private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+ private final int numThreads;
+
+ public VectorMapJoinFastStringHashMultiSetContainer(
+ boolean isFullOuter,
+ int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+ int numHTs) {
+ vectorMapJoinFastStringHashMultiSets = new VectorMapJoinFastStringHashMultiSet[4];
+ LOG.info("Initializing {} HT Containers ", numHTs);
+ for (int i = 0; i < numHTs; ++i) {
+ vectorMapJoinFastStringHashMultiSets[i] = new VectorMapJoinFastStringHashMultiSet(
+ isFullOuter,
+ initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+ }
+ PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo };
+ keyBinarySortableDeserializeRead =
+ BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+ this.numThreads = numHTs;
+ }
+
+ @Override
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+ throws HiveException, IOException {
+ vectorMapJoinFastStringHashMultiSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+ }
+
+ @Override
+ public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+ byte[] keyBytes = currentKey.getBytes();
+ int keyLength = currentKey.getLength();
+ keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+ try {
+ if (!keyBinarySortableDeserializeRead.readNextField()) {
+ return 0;
+ }
+ } catch (Exception e) {
+ throw new HiveException("DeserializeRead details: " +
+ keyBinarySortableDeserializeRead.getDetailedReadPositionString(), e);
+ }
+ return HashCodeUtil.murmurHash(
+ keyBinarySortableDeserializeRead.currentBytes,
+ keyBinarySortableDeserializeRead.currentBytesStart,
+ keyBinarySortableDeserializeRead.currentBytesLength);
+ }
+
+ @Override
+ public long getEstimatedMemorySize() {
+ long estimatedMemorySize = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ estimatedMemorySize += vectorMapJoinFastStringHashMultiSets[i].getEstimatedMemorySize();
+ }
+ return estimatedMemorySize;
+ }
+
+ @Override
+ public int size() {
+ int size = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ size += vectorMapJoinFastStringHashMultiSets[i].size();
+ }
+ return size;
+ }
+
+ @Override
+ public MatchTracker createMatchTracker() {
+ int count = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ count += vectorMapJoinFastStringHashMultiSets[i].logicalHashBucketCount;
+ }
+ return MatchTracker.create(count);
+ }
+
+ @Override
+ public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int spillPartitionId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JoinUtil.JoinResult contains(byte[] keyBytes, int keyStart, int keyLength,
+ VectorMapJoinHashMultiSetResult hashMultiSetResult) throws IOException {
+ long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
+ return vectorMapJoinFastStringHashMultiSets[(int) ((numThreads - 1) & hashCode)].contains(keyBytes, keyStart, keyLength,
+ hashMultiSetResult);
+ }
+
+ @Override
+ public VectorMapJoinHashMultiSetResult createHashMultiSetResult() {
+ return new VectorMapJoinFastBytesHashMultiSetStore.HashMultiSetResult();
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
index 4ece0d1..3a96d9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
@@ -34,11 +34,11 @@ public class VectorMapJoinFastStringHashSet extends VectorMapJoinFastBytesHashSe
private VectorMapJoinFastStringCommon stringCommon;
@Override
- public void putRow(BytesWritable currentKey, BytesWritable currentValue)
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
throws HiveException, IOException {
// Ignore NULL keys (HashSet not used for FULL OUTER).
- stringCommon.adaptPutRow(this, currentKey, currentValue);
+ stringCommon.adaptPutRow(this, currentKey, currentValue, hashCode);
}
public VectorMapJoinFastStringHashSet(
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSetContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSetContainer.java
new file mode 100644
index 0000000..85b71e8
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSetContainer.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashSet;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Single STRING key hash set optimized for vector map join.
+ *
+ * The key will be deserialized and just the bytes will be stored.
+ */
+public class VectorMapJoinFastStringHashSetContainer extends VectorMapJoinFastHashTableContainerBase implements
+ VectorMapJoinBytesHashSet {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastStringHashSetContainer.class);
+
+ private final VectorMapJoinFastStringHashSet[] vectorMapJoinFastStringHashSets;
+ private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+ private final int numThreads;
+
+ public VectorMapJoinFastStringHashSetContainer(
+ boolean isFullOuter,
+ int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+ int numHTs) {
+ this.vectorMapJoinFastStringHashSets = new VectorMapJoinFastStringHashSet[numHTs];
+ LOG.info("Initializing {} HT Containers ", numHTs);
+ for (int i = 0; i < numHTs; ++i) {
+ vectorMapJoinFastStringHashSets[i] = new VectorMapJoinFastStringHashSet(
+ isFullOuter,
+ initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+ }
+ PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo };
+ this.keyBinarySortableDeserializeRead =
+ BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+ this.numThreads = numHTs;
+ }
+
+ @Override
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+ throws HiveException, IOException {
+ vectorMapJoinFastStringHashSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+ }
+
+ @Override
+ public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+ byte[] keyBytes = currentKey.getBytes();
+ int keyLength = currentKey.getLength();
+ keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+ try {
+ if (!keyBinarySortableDeserializeRead.readNextField()) {
+ return 0;
+ }
+ } catch (Exception e) {
+ throw new HiveException("DeserializeRead details: " +
+ keyBinarySortableDeserializeRead.getDetailedReadPositionString(), e);
+ }
+ return HashCodeUtil.murmurHash(
+ keyBinarySortableDeserializeRead.currentBytes,
+ keyBinarySortableDeserializeRead.currentBytesStart,
+ keyBinarySortableDeserializeRead.currentBytesLength);
+ }
+
+ @Override
+ public long getEstimatedMemorySize() {
+ long estimatedMemorySize = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ estimatedMemorySize += vectorMapJoinFastStringHashSets[i].getEstimatedMemorySize();
+ }
+ return estimatedMemorySize;
+ }
+
+ @Override
+ public int size() {
+ int size = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ size += vectorMapJoinFastStringHashSets[i].size();
+ }
+ return size;
+ }
+
+ @Override
+ public MatchTracker createMatchTracker() {
+ int count = 0;
+ for (int i = 0; i < numThreads; ++i) {
+ count += vectorMapJoinFastStringHashSets[i].logicalHashBucketCount;
+ }
+ return MatchTracker.create(count);
+ }
+
+ @Override
+ public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int spillPartitionId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JoinUtil.JoinResult contains(byte[] keyBytes, int keyStart, int keyLength,
+ VectorMapJoinHashSetResult hashSetResult) throws IOException {
+ long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
+ return vectorMapJoinFastStringHashSets[(int) ((numThreads - 1) & hashCode)].contains(keyBytes, keyStart, keyLength, hashSetResult);
+ }
+
+ @Override
+ public VectorMapJoinHashSetResult createHashSetResult() {
+ return new VectorMapJoinFastBytesHashSetStore.HashSetResult();
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
index ffc9fbf..1b281ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
@@ -19,22 +19,18 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
import java.io.IOException;
-import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.NonMatchedSmallTableIterator;
import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinTableContainer;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
-import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -45,9 +41,7 @@ import org.apache.hadoop.io.Writable;
* HashTableLoader for Tez constructs the hashtable from records read from
* a broadcast edge.
*/
-public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContainer {
-
- private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastTableContainer.class.getName());
+public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContainer, VectorMapJoinHashTable {
private final MapJoinDesc desc;
private final Configuration hconf;
@@ -55,16 +49,15 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
private final float keyCountAdj;
private final int threshold;
private final float loadFactor;
- private final int wbSize;
+ private final VectorMapJoinFastHashTableContainerBase INSTANCE;
private final long estimatedKeyCount;
-
- private final VectorMapJoinFastHashTable vectorMapJoinFastHashTable;
private String key;
+ private int numHTs;
public VectorMapJoinFastTableContainer(MapJoinDesc desc, Configuration hconf,
- long estimatedKeyCount) throws SerDeException {
+ long estimatedKeys, int numHTs) throws SerDeException {
this.desc = desc;
this.hconf = hconf;
@@ -72,21 +65,19 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
keyCountAdj = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT);
threshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD);
loadFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR);
- wbSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE);
- this.estimatedKeyCount = estimatedKeyCount;
+ this.numHTs = numHTs;
+ this.estimatedKeyCount = estimatedKeys > numHTs ? (estimatedKeys/ numHTs) : estimatedKeys;
- int newThreshold = HashMapWrapper.calculateTableSize(
- keyCountAdj, threshold, loadFactor, estimatedKeyCount);
+ int newThreshold = HashMapWrapper.calculateTableSize(keyCountAdj, threshold, loadFactor, estimatedKeyCount);
// LOG.debug("VectorMapJoinFastTableContainer load newThreshold " + newThreshold);
-
- vectorMapJoinFastHashTable = createHashTable(newThreshold);
+ this.INSTANCE = createHashTables(newThreshold);
}
@Override
public VectorMapJoinHashTable vectorMapJoinHashTable() {
- return vectorMapJoinFastHashTable;
+ return this.INSTANCE;
}
@Override
@@ -99,10 +90,9 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
return key;
}
- private VectorMapJoinFastHashTable createHashTable(int newThreshold) {
+ private VectorMapJoinFastHashTableContainerBase createHashTables(int newThreshold) {
VectorMapJoinDesc vectorDesc = (VectorMapJoinDesc) desc.getVectorDesc();
- HashTableImplementationType hashTableImplementationType = vectorDesc.getHashTableImplementationType();
HashTableKind hashTableKind = vectorDesc.getHashTableKind();
HashTableKeyType hashTableKeyType = vectorDesc.getHashTableKeyType();
boolean isFullOuter = vectorDesc.getIsFullOuter();
@@ -110,8 +100,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
int writeBufferSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE);
- VectorMapJoinFastHashTable hashTable = null;
-
+ VectorMapJoinFastHashTableContainerBase htWrapper = null;
switch (hashTableKeyType) {
case BOOLEAN:
case BYTE:
@@ -121,25 +110,16 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
case LONG:
switch (hashTableKind) {
case HASH_MAP:
- hashTable = new VectorMapJoinFastLongHashMap(
- isFullOuter,
- minMaxEnabled,
- hashTableKeyType,
- newThreshold, loadFactor, writeBufferSize, estimatedKeyCount, desc.getKeyTblDesc());
+ htWrapper = new VectorMapJoinFastLongHashMapContainer(isFullOuter, minMaxEnabled,
+ hashTableKeyType, newThreshold, loadFactor, writeBufferSize, estimatedKeyCount, desc.getKeyTblDesc(), numHTs);
break;
case HASH_MULTISET:
- hashTable = new VectorMapJoinFastLongHashMultiSet(
- isFullOuter,
- minMaxEnabled,
- hashTableKeyType,
- newThreshold, loadFactor, writeBufferSize, estimatedKeyCount, desc.getKeyTblDesc());
+ htWrapper = new VectorMapJoinFastLongHashMultiSetContainer(isFullOuter, minMaxEnabled,
+ hashTableKeyType, newThreshold, loadFactor, writeBufferSize, estimatedKeyCount, desc.getKeyTblDesc(), numHTs);
break;
case HASH_SET:
- hashTable = new VectorMapJoinFastLongHashSet(
- isFullOuter,
- minMaxEnabled,
- hashTableKeyType,
- newThreshold, loadFactor, writeBufferSize, estimatedKeyCount, desc.getKeyTblDesc());
+ htWrapper = new VectorMapJoinFastLongHashSetContainer(isFullOuter, minMaxEnabled,
+ hashTableKeyType, newThreshold, loadFactor, writeBufferSize, estimatedKeyCount, desc.getKeyTblDesc(), numHTs);
break;
}
break;
@@ -147,19 +127,16 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
case STRING:
switch (hashTableKind) {
case HASH_MAP:
- hashTable = new VectorMapJoinFastStringHashMap(
- isFullOuter,
- newThreshold, loadFactor, writeBufferSize, estimatedKeyCount, desc.getKeyTblDesc());
+ htWrapper = new VectorMapJoinFastStringHashMapContainer(isFullOuter, newThreshold,
+ loadFactor, writeBufferSize, estimatedKeyCount, desc.getKeyTblDesc(), numHTs);
break;
case HASH_MULTISET:
- hashTable = new VectorMapJoinFastStringHashMultiSet(
- isFullOuter,
- newThreshold, loadFactor, writeBufferSize, estimatedKeyCount, desc.getKeyTblDesc());
+ htWrapper = new VectorMapJoinFastStringHashMultiSetContainer(isFullOuter, newThreshold,
+ loadFactor, writeBufferSize, estimatedKeyCount, desc.getKeyTblDesc(), numHTs);
break;
case HASH_SET:
- hashTable = new VectorMapJoinFastStringHashSet(
- isFullOuter,
- newThreshold, loadFactor, writeBufferSize, estimatedKeyCount, desc.getKeyTblDesc());
+ htWrapper = new VectorMapJoinFastStringHashSetContainer(isFullOuter, newThreshold,
+ loadFactor, writeBufferSize, estimatedKeyCount, desc.getKeyTblDesc(), numHTs);
break;
}
break;
@@ -167,34 +144,40 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
case MULTI_KEY:
switch (hashTableKind) {
case HASH_MAP:
- hashTable = new VectorMapJoinFastMultiKeyHashMap(
- isFullOuter,
- newThreshold, loadFactor, writeBufferSize, estimatedKeyCount);
+ htWrapper = new VectorMapJoinFastMultiKeyHashMapContainer(isFullOuter, newThreshold,
+ loadFactor, writeBufferSize, estimatedKeyCount, numHTs);
break;
case HASH_MULTISET:
- hashTable = new VectorMapJoinFastMultiKeyHashMultiSet(
- isFullOuter,
- newThreshold, loadFactor, writeBufferSize, estimatedKeyCount);
+ htWrapper = new VectorMapJoinFastMultiKeyHashMultiSetContainer(isFullOuter, newThreshold,
+ loadFactor, writeBufferSize, estimatedKeyCount, numHTs);
break;
case HASH_SET:
- hashTable = new VectorMapJoinFastMultiKeyHashSet(
- isFullOuter,
- newThreshold, loadFactor, writeBufferSize, estimatedKeyCount);
+ htWrapper = new VectorMapJoinFastMultiKeyHashSetContainer(isFullOuter, newThreshold,
+ loadFactor, writeBufferSize, estimatedKeyCount, numHTs);
break;
}
break;
}
+ return htWrapper;
+ }
- return hashTable;
+ public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+ return INSTANCE.getHashCode(currentKey);
}
@Override
public MapJoinKey putRow(Writable currentKey, Writable currentValue)
throws SerDeException, HiveException, IOException {
+ long hashCode = INSTANCE.getHashCode((BytesWritable) currentKey);
+ INSTANCE.putRow(hashCode, (BytesWritable) currentKey, (BytesWritable) currentValue);
+ return null;
+ }
+ @Override
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+ throws SerDeException, HiveException, IOException {
// We are not using the key and value contexts, nor do we support a MapJoinKey.
- vectorMapJoinFastHashTable.putRow((BytesWritable) currentKey, (BytesWritable) currentValue);
- return null;
+ INSTANCE.putRow(hashCode, currentKey, currentValue);
}
@Override
@@ -208,9 +191,8 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
}
@Override
- public NonMatchedSmallTableIterator createNonMatchedSmallTableIterator(
- MatchTracker matchTracker) {
- throw new RuntimeException("Not applicable");
+ public NonMatchedSmallTableIterator createNonMatchedSmallTableIterator(MatchTracker matchTracker) {
+ throw new UnsupportedOperationException();
}
@Override
@@ -220,7 +202,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
@Override
public MapJoinKey getAnyKey() {
- throw new RuntimeException("Not applicable");
+ throw new UnsupportedOperationException();
}
@Override
@@ -234,19 +216,33 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
}
@Override
+ public boolean containsLongKey(long currentKey) {
+ return INSTANCE.containsLongKey(currentKey);
+ }
+
+ @Override
public int size() {
- return vectorMapJoinFastHashTable.size();
+ return INSTANCE.size();
+ }
+
+ @Override
+ public MatchTracker createMatchTracker() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int spillPartitionId() {
+ throw new UnsupportedOperationException();
}
@Override
public long getEstimatedMemorySize() {
- JavaDataModel jdm = JavaDataModel.get();
- long size = 0;
- size += vectorMapJoinFastHashTable.getEstimatedMemorySize();
- size += (4 * jdm.primitive1());
- size += (2 * jdm.object());
- size += (jdm.primitive2());
- return size;
+ return INSTANCE.getEstimatedMemorySize();
}
@Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
index 93e8440..40ec3b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
@@ -32,13 +32,12 @@ import org.apache.hadoop.io.BytesWritable;
*/
public interface VectorMapJoinHashTable extends MemoryEstimate {
- /*
- * @param currentKey
- * The current key.
- * @param currentValue
- * The current value.
+ /**
+ * @param hashCode current HashCode to avoid re-computation
+ * @param currentKey The current Key in bytes
+ * @param currentValue The current Value in bytes
*/
- void putRow(BytesWritable currentKey, BytesWritable currentValue)
+ void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
throws SerDeException, HiveException, IOException;
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
index 13d24f7..13c1cce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec.vector.mapjoin.optimized;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.slf4j.Logger;
@@ -60,7 +61,7 @@ public abstract class VectorMapJoinOptimizedHashTable
@Override
public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
- throw new RuntimeException("Not implemented");
+ throw new UnsupportedOperationException();
}
@Override
@@ -69,16 +70,16 @@ public abstract class VectorMapJoinOptimizedHashTable
}
@Override
- public void putRow(BytesWritable currentKey, BytesWritable currentValue)
+ public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
throws SerDeException, HiveException, IOException {
-
- putRowInternal(currentKey, currentValue);
+ // Method only supported by FAST HashTable implementations
+ throw new UnsupportedEncodingException();
}
@Override
public boolean containsLongKey(long currentKey) {
// Method only supported by FAST HashTable implementations
- throw new RuntimeException("Not implemented");
+ throw new UnsupportedOperationException();
}
protected void putRowInternal(BytesWritable key, BytesWritable value)
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestVectorMapJoinFastHashTable.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestVectorMapJoinFastHashTable.java
index cb8ac38..d8a076a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestVectorMapJoinFastHashTable.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestVectorMapJoinFastHashTable.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
public class TestVectorMapJoinFastHashTable {
+ // TODO HIVE-25145
long keyCount = 15_000_000;
private static final Logger LOG = LoggerFactory.getLogger(TestVectorMapJoinFastHashTable.class.getName());
@@ -72,7 +73,7 @@ public class TestVectorMapJoinFastHashTable {
keyTblDesc.setProperties(new Properties());
desc.setKeyTblDesc(keyTblDesc);
Configuration hconf = new HiveConf();
- VectorMapJoinFastTableContainer container = new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
+ VectorMapJoinFastTableContainer container = new VectorMapJoinFastTableContainer(desc, hconf, keyCount, 1);
container.setSerde(null, null);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java
index 68f5338..e4674d8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java
@@ -925,7 +925,7 @@ public class MapJoinTestConfig {
case FAST:
mapJoinTableContainer =
new VectorMapJoinFastTableContainer(
- mapJoinDesc, testDesc.hiveConf, testData.smallTableKeyHashMap.size());
+ mapJoinDesc, testDesc.hiveConf, testData.smallTableKeyHashMap.size(), 1);
break;
default:
throw new RuntimeException("Unexpected hash table implementation type " + vectorDesc.getHashTableImplementationType());
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java
index 476ecd3..5a9f180 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java
@@ -313,7 +313,7 @@ public class CheckFastRowHashMap extends CheckFastHashTable {
return array[index].getValues();
}
- public void verify(VectorMapJoinFastHashTable map,
+ public void verify(VectorMapJoinFastHashTableContainerBase map,
HashTableKeyType hashTableKeyType,
TypeInfo[] valueTypeInfos, boolean doClipping,
boolean useExactBytes, Random random) throws IOException {
@@ -338,7 +338,7 @@ public class CheckFastRowHashMap extends CheckFastHashTable {
{
Object[] keyRow = element.getKeyRow();
Object keyObject = keyRow[0];
- VectorMapJoinFastLongHashMap longHashMap = (VectorMapJoinFastLongHashMap) map;
+ VectorMapJoinFastLongHashMapContainer longHashMap = (VectorMapJoinFastLongHashMapContainer) map;
hashMapResult = longHashMap.createHashMapResult();
long longKey;
switch (hashTableKeyType) {
@@ -361,7 +361,7 @@ public class CheckFastRowHashMap extends CheckFastHashTable {
throw new RuntimeException("Unexpected hash table key type " + hashTableKeyType.name());
}
joinResult = longHashMap.lookup(longKey, hashMapResult);
- if (joinResult != JoinUtil.JoinResult.MATCH || !longHashMap.containsLongKey(longKey)) {
+ if (joinResult != JoinUtil.JoinResult.MATCH) {
assertTrue(false);
}
}
@@ -370,7 +370,7 @@ public class CheckFastRowHashMap extends CheckFastHashTable {
{
Object[] keyRow = element.getKeyRow();
Object keyObject = keyRow[0];
- VectorMapJoinFastStringHashMap stringHashMap = (VectorMapJoinFastStringHashMap) map;
+ VectorMapJoinFastStringHashMapContainer stringHashMap = (VectorMapJoinFastStringHashMapContainer) map;
hashMapResult = stringHashMap.createHashMapResult();
Text text = (Text) keyObject;
byte[] bytes = text.getBytes();
@@ -384,7 +384,7 @@ public class CheckFastRowHashMap extends CheckFastHashTable {
case MULTI_KEY:
{
byte[] keyBytes = element.getKey();
- VectorMapJoinFastMultiKeyHashMap stringHashMap = (VectorMapJoinFastMultiKeyHashMap) map;
+ VectorMapJoinFastMultiKeyHashMapContainer stringHashMap = (VectorMapJoinFastMultiKeyHashMapContainer) map;
hashMapResult = stringHashMap.createHashMapResult();
joinResult = stringHashMap.lookup(keyBytes, 0, keyBytes.length, hashMapResult);
if (joinResult != JoinUtil.JoinResult.MATCH) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastRowHashMap.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastRowHashMap.java
index e4fb98f..f5eb68c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastRowHashMap.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastRowHashMap.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.CheckFastRowHashMap.Ve
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
-import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
@@ -46,7 +45,7 @@ import org.junit.Before;
import org.junit.Test;
/*
- * An multi-key value hash map optimized for vector map join.
+ * Multi-key value hash map optimized for vector map join.
*
* The key is uninterpreted bytes.
*/
@@ -61,10 +60,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
}
private void addAndVerifyRows(VectorRandomRowSource valueSource, Object[][] rows,
- VectorMapJoinFastHashTable map, HashTableKeyType hashTableKeyType,
+ VectorMapJoinFastHashTableContainerBase map, HashTableKeyType hashTableKeyType,
VerifyFastRowHashMap verifyTable, String[] keyTypeNames,
boolean doClipping, boolean useExactBytes)
- throws HiveException, IOException, SerDeException {
+ throws HiveException, IOException {
final int keyCount = keyTypeNames.length;
PrimitiveTypeInfo[] keyPrimitiveTypeInfos = new PrimitiveTypeInfo[keyCount];
@@ -136,7 +135,8 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
// Serialize keyRow into key bytes.
BytesWritable keyWritable = new BytesWritable(key);
BytesWritable valueWritable = new BytesWritable(value);
- map.putRow(keyWritable, valueWritable);
+ long hashcode = map.getHashCode(keyWritable);
+ map.putRow(hashcode, keyWritable, valueWritable);
// verifyTable.verify(map);
}
verifyTable.verify(map, hashTableKeyType, valueTypeInfos,
@@ -148,10 +148,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(927337);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastLongHashMap map =
- new VectorMapJoinFastLongHashMap(
+ VectorMapJoinFastLongHashMapContainer map =
+ new VectorMapJoinFastLongHashMapContainer(
false, false, HashTableKeyType.LONG,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -175,10 +175,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(927337);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastLongHashMap map =
- new VectorMapJoinFastLongHashMap(
+ VectorMapJoinFastLongHashMapContainer map =
+ new VectorMapJoinFastLongHashMapContainer(
false, false, HashTableKeyType.INT,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -202,10 +202,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(927337);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastStringHashMap map =
- new VectorMapJoinFastStringHashMap(
+ VectorMapJoinFastStringHashMapContainer map =
+ new VectorMapJoinFastStringHashMapContainer(
false,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -229,10 +229,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(833);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastMultiKeyHashMap map =
- new VectorMapJoinFastMultiKeyHashMap(
+ VectorMapJoinFastMultiKeyHashMapContainer map =
+ new VectorMapJoinFastMultiKeyHashMapContainer(
false,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -256,10 +256,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(833099);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastMultiKeyHashMap map =
- new VectorMapJoinFastMultiKeyHashMap(
+ VectorMapJoinFastMultiKeyHashMapContainer map =
+ new VectorMapJoinFastMultiKeyHashMapContainer(
false,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -283,10 +283,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(833099);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastMultiKeyHashMap map =
- new VectorMapJoinFastMultiKeyHashMap(
+ VectorMapJoinFastMultiKeyHashMapContainer map =
+ new VectorMapJoinFastMultiKeyHashMapContainer(
false,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -310,10 +310,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(326232);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastLongHashMap map =
- new VectorMapJoinFastLongHashMap(
+ VectorMapJoinFastLongHashMapContainer map =
+ new VectorMapJoinFastLongHashMapContainer(
false, false, HashTableKeyType.LONG,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -337,10 +337,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(326232);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastLongHashMap map =
- new VectorMapJoinFastLongHashMap(
+ VectorMapJoinFastLongHashMapContainer map =
+ new VectorMapJoinFastLongHashMapContainer(
false, false, HashTableKeyType.INT,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -364,10 +364,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(326232);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastStringHashMap map =
- new VectorMapJoinFastStringHashMap(
+ VectorMapJoinFastStringHashMapContainer map =
+ new VectorMapJoinFastStringHashMapContainer(
false,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -391,10 +391,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(2331);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastMultiKeyHashMap map =
- new VectorMapJoinFastMultiKeyHashMap(
+ VectorMapJoinFastMultiKeyHashMapContainer map =
+ new VectorMapJoinFastMultiKeyHashMapContainer(
false,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -418,10 +418,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(7403);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastMultiKeyHashMap map =
- new VectorMapJoinFastMultiKeyHashMap(
+ VectorMapJoinFastMultiKeyHashMapContainer map =
+ new VectorMapJoinFastMultiKeyHashMapContainer(
false,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -445,10 +445,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(99);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastMultiKeyHashMap map =
- new VectorMapJoinFastMultiKeyHashMap(
+ VectorMapJoinFastMultiKeyHashMapContainer map =
+ new VectorMapJoinFastMultiKeyHashMapContainer(
false,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -473,10 +473,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(27722);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastLongHashMap map =
- new VectorMapJoinFastLongHashMap(
+ VectorMapJoinFastLongHashMapContainer map =
+ new VectorMapJoinFastLongHashMapContainer(
false, false, HashTableKeyType.LONG,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -500,10 +500,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(8238383);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastLongHashMap map =
- new VectorMapJoinFastLongHashMap(
+ VectorMapJoinFastLongHashMapContainer map =
+ new VectorMapJoinFastLongHashMapContainer(
false, false, HashTableKeyType.INT,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -527,10 +527,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(8235);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastStringHashMap map =
- new VectorMapJoinFastStringHashMap(
+ VectorMapJoinFastStringHashMapContainer map =
+ new VectorMapJoinFastStringHashMapContainer(
false,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -554,10 +554,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(8235);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastMultiKeyHashMap map =
- new VectorMapJoinFastMultiKeyHashMap(
+ VectorMapJoinFastMultiKeyHashMapContainer map =
+ new VectorMapJoinFastMultiKeyHashMapContainer(
false,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -581,10 +581,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(8235);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastMultiKeyHashMap map =
- new VectorMapJoinFastMultiKeyHashMap(
+ VectorMapJoinFastMultiKeyHashMapContainer map =
+ new VectorMapJoinFastMultiKeyHashMapContainer(
false,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -608,10 +608,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(8235);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastMultiKeyHashMap map =
- new VectorMapJoinFastMultiKeyHashMap(
+ VectorMapJoinFastMultiKeyHashMapContainer map =
+ new VectorMapJoinFastMultiKeyHashMapContainer(
false,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -635,10 +635,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(2122);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastLongHashMap map =
- new VectorMapJoinFastLongHashMap(
+ VectorMapJoinFastLongHashMapContainer map =
+ new VectorMapJoinFastLongHashMapContainer(
false, false, HashTableKeyType.LONG,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -662,10 +662,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(7520);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastLongHashMap map =
- new VectorMapJoinFastLongHashMap(
+ VectorMapJoinFastLongHashMapContainer map =
+ new VectorMapJoinFastLongHashMapContainer(
false, false, HashTableKeyType.INT,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -689,10 +689,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(7539);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastStringHashMap map =
- new VectorMapJoinFastStringHashMap(
+ VectorMapJoinFastStringHashMapContainer map =
+ new VectorMapJoinFastStringHashMapContainer(
false,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, tableDesc, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -716,10 +716,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(13);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastMultiKeyHashMap map =
- new VectorMapJoinFastMultiKeyHashMap(
+ VectorMapJoinFastMultiKeyHashMapContainer map =
+ new VectorMapJoinFastMultiKeyHashMapContainer(
false,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -743,10 +743,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(12);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastMultiKeyHashMap map =
- new VectorMapJoinFastMultiKeyHashMap(
+ VectorMapJoinFastMultiKeyHashMapContainer map =
+ new VectorMapJoinFastMultiKeyHashMapContainer(
false,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();
@@ -770,10 +770,10 @@ public class TestVectorMapJoinFastRowHashMap extends CommonFastHashTable {
random = new Random(7);
// Use a large capacity that doesn't require expansion, yet.
- VectorMapJoinFastMultiKeyHashMap map =
- new VectorMapJoinFastMultiKeyHashMap(
+ VectorMapJoinFastMultiKeyHashMapContainer map =
+ new VectorMapJoinFastMultiKeyHashMapContainer(
false,
- LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1);
+ LARGE_CAPACITY, LOAD_FACTOR, LARGE_WB_SIZE, -1, 4);
VerifyFastRowHashMap verifyTable = new VerifyFastRowHashMap();