You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/02/22 20:37:31 UTC

[GitHub] [hive] ramesh0201 opened a new pull request #2004: HIVE-24037 Parallelize hash table constructions in map joins

ramesh0201 opened a new pull request #2004:
URL: https://github.com/apache/hive/pull/2004


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://cwiki.apache.org/confluence/display/Hive/HowToContribute
     2. Ensure that you have created an issue on the Hive project JIRA: https://issues.apache.org/jira/projects/HIVE/summary
     3. Ensure you have added or run the appropriate tests for your PR: 
     4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]HIVE-XXXXX:  Your PR title ...'.
     5. Be sure to keep the PR description updated to reflect all changes.
     6. Please write your PR title to summarize what this PR proposes.
     7. If possible, provide a concise example to reproduce the issue for a faster review.
   
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description, screenshot and/or a reproducable example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Hive versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] github-actions[bot] closed pull request #2004: HIVE-24037 Parallelize hash table constructions in map joins

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #2004:
URL: https://github.com/apache/hive/pull/2004


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref commented on a change in pull request #2004: HIVE-24037 Parallelize hash table constructions in map joins

Posted by GitBox <gi...@apache.org>.
pgaref commented on a change in pull request #2004:
URL: https://github.com/apache/hive/pull/2004#discussion_r631130583



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerDirectAccess.java
##########
@@ -21,11 +21,15 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 
 public interface MapJoinTableContainerDirectAccess {
 
   void put(Writable currentKey, Writable currentValue) throws SerDeException, IOException;
 
+  long calculateLongHashCode(BytesWritable currentKey) throws HiveException, IOException, SerDeException;

Review comment:
       Nit. Shall we simplify this to something like getHashCode(BytesWritable currentKey)  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref commented on a change in pull request #2004: HIVE-24037 Parallelize hash table constructions in map joins

Posted by GitBox <gi...@apache.org>.
pgaref commented on a change in pull request #2004:
URL: https://github.com/apache/hive/pull/2004#discussion_r583062970



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerDirectAccess.java
##########
@@ -21,11 +21,15 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 
 public interface MapJoinTableContainerDirectAccess {
 
   void put(Writable currentKey, Writable currentValue) throws SerDeException, IOException;
 
+  long calculateLongHashCode(BytesWritable currentKey, BytesWritable currentValue) throws HiveException, IOException, SerDeException;

Review comment:
       lets use only the Key here

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
##########
@@ -667,6 +673,8 @@ private void setUpHashTable() {
         VectorMapJoinTableContainer vectorMapJoinTableContainer =
                 (VectorMapJoinTableContainer) mapJoinTables[posSingleVectorMapJoinSmallTable];
         vectorMapJoinHashTable = vectorMapJoinTableContainer.vectorMapJoinHashTable();
+        vectorMapJoinFastHashTableWrapper = ((VectorMapJoinFastHashTableParallel)vectorMapJoinTableContainer.

Review comment:
       Can we just keep it as a vectorMapJoinHashTable ? What is the reason for the extra interface (VectorMapJoinFastHashTableWrapper)

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
##########
@@ -48,7 +48,7 @@
 
   private long fullOuterNullKeyRefWord;
 
-  private static class NonMatchedBytesHashMapIterator extends VectorMapJoinFastNonMatchedIterator {
+  public static class NonMatchedBytesHashMapIterator extends VectorMapJoinFastNonMatchedIterator {

Review comment:
       Maybe move this along with NonMatchedLongHashMapIterator to their own class? 

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
##########
@@ -64,15 +64,19 @@ public void throwExpandError(int limit, String dataTypeName) {
 
   private static void validateCapacity(long capacity) {
     if (Long.bitCount(capacity) != 1) {
-      throw new AssertionError("Capacity must be a power of two");
+      throw new AssertionError("Capacity must be a power of two " + capacity);

Review comment:
       Nit. A better way to calculate if number is power of two would be:
   `capacity & (capacity -1) == 0`

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -71,6 +196,79 @@ public void init(ExecMapperContext context, MapredContext mrContext,
     String vertexName = hconf.get(Operator.CONTEXT_NAME_KEY, "");
     String counterName = Utilities.getVertexCounterName(HashTableLoaderCounters.HASHTABLE_LOAD_TIME_MS.name(), vertexName);
     this.htLoadCounter = tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup, counterName);
+    this.numEntries = 0;
+    totalEntries = new AtomicLong(0);
+  }
+
+  public void drainQueueAndLoad(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer, boolean doMemCheck,
+      String inputName, MemoryMonitorInfo memoryMonitorInfo, long effectiveThreshold, int partitionId,
+      BlockingQueue<QueueElementBatch>[] sharedQ)
+      throws InterruptedException, IOException, HiveException, SerDeException {
+    LOG.info("Draining thread " + partitionId + " started");
+    long entries = 0;
+    BlockingQueue<QueueElementBatch>[] partitionQ = sharedQ;

Review comment:
       is this assignment needed?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -71,6 +196,79 @@ public void init(ExecMapperContext context, MapredContext mrContext,
     String vertexName = hconf.get(Operator.CONTEXT_NAME_KEY, "");
     String counterName = Utilities.getVertexCounterName(HashTableLoaderCounters.HASHTABLE_LOAD_TIME_MS.name(), vertexName);
     this.htLoadCounter = tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup, counterName);
+    this.numEntries = 0;
+    totalEntries = new AtomicLong(0);
+  }
+
+  public void drainQueueAndLoad(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer, boolean doMemCheck,
+      String inputName, MemoryMonitorInfo memoryMonitorInfo, long effectiveThreshold, int partitionId,
+      BlockingQueue<QueueElementBatch>[] sharedQ)
+      throws InterruptedException, IOException, HiveException, SerDeException {
+    LOG.info("Draining thread " + partitionId + " started");
+    long entries = 0;
+    BlockingQueue<QueueElementBatch>[] partitionQ = sharedQ;
+    while(true) {
+      QueueElementBatch batch = partitionQ[partitionId].take();
+      if(batch == sentinel) {
+        break;
+      }
+      LOG.info("Draining thread " + partitionId + " got size "+ batch.getSize());
+      for (int i = 0; i < batch.getSize(); i++) {
+        try {
+          HashTableElement h = batch.getBatch(i);
+          vectorMapJoinFastTableContainer
+              .putRow(h.getKey(), h.getValue(), h.getHashCode(), h.getDeserializeKey());
+        }
+        catch(Exception e) {
+          LOG.info("Exception in draining thread put row: ",e);
+          throw new HiveException(e);
+        }
+        ++entries;
+      }
+      LOG.info("Draining thread added entries : "+ entries);
+      totalEntries.set(totalEntries.get() + entries);

Review comment:
       Maybe use getAndAdd instead?
   also entries variable seems not necessary -- we always increment by batchSize right?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMapParallel.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+// import org.slf4j.Logger;

Review comment:
       leftovers

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMapParallel.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+// import org.slf4j.Logger;
+// import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.common.MemoryEstimate;
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMap;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
+import org.apache.hadoop.hive.serde2.WriteBuffers.ByteSegmentRef;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.security.jgss.HttpCaller;
+
+/*
+ * An single LONG key hash map optimized for vector map join.
+ */
+public class VectorMapJoinFastLongHashMapParallel extends VectorMapJoinFastHashTableWrapper implements
+    VectorMapJoinLongHashMap, MemoryEstimate {
+
+  public static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMapParallel.class);
+  private VectorMapJoinFastLongHashMap[] vectorMapJoinFastLongHashMaps;
+
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+
+  HashTableKeyType hashTableKeyType;
+
+  @Override public boolean useMinMax() {
+    boolean useMinMax = false;
+    /*for (int i=0; i<4; ++i) {

Review comment:
       leftovers

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
##########
@@ -34,7 +34,7 @@
 
   protected final boolean isFullOuter;
 
-  protected int logicalHashBucketCount;
+  public int logicalHashBucketCount;

Review comment:
       lets keep this protected and use a getter is we actually need to retrieve 

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java
##########
@@ -1039,7 +1054,11 @@ protected void generateFullOuterStringKeySmallTableNoMatches() throws HiveExcept
   protected void fullOuterHashTableSetup() {
 
     // Always track key matches for FULL OUTER.
-    matchTracker = vectorMapJoinHashTable.createMatchTracker();
+    if (vectorMapJoinFastHashTableWrapper != null) {

Review comment:
       This code is repeating all over -- seems like if we just keep it as a VectorMapJoinHashTable we will avoid all these conditions

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
##########
@@ -44,13 +44,13 @@ public VectorMapJoinHashSetResult createHashSetResult() {
     return new VectorMapJoinFastBytesHashSetStore.HashSetResult();
   }
 
-  public void add(byte[] keyBytes, int keyStart, int keyLength, BytesWritable currentValue) {
+  public void add(byte[] keyBytes, int keyStart, int keyLength, BytesWritable currentValue, long hashCode) {
 
     if (checkResize()) {
       expandAndRehash();
     }
 
-    long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
+    //long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);

Review comment:
       leftover?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java
##########
@@ -20,16 +20,15 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableWrapper;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.*;

Review comment:
       fully qualified names for imports

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
##########
@@ -21,6 +21,7 @@
 import java.io.IOException;
 
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hive.common.util.HashCodeUtil;

Review comment:
       Unused import?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
##########
@@ -806,6 +806,12 @@ public void put(Writable currentKey, Writable currentValue) throws SerDeExceptio
     internalPutRow(directWriteHelper, currentKey, currentValue);
   }
 
+  @Override
+  public long calculateLongHashCode(BytesWritable currentKey, BytesWritable currentValue) throws HiveException, IOException, SerDeException {
+    directWriteHelper.setKeyValue(currentKey, currentValue);

Review comment:
       A bit confused here, we only need the Hash of the key right? 
   No need to pass value (could be null) -- or we could even call MurmurHash directly..

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
##########
@@ -45,12 +46,20 @@
 
   protected BytesWritable testKeyBytesWritable;
 
+
+  @Override
+  public void putRow(BytesWritable currentKey, BytesWritable currentValue)

Review comment:
       I guess this was here for testing? 
   We should either change the interface (to not use hashCode) or modify out code to use existing methods

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -54,11 +56,134 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastHashTableLoader.class.getName());
 
+  public static final int HASHTABLE_EXECUTOR_THREADS = 4;

Review comment:
       are we going to make this configurable?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
##########
@@ -49,13 +49,13 @@ public VectorMapJoinHashMultiSetResult createHashMultiSetResult() {
     return new VectorMapJoinFastBytesHashMultiSetStore.HashMultiSetResult();
   }
 
-  public void add(byte[] keyBytes, int keyStart, int keyLength, BytesWritable currentValue) {
+  public void add(byte[] keyBytes, int keyStart, int keyLength, BytesWritable currentValue, long hashCode) {
 
     if (checkResize()) {
       expandAndRehash();
     }
 
-    long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
+    //long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);

Review comment:
       leftover?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -54,11 +56,134 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastHashTableLoader.class.getName());
 
+  public static final int HASHTABLE_EXECUTOR_THREADS = 4;
+
+  public static class HashTableElement {
+    byte[] keyBytes;
+    int keyLength;
+    byte[] valueBytes;
+    int valueLength;
+    long deserializeKey;
+    long hashCode;
+
+    public HashTableElement(byte[] keyBytes, int keyLength, byte[] valueBytes, int valueLength, long key, long hashCode) {
+      this.keyBytes = keyBytes;
+      this.keyLength = keyLength;
+      this.valueBytes = valueBytes;
+      this.valueLength = valueLength;
+      this.deserializeKey = key;
+      this.hashCode = hashCode;
+    }
+
+    public BytesWritable getKey() {
+      return new BytesWritable(keyBytes, keyLength);
+    }
+
+    public BytesWritable getValue() {
+      return new BytesWritable(valueBytes, valueLength);
+    }
+
+    public long getDeserializeKey() {
+      return deserializeKey;
+    }
+
+    public long getHashCode() {
+      return hashCode;
+    }
+  }
+
+  public static class QueueElementBatch {
+    HashTableElement[] batch;
+    /*byte[] bytes;
+    int[] lengths;
+    long[] deserializeKeys;
+    long[] hashCodes;*/
+    int currentIndex;
+
+    public QueueElementBatch() {
+      batch = new HashTableElement[1024];
+      /*bytes = new byte[4 * 1024 * 1024];
+      lengths = new int[2 * 1024];
+      deserializeKeys = new long[1024];
+      hashCodes = new long[1024];*/
+      currentIndex = 0;
+    }
+
+    public boolean addElement(HashTableElement h) {
+      batch[currentIndex] = h;
+      currentIndex++;
+      return (currentIndex == 1024);
+    }
+
+    /*public void addElement(byte[] keyByte, int keyLength, byte[] valueByte, int valueLength, long deserializeKey,

Review comment:
       leftovers?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
##########
@@ -64,15 +64,19 @@ public void throwExpandError(int limit, String dataTypeName) {
 
   private static void validateCapacity(long capacity) {
     if (Long.bitCount(capacity) != 1) {
-      throw new AssertionError("Capacity must be a power of two");
+      throw new AssertionError("Capacity must be a power of two " + capacity);
     }
     if (capacity <= 0) {
       throw new AssertionError("Invalid capacity " + capacity);
     }
   }
 
   private static int nextHighestPowerOfTwo(int v) {
-    return Integer.highestOneBit(v) << 1;
+    int value = Integer.highestOneBit(v);

Review comment:
       Seems like when we are at the max POWER we skip resizing, why this was not a problem before? shall we LOG here? 
   Can we also add doc please?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -54,11 +56,134 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastHashTableLoader.class.getName());
 
+  public static final int HASHTABLE_EXECUTOR_THREADS = 4;
+
+  public static class HashTableElement {
+    byte[] keyBytes;
+    int keyLength;
+    byte[] valueBytes;
+    int valueLength;
+    long deserializeKey;
+    long hashCode;
+
+    public HashTableElement(byte[] keyBytes, int keyLength, byte[] valueBytes, int valueLength, long key, long hashCode) {
+      this.keyBytes = keyBytes;
+      this.keyLength = keyLength;
+      this.valueBytes = valueBytes;
+      this.valueLength = valueLength;
+      this.deserializeKey = key;
+      this.hashCode = hashCode;
+    }
+
+    public BytesWritable getKey() {
+      return new BytesWritable(keyBytes, keyLength);
+    }
+
+    public BytesWritable getValue() {
+      return new BytesWritable(valueBytes, valueLength);
+    }
+
+    public long getDeserializeKey() {
+      return deserializeKey;
+    }
+
+    public long getHashCode() {
+      return hashCode;
+    }
+  }
+
+  public static class QueueElementBatch {

Review comment:
       Maybe HashTableElementBatch? Is the batch size going to be configurable?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -18,8 +18,10 @@
 package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.*;

Review comment:
       fully qualified names 

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -54,11 +56,134 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastHashTableLoader.class.getName());
 
+  public static final int HASHTABLE_EXECUTOR_THREADS = 4;
+
+  public static class HashTableElement {
+    byte[] keyBytes;
+    int keyLength;
+    byte[] valueBytes;
+    int valueLength;
+    long deserializeKey;
+    long hashCode;
+
+    public HashTableElement(byte[] keyBytes, int keyLength, byte[] valueBytes, int valueLength, long key, long hashCode) {
+      this.keyBytes = keyBytes;
+      this.keyLength = keyLength;
+      this.valueBytes = valueBytes;
+      this.valueLength = valueLength;
+      this.deserializeKey = key;
+      this.hashCode = hashCode;
+    }
+
+    public BytesWritable getKey() {
+      return new BytesWritable(keyBytes, keyLength);
+    }
+
+    public BytesWritable getValue() {
+      return new BytesWritable(valueBytes, valueLength);
+    }
+
+    public long getDeserializeKey() {
+      return deserializeKey;
+    }
+
+    public long getHashCode() {
+      return hashCode;
+    }
+  }
+
+  public static class QueueElementBatch {
+    HashTableElement[] batch;
+    /*byte[] bytes;

Review comment:
       leftovers?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -54,11 +56,134 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastHashTableLoader.class.getName());
 
+  public static final int HASHTABLE_EXECUTOR_THREADS = 4;
+
+  public static class HashTableElement {
+    byte[] keyBytes;
+    int keyLength;
+    byte[] valueBytes;
+    int valueLength;
+    long deserializeKey;
+    long hashCode;
+
+    public HashTableElement(byte[] keyBytes, int keyLength, byte[] valueBytes, int valueLength, long key, long hashCode) {
+      this.keyBytes = keyBytes;
+      this.keyLength = keyLength;
+      this.valueBytes = valueBytes;
+      this.valueLength = valueLength;
+      this.deserializeKey = key;
+      this.hashCode = hashCode;
+    }
+
+    public BytesWritable getKey() {
+      return new BytesWritable(keyBytes, keyLength);
+    }
+
+    public BytesWritable getValue() {
+      return new BytesWritable(valueBytes, valueLength);
+    }
+
+    public long getDeserializeKey() {
+      return deserializeKey;
+    }
+
+    public long getHashCode() {
+      return hashCode;
+    }
+  }
+
+  public static class QueueElementBatch {
+    HashTableElement[] batch;
+    /*byte[] bytes;
+    int[] lengths;
+    long[] deserializeKeys;
+    long[] hashCodes;*/
+    int currentIndex;
+
+    public QueueElementBatch() {
+      batch = new HashTableElement[1024];
+      /*bytes = new byte[4 * 1024 * 1024];
+      lengths = new int[2 * 1024];
+      deserializeKeys = new long[1024];
+      hashCodes = new long[1024];*/
+      currentIndex = 0;
+    }
+
+    public boolean addElement(HashTableElement h) {
+      batch[currentIndex] = h;
+      currentIndex++;
+      return (currentIndex == 1024);
+    }
+
+    /*public void addElement(byte[] keyByte, int keyLength, byte[] valueByte, int valueLength, long deserializeKey,
+        long hashCode) {
+      int start = 0;
+      if (currentIndex > 0) {
+        start = lengths[(currentIndex * 2) - 1];
+      }
+      if (keyByte != null) {
+        // use System.Arraycopy
+        for (int i = 0; i < keyLength; ++i) {
+          bytes[start + i] = keyByte[i];
+        }
+      }
+      if (valueByte != null) {
+        // use System.Arraycopy
+        for (int i = 0; i < valueLength; ++i) {
+          bytes[start + keyLength + i] = valueByte[i];
+        }
+      }
+      //change to end offsets
+      lengths[currentIndex * 2] = start + keyLength;
+      lengths[(currentIndex * 2) + 1] = start + keyLength + valueLength;
+      deserializeKeys[currentIndex] = deserializeKey;
+      hashCodes[currentIndex] = hashCode;
+      currentIndex++;
+    }
+
+    public boolean canAdd(int keyLength, int valueLength) {
+      if (currentIndex >= 1024) {
+        return false;
+      }
+      int start = 0;
+      if (currentIndex > 0) {
+        start = lengths[(currentIndex * 2) - 1];
+      }
+      if ((start + keyLength + valueLength) < bytes.length) {
+        return true;
+      }
+      return false;
+    }*/
+
+    public HashTableElement getBatch(int i) {
+      return batch[i];
+      /*int start = 0;
+      if (i > 0) {
+        start = lengths[(i * 2) - 1];
+      }
+      LOG.info("For i: "+i+" key start: "+start+" key end: "+lengths[i * 2]+
+          " value start: "+lengths[i * 2]+" value end: "+lengths[(i * 2) + 1]);
+      return new HashTableElement(Arrays.copyOfRange(bytes, start, lengths[i * 2]),
+          lengths[i * 2] - start,
+          Arrays.copyOfRange(bytes, lengths[i * 2], lengths[(i * 2) + 1]),
+          lengths[(i * 2) + 1] - lengths[i * 2],
+          deserializeKeys[i],
+          hashCodes[i]);*/
+    }
+
+    public int getSize() {
+      return currentIndex;
+    }
+  }
+
   private Configuration hconf;
   protected MapJoinDesc desc;
   private TezContext tezContext;
   private String cacheKey;
   private TezCounter htLoadCounter;
+  private long numEntries;
+  private AtomicLong totalEntries;
+  private static final QueueElementBatch sentinel = new QueueElementBatch();

Review comment:
       is this like a NULL_BATCH?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -54,11 +56,134 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastHashTableLoader.class.getName());
 
+  public static final int HASHTABLE_EXECUTOR_THREADS = 4;
+
+  public static class HashTableElement {
+    byte[] keyBytes;
+    int keyLength;
+    byte[] valueBytes;
+    int valueLength;
+    long deserializeKey;
+    long hashCode;
+
+    public HashTableElement(byte[] keyBytes, int keyLength, byte[] valueBytes, int valueLength, long key, long hashCode) {
+      this.keyBytes = keyBytes;
+      this.keyLength = keyLength;
+      this.valueBytes = valueBytes;
+      this.valueLength = valueLength;
+      this.deserializeKey = key;
+      this.hashCode = hashCode;
+    }
+
+    public BytesWritable getKey() {
+      return new BytesWritable(keyBytes, keyLength);
+    }
+
+    public BytesWritable getValue() {
+      return new BytesWritable(valueBytes, valueLength);
+    }
+
+    public long getDeserializeKey() {
+      return deserializeKey;
+    }
+
+    public long getHashCode() {
+      return hashCode;
+    }
+  }
+
+  public static class QueueElementBatch {
+    HashTableElement[] batch;
+    /*byte[] bytes;
+    int[] lengths;
+    long[] deserializeKeys;
+    long[] hashCodes;*/
+    int currentIndex;
+
+    public QueueElementBatch() {
+      batch = new HashTableElement[1024];
+      /*bytes = new byte[4 * 1024 * 1024];
+      lengths = new int[2 * 1024];
+      deserializeKeys = new long[1024];
+      hashCodes = new long[1024];*/
+      currentIndex = 0;
+    }
+
+    public boolean addElement(HashTableElement h) {

Review comment:
       Lets doc behaviour here -- seems that this is expecting that  we will never go beyond 1024

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -54,11 +56,134 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastHashTableLoader.class.getName());
 
+  public static final int HASHTABLE_EXECUTOR_THREADS = 4;
+
+  public static class HashTableElement {

Review comment:
       add class doc?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -71,6 +196,79 @@ public void init(ExecMapperContext context, MapredContext mrContext,
     String vertexName = hconf.get(Operator.CONTEXT_NAME_KEY, "");
     String counterName = Utilities.getVertexCounterName(HashTableLoaderCounters.HASHTABLE_LOAD_TIME_MS.name(), vertexName);
     this.htLoadCounter = tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup, counterName);
+    this.numEntries = 0;
+    totalEntries = new AtomicLong(0);
+  }
+
+  public void drainQueueAndLoad(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer, boolean doMemCheck,
+      String inputName, MemoryMonitorInfo memoryMonitorInfo, long effectiveThreshold, int partitionId,
+      BlockingQueue<QueueElementBatch>[] sharedQ)
+      throws InterruptedException, IOException, HiveException, SerDeException {
+    LOG.info("Draining thread " + partitionId + " started");
+    long entries = 0;
+    BlockingQueue<QueueElementBatch>[] partitionQ = sharedQ;
+    while(true) {

Review comment:
       while (batch != sentinel) ?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -71,6 +196,79 @@ public void init(ExecMapperContext context, MapredContext mrContext,
     String vertexName = hconf.get(Operator.CONTEXT_NAME_KEY, "");
     String counterName = Utilities.getVertexCounterName(HashTableLoaderCounters.HASHTABLE_LOAD_TIME_MS.name(), vertexName);
     this.htLoadCounter = tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup, counterName);
+    this.numEntries = 0;
+    totalEntries = new AtomicLong(0);
+  }
+
+  public void drainQueueAndLoad(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer, boolean doMemCheck,
+      String inputName, MemoryMonitorInfo memoryMonitorInfo, long effectiveThreshold, int partitionId,
+      BlockingQueue<QueueElementBatch>[] sharedQ)
+      throws InterruptedException, IOException, HiveException, SerDeException {
+    LOG.info("Draining thread " + partitionId + " started");
+    long entries = 0;
+    BlockingQueue<QueueElementBatch>[] partitionQ = sharedQ;
+    while(true) {
+      QueueElementBatch batch = partitionQ[partitionId].take();
+      if(batch == sentinel) {
+        break;
+      }
+      LOG.info("Draining thread " + partitionId + " got size "+ batch.getSize());
+      for (int i = 0; i < batch.getSize(); i++) {
+        try {
+          HashTableElement h = batch.getBatch(i);
+          vectorMapJoinFastTableContainer
+              .putRow(h.getKey(), h.getValue(), h.getHashCode(), h.getDeserializeKey());
+        }
+        catch(Exception e) {

Review comment:
       Too generic try catch? propagate exception instead?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -71,6 +196,79 @@ public void init(ExecMapperContext context, MapredContext mrContext,
     String vertexName = hconf.get(Operator.CONTEXT_NAME_KEY, "");
     String counterName = Utilities.getVertexCounterName(HashTableLoaderCounters.HASHTABLE_LOAD_TIME_MS.name(), vertexName);
     this.htLoadCounter = tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup, counterName);
+    this.numEntries = 0;
+    totalEntries = new AtomicLong(0);
+  }
+
+  public void drainQueueAndLoad(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer, boolean doMemCheck,
+      String inputName, MemoryMonitorInfo memoryMonitorInfo, long effectiveThreshold, int partitionId,
+      BlockingQueue<QueueElementBatch>[] sharedQ)
+      throws InterruptedException, IOException, HiveException, SerDeException {
+    LOG.info("Draining thread " + partitionId + " started");
+    long entries = 0;
+    BlockingQueue<QueueElementBatch>[] partitionQ = sharedQ;
+    while(true) {
+      QueueElementBatch batch = partitionQ[partitionId].take();
+      if(batch == sentinel) {
+        break;
+      }
+      LOG.info("Draining thread " + partitionId + " got size "+ batch.getSize());
+      for (int i = 0; i < batch.getSize(); i++) {
+        try {
+          HashTableElement h = batch.getBatch(i);
+          vectorMapJoinFastTableContainer
+              .putRow(h.getKey(), h.getValue(), h.getHashCode(), h.getDeserializeKey());
+        }
+        catch(Exception e) {
+          LOG.info("Exception in draining thread put row: ",e);
+          throw new HiveException(e);
+        }
+        ++entries;
+      }
+      LOG.info("Draining thread added entries : "+ entries);

Review comment:
       debug?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -148,28 +345,67 @@ public void load(MapJoinTableContainer[] mapJoinTables,
                 vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
 
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        ExecutorService executorService = Executors.newFixedThreadPool(HASHTABLE_EXECUTOR_THREADS);
+        boolean drained = false;
+        BlockingQueue<QueueElementBatch>[] sharedQ = new BlockingQueue[HASHTABLE_EXECUTOR_THREADS];
+        for(int i = 0; i < HASHTABLE_EXECUTOR_THREADS; ++i) {
+          sharedQ[i] = new LinkedBlockingQueue<>();
+        }
+        QueueElementBatch[] batches = new QueueElementBatch[HASHTABLE_EXECUTOR_THREADS];
+        for (int i = 0; i < HASHTABLE_EXECUTOR_THREADS; ++i) {
+          batches[i] = new QueueElementBatch();

Review comment:
       Lets use a init method for these Parallel load vars and call from init

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -148,28 +345,67 @@ public void load(MapJoinTableContainer[] mapJoinTables,
                 vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
 
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        ExecutorService executorService = Executors.newFixedThreadPool(HASHTABLE_EXECUTOR_THREADS);

Review comment:
       I would move this to the init instead -- we might want to use conf HiveConf as well

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -71,6 +196,79 @@ public void init(ExecMapperContext context, MapredContext mrContext,
     String vertexName = hconf.get(Operator.CONTEXT_NAME_KEY, "");
     String counterName = Utilities.getVertexCounterName(HashTableLoaderCounters.HASHTABLE_LOAD_TIME_MS.name(), vertexName);
     this.htLoadCounter = tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup, counterName);
+    this.numEntries = 0;
+    totalEntries = new AtomicLong(0);
+  }
+
+  public void drainQueueAndLoad(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer, boolean doMemCheck,
+      String inputName, MemoryMonitorInfo memoryMonitorInfo, long effectiveThreshold, int partitionId,
+      BlockingQueue<QueueElementBatch>[] sharedQ)
+      throws InterruptedException, IOException, HiveException, SerDeException {
+    LOG.info("Draining thread " + partitionId + " started");
+    long entries = 0;
+    BlockingQueue<QueueElementBatch>[] partitionQ = sharedQ;
+    while(true) {
+      QueueElementBatch batch = partitionQ[partitionId].take();
+      if(batch == sentinel) {
+        break;
+      }
+      LOG.info("Draining thread " + partitionId + " got size "+ batch.getSize());
+      for (int i = 0; i < batch.getSize(); i++) {
+        try {
+          HashTableElement h = batch.getBatch(i);
+          vectorMapJoinFastTableContainer
+              .putRow(h.getKey(), h.getValue(), h.getHashCode(), h.getDeserializeKey());
+        }
+        catch(Exception e) {
+          LOG.info("Exception in draining thread put row: ",e);
+          throw new HiveException(e);
+        }
+        ++entries;
+      }
+      LOG.info("Draining thread added entries : "+ entries);
+      totalEntries.set(totalEntries.get() + entries);
+      entries = 0;
+      long hashTableEntries = totalEntries.get();
+      if (doMemCheck && (hashTableEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
+        final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
+        if (estMemUsage > effectiveThreshold) {
+          String msg = "Hash table loading exceeded memory limits for input: " + inputName +
+              " numEntries: " + hashTableEntries + " estimatedMemoryUsage: " + estMemUsage +
+              " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
+          LOG.error(msg);
+          throw new MapJoinMemoryExhaustionError(msg);
+        } else {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
+                    "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, hashTableEntries, estMemUsage,
+                effectiveThreshold);
+          }
+        }
+      }
+    }
+    LOG.info("Draining thread "+ partitionId +" is finished");
+  }
+
+  public void drain(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer, boolean doMemCheck,

Review comment:
       Seems that this method is just submitting the drain calls to the pool -- shall we rename and doc this?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -71,6 +196,79 @@ public void init(ExecMapperContext context, MapredContext mrContext,
     String vertexName = hconf.get(Operator.CONTEXT_NAME_KEY, "");
     String counterName = Utilities.getVertexCounterName(HashTableLoaderCounters.HASHTABLE_LOAD_TIME_MS.name(), vertexName);
     this.htLoadCounter = tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup, counterName);
+    this.numEntries = 0;
+    totalEntries = new AtomicLong(0);
+  }
+
+  public void drainQueueAndLoad(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer, boolean doMemCheck,
+      String inputName, MemoryMonitorInfo memoryMonitorInfo, long effectiveThreshold, int partitionId,
+      BlockingQueue<QueueElementBatch>[] sharedQ)
+      throws InterruptedException, IOException, HiveException, SerDeException {
+    LOG.info("Draining thread " + partitionId + " started");
+    long entries = 0;
+    BlockingQueue<QueueElementBatch>[] partitionQ = sharedQ;
+    while(true) {
+      QueueElementBatch batch = partitionQ[partitionId].take();
+      if(batch == sentinel) {
+        break;
+      }
+      LOG.info("Draining thread " + partitionId + " got size "+ batch.getSize());
+      for (int i = 0; i < batch.getSize(); i++) {
+        try {
+          HashTableElement h = batch.getBatch(i);
+          vectorMapJoinFastTableContainer
+              .putRow(h.getKey(), h.getValue(), h.getHashCode(), h.getDeserializeKey());
+        }
+        catch(Exception e) {
+          LOG.info("Exception in draining thread put row: ",e);
+          throw new HiveException(e);
+        }
+        ++entries;
+      }
+      LOG.info("Draining thread added entries : "+ entries);
+      totalEntries.set(totalEntries.get() + entries);
+      entries = 0;
+      long hashTableEntries = totalEntries.get();
+      if (doMemCheck && (hashTableEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
+        final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
+        if (estMemUsage > effectiveThreshold) {
+          String msg = "Hash table loading exceeded memory limits for input: " + inputName +
+              " numEntries: " + hashTableEntries + " estimatedMemoryUsage: " + estMemUsage +
+              " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
+          LOG.error(msg);
+          throw new MapJoinMemoryExhaustionError(msg);
+        } else {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
+                    "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, hashTableEntries, estMemUsage,
+                effectiveThreshold);
+          }
+        }
+      }
+    }
+    LOG.info("Draining thread "+ partitionId +" is finished");
+  }
+
+  public void drain(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer, boolean doMemCheck,
+      String inputName, MemoryMonitorInfo memoryMonitorInfo, long effectiveThreshold, ExecutorService executorService,
+      BlockingQueue<QueueElementBatch>[] sharedQ)
+      throws InterruptedException, IOException, HiveException, SerDeException {
+    boolean finalDoMemCheck = doMemCheck;
+    long finalEffectiveThreshold = effectiveThreshold;
+    for (int partitionId = 0; partitionId < HASHTABLE_EXECUTOR_THREADS; ++partitionId) {
+      int finalPartitionId = partitionId;
+      executorService.submit(() -> {
+        try {
+          LOG.info("Partition id is: "+ finalPartitionId + " Queue size is: "+ sharedQ[finalPartitionId].size());
+          drainQueueAndLoad(vectorMapJoinFastTableContainer, finalDoMemCheck, inputName, memoryMonitorInfo,
+              finalEffectiveThreshold, finalPartitionId, sharedQ);
+        } catch (HiveException e) {
+          e.printStackTrace();
+        } catch (IOException | InterruptedException | SerDeException e) {
+          e.printStackTrace();

Review comment:
       propagate exception instead?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -148,28 +345,67 @@ public void load(MapJoinTableContainer[] mapJoinTables,
                 vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
 
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        ExecutorService executorService = Executors.newFixedThreadPool(HASHTABLE_EXECUTOR_THREADS);
+        boolean drained = false;
+        BlockingQueue<QueueElementBatch>[] sharedQ = new BlockingQueue[HASHTABLE_EXECUTOR_THREADS];
+        for(int i = 0; i < HASHTABLE_EXECUTOR_THREADS; ++i) {
+          sharedQ[i] = new LinkedBlockingQueue<>();
+        }
+        QueueElementBatch[] batches = new QueueElementBatch[HASHTABLE_EXECUTOR_THREADS];
+        for (int i = 0; i < HASHTABLE_EXECUTOR_THREADS; ++i) {
+          batches[i] = new QueueElementBatch();
+        }
+        //start the threads
+        drain(vectorMapJoinFastTableContainer, doMemCheck, inputName, memoryMonitorInfo,
+            effectiveThreshold, executorService, sharedQ);
         long startTime = System.currentTimeMillis();
         while (kvReader.next()) {
-          vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
-              (BytesWritable)kvReader.getCurrentValue());
+          BytesWritable currentKey = (BytesWritable) kvReader.getCurrentKey();
+          BytesWritable currentValue = (BytesWritable) kvReader.getCurrentValue();
+          long key = vectorMapJoinFastTableContainer.deserializeToKey(currentKey);
+          long hashCode = vectorMapJoinFastTableContainer.calculateLongHashCode(key, currentKey);
+          int partitionId = (int) (((1 << 2) - 1) & hashCode);

Review comment:
       add a comment explaining the partition logic here? For example why arent we using the first two bits instead?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -148,28 +345,67 @@ public void load(MapJoinTableContainer[] mapJoinTables,
                 vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
 
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        ExecutorService executorService = Executors.newFixedThreadPool(HASHTABLE_EXECUTOR_THREADS);
+        boolean drained = false;
+        BlockingQueue<QueueElementBatch>[] sharedQ = new BlockingQueue[HASHTABLE_EXECUTOR_THREADS];
+        for(int i = 0; i < HASHTABLE_EXECUTOR_THREADS; ++i) {
+          sharedQ[i] = new LinkedBlockingQueue<>();
+        }
+        QueueElementBatch[] batches = new QueueElementBatch[HASHTABLE_EXECUTOR_THREADS];
+        for (int i = 0; i < HASHTABLE_EXECUTOR_THREADS; ++i) {
+          batches[i] = new QueueElementBatch();
+        }
+        //start the threads
+        drain(vectorMapJoinFastTableContainer, doMemCheck, inputName, memoryMonitorInfo,
+            effectiveThreshold, executorService, sharedQ);
         long startTime = System.currentTimeMillis();
         while (kvReader.next()) {
-          vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
-              (BytesWritable)kvReader.getCurrentValue());
+          BytesWritable currentKey = (BytesWritable) kvReader.getCurrentKey();
+          BytesWritable currentValue = (BytesWritable) kvReader.getCurrentValue();
+          long key = vectorMapJoinFastTableContainer.deserializeToKey(currentKey);
+          long hashCode = vectorMapJoinFastTableContainer.calculateLongHashCode(key, currentKey);
+          int partitionId = (int) (((1 << 2) - 1) & hashCode);
           numEntries++;
-          if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
-              final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
-              if (estMemUsage > effectiveThreshold) {
-                String msg = "Hash table loading exceeded memory limits for input: " + inputName +
-                  " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
-                  " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
-                LOG.error(msg);
-                throw new MapJoinMemoryExhaustionError(msg);
-              } else {
-                if (LOG.isInfoEnabled()) {
-                  LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
-                      "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
-                    effectiveThreshold);
-                }
-              }
+          // call getBytes as copy is called later
+          byte[] valueBytes = currentValue.copyBytes();
+          int valueLength = currentValue.getLength();
+          byte[] keyBytes = currentKey.copyBytes();
+          int keyLength = currentKey.getLength();
+          HashTableElement h = new HashTableElement(keyBytes, keyLength, valueBytes, valueLength, key, hashCode);

Review comment:
       Any chance we could avoid the copy above and pass down BytesWritable ?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -148,28 +345,67 @@ public void load(MapJoinTableContainer[] mapJoinTables,
                 vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
 
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        ExecutorService executorService = Executors.newFixedThreadPool(HASHTABLE_EXECUTOR_THREADS);
+        boolean drained = false;
+        BlockingQueue<QueueElementBatch>[] sharedQ = new BlockingQueue[HASHTABLE_EXECUTOR_THREADS];
+        for(int i = 0; i < HASHTABLE_EXECUTOR_THREADS; ++i) {
+          sharedQ[i] = new LinkedBlockingQueue<>();
+        }
+        QueueElementBatch[] batches = new QueueElementBatch[HASHTABLE_EXECUTOR_THREADS];
+        for (int i = 0; i < HASHTABLE_EXECUTOR_THREADS; ++i) {
+          batches[i] = new QueueElementBatch();
+        }
+        //start the threads
+        drain(vectorMapJoinFastTableContainer, doMemCheck, inputName, memoryMonitorInfo,
+            effectiveThreshold, executorService, sharedQ);
         long startTime = System.currentTimeMillis();
         while (kvReader.next()) {
-          vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
-              (BytesWritable)kvReader.getCurrentValue());
+          BytesWritable currentKey = (BytesWritable) kvReader.getCurrentKey();
+          BytesWritable currentValue = (BytesWritable) kvReader.getCurrentValue();
+          long key = vectorMapJoinFastTableContainer.deserializeToKey(currentKey);
+          long hashCode = vectorMapJoinFastTableContainer.calculateLongHashCode(key, currentKey);
+          int partitionId = (int) (((1 << 2) - 1) & hashCode);
           numEntries++;
-          if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
-              final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
-              if (estMemUsage > effectiveThreshold) {
-                String msg = "Hash table loading exceeded memory limits for input: " + inputName +
-                  " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
-                  " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
-                LOG.error(msg);
-                throw new MapJoinMemoryExhaustionError(msg);
-              } else {
-                if (LOG.isInfoEnabled()) {
-                  LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
-                      "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
-                    effectiveThreshold);
-                }
-              }
+          // call getBytes as copy is called later
+          byte[] valueBytes = currentValue.copyBytes();
+          int valueLength = currentValue.getLength();
+          byte[] keyBytes = currentKey.copyBytes();
+          int keyLength = currentKey.getLength();
+          HashTableElement h = new HashTableElement(keyBytes, keyLength, valueBytes, valueLength, key, hashCode);
+          if (batches[partitionId].addElement(h)) {

Review comment:
       when batch full, add new one // please add comment

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -148,28 +345,67 @@ public void load(MapJoinTableContainer[] mapJoinTables,
                 vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
 
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        ExecutorService executorService = Executors.newFixedThreadPool(HASHTABLE_EXECUTOR_THREADS);
+        boolean drained = false;
+        BlockingQueue<QueueElementBatch>[] sharedQ = new BlockingQueue[HASHTABLE_EXECUTOR_THREADS];
+        for(int i = 0; i < HASHTABLE_EXECUTOR_THREADS; ++i) {
+          sharedQ[i] = new LinkedBlockingQueue<>();
+        }
+        QueueElementBatch[] batches = new QueueElementBatch[HASHTABLE_EXECUTOR_THREADS];
+        for (int i = 0; i < HASHTABLE_EXECUTOR_THREADS; ++i) {
+          batches[i] = new QueueElementBatch();
+        }
+        //start the threads
+        drain(vectorMapJoinFastTableContainer, doMemCheck, inputName, memoryMonitorInfo,
+            effectiveThreshold, executorService, sharedQ);
         long startTime = System.currentTimeMillis();
         while (kvReader.next()) {
-          vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
-              (BytesWritable)kvReader.getCurrentValue());
+          BytesWritable currentKey = (BytesWritable) kvReader.getCurrentKey();
+          BytesWritable currentValue = (BytesWritable) kvReader.getCurrentValue();
+          long key = vectorMapJoinFastTableContainer.deserializeToKey(currentKey);
+          long hashCode = vectorMapJoinFastTableContainer.calculateLongHashCode(key, currentKey);
+          int partitionId = (int) (((1 << 2) - 1) & hashCode);
           numEntries++;
-          if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
-              final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
-              if (estMemUsage > effectiveThreshold) {
-                String msg = "Hash table loading exceeded memory limits for input: " + inputName +
-                  " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
-                  " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
-                LOG.error(msg);
-                throw new MapJoinMemoryExhaustionError(msg);
-              } else {
-                if (LOG.isInfoEnabled()) {
-                  LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
-                      "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
-                    effectiveThreshold);
-                }
-              }
+          // call getBytes as copy is called later
+          byte[] valueBytes = currentValue.copyBytes();
+          int valueLength = currentValue.getLength();
+          byte[] keyBytes = currentKey.copyBytes();
+          int keyLength = currentKey.getLength();
+          HashTableElement h = new HashTableElement(keyBytes, keyLength, valueBytes, valueLength, key, hashCode);
+          if (batches[partitionId].addElement(h)) {
+              sharedQ[partitionId].add(batches[partitionId]);
+              batches[partitionId] = new QueueElementBatch();
           }
+          /*if (!batches[partitionId].canAdd(keyLength, valueLength)) {
+             sharedQ[partitionId].add(batches[partitionId]);
+             batches[partitionId] = new QueueElementBatch();
+           }
+           LOG.info("For numEntries: "+numEntries+" keybytes length : "+keyBytes.length+" keyLength: "+keyLength+
+               " valueBytes length: "+valueBytes.length+" valueLength "+valueLength);
+           batches[partitionId].addElement(keyBytes, keyLength, valueBytes, valueLength, key, hashCode);*/
         }
+
+        LOG.info("Finished loading the queue for input: {} endTime : {}", inputName, System.currentTimeMillis());
+
+        // Add sentinel at the end of queue
+        for (int i=0; i<4; ++i) {

Review comment:
       magic constant -- I guess this should be HASHTABLE_EXECUTOR_THREADS

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -148,28 +345,67 @@ public void load(MapJoinTableContainer[] mapJoinTables,
                 vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
 
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        ExecutorService executorService = Executors.newFixedThreadPool(HASHTABLE_EXECUTOR_THREADS);
+        boolean drained = false;
+        BlockingQueue<QueueElementBatch>[] sharedQ = new BlockingQueue[HASHTABLE_EXECUTOR_THREADS];
+        for(int i = 0; i < HASHTABLE_EXECUTOR_THREADS; ++i) {
+          sharedQ[i] = new LinkedBlockingQueue<>();
+        }
+        QueueElementBatch[] batches = new QueueElementBatch[HASHTABLE_EXECUTOR_THREADS];
+        for (int i = 0; i < HASHTABLE_EXECUTOR_THREADS; ++i) {
+          batches[i] = new QueueElementBatch();
+        }
+        //start the threads
+        drain(vectorMapJoinFastTableContainer, doMemCheck, inputName, memoryMonitorInfo,
+            effectiveThreshold, executorService, sharedQ);
         long startTime = System.currentTimeMillis();
         while (kvReader.next()) {
-          vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
-              (BytesWritable)kvReader.getCurrentValue());
+          BytesWritable currentKey = (BytesWritable) kvReader.getCurrentKey();
+          BytesWritable currentValue = (BytesWritable) kvReader.getCurrentValue();
+          long key = vectorMapJoinFastTableContainer.deserializeToKey(currentKey);
+          long hashCode = vectorMapJoinFastTableContainer.calculateLongHashCode(key, currentKey);
+          int partitionId = (int) (((1 << 2) - 1) & hashCode);
           numEntries++;
-          if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
-              final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
-              if (estMemUsage > effectiveThreshold) {
-                String msg = "Hash table loading exceeded memory limits for input: " + inputName +
-                  " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
-                  " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
-                LOG.error(msg);
-                throw new MapJoinMemoryExhaustionError(msg);
-              } else {
-                if (LOG.isInfoEnabled()) {
-                  LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
-                      "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
-                    effectiveThreshold);
-                }
-              }
+          // call getBytes as copy is called later

Review comment:
       old comment?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -148,28 +345,67 @@ public void load(MapJoinTableContainer[] mapJoinTables,
                 vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
 
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        ExecutorService executorService = Executors.newFixedThreadPool(HASHTABLE_EXECUTOR_THREADS);
+        boolean drained = false;
+        BlockingQueue<QueueElementBatch>[] sharedQ = new BlockingQueue[HASHTABLE_EXECUTOR_THREADS];
+        for(int i = 0; i < HASHTABLE_EXECUTOR_THREADS; ++i) {
+          sharedQ[i] = new LinkedBlockingQueue<>();
+        }
+        QueueElementBatch[] batches = new QueueElementBatch[HASHTABLE_EXECUTOR_THREADS];
+        for (int i = 0; i < HASHTABLE_EXECUTOR_THREADS; ++i) {
+          batches[i] = new QueueElementBatch();
+        }
+        //start the threads
+        drain(vectorMapJoinFastTableContainer, doMemCheck, inputName, memoryMonitorInfo,
+            effectiveThreshold, executorService, sharedQ);
         long startTime = System.currentTimeMillis();
         while (kvReader.next()) {
-          vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
-              (BytesWritable)kvReader.getCurrentValue());
+          BytesWritable currentKey = (BytesWritable) kvReader.getCurrentKey();
+          BytesWritable currentValue = (BytesWritable) kvReader.getCurrentValue();
+          long key = vectorMapJoinFastTableContainer.deserializeToKey(currentKey);
+          long hashCode = vectorMapJoinFastTableContainer.calculateLongHashCode(key, currentKey);
+          int partitionId = (int) (((1 << 2) - 1) & hashCode);
           numEntries++;
-          if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
-              final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
-              if (estMemUsage > effectiveThreshold) {
-                String msg = "Hash table loading exceeded memory limits for input: " + inputName +
-                  " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
-                  " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
-                LOG.error(msg);
-                throw new MapJoinMemoryExhaustionError(msg);
-              } else {
-                if (LOG.isInfoEnabled()) {
-                  LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
-                      "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
-                    effectiveThreshold);
-                }
-              }
+          // call getBytes as copy is called later
+          byte[] valueBytes = currentValue.copyBytes();
+          int valueLength = currentValue.getLength();
+          byte[] keyBytes = currentKey.copyBytes();
+          int keyLength = currentKey.getLength();
+          HashTableElement h = new HashTableElement(keyBytes, keyLength, valueBytes, valueLength, key, hashCode);
+          if (batches[partitionId].addElement(h)) {
+              sharedQ[partitionId].add(batches[partitionId]);
+              batches[partitionId] = new QueueElementBatch();
           }
+          /*if (!batches[partitionId].canAdd(keyLength, valueLength)) {
+             sharedQ[partitionId].add(batches[partitionId]);
+             batches[partitionId] = new QueueElementBatch();
+           }
+           LOG.info("For numEntries: "+numEntries+" keybytes length : "+keyBytes.length+" keyLength: "+keyLength+
+               " valueBytes length: "+valueBytes.length+" valueLength "+valueLength);
+           batches[partitionId].addElement(keyBytes, keyLength, valueBytes, valueLength, key, hashCode);*/
         }
+
+        LOG.info("Finished loading the queue for input: {} endTime : {}", inputName, System.currentTimeMillis());
+
+        // Add sentinel at the end of queue
+        for (int i=0; i<4; ++i) {
+          // add sentinel to the q not the batch
+          //batches[i].addElement(sentinel.keyBytes, -1, sentinel.valueBytes, -1,
+          //    sentinel.getDeserializeKey(), sentinel.getHashCode());
+          sharedQ[i].add(batches[i]);
+          sharedQ[i].add(sentinel);
+        }
+
+        executorService.shutdown();
+        try {
+          executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+
+        LOG.info("Total entries added to queue after looping: "+numEntries);
+        LOG.info("Total entries added to hash table: "+ totalEntries.get());

Review comment:
       Consolidate LOG?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] github-actions[bot] closed pull request #2004: HIVE-24037 Parallelize hash table constructions in map joins

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #2004:
URL: https://github.com/apache/hive/pull/2004


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] github-actions[bot] closed pull request #2004: HIVE-24037 Parallelize hash table constructions in map joins

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #2004:
URL: https://github.com/apache/hive/pull/2004


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] github-actions[bot] commented on pull request #2004: HIVE-24037 Parallelize hash table constructions in map joins

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #2004:
URL: https://github.com/apache/hive/pull/2004#issuecomment-877884465


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.
   Feel free to reach out on the dev@hive.apache.org list if the patch is in need of reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref commented on a change in pull request #2004: HIVE-24037 Parallelize hash table constructions in map joins

Posted by GitBox <gi...@apache.org>.
pgaref commented on a change in pull request #2004:
URL: https://github.com/apache/hive/pull/2004#discussion_r631163313



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -141,35 +280,64 @@ public void load(MapJoinTableContainer[] mapJoinTables,
         long keyCount = Math.max(estKeyCount, inputRecords);
 
         VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
-                new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
+                new VectorMapJoinFastTableContainer(desc, hconf, keyCount, numThreads);
 
         LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {} " +
                 "estKeyCount : {} keyCount : {}", inputName, cacheKey,
                 vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
 
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+        BlockingQueue<QueueElementBatch>[] sharedQ = new BlockingQueue[numThreads];
+        for(int i = 0; i < numThreads; ++i) {
+          sharedQ[i] = new LinkedBlockingQueue<>();
+        }
+        QueueElementBatch[] batches = new QueueElementBatch[numThreads];
+        for (int i = 0; i < numThreads; ++i) {
+          batches[i] = new QueueElementBatch();
+        }
+        //start the threads
+        drain(vectorMapJoinFastTableContainer, doMemCheck, inputName, memoryMonitorInfo,
+            effectiveThreshold, executorService, sharedQ);
         long startTime = System.currentTimeMillis();
         while (kvReader.next()) {
-          vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
-              (BytesWritable)kvReader.getCurrentValue());
+          BytesWritable currentKey = (BytesWritable) kvReader.getCurrentKey();
+          BytesWritable currentValue = (BytesWritable) kvReader.getCurrentValue();
+          long key = vectorMapJoinFastTableContainer.deserializeToKey(currentKey);
+          long hashCode = vectorMapJoinFastTableContainer.calculateLongHashCode(key, currentKey);
+          int partitionId = (int) ((numThreads - 1) & hashCode);
           numEntries++;
-          if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
-              final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
-              if (estMemUsage > effectiveThreshold) {
-                String msg = "Hash table loading exceeded memory limits for input: " + inputName +
-                  " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
-                  " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
-                LOG.error(msg);
-                throw new MapJoinMemoryExhaustionError(msg);
-              } else {
-                if (LOG.isInfoEnabled()) {
-                  LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
-                      "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
-                    effectiveThreshold);
-                }
-              }
+          // call getBytes as copy is called later
+          byte[] valueBytes = currentValue.copyBytes();
+          int valueLength = currentValue.getLength();
+          byte[] keyBytes = currentKey.copyBytes();
+          int keyLength = currentKey.getLength();
+          HashTableElement h = new HashTableElement(keyBytes, keyLength, valueBytes, valueLength, key, hashCode);
+          if (batches[partitionId].addElement(h)) {
+              sharedQ[partitionId].add(batches[partitionId]);
+              batches[partitionId] = new QueueElementBatch();
           }
         }
+
+        LOG.info("Finished loading the queue for input: {} endTime : {}", inputName, System.currentTimeMillis());
+
+        // Add sentinel at the end of queue
+        for (int i=0; i<4; ++i) {
+          // add sentinel to the q not the batch
+          sharedQ[i].add(batches[i]);
+          sharedQ[i].add(sentinel);
+        }
+
+        executorService.shutdown();
+        try {
+          executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException e) {

Review comment:
       handle exception along with others at the end




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref commented on a change in pull request #2004: HIVE-24037 Parallelize hash table constructions in map joins

Posted by GitBox <gi...@apache.org>.
pgaref commented on a change in pull request #2004:
URL: https://github.com/apache/hive/pull/2004#discussion_r631202108



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableWrapper.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+
+public abstract class VectorMapJoinFastHashTableWrapper {
+
+  public abstract long calculateLongHashCode(long key, BytesWritable currentKey) throws HiveException, IOException;
+
+  public abstract long deserializeToKey(BytesWritable currentKey) throws HiveException, IOException;

Review comment:
       Maybe have a default impl of deserializeToKey() throwing an exception or returning 0 and only have Long implementations to Override?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref commented on a change in pull request #2004: HIVE-24037 Parallelize hash table constructions in map joins

Posted by GitBox <gi...@apache.org>.
pgaref commented on a change in pull request #2004:
URL: https://github.com/apache/hive/pull/2004#discussion_r631157229



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -141,35 +280,64 @@ public void load(MapJoinTableContainer[] mapJoinTables,
         long keyCount = Math.max(estKeyCount, inputRecords);
 
         VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
-                new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
+                new VectorMapJoinFastTableContainer(desc, hconf, keyCount, numThreads);
 
         LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {} " +
                 "estKeyCount : {} keyCount : {}", inputName, cacheKey,
                 vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
 
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+        BlockingQueue<QueueElementBatch>[] sharedQ = new BlockingQueue[numThreads];
+        for(int i = 0; i < numThreads; ++i) {
+          sharedQ[i] = new LinkedBlockingQueue<>();
+        }
+        QueueElementBatch[] batches = new QueueElementBatch[numThreads];
+        for (int i = 0; i < numThreads; ++i) {
+          batches[i] = new QueueElementBatch();
+        }

Review comment:
       ets use a init method for these lines

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -141,35 +280,64 @@ public void load(MapJoinTableContainer[] mapJoinTables,
         long keyCount = Math.max(estKeyCount, inputRecords);
 
         VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
-                new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
+                new VectorMapJoinFastTableContainer(desc, hconf, keyCount, numThreads);
 
         LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {} " +
                 "estKeyCount : {} keyCount : {}", inputName, cacheKey,
                 vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
 
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+        BlockingQueue<QueueElementBatch>[] sharedQ = new BlockingQueue[numThreads];
+        for(int i = 0; i < numThreads; ++i) {
+          sharedQ[i] = new LinkedBlockingQueue<>();
+        }
+        QueueElementBatch[] batches = new QueueElementBatch[numThreads];
+        for (int i = 0; i < numThreads; ++i) {
+          batches[i] = new QueueElementBatch();
+        }

Review comment:
       Lets use a init method for these lines




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref commented on a change in pull request #2004: HIVE-24037 Parallelize hash table constructions in map joins

Posted by GitBox <gi...@apache.org>.
pgaref commented on a change in pull request #2004:
URL: https://github.com/apache/hive/pull/2004#discussion_r631128300



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
##########
@@ -806,6 +806,12 @@ public void put(Writable currentKey, Writable currentValue) throws SerDeExceptio
     internalPutRow(directWriteHelper, currentKey, currentValue);
   }
 
+  @Override
+  public long calculateLongHashCode(BytesWritable currentKey) throws HiveException, IOException, SerDeException {
+    directWriteHelper.setKeyValue(currentKey, null);
+    return (long)directWriteHelper.getHashFromKey();

Review comment:
       Casting seems redundant here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref commented on a change in pull request #2004: HIVE-24037 Parallelize hash table constructions in map joins

Posted by GitBox <gi...@apache.org>.
pgaref commented on a change in pull request #2004:
URL: https://github.com/apache/hive/pull/2004#discussion_r631139021



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -54,11 +61,73 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastHashTableLoader.class.getName());
 
+  public static class HashTableElement {
+    byte[] keyBytes;
+    int keyLength;
+    byte[] valueBytes;
+    int valueLength;
+    long deserializeKey;
+    long hashCode;
+
+    public HashTableElement(byte[] keyBytes, int keyLength, byte[] valueBytes, int valueLength, long key, long hashCode) {

Review comment:
       KeyLen and ValueLen seems redundant ? copyBytes goes up to size() anyway..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref commented on a change in pull request #2004: HIVE-24037 Parallelize hash table constructions in map joins

Posted by GitBox <gi...@apache.org>.
pgaref commented on a change in pull request #2004:
URL: https://github.com/apache/hive/pull/2004#discussion_r631163313



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -141,35 +280,64 @@ public void load(MapJoinTableContainer[] mapJoinTables,
         long keyCount = Math.max(estKeyCount, inputRecords);
 
         VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
-                new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
+                new VectorMapJoinFastTableContainer(desc, hconf, keyCount, numThreads);
 
         LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {} " +
                 "estKeyCount : {} keyCount : {}", inputName, cacheKey,
                 vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
 
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+        BlockingQueue<QueueElementBatch>[] sharedQ = new BlockingQueue[numThreads];
+        for(int i = 0; i < numThreads; ++i) {
+          sharedQ[i] = new LinkedBlockingQueue<>();
+        }
+        QueueElementBatch[] batches = new QueueElementBatch[numThreads];
+        for (int i = 0; i < numThreads; ++i) {
+          batches[i] = new QueueElementBatch();
+        }
+        //start the threads
+        drain(vectorMapJoinFastTableContainer, doMemCheck, inputName, memoryMonitorInfo,
+            effectiveThreshold, executorService, sharedQ);
         long startTime = System.currentTimeMillis();
         while (kvReader.next()) {
-          vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
-              (BytesWritable)kvReader.getCurrentValue());
+          BytesWritable currentKey = (BytesWritable) kvReader.getCurrentKey();
+          BytesWritable currentValue = (BytesWritable) kvReader.getCurrentValue();
+          long key = vectorMapJoinFastTableContainer.deserializeToKey(currentKey);
+          long hashCode = vectorMapJoinFastTableContainer.calculateLongHashCode(key, currentKey);
+          int partitionId = (int) ((numThreads - 1) & hashCode);
           numEntries++;
-          if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
-              final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
-              if (estMemUsage > effectiveThreshold) {
-                String msg = "Hash table loading exceeded memory limits for input: " + inputName +
-                  " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
-                  " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
-                LOG.error(msg);
-                throw new MapJoinMemoryExhaustionError(msg);
-              } else {
-                if (LOG.isInfoEnabled()) {
-                  LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
-                      "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
-                    effectiveThreshold);
-                }
-              }
+          // call getBytes as copy is called later
+          byte[] valueBytes = currentValue.copyBytes();
+          int valueLength = currentValue.getLength();
+          byte[] keyBytes = currentKey.copyBytes();
+          int keyLength = currentKey.getLength();
+          HashTableElement h = new HashTableElement(keyBytes, keyLength, valueBytes, valueLength, key, hashCode);
+          if (batches[partitionId].addElement(h)) {
+              sharedQ[partitionId].add(batches[partitionId]);
+              batches[partitionId] = new QueueElementBatch();
           }
         }
+
+        LOG.info("Finished loading the queue for input: {} endTime : {}", inputName, System.currentTimeMillis());
+
+        // Add sentinel at the end of queue
+        for (int i=0; i<4; ++i) {
+          // add sentinel to the q not the batch
+          sharedQ[i].add(batches[i]);
+          sharedQ[i].add(sentinel);
+        }
+
+        executorService.shutdown();
+        try {
+          executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException e) {

Review comment:
       handle exception along with others at the end?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pgaref commented on a change in pull request #2004: HIVE-24037 Parallelize hash table constructions in map joins

Posted by GitBox <gi...@apache.org>.
pgaref commented on a change in pull request #2004:
URL: https://github.com/apache/hive/pull/2004#discussion_r631127066



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
##########
@@ -499,6 +500,13 @@ public void put(Writable currentKey, Writable currentValue) throws SerDeExceptio
     hashMap.put(directWriteHelper, -1);
   }
 
+  @Override public long calculateLongHashCode(BytesWritable currentKey)
+      throws HiveException, IOException, SerDeException {
+    directWriteHelper.setKeyValue(currentKey, null);
+    directWriteHelper.getHashFromKey();
+    return 0;

Review comment:
       Maybe return directWriteHelper.getHashFromKey()   ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org