You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/05/21 17:04:00 UTC

[jira] [Work logged] (HIVE-25149) Support parallel load for Optimized HT implementations

     [ https://issues.apache.org/jira/browse/HIVE-25149?focusedWorklogId=600508&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-600508 ]

ASF GitHub Bot logged work on HIVE-25149:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/May/21 17:03
            Start Date: 21/May/21 17:03
    Worklog Time Spent: 10m 
      Work Description: belugabehr commented on a change in pull request #2305:
URL: https://github.com/apache/hive/pull/2305#discussion_r637058364



##########
File path: itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LegacyVectorMapJoinFastHashTableLoader.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.exec.tez.TezContext;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastTableContainer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+public class LegacyVectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTableLoader {
+  private static final Logger LOG = LoggerFactory.getLogger(LegacyVectorMapJoinFastHashTableLoader.class.getName());

Review comment:
       Nit: Don't need `getName()`

##########
File path: itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LegacyVectorMapJoinFastHashTableLoader.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.exec.tez.TezContext;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastTableContainer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+public class LegacyVectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTableLoader {
+  private static final Logger LOG = LoggerFactory.getLogger(LegacyVectorMapJoinFastHashTableLoader.class.getName());
+  private Configuration hconf;
+  protected MapJoinDesc desc;
+  private TezContext tezContext;
+  private String cacheKey;
+  private TezCounter htLoadCounter;
+
+  public LegacyVectorMapJoinFastHashTableLoader(TezContext context, Configuration hconf, MapJoinOperator joinOp) {
+    this.tezContext = context;
+    this.hconf = hconf;
+    this.desc = (MapJoinDesc)joinOp.getConf();
+    this.cacheKey = joinOp.getCacheKey();
+    this.htLoadCounter = this.tezContext.getTezProcessorContext().getCounters().findCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP), hconf.get("__hive.context.name", ""));
+  }
+
+  @Override
+  public void init(ExecMapperContext context, MapredContext mrContext,
+      Configuration hconf, MapJoinOperator joinOp) {
+    this.tezContext = (TezContext) mrContext;
+    this.hconf = hconf;
+    this.desc = joinOp.getConf();
+    this.cacheKey = joinOp.getCacheKey();
+    String counterGroup = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP);
+    String vertexName = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+    String counterName = Utilities
+        .getVertexCounterName(HashTableLoaderCounters.HASHTABLE_LOAD_TIME_MS.name(), vertexName);
+    this.htLoadCounter = tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup, counterName);
+  }
+
+  @Override
+  public void load(MapJoinTableContainer[] mapJoinTables,
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes)
+      throws HiveException {
+
+    Map<Integer, String> parentToInput = desc.getParentToInput();
+    Map<Integer, Long> parentKeyCounts = desc.getParentKeyCounts();
+
+    MemoryMonitorInfo memoryMonitorInfo = desc.getMemoryMonitorInfo();
+    boolean doMemCheck = false;
+    long effectiveThreshold = 0;
+    if (memoryMonitorInfo != null) {
+      effectiveThreshold = memoryMonitorInfo.getEffectiveThreshold(desc.getMaxMemoryAvailable());
+
+      // hash table loading happens in server side, LlapDecider could kick out some fragments to run outside of LLAP.
+      // Flip the flag at runtime in case if we are running outside of LLAP
+      if (!LlapDaemonInfo.INSTANCE.isLlap()) {
+        memoryMonitorInfo.setLlap(false);
+      }
+      if (memoryMonitorInfo.doMemoryMonitoring()) {
+        doMemCheck = true;
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Memory monitoring for hash table loader enabled. {}", memoryMonitorInfo);
+        }
+      }
+    }
+
+    if (!doMemCheck) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Not doing hash table memory monitoring. {}", memoryMonitorInfo);
+      }
+    }
+    for (int pos = 0; pos < mapJoinTables.length; pos++) {
+      if (pos == desc.getPosBigTable()) {
+        continue;
+      }
+
+      long numEntries = 0;
+      String inputName = parentToInput.get(pos);
+      LogicalInput input = tezContext.getInput(inputName);
+
+      try {
+        input.start();
+        tezContext.getTezProcessorContext().waitForAnyInputReady(
+            Collections.<Input> singletonList(input));
+      } catch (Exception e) {
+        throw new HiveException(e);
+      }
+
+      try {
+        KeyValueReader kvReader = (KeyValueReader) input.getReader();
+
+        Long keyCountObj = parentKeyCounts.get(pos);
+        long estKeyCount = (keyCountObj == null) ? -1 : keyCountObj;
+
+        long inputRecords = -1;
+        try {
+          //TODO : Need to use class instead of string.
+          // https://issues.apache.org/jira/browse/HIVE-23981
+          inputRecords = ((AbstractLogicalInput) input).getContext().getCounters().
+              findCounter("org.apache.tez.common.counters.TaskCounter",
+                  "APPROXIMATE_INPUT_RECORDS").getValue();
+        } catch (Exception e) {
+          LOG.debug("Failed to get value for counter APPROXIMATE_INPUT_RECORDS", e);
+        }
+        long keyCount = Math.max(estKeyCount, inputRecords);
+
+        VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
+            new VectorMapJoinFastTableContainer(desc, hconf, keyCount, 1);
+
+        LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {} " +
+                "estKeyCount : {} keyCount : {}", inputName, cacheKey,
+            vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
+
+        vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        long startTime = System.currentTimeMillis();
+        while (kvReader.next()) {
+          vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
+              (BytesWritable)kvReader.getCurrentValue());
+          numEntries++;
+          if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
+            final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
+            if (estMemUsage > effectiveThreshold) {
+              String msg = "Hash table loading exceeded memory limits for input: " + inputName +
+                  " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
+                  " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
+              LOG.error(msg);
+              throw new MapJoinMemoryExhaustionError(msg);

Review comment:
       Log or throw, not both.  The try-catch that catches this exception is responsible for logging.

##########
File path: itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LegacyVectorMapJoinFastHashTableLoader.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.exec.tez.TezContext;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastTableContainer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+public class LegacyVectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTableLoader {
+  private static final Logger LOG = LoggerFactory.getLogger(LegacyVectorMapJoinFastHashTableLoader.class.getName());
+  private Configuration hconf;
+  protected MapJoinDesc desc;
+  private TezContext tezContext;
+  private String cacheKey;
+  private TezCounter htLoadCounter;
+
+  public LegacyVectorMapJoinFastHashTableLoader(TezContext context, Configuration hconf, MapJoinOperator joinOp) {
+    this.tezContext = context;
+    this.hconf = hconf;
+    this.desc = (MapJoinDesc)joinOp.getConf();
+    this.cacheKey = joinOp.getCacheKey();
+    this.htLoadCounter = this.tezContext.getTezProcessorContext().getCounters().findCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP), hconf.get("__hive.context.name", ""));
+  }
+
+  @Override
+  public void init(ExecMapperContext context, MapredContext mrContext,
+      Configuration hconf, MapJoinOperator joinOp) {
+    this.tezContext = (TezContext) mrContext;
+    this.hconf = hconf;
+    this.desc = joinOp.getConf();
+    this.cacheKey = joinOp.getCacheKey();
+    String counterGroup = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP);
+    String vertexName = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+    String counterName = Utilities
+        .getVertexCounterName(HashTableLoaderCounters.HASHTABLE_LOAD_TIME_MS.name(), vertexName);
+    this.htLoadCounter = tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup, counterName);
+  }
+
+  @Override
+  public void load(MapJoinTableContainer[] mapJoinTables,
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes)
+      throws HiveException {
+
+    Map<Integer, String> parentToInput = desc.getParentToInput();
+    Map<Integer, Long> parentKeyCounts = desc.getParentKeyCounts();
+
+    MemoryMonitorInfo memoryMonitorInfo = desc.getMemoryMonitorInfo();
+    boolean doMemCheck = false;
+    long effectiveThreshold = 0;
+    if (memoryMonitorInfo != null) {
+      effectiveThreshold = memoryMonitorInfo.getEffectiveThreshold(desc.getMaxMemoryAvailable());
+
+      // hash table loading happens in server side, LlapDecider could kick out some fragments to run outside of LLAP.
+      // Flip the flag at runtime in case if we are running outside of LLAP
+      if (!LlapDaemonInfo.INSTANCE.isLlap()) {
+        memoryMonitorInfo.setLlap(false);
+      }
+      if (memoryMonitorInfo.doMemoryMonitoring()) {
+        doMemCheck = true;
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Memory monitoring for hash table loader enabled. {}", memoryMonitorInfo);
+        }
+      }
+    }
+
+    if (!doMemCheck) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Not doing hash table memory monitoring. {}", memoryMonitorInfo);
+      }

Review comment:
       Remove `Log.isInforEnabled()` just added clutter that need not be here

##########
File path: itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTLongKeyBench.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig.MapJoinTestImplementation;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/*
+ * Build with "mvn clean install -DskipTests -Pperf -Drat.skip=true" at main itests directory.
+ * From itests/hive-jmh directory, run:
+ *     java -jar -Xmx8g -Xms8g target/benchmarks.jar org.apache.hive.benchmark.vectorization.mapjoin.load.VectorFastHTLongKeyBench
+ *
+ *  {HASH_MAP, HASH_SET, HASH_MULTISET}
+ *    X
+ *  {NATIVE_VECTOR_FAST}
+ */
+public class VectorFastHTLongKeyBench {
+
+//  public static class HashMultiSetLargeFastVectorBench extends LongKeyBase {
+//    @Setup
+//    public void setup() throws Exception {
+//      System.out.println("Do Setup");
+//      doSetup(VectorMapJoinVariation.INNER_BIG_ONLY, MapJoinTestImplementation.NATIVE_VECTOR_FAST, 20_000_000);
+//    }
+//
+//    @TearDown(Level.Invocation)
+//    public void doTearDown() {
+//      System.out.println("Do TearDown");
+//      customKeyValueReader.reset();
+//    }
+//  }
+//
+//  public static class HashMultiSetSmallFastVectorBench extends LongKeyBase {
+//    @Setup
+//    public void setup() throws Exception {
+//      System.out.println("Do Setup");
+//      doSetup(VectorMapJoinVariation.INNER_BIG_ONLY, MapJoinTestImplementation.NATIVE_VECTOR_FAST, 10_000);
+//    }
+//
+//    @TearDown(Level.Invocation)
+//    public void doTearDown() {
+//      System.out.println("Do TearDown");
+//      customKeyValueReader.reset();
+//    }
+//  }
+//
+//  public static class HashSetLargeFastVectorBench extends LongKeyBase {
+//    @Setup
+//    public void setup() throws Exception {
+//      System.out.println("Do Setup");
+//      doSetup(VectorMapJoinVariation.LEFT_SEMI, MapJoinTestImplementation.NATIVE_VECTOR_FAST, 20_000_000);
+//    }
+//
+//    @TearDown(Level.Invocation)
+//    public void doTearDown() {
+//      System.out.println("Do TearDown");
+//      customKeyValueReader.reset();
+//    }
+//  }
+//
+//  public static class HashSetSmallFastVectorBench extends LongKeyBase {
+//    @Setup
+//    public void setup() throws Exception {
+//      System.out.println("Do Setup");
+//      doSetup(VectorMapJoinVariation.LEFT_SEMI, MapJoinTestImplementation.NATIVE_VECTOR_FAST, 10_000);
+//    }
+//
+//    @TearDown(Level.Invocation)
+//    public void doTearDown() {
+//      System.out.println("Do TearDown");
+//      customKeyValueReader.reset();
+//    }
+//  }

Review comment:
       Remove commented-out code

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSetContainer.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.JoinUtil.JoinResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashSet;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+
+/*
+ * An single LONG key hash set optimized for vector map join.
+ */
+public class VectorMapJoinFastLongHashSetContainer extends VectorMapJoinFastHashTableContainerBase implements VectorMapJoinLongHashSet{
+
+  public static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashSetContainer.class);
+
+  private final VectorMapJoinFastLongHashSet[] vectorMapJoinFastLongHashSets;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final HashTableKeyType hashTableKeyType;
+  private final int numThreads;
+  private final boolean minMaxEnabled;
+
+  public VectorMapJoinFastLongHashSetContainer(
+      boolean isFullOuter,
+      boolean minMaxEnabled,
+      HashTableKeyType hashTableKeyType,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+      int numThreads) {
+    this.hashTableKeyType = hashTableKeyType;
+    this.vectorMapJoinFastLongHashSets = new VectorMapJoinFastLongHashSet[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      vectorMapJoinFastLongHashSets[i] =
+          new VectorMapJoinFastLongHashSet(isFullOuter, minMaxEnabled, hashTableKeyType, initialCapacity,
+              loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+    }
+    PrimitiveTypeInfo[] primitiveTypeInfos = { hashTableKeyType.getPrimitiveTypeInfo() };
+    this.keyBinarySortableDeserializeRead =
+        BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+    this.numThreads = numThreads;
+    this.minMaxEnabled = minMaxEnabled;
+  }
+
+  public VectorMapJoinHashSetResult createHashSetResult() {
+    return new VectorMapJoinFastHashSet.HashSetResult();
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+    try {
+      if (!keyBinarySortableDeserializeRead.readNextField()) {
+        return 0;
+      }
+    } catch (Exception e) {
+      throw new HiveException("\nDeserializeRead details: " +
+          keyBinarySortableDeserializeRead.getDetailedReadPositionString() + "\nException: " + e);
+    }

Review comment:
       Same.  Wrap underlying Exception, do not put newlines in the error message.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSetContainer.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.JoinUtil.JoinResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashSet;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+
+/*
+ * An single LONG key hash set optimized for vector map join.
+ */
+public class VectorMapJoinFastLongHashSetContainer extends VectorMapJoinFastHashTableContainerBase implements VectorMapJoinLongHashSet{
+
+  public static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashSetContainer.class);
+
+  private final VectorMapJoinFastLongHashSet[] vectorMapJoinFastLongHashSets;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final HashTableKeyType hashTableKeyType;
+  private final int numThreads;
+  private final boolean minMaxEnabled;
+
+  public VectorMapJoinFastLongHashSetContainer(
+      boolean isFullOuter,
+      boolean minMaxEnabled,
+      HashTableKeyType hashTableKeyType,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+      int numThreads) {
+    this.hashTableKeyType = hashTableKeyType;
+    this.vectorMapJoinFastLongHashSets = new VectorMapJoinFastLongHashSet[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      vectorMapJoinFastLongHashSets[i] =
+          new VectorMapJoinFastLongHashSet(isFullOuter, minMaxEnabled, hashTableKeyType, initialCapacity,
+              loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+    }
+    PrimitiveTypeInfo[] primitiveTypeInfos = { hashTableKeyType.getPrimitiveTypeInfo() };
+    this.keyBinarySortableDeserializeRead =
+        BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+    this.numThreads = numThreads;
+    this.minMaxEnabled = minMaxEnabled;
+  }
+
+  public VectorMapJoinHashSetResult createHashSetResult() {
+    return new VectorMapJoinFastHashSet.HashSetResult();
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+    try {
+      if (!keyBinarySortableDeserializeRead.readNextField()) {
+        return 0;
+      }
+    } catch (Exception e) {
+      throw new HiveException("\nDeserializeRead details: " +
+          keyBinarySortableDeserializeRead.getDetailedReadPositionString() + "\nException: " + e);
+    }
+    long key = VectorMapJoinFastLongHashUtil.deserializeLongKey(keyBinarySortableDeserializeRead, hashTableKeyType);
+    return HashCodeUtil.calculateLongHashCode(key);
+  }
+
+  @Override
+  public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException {
+    vectorMapJoinFastLongHashSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+  }
+
+  public JoinResult contains(long key, VectorMapJoinHashSetResult hashSetResult) {
+    long hashCode = HashCodeUtil.calculateLongHashCode(key);
+    return vectorMapJoinFastLongHashSets[(int) ((numThreads - 1) & hashCode)].contains(key, hashSetResult);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long estimatedMemorySize = 0;
+    for (int i=0; i<numThreads; ++i) {
+      estimatedMemorySize += vectorMapJoinFastLongHashSets[i].getEstimatedMemorySize();
+    }
+    return estimatedMemorySize;
+  }
+
+  @Override
+  public int size() {
+    int size = 0;
+    for (int i=0; i<numThreads; ++i) {
+      size += vectorMapJoinFastLongHashSets[i].size();
+    }
+    return size;
+  }
+
+  @Override
+  public MatchTracker createMatchTracker() {
+    int count = 0;
+    for (int i=0; i < numThreads; ++i) {
+      count += vectorMapJoinFastLongHashSets[i].logicalHashBucketCount;
+    }
+    return MatchTracker.create(count);
+  }
+
+  @Override
+  public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+    throw new RuntimeException("Not implemented");

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSetContainer.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashSet;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.BytesWritable;
+
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * An multi-key hash set optimized for vector map join.
+ *
+ * The key is stored as the provided bytes (uninterpreted).
+ */
+public class VectorMapJoinFastMultiKeyHashSetContainer
+    extends VectorMapJoinFastHashTableContainerBase implements VectorMapJoinBytesHashSet {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastMultiKeyHashSetContainer.class);
+
+  private final VectorMapJoinFastMultiKeyHashSet[] vectorMapJoinFastMultiKeyHashSets;
+  private BytesWritable testKeyBytesWritable;
+  private final int numThreads;
+
+  public VectorMapJoinFastMultiKeyHashSetContainer(
+      boolean isFullOuter,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, int numThreads) {
+    this.vectorMapJoinFastMultiKeyHashSets = new VectorMapJoinFastMultiKeyHashSet[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      this.vectorMapJoinFastMultiKeyHashSets[i] =
+          new VectorMapJoinFastMultiKeyHashSet(isFullOuter, initialCapacity, loadFactor, writeBuffersSize,
+              estimatedKeyCount);
+    }
+    this.numThreads = numThreads;
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    return HashCodeUtil.murmurHash(keyBytes, 0, keyLength);
+  }
+
+  @Override
+  public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException {
+    vectorMapJoinFastMultiKeyHashSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long estimatedMemorySize = 0;
+    for (int i=0; i<numThreads; ++i) {
+      estimatedMemorySize += vectorMapJoinFastMultiKeyHashSets[i].getEstimatedMemorySize();
+    }
+    return estimatedMemorySize;
+  }
+
+  @Override
+  public int size() {
+    int size = 0;
+    for (int i=0; i<numThreads; ++i) {
+      size += vectorMapJoinFastMultiKeyHashSets[i].size();
+    }
+    return size;
+  }
+
+  @Override
+  public MatchTracker createMatchTracker() {
+    int count = 0;
+    for (int i=0; i < numThreads; ++i) {
+      count += vectorMapJoinFastMultiKeyHashSets[i].logicalHashBucketCount;
+    }
+    return MatchTracker.create(count);
+  }
+
+  @Override
+  public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+    throw new RuntimeException("Not implemented");

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
##########
@@ -233,20 +221,34 @@ public boolean hasSpill() {
     return false;
   }
 
+  @Override
+  public boolean containsLongKey(long currentKey) {
+    return INSTANCE.containsLongKey(currentKey);
+  }
+
   @Override
   public int size() {
-    return vectorMapJoinFastHashTable.size();
+    return INSTANCE.size();
+  }
+
+  @Override
+  public MatchTracker createMatchTracker() {
+    throw new RuntimeException("Not applicable");
+  }
+
+  @Override
+  public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+    throw new RuntimeException("Not applicable");
+  }
+
+  @Override
+  public int spillPartitionId() {
+    throw new RuntimeException("Not applicable");

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSetContainer.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashSet;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * An single STRING key hash set optimized for vector map join.
+ *
+ * The key will be deserialized and just the bytes will be stored.
+ */
+public class VectorMapJoinFastStringHashSetContainer extends VectorMapJoinFastHashTableContainerBase implements
+    VectorMapJoinBytesHashSet {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastStringHashSetContainer.class);
+
+  private final VectorMapJoinFastStringHashSet[] vectorMapJoinFastStringHashSets;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final int numThreads;
+
+  public VectorMapJoinFastStringHashSetContainer(
+      boolean isFullOuter,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+      int numThreads) {
+    this.vectorMapJoinFastStringHashSets = new VectorMapJoinFastStringHashSet[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      vectorMapJoinFastStringHashSets[i] = new VectorMapJoinFastStringHashSet(
+          isFullOuter,
+          initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+    }
+    PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo };
+    this.keyBinarySortableDeserializeRead =
+        BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+    this.numThreads = numThreads;
+  }
+
+  @Override
+  public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException {
+    vectorMapJoinFastStringHashSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+    try {
+      if (!keyBinarySortableDeserializeRead.readNextField()) {
+        return 0;
+      }
+    } catch (Exception e) {
+      throw new HiveException("\nDeserializeRead details: " +
+          keyBinarySortableDeserializeRead.getDetailedReadPositionString() + "\nException: " + e);
+    }
+    return HashCodeUtil.murmurHash(
+        keyBinarySortableDeserializeRead.currentBytes,
+        keyBinarySortableDeserializeRead.currentBytesStart,
+        keyBinarySortableDeserializeRead.currentBytesLength);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long estimatedMemorySize = 0;
+    for (int i=0; i<numThreads; ++i) {
+      estimatedMemorySize += vectorMapJoinFastStringHashSets[i].getEstimatedMemorySize();
+    }
+    return estimatedMemorySize;
+  }
+
+  @Override
+  public int size() {
+    int size = 0;
+    for (int i=0; i<numThreads; ++i) {
+      size += vectorMapJoinFastStringHashSets[i].size();
+    }
+    return size;
+  }
+
+  @Override
+  public MatchTracker createMatchTracker() {
+    int count = 0;
+    for (int i=0; i < numThreads; ++i) {
+      count += vectorMapJoinFastStringHashSets[i].logicalHashBucketCount;
+    }
+    return MatchTracker.create(count);
+  }
+
+  @Override
+  public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public int spillPartitionId() {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public JoinUtil.JoinResult contains(byte[] keyBytes, int keyStart, int keyLength,
+      VectorMapJoinHashSetResult hashSetResult) throws IOException {
+    long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
+    return vectorMapJoinFastStringHashSets[(int) ((numThreads - 1) & hashCode)].contains(keyBytes, keyStart, keyLength, hashSetResult);
+  }
+
+  @Override
+  public VectorMapJoinHashSetResult createHashSetResult() {
+    return new VectorMapJoinFastBytesHashSetStore.HashSetResult();
+  }
+}

Review comment:
       Add new line

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -139,50 +237,72 @@ public void load(MapJoinTableContainer[] mapJoinTables,
           LOG.debug("Failed to get value for counter APPROXIMATE_INPUT_RECORDS", e);
         }
         long keyCount = Math.max(estKeyCount, inputRecords);
+        initHTLoadingService(keyCount);
 
-        VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
-                new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
+        VectorMapJoinFastTableContainer tableContainer =
+            new VectorMapJoinFastTableContainer(desc, hconf, keyCount, numLoadThreads);
 
         LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {} " +
                 "estKeyCount : {} keyCount : {}", inputName, cacheKey,
-                vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
+                tableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
+
+        tableContainer.setSerde(null, null); // No SerDes here.
+        // Submit parallel loading Threads
+        submitQueueDrainThreads(tableContainer);
 
-        vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        long receivedEntries = 0;
         long startTime = System.currentTimeMillis();
         while (kvReader.next()) {
-          vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
-              (BytesWritable)kvReader.getCurrentValue());
-          numEntries++;
-          if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
-              final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
-              if (estMemUsage > effectiveThreshold) {
-                String msg = "Hash table loading exceeded memory limits for input: " + inputName +
-                  " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
+          BytesWritable currentKey = (BytesWritable) kvReader.getCurrentKey();
+          BytesWritable currentValue = (BytesWritable) kvReader.getCurrentValue();
+          long hashCode = tableContainer.getHashCode(currentKey);
+          int partitionId = (int) ((numLoadThreads - 1) & hashCode);
+          // call getBytes as copy is called later
+          HashTableElement h = new HashTableElement(hashCode, currentValue.copyBytes(), currentKey.copyBytes());
+          if (elementBatches[partitionId].addElement(h)) {
+            loadBatchQueues[partitionId].add(elementBatches[partitionId]);
+            elementBatches[partitionId] = new HashTableElementBatch();
+          }
+          receivedEntries++;
+          if (doMemCheck && (receivedEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
+            final long estMemUsage = tableContainer.getEstimatedMemorySize();
+            if (estMemUsage > effectiveThreshold) {
+              String msg = "Hash table loading exceeded memory limits for input: " + inputName +
+                  " numEntries: " + receivedEntries + " estimatedMemoryUsage: " + estMemUsage +
                   " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
-                LOG.error(msg);
-                throw new MapJoinMemoryExhaustionError(msg);
-              } else {
-                if (LOG.isInfoEnabled()) {
-                  LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
-                      "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
+              LOG.error(msg);
+              throw new MapJoinMemoryExhaustionError(msg);
+            } else {
+              if (LOG.isInfoEnabled()) {

Review comment:
       Remove check.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -139,50 +237,72 @@ public void load(MapJoinTableContainer[] mapJoinTables,
           LOG.debug("Failed to get value for counter APPROXIMATE_INPUT_RECORDS", e);
         }
         long keyCount = Math.max(estKeyCount, inputRecords);
+        initHTLoadingService(keyCount);
 
-        VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
-                new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
+        VectorMapJoinFastTableContainer tableContainer =
+            new VectorMapJoinFastTableContainer(desc, hconf, keyCount, numLoadThreads);
 
         LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {} " +
                 "estKeyCount : {} keyCount : {}", inputName, cacheKey,
-                vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
+                tableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
+
+        tableContainer.setSerde(null, null); // No SerDes here.
+        // Submit parallel loading Threads
+        submitQueueDrainThreads(tableContainer);
 
-        vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        long receivedEntries = 0;
         long startTime = System.currentTimeMillis();
         while (kvReader.next()) {
-          vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
-              (BytesWritable)kvReader.getCurrentValue());
-          numEntries++;
-          if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
-              final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
-              if (estMemUsage > effectiveThreshold) {
-                String msg = "Hash table loading exceeded memory limits for input: " + inputName +
-                  " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
+          BytesWritable currentKey = (BytesWritable) kvReader.getCurrentKey();
+          BytesWritable currentValue = (BytesWritable) kvReader.getCurrentValue();
+          long hashCode = tableContainer.getHashCode(currentKey);
+          int partitionId = (int) ((numLoadThreads - 1) & hashCode);
+          // call getBytes as copy is called later
+          HashTableElement h = new HashTableElement(hashCode, currentValue.copyBytes(), currentKey.copyBytes());
+          if (elementBatches[partitionId].addElement(h)) {
+            loadBatchQueues[partitionId].add(elementBatches[partitionId]);
+            elementBatches[partitionId] = new HashTableElementBatch();
+          }
+          receivedEntries++;
+          if (doMemCheck && (receivedEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
+            final long estMemUsage = tableContainer.getEstimatedMemorySize();
+            if (estMemUsage > effectiveThreshold) {
+              String msg = "Hash table loading exceeded memory limits for input: " + inputName +
+                  " numEntries: " + receivedEntries + " estimatedMemoryUsage: " + estMemUsage +
                   " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
-                LOG.error(msg);
-                throw new MapJoinMemoryExhaustionError(msg);
-              } else {
-                if (LOG.isInfoEnabled()) {
-                  LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
-                      "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
+              LOG.error(msg);
+              throw new MapJoinMemoryExhaustionError(msg);

Review comment:
       Do not log-and-throw

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableContainerBase.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+
+// Base class of MultiHT implementations
+public abstract class VectorMapJoinFastHashTableContainerBase implements VectorMapJoinHashTable {
+
+  public abstract void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException;
+
+  public abstract long getHashCode(BytesWritable currentKey) throws HiveException, IOException;
+
+  public abstract long getEstimatedMemorySize();
+
+  public abstract int size();
+
+  // Method to be removed..
+  public boolean containsLongKey(long currentKey) {
+    throw new RuntimeException("Not supported!");
+  }

Review comment:
       Why is a 'unused' method being added?  If this is truly intentional, use `@deprecated` flags

##########
File path: itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/VectorFastHTBytesKeyBench.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig.MapJoinTestImplementation;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/*
+ * Build with "mvn clean install -DskipTests -Pperf" at main itests directory.
+ * From itests/hive-jmh directory, run:
+ *     java -jar -Xmx8g -Xms8g target/benchmarks.jar org.apache.hive.benchmark.vectorization.mapjoin.load.VectorFastHTLongKeyBench
+ *
+ *  {HASH_MAP, HASH_SET, HASH_MULTISET}
+ *    X
+ *  {NATIVE_VECTOR_FAST}
+ */
+public class VectorFastHTBytesKeyBench {
+
+  public static class HashMultiSetLargeFastVectorBench extends BytesKeyBase {
+    @Setup
+    public void setup() throws Exception {
+      System.out.println("Do Setup");
+      doSetup(VectorMapJoinVariation.INNER_BIG_ONLY, MapJoinTestImplementation.NATIVE_VECTOR_FAST, 20_000_000);
+    }
+
+    @TearDown(Level.Invocation)
+    public void doTearDown() {
+      System.out.println("Do TearDown");

Review comment:
       Use logging framework

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -73,6 +104,74 @@ public void init(ExecMapperContext context, MapredContext mrContext,
     this.htLoadCounter = tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup, counterName);
   }
 
+  private void initHTLoadingService(long estKeyCount) {
+    this.numLoadThreads = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINPARALELHASHTABLETHREADS);
+    this.totalEntries = new LongAccumulator(Long::sum, 0L);
+    this.loadExecService = Executors.newFixedThreadPool(numLoadThreads,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setPriority(Thread.NORM_PRIORITY)
+            .setNameFormat("HT-Load-Thread-%d")
+            .build());
+    this.elementBatches = new HashTableElementBatch[numLoadThreads];
+    this.loadBatchQueues = new BlockingQueue[numLoadThreads];
+    for (int i = 0; i < numLoadThreads; ++i) {
+      elementBatches[i] = new HashTableElementBatch();
+      loadBatchQueues[i] = new LinkedBlockingQueue();
+    }
+  }
+
+  private void submitQueueDrainThreads(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer)
+      throws InterruptedException, IOException, SerDeException {
+    for (int partitionId = 0; partitionId < numLoadThreads; partitionId++) {
+      int finalPartitionId = partitionId;
+      this.loadExecService.submit(() -> {
+        try {
+          LOG.info("Partition id {} with Queue size {}", finalPartitionId, loadBatchQueues[finalPartitionId].size());
+          drainAndLoadForPartition(finalPartitionId, vectorMapJoinFastTableContainer);
+        } catch (IOException | InterruptedException | SerDeException | HiveException e) {
+          LOG.error("Failed to start HT Load threads! {}", e);
+          throw new RuntimeException(e.getMessage(), e);
+        }
+      });
+    }
+  }
+
+  private void drainAndLoadForPartition(int partitionId, VectorMapJoinFastTableContainer tableContainer)
+      throws InterruptedException, IOException, HiveException, SerDeException {
+    LOG.info("Starting draining thread {}", partitionId);
+    long totalProcessedEntries = 0;
+    HashTableElementBatch batch = null;
+    while (batch != DONE_SENTINEL) {
+      batch = this.loadBatchQueues[partitionId].take();
+      LOG.debug("Draining thread {} batchSize {}", partitionId, batch.getSize());
+      for (int i = 0; i < batch.getSize(); i++) {
+        try {
+          HashTableElement h = batch.getBatch(i);
+          tableContainer.putRow(h.getHashCode(), h.getKey(), h.getValue());
+        }
+        catch(Exception e) {
+          LOG.info("Exception in draining thread put row: ", e);
+          throw new HiveException(e);

Review comment:
       Dot not log and throw.

##########
File path: itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LegacyVectorMapJoinFastHashTableLoader.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.exec.tez.TezContext;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastTableContainer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+public class LegacyVectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTableLoader {
+  private static final Logger LOG = LoggerFactory.getLogger(LegacyVectorMapJoinFastHashTableLoader.class.getName());
+  private Configuration hconf;
+  protected MapJoinDesc desc;
+  private TezContext tezContext;
+  private String cacheKey;
+  private TezCounter htLoadCounter;
+
+  public LegacyVectorMapJoinFastHashTableLoader(TezContext context, Configuration hconf, MapJoinOperator joinOp) {
+    this.tezContext = context;
+    this.hconf = hconf;
+    this.desc = (MapJoinDesc)joinOp.getConf();
+    this.cacheKey = joinOp.getCacheKey();
+    this.htLoadCounter = this.tezContext.getTezProcessorContext().getCounters().findCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP), hconf.get("__hive.context.name", ""));
+  }
+
+  @Override
+  public void init(ExecMapperContext context, MapredContext mrContext,
+      Configuration hconf, MapJoinOperator joinOp) {
+    this.tezContext = (TezContext) mrContext;
+    this.hconf = hconf;
+    this.desc = joinOp.getConf();
+    this.cacheKey = joinOp.getCacheKey();
+    String counterGroup = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP);
+    String vertexName = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+    String counterName = Utilities
+        .getVertexCounterName(HashTableLoaderCounters.HASHTABLE_LOAD_TIME_MS.name(), vertexName);
+    this.htLoadCounter = tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup, counterName);
+  }
+
+  @Override
+  public void load(MapJoinTableContainer[] mapJoinTables,
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes)
+      throws HiveException {
+
+    Map<Integer, String> parentToInput = desc.getParentToInput();
+    Map<Integer, Long> parentKeyCounts = desc.getParentKeyCounts();
+
+    MemoryMonitorInfo memoryMonitorInfo = desc.getMemoryMonitorInfo();
+    boolean doMemCheck = false;
+    long effectiveThreshold = 0;
+    if (memoryMonitorInfo != null) {
+      effectiveThreshold = memoryMonitorInfo.getEffectiveThreshold(desc.getMaxMemoryAvailable());
+
+      // hash table loading happens in server side, LlapDecider could kick out some fragments to run outside of LLAP.
+      // Flip the flag at runtime in case if we are running outside of LLAP
+      if (!LlapDaemonInfo.INSTANCE.isLlap()) {
+        memoryMonitorInfo.setLlap(false);
+      }
+      if (memoryMonitorInfo.doMemoryMonitoring()) {
+        doMemCheck = true;
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Memory monitoring for hash table loader enabled. {}", memoryMonitorInfo);
+        }
+      }
+    }
+
+    if (!doMemCheck) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Not doing hash table memory monitoring. {}", memoryMonitorInfo);
+      }
+    }
+    for (int pos = 0; pos < mapJoinTables.length; pos++) {
+      if (pos == desc.getPosBigTable()) {
+        continue;
+      }
+
+      long numEntries = 0;
+      String inputName = parentToInput.get(pos);
+      LogicalInput input = tezContext.getInput(inputName);
+
+      try {
+        input.start();
+        tezContext.getTezProcessorContext().waitForAnyInputReady(
+            Collections.<Input> singletonList(input));
+      } catch (Exception e) {
+        throw new HiveException(e);
+      }
+
+      try {
+        KeyValueReader kvReader = (KeyValueReader) input.getReader();
+
+        Long keyCountObj = parentKeyCounts.get(pos);
+        long estKeyCount = (keyCountObj == null) ? -1 : keyCountObj;
+
+        long inputRecords = -1;
+        try {
+          //TODO : Need to use class instead of string.
+          // https://issues.apache.org/jira/browse/HIVE-23981
+          inputRecords = ((AbstractLogicalInput) input).getContext().getCounters().
+              findCounter("org.apache.tez.common.counters.TaskCounter",
+                  "APPROXIMATE_INPUT_RECORDS").getValue();
+        } catch (Exception e) {
+          LOG.debug("Failed to get value for counter APPROXIMATE_INPUT_RECORDS", e);
+        }
+        long keyCount = Math.max(estKeyCount, inputRecords);
+
+        VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
+            new VectorMapJoinFastTableContainer(desc, hconf, keyCount, 1);
+
+        LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {} " +
+                "estKeyCount : {} keyCount : {}", inputName, cacheKey,
+            vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
+
+        vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        long startTime = System.currentTimeMillis();
+        while (kvReader.next()) {
+          vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
+              (BytesWritable)kvReader.getCurrentValue());
+          numEntries++;
+          if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
+            final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
+            if (estMemUsage > effectiveThreshold) {
+              String msg = "Hash table loading exceeded memory limits for input: " + inputName +
+                  " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
+                  " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
+              LOG.error(msg);
+              throw new MapJoinMemoryExhaustionError(msg);
+            } else {
+              if (LOG.isInfoEnabled()) {
+                LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
+                        "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
+                    effectiveThreshold);
+              }

Review comment:
       Remove `Log.isInforEnabled()` just added clutter that need not be here

##########
File path: itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/MultiKeyBase.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestDescription;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.RandomTypeUtil;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+import java.util.Random;
+
+public class MultiKeyBase extends AbstractHTLoadBench {
+
+  public void doSetup(VectorMapJoinDesc.VectorMapJoinVariation vectorMapJoinVariation,
+      MapJoinTestConfig.MapJoinTestImplementation mapJoinImplementation, int rows) throws Exception {
+    long seed = 2543;
+    int rowCount = rows;
+    HiveConf hiveConf = new HiveConf();
+    int[] bigTableKeyColumnNums = new int[] { 0, 1, 2};
+    String[] bigTableColumnNames = new String[] { "b1", "b2", "b3" };
+    TypeInfo[] bigTableTypeInfos = new TypeInfo[] {
+        TypeInfoFactory.intTypeInfo,
+        TypeInfoFactory.longTypeInfo,
+        TypeInfoFactory.stringTypeInfo
+    };
+    int[] smallTableRetainKeyColumnNums = new int[] {};
+    TypeInfo[] smallTableValueTypeInfos = new TypeInfo[] { TypeInfoFactory.stringTypeInfo };
+    MapJoinTestDescription.SmallTableGenerationParameters smallTableGenerationParameters =
+        new MapJoinTestDescription.SmallTableGenerationParameters();
+    smallTableGenerationParameters
+        .setValueOption(MapJoinTestDescription.SmallTableGenerationParameters.ValueOption.ONLY_ONE);
+    setupMapJoinHT(hiveConf, seed, rowCount, vectorMapJoinVariation, mapJoinImplementation, bigTableColumnNames,
+        bigTableTypeInfos, bigTableKeyColumnNums, smallTableValueTypeInfos, smallTableRetainKeyColumnNums,
+        smallTableGenerationParameters);
+    this.customKeyValueReader = generateByteKVPairs(rowCount, seed);
+  }
+
+  private static CustomKeyValueReader generateByteKVPairs(int rows, long seed) throws IOException {
+    System.out.println("Data GEN for: " + rows);
+    Random random = new Random(seed);
+    BytesWritable[] keys = new BytesWritable[rows];
+    BytesWritable[] values = new BytesWritable[rows];
+    BinarySortableSerializeWrite bsw = new BinarySortableSerializeWrite(1);
+    long startTime = System.currentTimeMillis();
+    ByteStream.Output outp;
+    BytesWritable key;
+    BytesWritable value;
+    int str_length = 8;
+    for (int i = 0; i < rows; i++) {
+      outp = new ByteStream.Output(str_length);
+      bsw.set(outp);
+      bsw.writeTimestamp(RandomTypeUtil.getRandTimestamp(random));
+      key = new BytesWritable(outp.getData(), outp.getLength());
+
+      bsw.reset();
+      for (int j = 0; j < str_length; j++) {
+        outp.writeByte(j, (byte) (random.nextInt(+'c' - 'a' + 1) + 'a'));
+      }
+      value = new BytesWritable(outp.getData(), outp.getLength());
+      keys[i] = key;
+      values[i] = value;
+    }
+    LOG.info("Data GEN done after {} sec", (System.currentTimeMillis() - startTime) / 1_000);

Review comment:
       Use `TimeUnit.Millis.toSeconds()` for clarity.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
##########
@@ -261,6 +261,15 @@ public void load(MapJoinTableContainer[] mapJoinTables,
 
         tableContainer.setSerde(keyCtx, valCtx);
         long startTime = System.currentTimeMillis();
+
+        /**
+         * TODO PANOS
+         * Seems like the right place for:
+         *  ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+         *  ...
+         *  drain...
+         *  Also key/value always seem to be treated like a ByteWritable underneath..
+         */

Review comment:
       No `todo` please.  Log it in JIRA

##########
File path: itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/load/LegacyVectorMapJoinFastHashTableLoader.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.benchmark.vectorization.mapjoin.load;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
+import org.apache.hadoop.hive.ql.exec.tez.TezContext;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastTableContainer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+public class LegacyVectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTableLoader {
+  private static final Logger LOG = LoggerFactory.getLogger(LegacyVectorMapJoinFastHashTableLoader.class.getName());
+  private Configuration hconf;
+  protected MapJoinDesc desc;
+  private TezContext tezContext;
+  private String cacheKey;
+  private TezCounter htLoadCounter;
+
+  public LegacyVectorMapJoinFastHashTableLoader(TezContext context, Configuration hconf, MapJoinOperator joinOp) {
+    this.tezContext = context;
+    this.hconf = hconf;
+    this.desc = (MapJoinDesc)joinOp.getConf();
+    this.cacheKey = joinOp.getCacheKey();
+    this.htLoadCounter = this.tezContext.getTezProcessorContext().getCounters().findCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP), hconf.get("__hive.context.name", ""));
+  }
+
+  @Override
+  public void init(ExecMapperContext context, MapredContext mrContext,
+      Configuration hconf, MapJoinOperator joinOp) {
+    this.tezContext = (TezContext) mrContext;
+    this.hconf = hconf;
+    this.desc = joinOp.getConf();
+    this.cacheKey = joinOp.getCacheKey();
+    String counterGroup = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP);
+    String vertexName = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+    String counterName = Utilities
+        .getVertexCounterName(HashTableLoaderCounters.HASHTABLE_LOAD_TIME_MS.name(), vertexName);
+    this.htLoadCounter = tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup, counterName);
+  }
+
+  @Override
+  public void load(MapJoinTableContainer[] mapJoinTables,
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes)
+      throws HiveException {
+
+    Map<Integer, String> parentToInput = desc.getParentToInput();
+    Map<Integer, Long> parentKeyCounts = desc.getParentKeyCounts();
+
+    MemoryMonitorInfo memoryMonitorInfo = desc.getMemoryMonitorInfo();
+    boolean doMemCheck = false;
+    long effectiveThreshold = 0;
+    if (memoryMonitorInfo != null) {
+      effectiveThreshold = memoryMonitorInfo.getEffectiveThreshold(desc.getMaxMemoryAvailable());
+
+      // hash table loading happens in server side, LlapDecider could kick out some fragments to run outside of LLAP.
+      // Flip the flag at runtime in case if we are running outside of LLAP
+      if (!LlapDaemonInfo.INSTANCE.isLlap()) {
+        memoryMonitorInfo.setLlap(false);
+      }
+      if (memoryMonitorInfo.doMemoryMonitoring()) {
+        doMemCheck = true;
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Memory monitoring for hash table loader enabled. {}", memoryMonitorInfo);
+        }

Review comment:
       Remove `Log.isInforEnabled()` just added clutter that need not be here

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -73,6 +104,74 @@ public void init(ExecMapperContext context, MapredContext mrContext,
     this.htLoadCounter = tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup, counterName);
   }
 
+  private void initHTLoadingService(long estKeyCount) {
+    this.numLoadThreads = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINPARALELHASHTABLETHREADS);
+    this.totalEntries = new LongAccumulator(Long::sum, 0L);
+    this.loadExecService = Executors.newFixedThreadPool(numLoadThreads,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setPriority(Thread.NORM_PRIORITY)
+            .setNameFormat("HT-Load-Thread-%d")
+            .build());
+    this.elementBatches = new HashTableElementBatch[numLoadThreads];
+    this.loadBatchQueues = new BlockingQueue[numLoadThreads];
+    for (int i = 0; i < numLoadThreads; ++i) {
+      elementBatches[i] = new HashTableElementBatch();
+      loadBatchQueues[i] = new LinkedBlockingQueue();
+    }
+  }
+
+  private void submitQueueDrainThreads(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer)
+      throws InterruptedException, IOException, SerDeException {
+    for (int partitionId = 0; partitionId < numLoadThreads; partitionId++) {
+      int finalPartitionId = partitionId;
+      this.loadExecService.submit(() -> {
+        try {
+          LOG.info("Partition id {} with Queue size {}", finalPartitionId, loadBatchQueues[finalPartitionId].size());
+          drainAndLoadForPartition(finalPartitionId, vectorMapJoinFastTableContainer);
+        } catch (IOException | InterruptedException | SerDeException | HiveException e) {
+          LOG.error("Failed to start HT Load threads! {}", e);
+          throw new RuntimeException(e.getMessage(), e);

Review comment:
       Do not log-and-throw
   
   `throw new RuntimeException("Failed to start HT Load threads", e)`;

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -139,50 +237,72 @@ public void load(MapJoinTableContainer[] mapJoinTables,
           LOG.debug("Failed to get value for counter APPROXIMATE_INPUT_RECORDS", e);
         }
         long keyCount = Math.max(estKeyCount, inputRecords);
+        initHTLoadingService(keyCount);
 
-        VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
-                new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
+        VectorMapJoinFastTableContainer tableContainer =
+            new VectorMapJoinFastTableContainer(desc, hconf, keyCount, numLoadThreads);
 
         LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {} " +
                 "estKeyCount : {} keyCount : {}", inputName, cacheKey,
-                vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
+                tableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
+
+        tableContainer.setSerde(null, null); // No SerDes here.
+        // Submit parallel loading Threads
+        submitQueueDrainThreads(tableContainer);
 
-        vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        long receivedEntries = 0;
         long startTime = System.currentTimeMillis();
         while (kvReader.next()) {
-          vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
-              (BytesWritable)kvReader.getCurrentValue());
-          numEntries++;
-          if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
-              final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
-              if (estMemUsage > effectiveThreshold) {
-                String msg = "Hash table loading exceeded memory limits for input: " + inputName +
-                  " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
+          BytesWritable currentKey = (BytesWritable) kvReader.getCurrentKey();
+          BytesWritable currentValue = (BytesWritable) kvReader.getCurrentValue();
+          long hashCode = tableContainer.getHashCode(currentKey);
+          int partitionId = (int) ((numLoadThreads - 1) & hashCode);
+          // call getBytes as copy is called later
+          HashTableElement h = new HashTableElement(hashCode, currentValue.copyBytes(), currentKey.copyBytes());
+          if (elementBatches[partitionId].addElement(h)) {
+            loadBatchQueues[partitionId].add(elementBatches[partitionId]);
+            elementBatches[partitionId] = new HashTableElementBatch();
+          }
+          receivedEntries++;
+          if (doMemCheck && (receivedEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
+            final long estMemUsage = tableContainer.getEstimatedMemorySize();
+            if (estMemUsage > effectiveThreshold) {
+              String msg = "Hash table loading exceeded memory limits for input: " + inputName +
+                  " numEntries: " + receivedEntries + " estimatedMemoryUsage: " + estMemUsage +
                   " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
-                LOG.error(msg);
-                throw new MapJoinMemoryExhaustionError(msg);
-              } else {
-                if (LOG.isInfoEnabled()) {
-                  LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
-                      "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
+              LOG.error(msg);
+              throw new MapJoinMemoryExhaustionError(msg);
+            } else {
+              if (LOG.isInfoEnabled()) {
+                LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
+                        "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, receivedEntries, estMemUsage,
                     effectiveThreshold);
-                }
               }
+            }
           }
         }
+
+        LOG.info("Finished loading the queue for input: {} endTime : {}", inputName, System.currentTimeMillis());
+        addQueueDoneSentinel();
+        loadExecService.shutdown();
+        loadExecService.awaitTermination(5 * 60, TimeUnit.SECONDS);
+        LOG.info("Total received entries: {} Threads {} HT entries: {}", receivedEntries, numLoadThreads, totalEntries.get());
+
         long delta = System.currentTimeMillis() - startTime;
         htLoadCounter.increment(delta);
 
-        vectorMapJoinFastTableContainer.seal();
-        mapJoinTables[pos] = vectorMapJoinFastTableContainer;
+        tableContainer.seal();
+        mapJoinTables[pos] = tableContainer;
         if (doMemCheck) {
           LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {} " +
-              "estimatedMemoryUsage: {} Load Time : {} ", inputName, cacheKey, numEntries,
-            vectorMapJoinFastTableContainer.getEstimatedMemorySize(), delta);
+              "estimatedMemoryUsage: {} Load Time : {} ", inputName, cacheKey, receivedEntries,
+            tableContainer.getEstimatedMemorySize(), delta);
         } else {
           LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {} Load Time : {} ",
-                  inputName, cacheKey, numEntries, delta);
+                  inputName, cacheKey, receivedEntries, delta);
         }
+      } catch (InterruptedException e) {
+        throw new HiveException(e);

Review comment:
       Reset the thread's Interrupted status.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMapContainer.java
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.common.MemoryEstimate;
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMap;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * An single LONG key hash map optimized for vector map join.
+ */
+public class VectorMapJoinFastLongHashMapContainer extends VectorMapJoinFastHashTableContainerBase implements
+    VectorMapJoinLongHashMap, MemoryEstimate {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMapContainer.class);
+
+  private final VectorMapJoinFastLongHashMap[] vectorMapJoinFastLongHashMaps;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final HashTableKeyType hashTableKeyType;
+  private final int numThreads;
+  private final boolean minMaxEnabled;
+
+  public VectorMapJoinFastLongHashMapContainer(
+      boolean isFullOuter,
+      boolean minMaxEnabled,
+      HashTableKeyType hashTableKeyType,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+      int numThreads) {
+    this.hashTableKeyType = hashTableKeyType;
+    this.vectorMapJoinFastLongHashMaps = new VectorMapJoinFastLongHashMap[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      vectorMapJoinFastLongHashMaps[i] =
+          new VectorMapJoinFastLongHashMap(isFullOuter, minMaxEnabled, hashTableKeyType, initialCapacity, loadFactor,
+              writeBuffersSize, estimatedKeyCount, tableDesc);
+    }
+    PrimitiveTypeInfo[] primitiveTypeInfos = { hashTableKeyType.getPrimitiveTypeInfo() };
+    this.keyBinarySortableDeserializeRead =
+        BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+    this.numThreads = numThreads;
+    this.minMaxEnabled = minMaxEnabled;
+  }
+
+  @Override
+  public boolean useMinMax() {
+    return minMaxEnabled;
+  }
+
+  @Override
+  public long min() {
+    long min = Long.MAX_VALUE;
+    for (int i = 0; i < numThreads; ++i) {
+      long currentMin = vectorMapJoinFastLongHashMaps[i].min();
+      if (min > currentMin) {
+        min = currentMin;
+      }
+    }
+    return min;
+  }
+
+  @Override
+  public long max() {
+    long max = Long.MIN_VALUE;
+    for (int i = 0; i < numThreads; ++i) {
+      long currentMax = vectorMapJoinFastLongHashMaps[i].max();
+      if (max > currentMax) {
+        max = currentMax;
+      }
+    }
+    return max;
+  }
+
+  public static class NonMatchedLongHashMapIterator extends VectorMapJoinFastNonMatchedIterator {
+
+    private VectorMapJoinFastLongHashMap.NonMatchedLongHashMapIterator[] hashMapIterators;
+    private int index;
+    private int numThreads;
+
+    private NonMatchedLongHashMapIterator(MatchTracker matchTracker,
+        VectorMapJoinFastLongHashMap[] vectorMapJoinFastLongHashMaps, int numThreads) {
+      super(matchTracker);
+      hashMapIterators = new VectorMapJoinFastLongHashMap.NonMatchedLongHashMapIterator[numThreads];
+      for (int i = 0; i < numThreads; ++i) {
+        hashMapIterators[i] = new VectorMapJoinFastLongHashMap.NonMatchedLongHashMapIterator(matchTracker,
+            vectorMapJoinFastLongHashMaps[i]);
+      }
+      index = 0;
+      this.numThreads = numThreads;
+    }
+
+    public void init() {
+      for (int i = 0; i < numThreads; ++i) {
+        hashMapIterators[i].init();
+      }
+      index = 0;
+    }
+
+    public boolean findNextNonMatched() {
+      for (; index < numThreads; ++index) {
+        if (hashMapIterators[index].findNextNonMatched()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public boolean readNonMatchedLongKey() {
+      return hashMapIterators[index].readNonMatchedLongKey();
+    }
+
+    public long getNonMatchedLongKey() {
+      return hashMapIterators[index].getNonMatchedLongKey();
+    }
+
+    public VectorMapJoinHashMapResult getNonMatchedHashMapResult() {
+      return hashMapIterators[index].getNonMatchedHashMapResult();
+    }
+  }
+
+  public VectorMapJoinHashMapResult createHashMapResult() {
+    return new VectorMapJoinFastValueStore.HashMapResult();
+  }
+
+  @Override
+  public int spillPartitionId() {
+    throw new RuntimeException("Not implemented");
+  }

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
##########
@@ -80,30 +74,23 @@ public boolean adaptPutRow(BytesWritable currentKey, BytesWritable currentValue)
         return false;
       }
     } catch (Exception e) {
-      throw new HiveException(
-          "\nDeserializeRead details: " +
-              keyBinarySortableDeserializeRead.getDetailedReadPositionString() +
-          "\nException: " + e.toString());
+      throw new HiveException("\nDeserializeRead details: " +
+          keyBinarySortableDeserializeRead.getDetailedReadPositionString() + "\nException: " + e);

Review comment:
       Wrap exception, ditch the new-lines

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -139,50 +237,72 @@ public void load(MapJoinTableContainer[] mapJoinTables,
           LOG.debug("Failed to get value for counter APPROXIMATE_INPUT_RECORDS", e);
         }
         long keyCount = Math.max(estKeyCount, inputRecords);
+        initHTLoadingService(keyCount);
 
-        VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
-                new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
+        VectorMapJoinFastTableContainer tableContainer =
+            new VectorMapJoinFastTableContainer(desc, hconf, keyCount, numLoadThreads);
 
         LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {} " +
                 "estKeyCount : {} keyCount : {}", inputName, cacheKey,
-                vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
+                tableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
+
+        tableContainer.setSerde(null, null); // No SerDes here.
+        // Submit parallel loading Threads
+        submitQueueDrainThreads(tableContainer);
 
-        vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        long receivedEntries = 0;
         long startTime = System.currentTimeMillis();
         while (kvReader.next()) {
-          vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
-              (BytesWritable)kvReader.getCurrentValue());
-          numEntries++;
-          if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
-              final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
-              if (estMemUsage > effectiveThreshold) {
-                String msg = "Hash table loading exceeded memory limits for input: " + inputName +
-                  " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
+          BytesWritable currentKey = (BytesWritable) kvReader.getCurrentKey();
+          BytesWritable currentValue = (BytesWritable) kvReader.getCurrentValue();
+          long hashCode = tableContainer.getHashCode(currentKey);
+          int partitionId = (int) ((numLoadThreads - 1) & hashCode);
+          // call getBytes as copy is called later
+          HashTableElement h = new HashTableElement(hashCode, currentValue.copyBytes(), currentKey.copyBytes());
+          if (elementBatches[partitionId].addElement(h)) {
+            loadBatchQueues[partitionId].add(elementBatches[partitionId]);
+            elementBatches[partitionId] = new HashTableElementBatch();
+          }
+          receivedEntries++;
+          if (doMemCheck && (receivedEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
+            final long estMemUsage = tableContainer.getEstimatedMemorySize();
+            if (estMemUsage > effectiveThreshold) {
+              String msg = "Hash table loading exceeded memory limits for input: " + inputName +
+                  " numEntries: " + receivedEntries + " estimatedMemoryUsage: " + estMemUsage +
                   " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
-                LOG.error(msg);
-                throw new MapJoinMemoryExhaustionError(msg);
-              } else {
-                if (LOG.isInfoEnabled()) {
-                  LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
-                      "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
+              LOG.error(msg);
+              throw new MapJoinMemoryExhaustionError(msg);
+            } else {
+              if (LOG.isInfoEnabled()) {
+                LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
+                        "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, receivedEntries, estMemUsage,
                     effectiveThreshold);
-                }
               }
+            }
           }
         }
+
+        LOG.info("Finished loading the queue for input: {} endTime : {}", inputName, System.currentTimeMillis());

Review comment:
       No need to include 'endTime'. The logger itself will print the time of the message.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
##########
@@ -139,50 +237,72 @@ public void load(MapJoinTableContainer[] mapJoinTables,
           LOG.debug("Failed to get value for counter APPROXIMATE_INPUT_RECORDS", e);
         }
         long keyCount = Math.max(estKeyCount, inputRecords);
+        initHTLoadingService(keyCount);
 
-        VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
-                new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
+        VectorMapJoinFastTableContainer tableContainer =
+            new VectorMapJoinFastTableContainer(desc, hconf, keyCount, numLoadThreads);
 
         LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {} " +
                 "estKeyCount : {} keyCount : {}", inputName, cacheKey,
-                vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
+                tableContainer.getClass().getSimpleName(), pos, estKeyCount, keyCount);
+
+        tableContainer.setSerde(null, null); // No SerDes here.
+        // Submit parallel loading Threads
+        submitQueueDrainThreads(tableContainer);
 
-        vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
+        long receivedEntries = 0;
         long startTime = System.currentTimeMillis();
         while (kvReader.next()) {
-          vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
-              (BytesWritable)kvReader.getCurrentValue());
-          numEntries++;
-          if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
-              final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
-              if (estMemUsage > effectiveThreshold) {
-                String msg = "Hash table loading exceeded memory limits for input: " + inputName +
-                  " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
+          BytesWritable currentKey = (BytesWritable) kvReader.getCurrentKey();
+          BytesWritable currentValue = (BytesWritable) kvReader.getCurrentValue();
+          long hashCode = tableContainer.getHashCode(currentKey);
+          int partitionId = (int) ((numLoadThreads - 1) & hashCode);
+          // call getBytes as copy is called later
+          HashTableElement h = new HashTableElement(hashCode, currentValue.copyBytes(), currentKey.copyBytes());
+          if (elementBatches[partitionId].addElement(h)) {
+            loadBatchQueues[partitionId].add(elementBatches[partitionId]);
+            elementBatches[partitionId] = new HashTableElementBatch();
+          }
+          receivedEntries++;
+          if (doMemCheck && (receivedEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
+            final long estMemUsage = tableContainer.getEstimatedMemorySize();
+            if (estMemUsage > effectiveThreshold) {
+              String msg = "Hash table loading exceeded memory limits for input: " + inputName +
+                  " numEntries: " + receivedEntries + " estimatedMemoryUsage: " + estMemUsage +
                   " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
-                LOG.error(msg);
-                throw new MapJoinMemoryExhaustionError(msg);
-              } else {
-                if (LOG.isInfoEnabled()) {
-                  LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
-                      "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
+              LOG.error(msg);
+              throw new MapJoinMemoryExhaustionError(msg);
+            } else {
+              if (LOG.isInfoEnabled()) {
+                LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
+                        "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, receivedEntries, estMemUsage,
                     effectiveThreshold);
-                }
               }
+            }
           }
         }
+
+        LOG.info("Finished loading the queue for input: {} endTime : {}", inputName, System.currentTimeMillis());
+        addQueueDoneSentinel();
+        loadExecService.shutdown();
+        loadExecService.awaitTermination(5 * 60, TimeUnit.SECONDS);

Review comment:
       This is an arbitrary time, and a long time at that (5 minutes).  Use `TimeUnit,MINUTES` for starters, and make sure there is a log message stating that there will be a max 5 minutes to wait for a thread.  This will otherwise look like a stalled process to an observer.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
##########
@@ -69,10 +69,10 @@ public int spillPartitionId() {
   }
 
   @Override
-  public void putRow(BytesWritable currentKey, BytesWritable currentValue)
+  public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
       throws SerDeException, HiveException, IOException {
-
-    putRowInternal(currentKey, currentValue);
+    // Method only supported by FAST HashTable implementations
+    throw new RuntimeException("Not implemented");

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMapContainer.java
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.common.MemoryEstimate;
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMap;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * An single LONG key hash map optimized for vector map join.
+ */
+public class VectorMapJoinFastLongHashMapContainer extends VectorMapJoinFastHashTableContainerBase implements
+    VectorMapJoinLongHashMap, MemoryEstimate {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMapContainer.class);
+
+  private final VectorMapJoinFastLongHashMap[] vectorMapJoinFastLongHashMaps;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final HashTableKeyType hashTableKeyType;
+  private final int numThreads;
+  private final boolean minMaxEnabled;
+
+  public VectorMapJoinFastLongHashMapContainer(
+      boolean isFullOuter,
+      boolean minMaxEnabled,
+      HashTableKeyType hashTableKeyType,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+      int numThreads) {
+    this.hashTableKeyType = hashTableKeyType;
+    this.vectorMapJoinFastLongHashMaps = new VectorMapJoinFastLongHashMap[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      vectorMapJoinFastLongHashMaps[i] =
+          new VectorMapJoinFastLongHashMap(isFullOuter, minMaxEnabled, hashTableKeyType, initialCapacity, loadFactor,
+              writeBuffersSize, estimatedKeyCount, tableDesc);
+    }
+    PrimitiveTypeInfo[] primitiveTypeInfos = { hashTableKeyType.getPrimitiveTypeInfo() };
+    this.keyBinarySortableDeserializeRead =
+        BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+    this.numThreads = numThreads;
+    this.minMaxEnabled = minMaxEnabled;
+  }
+
+  @Override
+  public boolean useMinMax() {
+    return minMaxEnabled;
+  }
+
+  @Override
+  public long min() {
+    long min = Long.MAX_VALUE;
+    for (int i = 0; i < numThreads; ++i) {
+      long currentMin = vectorMapJoinFastLongHashMaps[i].min();
+      if (min > currentMin) {
+        min = currentMin;
+      }
+    }
+    return min;
+  }
+
+  @Override
+  public long max() {
+    long max = Long.MIN_VALUE;
+    for (int i = 0; i < numThreads; ++i) {
+      long currentMax = vectorMapJoinFastLongHashMaps[i].max();
+      if (max > currentMax) {
+        max = currentMax;
+      }
+    }
+    return max;
+  }
+
+  public static class NonMatchedLongHashMapIterator extends VectorMapJoinFastNonMatchedIterator {
+
+    private VectorMapJoinFastLongHashMap.NonMatchedLongHashMapIterator[] hashMapIterators;
+    private int index;
+    private int numThreads;
+
+    private NonMatchedLongHashMapIterator(MatchTracker matchTracker,
+        VectorMapJoinFastLongHashMap[] vectorMapJoinFastLongHashMaps, int numThreads) {
+      super(matchTracker);
+      hashMapIterators = new VectorMapJoinFastLongHashMap.NonMatchedLongHashMapIterator[numThreads];
+      for (int i = 0; i < numThreads; ++i) {
+        hashMapIterators[i] = new VectorMapJoinFastLongHashMap.NonMatchedLongHashMapIterator(matchTracker,
+            vectorMapJoinFastLongHashMaps[i]);
+      }
+      index = 0;
+      this.numThreads = numThreads;
+    }
+
+    public void init() {
+      for (int i = 0; i < numThreads; ++i) {
+        hashMapIterators[i].init();
+      }
+      index = 0;
+    }
+
+    public boolean findNextNonMatched() {
+      for (; index < numThreads; ++index) {
+        if (hashMapIterators[index].findNextNonMatched()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public boolean readNonMatchedLongKey() {
+      return hashMapIterators[index].readNonMatchedLongKey();
+    }
+
+    public long getNonMatchedLongKey() {
+      return hashMapIterators[index].getNonMatchedLongKey();
+    }
+
+    public VectorMapJoinHashMapResult getNonMatchedHashMapResult() {
+      return hashMapIterators[index].getNonMatchedHashMapResult();
+    }
+  }
+
+  public VectorMapJoinHashMapResult createHashMapResult() {
+    return new VectorMapJoinFastValueStore.HashMapResult();
+  }
+
+  @Override
+  public int spillPartitionId() {
+    throw new RuntimeException("Not implemented");
+  }
+
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+    try {
+      if (!keyBinarySortableDeserializeRead.readNextField()) {
+        return 0;
+      }
+    } catch(Exception e) {
+      throw new HiveException("\nDeserializeRead details: " +
+          keyBinarySortableDeserializeRead.getDetailedReadPositionString() + "\nException: " + e);

Review comment:
       No new-line characters in messages.  It make the logging harder to handle.
   Also, do not include the Exception message like this. Pass the entire exception into the wrapper:
   
   `throw New HiveException("This is my error message", e);

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSetContainer.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMultiSet;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+
+/*
+ * An single LONG key hash multi-set optimized for vector map join.
+ */
+public class VectorMapJoinFastLongHashMultiSetContainer extends VectorMapJoinFastHashTableContainerBase implements
+    VectorMapJoinLongHashMultiSet {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMultiSetContainer.class);
+
+  private final VectorMapJoinFastLongHashMultiSet[] vectorMapJoinFastLongHashMultiSets;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final HashTableKeyType hashTableKeyType;
+  private final int numThreads;
+  private final boolean minMaxEnabled;
+
+  public VectorMapJoinFastLongHashMultiSetContainer(
+      boolean isFullOuter,
+      boolean minMaxEnabled,
+      HashTableKeyType hashTableKeyType,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+      int numThreads) {
+    this.hashTableKeyType = hashTableKeyType;
+    this.vectorMapJoinFastLongHashMultiSets = new VectorMapJoinFastLongHashMultiSet[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      vectorMapJoinFastLongHashMultiSets[i] = new VectorMapJoinFastLongHashMultiSet(isFullOuter,
+          minMaxEnabled, hashTableKeyType,
+          initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+    }
+    PrimitiveTypeInfo[] primitiveTypeInfos = { hashTableKeyType.getPrimitiveTypeInfo() };
+    this.keyBinarySortableDeserializeRead =
+        BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+    this.numThreads = numThreads;
+    this.minMaxEnabled = minMaxEnabled;
+  }
+
+  public VectorMapJoinHashMultiSetResult createHashMultiSetResult() {
+    return new VectorMapJoinFastHashMultiSet.HashMultiSetResult();
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+    try {
+      if (!keyBinarySortableDeserializeRead.readNextField()) {
+        return 0;
+      }
+    } catch (Exception e) {
+      throw new HiveException("\nDeserializeRead details: " +
+          keyBinarySortableDeserializeRead.getDetailedReadPositionString() + "\nException: " + e);
+    }
+    long key = VectorMapJoinFastLongHashUtil.deserializeLongKey(keyBinarySortableDeserializeRead, hashTableKeyType);
+    return HashCodeUtil.calculateLongHashCode(key);
+  }
+
+  @Override
+  public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException {
+    vectorMapJoinFastLongHashMultiSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+  }
+
+  public JoinUtil.JoinResult contains(long key, VectorMapJoinHashMultiSetResult hashMultiSetResult) {
+    long hashCode = HashCodeUtil.calculateLongHashCode(key);
+    return vectorMapJoinFastLongHashMultiSets[(int) ((numThreads - 1) & hashCode)].contains(key, hashMultiSetResult);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long estimatedMemorySize = 0;
+    for (int i=0; i<numThreads; ++i) {
+      estimatedMemorySize += vectorMapJoinFastLongHashMultiSets[i].getEstimatedMemorySize();
+    }
+    return estimatedMemorySize;
+  }
+
+  @Override
+  public int size() {
+    int size = 0;
+    for (int i=0; i<numThreads; ++i) {
+      size += vectorMapJoinFastLongHashMultiSets[i].size();
+    }
+    return size;
+  }
+
+  @Override
+  public MatchTracker createMatchTracker() {
+    int count = 0;
+    for (int i=0; i < numThreads; ++i) {
+      count += vectorMapJoinFastLongHashMultiSets[i].logicalHashBucketCount;
+    }
+    return MatchTracker.create(count);
+  }
+
+  @Override
+  public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public int spillPartitionId() {
+    throw new RuntimeException("Not implemented");

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSetContainer.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMultiSet;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+
+/*
+ * An single LONG key hash multi-set optimized for vector map join.
+ */
+public class VectorMapJoinFastLongHashMultiSetContainer extends VectorMapJoinFastHashTableContainerBase implements
+    VectorMapJoinLongHashMultiSet {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMultiSetContainer.class);
+
+  private final VectorMapJoinFastLongHashMultiSet[] vectorMapJoinFastLongHashMultiSets;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final HashTableKeyType hashTableKeyType;
+  private final int numThreads;
+  private final boolean minMaxEnabled;
+
+  public VectorMapJoinFastLongHashMultiSetContainer(
+      boolean isFullOuter,
+      boolean minMaxEnabled,
+      HashTableKeyType hashTableKeyType,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+      int numThreads) {
+    this.hashTableKeyType = hashTableKeyType;
+    this.vectorMapJoinFastLongHashMultiSets = new VectorMapJoinFastLongHashMultiSet[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      vectorMapJoinFastLongHashMultiSets[i] = new VectorMapJoinFastLongHashMultiSet(isFullOuter,
+          minMaxEnabled, hashTableKeyType,
+          initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+    }
+    PrimitiveTypeInfo[] primitiveTypeInfos = { hashTableKeyType.getPrimitiveTypeInfo() };
+    this.keyBinarySortableDeserializeRead =
+        BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+    this.numThreads = numThreads;
+    this.minMaxEnabled = minMaxEnabled;
+  }
+
+  public VectorMapJoinHashMultiSetResult createHashMultiSetResult() {
+    return new VectorMapJoinFastHashMultiSet.HashMultiSetResult();
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+    try {
+      if (!keyBinarySortableDeserializeRead.readNextField()) {
+        return 0;
+      }
+    } catch (Exception e) {
+      throw new HiveException("\nDeserializeRead details: " +
+          keyBinarySortableDeserializeRead.getDetailedReadPositionString() + "\nException: " + e);
+    }
+    long key = VectorMapJoinFastLongHashUtil.deserializeLongKey(keyBinarySortableDeserializeRead, hashTableKeyType);
+    return HashCodeUtil.calculateLongHashCode(key);
+  }
+
+  @Override
+  public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException {
+    vectorMapJoinFastLongHashMultiSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+  }
+
+  public JoinUtil.JoinResult contains(long key, VectorMapJoinHashMultiSetResult hashMultiSetResult) {
+    long hashCode = HashCodeUtil.calculateLongHashCode(key);
+    return vectorMapJoinFastLongHashMultiSets[(int) ((numThreads - 1) & hashCode)].contains(key, hashMultiSetResult);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long estimatedMemorySize = 0;
+    for (int i=0; i<numThreads; ++i) {
+      estimatedMemorySize += vectorMapJoinFastLongHashMultiSets[i].getEstimatedMemorySize();
+    }
+    return estimatedMemorySize;
+  }
+
+  @Override
+  public int size() {
+    int size = 0;
+    for (int i=0; i<numThreads; ++i) {
+      size += vectorMapJoinFastLongHashMultiSets[i].size();
+    }
+    return size;
+  }
+
+  @Override
+  public MatchTracker createMatchTracker() {
+    int count = 0;
+    for (int i=0; i < numThreads; ++i) {
+      count += vectorMapJoinFastLongHashMultiSets[i].logicalHashBucketCount;
+    }
+    return MatchTracker.create(count);
+  }
+
+  @Override
+  public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+    throw new RuntimeException("Not implemented");

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSetContainer.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.JoinUtil.JoinResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashSet;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+
+/*
+ * An single LONG key hash set optimized for vector map join.
+ */
+public class VectorMapJoinFastLongHashSetContainer extends VectorMapJoinFastHashTableContainerBase implements VectorMapJoinLongHashSet{
+
+  public static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashSetContainer.class);
+
+  private final VectorMapJoinFastLongHashSet[] vectorMapJoinFastLongHashSets;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final HashTableKeyType hashTableKeyType;
+  private final int numThreads;
+  private final boolean minMaxEnabled;
+
+  public VectorMapJoinFastLongHashSetContainer(
+      boolean isFullOuter,
+      boolean minMaxEnabled,
+      HashTableKeyType hashTableKeyType,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+      int numThreads) {
+    this.hashTableKeyType = hashTableKeyType;
+    this.vectorMapJoinFastLongHashSets = new VectorMapJoinFastLongHashSet[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      vectorMapJoinFastLongHashSets[i] =
+          new VectorMapJoinFastLongHashSet(isFullOuter, minMaxEnabled, hashTableKeyType, initialCapacity,
+              loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+    }
+    PrimitiveTypeInfo[] primitiveTypeInfos = { hashTableKeyType.getPrimitiveTypeInfo() };
+    this.keyBinarySortableDeserializeRead =
+        BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+    this.numThreads = numThreads;
+    this.minMaxEnabled = minMaxEnabled;
+  }
+
+  public VectorMapJoinHashSetResult createHashSetResult() {
+    return new VectorMapJoinFastHashSet.HashSetResult();
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+    try {
+      if (!keyBinarySortableDeserializeRead.readNextField()) {
+        return 0;
+      }
+    } catch (Exception e) {
+      throw new HiveException("\nDeserializeRead details: " +
+          keyBinarySortableDeserializeRead.getDetailedReadPositionString() + "\nException: " + e);
+    }
+    long key = VectorMapJoinFastLongHashUtil.deserializeLongKey(keyBinarySortableDeserializeRead, hashTableKeyType);
+    return HashCodeUtil.calculateLongHashCode(key);
+  }
+
+  @Override
+  public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException {
+    vectorMapJoinFastLongHashSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+  }
+
+  public JoinResult contains(long key, VectorMapJoinHashSetResult hashSetResult) {
+    long hashCode = HashCodeUtil.calculateLongHashCode(key);
+    return vectorMapJoinFastLongHashSets[(int) ((numThreads - 1) & hashCode)].contains(key, hashSetResult);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long estimatedMemorySize = 0;
+    for (int i=0; i<numThreads; ++i) {
+      estimatedMemorySize += vectorMapJoinFastLongHashSets[i].getEstimatedMemorySize();
+    }
+    return estimatedMemorySize;
+  }
+
+  @Override
+  public int size() {
+    int size = 0;
+    for (int i=0; i<numThreads; ++i) {
+      size += vectorMapJoinFastLongHashSets[i].size();
+    }
+    return size;
+  }
+
+  @Override
+  public MatchTracker createMatchTracker() {
+    int count = 0;
+    for (int i=0; i < numThreads; ++i) {
+      count += vectorMapJoinFastLongHashSets[i].logicalHashBucketCount;
+    }
+    return MatchTracker.create(count);
+  }
+
+  @Override
+  public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public int spillPartitionId() {
+    throw new RuntimeException("Not implemented");

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMapContainer.java
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.BytesWritable;
+
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * An multi-key value hash map optimized for vector map join.
+ *
+ * The key is stored as the provided bytes (uninterpreted).
+ */
+public class VectorMapJoinFastMultiKeyHashMapContainer
+    extends VectorMapJoinFastHashTableContainerBase implements VectorMapJoinBytesHashMap {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastMultiKeyHashMapContainer.class);
+
+  private final VectorMapJoinFastMultiKeyHashMap[] vectorMapJoinFastMultiKeyHashMaps;
+  private BytesWritable testKeyBytesWritable;
+  private final int numThreads;
+
+  public VectorMapJoinFastMultiKeyHashMapContainer(
+      boolean isFullOuter,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, int numThreads) {
+    this.vectorMapJoinFastMultiKeyHashMaps = new VectorMapJoinFastMultiKeyHashMap[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      vectorMapJoinFastMultiKeyHashMaps[i] =
+          new VectorMapJoinFastMultiKeyHashMap(isFullOuter, initialCapacity, loadFactor, writeBuffersSize,
+              estimatedKeyCount);
+    }
+    this.numThreads = numThreads;
+  }
+
+  public static class NonMatchedBytesHashMapParallelIterator extends VectorMapJoinFastNonMatchedIterator {
+
+    private VectorMapJoinFastBytesHashMap.NonMatchedBytesHashMapIterator[] hashMapIterators;
+    private int index;
+    private int numThreads;
+
+    NonMatchedBytesHashMapParallelIterator(MatchTracker matchTracker,
+        VectorMapJoinFastBytesHashMap[] hashMaps, int numThreads) {
+      super(matchTracker);
+      hashMapIterators = new VectorMapJoinFastBytesHashMap.NonMatchedBytesHashMapIterator[4];
+      for (int i = 0; i < numThreads; ++i) {
+        hashMapIterators[i] = new VectorMapJoinFastBytesHashMap.NonMatchedBytesHashMapIterator(matchTracker,
+            hashMaps[i]);
+      }
+      index = 0;
+      this.numThreads = numThreads;
+    }
+
+    @Override
+    public void init() {
+      for (int i = 0; i < numThreads; ++i) {
+        hashMapIterators[i].init();
+      }
+      index = 0;
+    }
+
+    @Override
+    public boolean findNextNonMatched() {
+      for (; index < numThreads; ++index) {
+        if (hashMapIterators[index].findNextNonMatched()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public boolean readNonMatchedBytesKey() throws HiveException {
+      return hashMapIterators[index].readNonMatchedBytesKey();
+    }
+
+    @Override
+    public byte[] getNonMatchedBytes() {
+      return hashMapIterators[index].getNonMatchedBytes();
+    }
+
+    @Override
+    public int getNonMatchedBytesOffset() {
+      return hashMapIterators[index].getNonMatchedBytesOffset();
+    }
+
+    @Override
+    public int getNonMatchedBytesLength() {
+      return hashMapIterators[index].getNonMatchedBytesLength();
+    }
+
+    @Override
+    public VectorMapJoinHashMapResult getNonMatchedHashMapResult() {
+      return hashMapIterators[index].getNonMatchedHashMapResult();
+    }
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    return HashCodeUtil.murmurHash(keyBytes, 0, keyLength);
+  }
+
+  @Override
+  public synchronized void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException {
+    vectorMapJoinFastMultiKeyHashMaps[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long estimatedMemorySize = 0;
+    for (int i=0; i<numThreads; ++i) {
+      estimatedMemorySize += vectorMapJoinFastMultiKeyHashMaps[i].getEstimatedMemorySize();
+    }
+    return estimatedMemorySize;
+  }
+
+  @Override
+  public int size() {
+    int size = 0;
+    for (int i=0; i<numThreads; ++i) {
+      size += vectorMapJoinFastMultiKeyHashMaps[i].size();
+    }
+    return size;
+  }
+
+  @Override
+  public MatchTracker createMatchTracker() {
+    int count = 0;
+    for (int i=0; i < numThreads; ++i) {
+      count += vectorMapJoinFastMultiKeyHashMaps[i].logicalHashBucketCount;
+    }
+    return MatchTracker.create(count);
+  }
+
+  @Override
+  public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+    return new NonMatchedBytesHashMapParallelIterator(matchTracker, vectorMapJoinFastMultiKeyHashMaps, numThreads);
+  }
+
+  @Override
+  public int spillPartitionId() {
+    throw new RuntimeException("Not implemented");

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java
##########
@@ -59,10 +60,27 @@ public boolean adaptPutRow(VectorMapJoinFastBytesHashTable hashTable,
         keyBinarySortableDeserializeRead.currentBytes,
         keyBinarySortableDeserializeRead.currentBytesStart,
         keyBinarySortableDeserializeRead.currentBytesLength,
-        currentValue);
+        currentValue, hashCode);
     return true;
   }
 
+  public long calculateLongHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+    try {
+      if (!keyBinarySortableDeserializeRead.readNextField()) {
+        return 0;
+      }
+    } catch (Exception e) {
+      throw new HiveException(
+          "\nDeserializeRead details: " + keyBinarySortableDeserializeRead.getDetailedReadPositionString()
+              + "\nException: " + e.toString());

Review comment:
       Ditch the new-lines, wrap the Exception

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSetContainer.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMultiSet;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.BytesWritable;
+
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * An multi-key hash multi-set optimized for vector map join.
+ *
+ * The key is stored as the provided bytes (uninterpreted).
+ */
+public class VectorMapJoinFastMultiKeyHashMultiSetContainer
+    extends VectorMapJoinFastHashTableContainerBase implements VectorMapJoinBytesHashMultiSet {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastMultiKeyHashMultiSetContainer.class);
+
+  private final VectorMapJoinFastMultiKeyHashMultiSet[] vectorMapJoinFastMultiKeyHashMultiSets;
+  private BytesWritable testKeyBytesWritable;
+  private final int numThreads;
+
+  public VectorMapJoinFastMultiKeyHashMultiSetContainer(
+      boolean isFullOuter,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, int numThreads) {
+    this.vectorMapJoinFastMultiKeyHashMultiSets = new VectorMapJoinFastMultiKeyHashMultiSet[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      this.vectorMapJoinFastMultiKeyHashMultiSets[i] =
+          new VectorMapJoinFastMultiKeyHashMultiSet(isFullOuter, initialCapacity, loadFactor, writeBuffersSize,
+              estimatedKeyCount);
+    }
+    this.numThreads = numThreads;
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    return HashCodeUtil.murmurHash(keyBytes, 0, keyLength);
+  }
+
+  @Override
+  public synchronized void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException {
+    vectorMapJoinFastMultiKeyHashMultiSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey,
+        currentValue);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long estimatedMemorySize = 0;
+    for (int i=0; i<numThreads; ++i) {
+      estimatedMemorySize += vectorMapJoinFastMultiKeyHashMultiSets[i].getEstimatedMemorySize();
+    }
+    return estimatedMemorySize;
+  }
+
+  @Override
+  public int size() {
+    int size = 0;
+    for (int i=0; i<numThreads; ++i) {
+      size += vectorMapJoinFastMultiKeyHashMultiSets[i].size();
+    }
+    return size;
+  }
+
+  @Override
+  public MatchTracker createMatchTracker() {
+    int count = 0;
+    for (int i=0; i < numThreads; ++i) {
+      count += vectorMapJoinFastMultiKeyHashMultiSets[i].logicalHashBucketCount;
+    }
+    return MatchTracker.create(count);
+  }
+
+  @Override
+  public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public int spillPartitionId() {
+    throw new RuntimeException("Not implemented");

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSetContainer.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMultiSet;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.BytesWritable;
+
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * An multi-key hash multi-set optimized for vector map join.
+ *
+ * The key is stored as the provided bytes (uninterpreted).
+ */
+public class VectorMapJoinFastMultiKeyHashMultiSetContainer
+    extends VectorMapJoinFastHashTableContainerBase implements VectorMapJoinBytesHashMultiSet {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastMultiKeyHashMultiSetContainer.class);
+
+  private final VectorMapJoinFastMultiKeyHashMultiSet[] vectorMapJoinFastMultiKeyHashMultiSets;
+  private BytesWritable testKeyBytesWritable;
+  private final int numThreads;
+
+  public VectorMapJoinFastMultiKeyHashMultiSetContainer(
+      boolean isFullOuter,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, int numThreads) {
+    this.vectorMapJoinFastMultiKeyHashMultiSets = new VectorMapJoinFastMultiKeyHashMultiSet[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      this.vectorMapJoinFastMultiKeyHashMultiSets[i] =
+          new VectorMapJoinFastMultiKeyHashMultiSet(isFullOuter, initialCapacity, loadFactor, writeBuffersSize,
+              estimatedKeyCount);
+    }
+    this.numThreads = numThreads;
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    return HashCodeUtil.murmurHash(keyBytes, 0, keyLength);
+  }
+
+  @Override
+  public synchronized void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException {
+    vectorMapJoinFastMultiKeyHashMultiSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey,
+        currentValue);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long estimatedMemorySize = 0;
+    for (int i=0; i<numThreads; ++i) {
+      estimatedMemorySize += vectorMapJoinFastMultiKeyHashMultiSets[i].getEstimatedMemorySize();
+    }
+    return estimatedMemorySize;
+  }
+
+  @Override
+  public int size() {
+    int size = 0;
+    for (int i=0; i<numThreads; ++i) {
+      size += vectorMapJoinFastMultiKeyHashMultiSets[i].size();
+    }
+    return size;
+  }
+
+  @Override
+  public MatchTracker createMatchTracker() {
+    int count = 0;
+    for (int i=0; i < numThreads; ++i) {
+      count += vectorMapJoinFastMultiKeyHashMultiSets[i].logicalHashBucketCount;
+    }
+    return MatchTracker.create(count);
+  }
+
+  @Override
+  public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+    throw new RuntimeException("Not implemented");

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSetContainer.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashSet;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * An single STRING key hash set optimized for vector map join.
+ *
+ * The key will be deserialized and just the bytes will be stored.
+ */
+public class VectorMapJoinFastStringHashSetContainer extends VectorMapJoinFastHashTableContainerBase implements
+    VectorMapJoinBytesHashSet {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastStringHashSetContainer.class);
+
+  private final VectorMapJoinFastStringHashSet[] vectorMapJoinFastStringHashSets;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final int numThreads;
+
+  public VectorMapJoinFastStringHashSetContainer(
+      boolean isFullOuter,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+      int numThreads) {
+    this.vectorMapJoinFastStringHashSets = new VectorMapJoinFastStringHashSet[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      vectorMapJoinFastStringHashSets[i] = new VectorMapJoinFastStringHashSet(
+          isFullOuter,
+          initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+    }
+    PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo };
+    this.keyBinarySortableDeserializeRead =
+        BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+    this.numThreads = numThreads;
+  }
+
+  @Override
+  public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException {
+    vectorMapJoinFastStringHashSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+    try {
+      if (!keyBinarySortableDeserializeRead.readNextField()) {
+        return 0;
+      }
+    } catch (Exception e) {
+      throw new HiveException("\nDeserializeRead details: " +
+          keyBinarySortableDeserializeRead.getDetailedReadPositionString() + "\nException: " + e);
+    }
+    return HashCodeUtil.murmurHash(
+        keyBinarySortableDeserializeRead.currentBytes,
+        keyBinarySortableDeserializeRead.currentBytesStart,
+        keyBinarySortableDeserializeRead.currentBytesLength);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long estimatedMemorySize = 0;
+    for (int i=0; i<numThreads; ++i) {
+      estimatedMemorySize += vectorMapJoinFastStringHashSets[i].getEstimatedMemorySize();
+    }
+    return estimatedMemorySize;
+  }
+
+  @Override
+  public int size() {
+    int size = 0;
+    for (int i=0; i<numThreads; ++i) {
+      size += vectorMapJoinFastStringHashSets[i].size();
+    }
+    return size;
+  }
+
+  @Override
+  public MatchTracker createMatchTracker() {
+    int count = 0;
+    for (int i=0; i < numThreads; ++i) {
+      count += vectorMapJoinFastStringHashSets[i].logicalHashBucketCount;
+    }
+    return MatchTracker.create(count);
+  }
+
+  @Override
+  public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public int spillPartitionId() {
+    throw new RuntimeException("Not implemented");

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSetContainer.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashSet;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.BytesWritable;
+
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * An multi-key hash set optimized for vector map join.
+ *
+ * The key is stored as the provided bytes (uninterpreted).
+ */
+public class VectorMapJoinFastMultiKeyHashSetContainer
+    extends VectorMapJoinFastHashTableContainerBase implements VectorMapJoinBytesHashSet {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastMultiKeyHashSetContainer.class);
+
+  private final VectorMapJoinFastMultiKeyHashSet[] vectorMapJoinFastMultiKeyHashSets;
+  private BytesWritable testKeyBytesWritable;
+  private final int numThreads;
+
+  public VectorMapJoinFastMultiKeyHashSetContainer(
+      boolean isFullOuter,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, int numThreads) {
+    this.vectorMapJoinFastMultiKeyHashSets = new VectorMapJoinFastMultiKeyHashSet[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      this.vectorMapJoinFastMultiKeyHashSets[i] =
+          new VectorMapJoinFastMultiKeyHashSet(isFullOuter, initialCapacity, loadFactor, writeBuffersSize,
+              estimatedKeyCount);
+    }
+    this.numThreads = numThreads;
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    return HashCodeUtil.murmurHash(keyBytes, 0, keyLength);
+  }
+
+  @Override
+  public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException {
+    vectorMapJoinFastMultiKeyHashSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long estimatedMemorySize = 0;
+    for (int i=0; i<numThreads; ++i) {
+      estimatedMemorySize += vectorMapJoinFastMultiKeyHashSets[i].getEstimatedMemorySize();
+    }
+    return estimatedMemorySize;
+  }
+
+  @Override
+  public int size() {
+    int size = 0;
+    for (int i=0; i<numThreads; ++i) {
+      size += vectorMapJoinFastMultiKeyHashSets[i].size();
+    }
+    return size;
+  }
+
+  @Override
+  public MatchTracker createMatchTracker() {
+    int count = 0;
+    for (int i=0; i < numThreads; ++i) {
+      count += vectorMapJoinFastMultiKeyHashSets[i].logicalHashBucketCount;
+    }
+    return MatchTracker.create(count);
+  }
+
+  @Override
+  public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public int spillPartitionId() {
+    throw new RuntimeException("Not implemented");

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSetContainer.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMultiSet;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * An single STRING key hash multi-set optimized for vector map join.
+ *
+ * The key will be deserialized and just the bytes will be stored.
+ */
+public class VectorMapJoinFastStringHashMultiSetContainer extends VectorMapJoinFastHashTableContainerBase implements
+    VectorMapJoinBytesHashMultiSet {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastStringHashMultiSetContainer.class);
+
+  private final VectorMapJoinFastStringHashMultiSet[] vectorMapJoinFastStringHashMultiSets;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final int numThreads;
+
+  public VectorMapJoinFastStringHashMultiSetContainer(
+      boolean isFullOuter,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+      int numThreads) {
+    vectorMapJoinFastStringHashMultiSets = new VectorMapJoinFastStringHashMultiSet[4];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      vectorMapJoinFastStringHashMultiSets[i] = new VectorMapJoinFastStringHashMultiSet(
+          isFullOuter,
+          initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+    }
+    PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo };
+    keyBinarySortableDeserializeRead =
+        BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+    this.numThreads = numThreads;
+  }
+
+  @Override
+  public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException {
+    vectorMapJoinFastStringHashMultiSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+    try {
+      if (!keyBinarySortableDeserializeRead.readNextField()) {
+        return 0;
+      }
+    } catch (Exception e) {
+      throw new HiveException("\nDeserializeRead details: " +
+          keyBinarySortableDeserializeRead.getDetailedReadPositionString() + "\nException: " + e);
+    }
+    return HashCodeUtil.murmurHash(
+        keyBinarySortableDeserializeRead.currentBytes,
+        keyBinarySortableDeserializeRead.currentBytesStart,
+        keyBinarySortableDeserializeRead.currentBytesLength);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long estimatedMemorySize = 0;
+    for (int i=0; i<numThreads; ++i) {
+      estimatedMemorySize += vectorMapJoinFastStringHashMultiSets[i].getEstimatedMemorySize();
+    }
+    return estimatedMemorySize;
+  }
+
+  @Override
+  public int size() {
+    int size = 0;
+    for (int i=0; i<numThreads; ++i) {
+      size += vectorMapJoinFastStringHashMultiSets[i].size();
+    }
+    return size;
+  }
+
+  @Override
+  public MatchTracker createMatchTracker() {
+    int count = 0;
+    for (int i=0; i < numThreads; ++i) {
+      count += vectorMapJoinFastStringHashMultiSets[i].logicalHashBucketCount;
+    }
+    return MatchTracker.create(count);
+  }
+
+  @Override
+  public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public int spillPartitionId() {
+    throw new RuntimeException("Not implemented");

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMapContainer.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * An single STRING key hash map optimized for vector map join.
+ *
+ * The key will be deserialized and just the bytes will be stored.
+ */
+public class VectorMapJoinFastStringHashMapContainer extends VectorMapJoinFastHashTableContainerBase implements
+    VectorMapJoinBytesHashMap {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastStringHashMapContainer.class);
+
+  private final VectorMapJoinFastStringHashMap[] vectorMapJoinFastStringHashMaps;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final int numThreads;
+
+  public VectorMapJoinFastStringHashMapContainer(
+      boolean isFullOuter,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+      int numThreads) {
+    this.vectorMapJoinFastStringHashMaps = new VectorMapJoinFastStringHashMap[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      vectorMapJoinFastStringHashMaps[i] = new VectorMapJoinFastStringHashMap(isFullOuter,
+          initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+    }
+    PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo };
+    this.keyBinarySortableDeserializeRead =
+        BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+    this.numThreads = numThreads;
+  }
+
+  private static class NonMatchedBytesHashMapIterator extends VectorMapJoinFastNonMatchedIterator {
+
+    private VectorMapJoinFastBytesHashMap.NonMatchedBytesHashMapIterator[] hashMapIterators;
+    private int index;
+    private int numThreads;
+
+    NonMatchedBytesHashMapIterator(MatchTracker matchTracker,
+        VectorMapJoinFastStringHashMap[] hashMaps, int numThreads) {
+      super(matchTracker);
+      hashMapIterators = new VectorMapJoinFastBytesHashMap.NonMatchedBytesHashMapIterator[4];
+      for(int i=0; i<numThreads; ++i) {
+        hashMapIterators[i] = new VectorMapJoinFastBytesHashMap.NonMatchedBytesHashMapIterator(matchTracker,
+            hashMaps[i]);
+      }
+      index = 0;
+      this.numThreads = numThreads;
+    }
+
+    @Override
+    public void init() {
+      for(int i=0; i<numThreads; ++i) {
+        hashMapIterators[i].init();
+      }
+      index = 0;
+    }
+
+    @Override
+    public boolean findNextNonMatched() {
+      for(; index < numThreads; ++index) {
+        if (hashMapIterators[index].findNextNonMatched()) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public boolean readNonMatchedBytesKey() throws HiveException {
+      return hashMapIterators[index].readNonMatchedBytesKey();
+    }
+
+    @Override
+    public byte[] getNonMatchedBytes() {
+      return hashMapIterators[index].getNonMatchedBytes();
+    }
+
+    @Override
+    public int getNonMatchedBytesOffset() {
+      return hashMapIterators[index].getNonMatchedBytesOffset();
+    }
+
+    @Override
+    public int getNonMatchedBytesLength() {
+      return hashMapIterators[index].getNonMatchedBytesLength();
+    }
+
+    @Override
+    public VectorMapJoinHashMapResult getNonMatchedHashMapResult() {
+      return hashMapIterators[index].getNonMatchedHashMapResult();
+    }
+  }
+
+  @Override
+  public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException {
+    vectorMapJoinFastStringHashMaps[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+    try {
+      if (!keyBinarySortableDeserializeRead.readNextField()) {
+        return 0;
+      }
+    } catch (Exception e) {
+      throw new HiveException("\nDeserializeRead details: " +
+          keyBinarySortableDeserializeRead.getDetailedReadPositionString() + "\nException: " + e);
+    }
+    return HashCodeUtil.murmurHash(
+        keyBinarySortableDeserializeRead.currentBytes,
+        keyBinarySortableDeserializeRead.currentBytesStart,
+        keyBinarySortableDeserializeRead.currentBytesLength);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long estimatedMemorySize = 0;
+    for (int i=0; i<numThreads; ++i) {
+      estimatedMemorySize += vectorMapJoinFastStringHashMaps[i].getEstimatedMemorySize();
+    }
+    return estimatedMemorySize;
+  }
+
+  @Override
+  public int size() {
+    int size = 0;
+    for (int i=0; i<numThreads; ++i) {
+      size += vectorMapJoinFastStringHashMaps[i].size();
+    }
+    return size;
+  }
+
+  @Override
+  public MatchTracker createMatchTracker() {
+    int count = 0;
+    for (int i=0; i<numThreads; ++i) {
+      count += vectorMapJoinFastStringHashMaps[i].logicalHashBucketCount;
+    }
+    return MatchTracker.create(count);
+  }
+
+  @Override
+  public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+    return new NonMatchedBytesHashMapIterator(matchTracker, vectorMapJoinFastStringHashMaps, numThreads);
+  }
+
+  @Override
+  public int spillPartitionId() {
+    throw new RuntimeException("Not implemented");

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSetContainer.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashSet;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * An single STRING key hash set optimized for vector map join.
+ *
+ * The key will be deserialized and just the bytes will be stored.
+ */
+public class VectorMapJoinFastStringHashSetContainer extends VectorMapJoinFastHashTableContainerBase implements
+    VectorMapJoinBytesHashSet {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastStringHashSetContainer.class);
+
+  private final VectorMapJoinFastStringHashSet[] vectorMapJoinFastStringHashSets;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final int numThreads;
+
+  public VectorMapJoinFastStringHashSetContainer(
+      boolean isFullOuter,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+      int numThreads) {
+    this.vectorMapJoinFastStringHashSets = new VectorMapJoinFastStringHashSet[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      vectorMapJoinFastStringHashSets[i] = new VectorMapJoinFastStringHashSet(
+          isFullOuter,
+          initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+    }
+    PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo };
+    this.keyBinarySortableDeserializeRead =
+        BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+    this.numThreads = numThreads;
+  }
+
+  @Override
+  public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException {
+    vectorMapJoinFastStringHashSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+    try {
+      if (!keyBinarySortableDeserializeRead.readNextField()) {
+        return 0;
+      }
+    } catch (Exception e) {
+      throw new HiveException("\nDeserializeRead details: " +
+          keyBinarySortableDeserializeRead.getDetailedReadPositionString() + "\nException: " + e);
+    }
+    return HashCodeUtil.murmurHash(
+        keyBinarySortableDeserializeRead.currentBytes,
+        keyBinarySortableDeserializeRead.currentBytesStart,
+        keyBinarySortableDeserializeRead.currentBytesLength);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long estimatedMemorySize = 0;
+    for (int i=0; i<numThreads; ++i) {
+      estimatedMemorySize += vectorMapJoinFastStringHashSets[i].getEstimatedMemorySize();
+    }
+    return estimatedMemorySize;
+  }
+
+  @Override
+  public int size() {
+    int size = 0;
+    for (int i=0; i<numThreads; ++i) {
+      size += vectorMapJoinFastStringHashSets[i].size();
+    }
+    return size;
+  }
+
+  @Override
+  public MatchTracker createMatchTracker() {
+    int count = 0;
+    for (int i=0; i < numThreads; ++i) {
+      count += vectorMapJoinFastStringHashSets[i].logicalHashBucketCount;
+    }
+    return MatchTracker.create(count);
+  }
+
+  @Override
+  public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+    throw new RuntimeException("Not implemented");

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSetContainer.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashSet;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * An single STRING key hash set optimized for vector map join.
+ *
+ * The key will be deserialized and just the bytes will be stored.
+ */
+public class VectorMapJoinFastStringHashSetContainer extends VectorMapJoinFastHashTableContainerBase implements
+    VectorMapJoinBytesHashSet {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastStringHashSetContainer.class);
+
+  private final VectorMapJoinFastStringHashSet[] vectorMapJoinFastStringHashSets;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final int numThreads;
+
+  public VectorMapJoinFastStringHashSetContainer(
+      boolean isFullOuter,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+      int numThreads) {
+    this.vectorMapJoinFastStringHashSets = new VectorMapJoinFastStringHashSet[numThreads];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      vectorMapJoinFastStringHashSets[i] = new VectorMapJoinFastStringHashSet(
+          isFullOuter,
+          initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+    }
+    PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo };
+    this.keyBinarySortableDeserializeRead =
+        BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+    this.numThreads = numThreads;
+  }
+
+  @Override
+  public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException {
+    vectorMapJoinFastStringHashSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+    try {
+      if (!keyBinarySortableDeserializeRead.readNextField()) {
+        return 0;
+      }
+    } catch (Exception e) {
+      throw new HiveException("\nDeserializeRead details: " +
+          keyBinarySortableDeserializeRead.getDetailedReadPositionString() + "\nException: " + e);

Review comment:
       Ditch the new-lines, wrap the exception

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableContainerBase.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+
+// Base class of MultiHT implementations
+public abstract class VectorMapJoinFastHashTableContainerBase implements VectorMapJoinHashTable {
+
+  public abstract void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException;
+
+  public abstract long getHashCode(BytesWritable currentKey) throws HiveException, IOException;
+
+  public abstract long getEstimatedMemorySize();
+
+  public abstract int size();
+
+  // Method to be removed..
+  public boolean containsLongKey(long currentKey) {
+    throw new RuntimeException("Not supported!");
+  }

Review comment:
       UnsupportedOperationException

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSetContainer.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.ql.exec.persistence.MatchTracker;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMultiSet;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSetResult;
+import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinNonMatchedIterator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeserializeRead;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * An single STRING key hash multi-set optimized for vector map join.
+ *
+ * The key will be deserialized and just the bytes will be stored.
+ */
+public class VectorMapJoinFastStringHashMultiSetContainer extends VectorMapJoinFastHashTableContainerBase implements
+    VectorMapJoinBytesHashMultiSet {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastStringHashMultiSetContainer.class);
+
+  private final VectorMapJoinFastStringHashMultiSet[] vectorMapJoinFastStringHashMultiSets;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final int numThreads;
+
+  public VectorMapJoinFastStringHashMultiSetContainer(
+      boolean isFullOuter,
+      int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount, TableDesc tableDesc,
+      int numThreads) {
+    vectorMapJoinFastStringHashMultiSets = new VectorMapJoinFastStringHashMultiSet[4];
+    for (int i=0; i<numThreads; ++i) {
+      LOG.info("HT Container {} ", i);
+      vectorMapJoinFastStringHashMultiSets[i] = new VectorMapJoinFastStringHashMultiSet(
+          isFullOuter,
+          initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount, tableDesc);
+    }
+    PrimitiveTypeInfo[] primitiveTypeInfos = { TypeInfoFactory.stringTypeInfo };
+    keyBinarySortableDeserializeRead =
+        BinarySortableDeserializeRead.with(primitiveTypeInfos, false, tableDesc.getProperties());
+    this.numThreads = numThreads;
+  }
+
+  @Override
+  public void putRow(long hashCode, BytesWritable currentKey, BytesWritable currentValue)
+      throws HiveException, IOException {
+    vectorMapJoinFastStringHashMultiSets[(int) ((numThreads - 1) & hashCode)].putRow(hashCode, currentKey, currentValue);
+  }
+
+  @Override
+  public long getHashCode(BytesWritable currentKey) throws HiveException, IOException {
+    byte[] keyBytes = currentKey.getBytes();
+    int keyLength = currentKey.getLength();
+    keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
+    try {
+      if (!keyBinarySortableDeserializeRead.readNextField()) {
+        return 0;
+      }
+    } catch (Exception e) {
+      throw new HiveException("\nDeserializeRead details: " +
+          keyBinarySortableDeserializeRead.getDetailedReadPositionString() + "\nException: " + e);
+    }
+    return HashCodeUtil.murmurHash(
+        keyBinarySortableDeserializeRead.currentBytes,
+        keyBinarySortableDeserializeRead.currentBytesStart,
+        keyBinarySortableDeserializeRead.currentBytesLength);
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long estimatedMemorySize = 0;
+    for (int i=0; i<numThreads; ++i) {
+      estimatedMemorySize += vectorMapJoinFastStringHashMultiSets[i].getEstimatedMemorySize();
+    }
+    return estimatedMemorySize;
+  }
+
+  @Override
+  public int size() {
+    int size = 0;
+    for (int i=0; i<numThreads; ++i) {
+      size += vectorMapJoinFastStringHashMultiSets[i].size();
+    }
+    return size;
+  }
+
+  @Override
+  public MatchTracker createMatchTracker() {
+    int count = 0;
+    for (int i=0; i < numThreads; ++i) {
+      count += vectorMapJoinFastStringHashMultiSets[i].logicalHashBucketCount;
+    }
+    return MatchTracker.create(count);
+  }
+
+  @Override
+  public VectorMapJoinNonMatchedIterator createNonMatchedIterator(MatchTracker matchTracker) {
+    throw new RuntimeException("Not implemented");

Review comment:
       UnsupportedOperationException




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 600508)
    Remaining Estimate: 0h
            Time Spent: 10m

> Support parallel load for Optimized HT implementations
> ------------------------------------------------------
>
>                 Key: HIVE-25149
>                 URL: https://issues.apache.org/jira/browse/HIVE-25149
>             Project: Hive
>          Issue Type: Sub-task
>            Reporter: Panagiotis Garefalakis
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)