You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/08/03 19:34:49 UTC

[1/3] hive git commit: HIVE-14408 : thread safety issue in fast hashtable (Sergey Shelukhin, reviewed by Gopal V)

Repository: hive
Updated Branches:
  refs/heads/branch-2.0 98c559474 -> 74c46e846
  refs/heads/branch-2.1 f7fdd4e88 -> 18c679722
  refs/heads/master 4b6ac735f -> f008a38b4


HIVE-14408 : thread safety issue in fast hashtable (Sergey Shelukhin, reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f008a38b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f008a38b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f008a38b

Branch: refs/heads/master
Commit: f008a38b433fc287a233c7131b1d7e45f9465557
Parents: 4b6ac73
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Aug 3 12:33:35 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Aug 3 12:33:35 2016 -0700

----------------------------------------------------------------------
 .../persistence/BytesBytesMultiHashMap.java     | 34 +++++++-------
 .../ql/exec/vector/VectorMapJoinOperator.java   |  2 +-
 .../ql/exec/vector/VectorizedBatchUtil.java     | 13 +++---
 .../mapjoin/VectorMapJoinCommonOperator.java    |  1 +
 .../VectorMapJoinGenerateResultOperator.java    |  1 +
 .../fast/VectorMapJoinFastBytesHashMap.java     |  2 +-
 .../VectorMapJoinFastBytesHashMultiSet.java     |  2 +-
 .../fast/VectorMapJoinFastBytesHashSet.java     |  2 +-
 .../fast/VectorMapJoinFastBytesHashTable.java   |  8 ++--
 .../fast/VectorMapJoinFastHashTable.java        |  2 +-
 .../mapjoin/fast/VectorMapJoinFastKeyStore.java | 16 ++++---
 .../fast/VectorMapJoinFastLongHashTable.java    |  8 ++--
 .../fast/VectorMapJoinFastTableContainer.java   | 10 ++--
 .../hashtable/VectorMapJoinHashTableResult.java |  8 ++++
 .../apache/hadoop/hive/serde2/WriteBuffers.java | 49 +++++++++++---------
 15 files changed, 90 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
index 51acae0..dd88461 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
@@ -449,7 +449,7 @@ public final class BytesBytesMultiHashMap {
 
     kv.writeKey(writeBuffers);
     int keyLength = (int)(writeBuffers.getWritePoint() - keyOffset);
-    int hashCode = (keyHashCode == -1) ? writeBuffers.hashCode(keyOffset, keyLength) : keyHashCode;
+    int hashCode = (keyHashCode == -1) ? writeBuffers.unsafeHashCode(keyOffset, keyLength) : keyHashCode;
 
     int slot = findKeySlotToWrite(keyOffset, keyLength, hashCode);
     // LOG.info("Write hash code is " + Integer.toBinaryString(hashCode) + " - " + slot);
@@ -683,8 +683,8 @@ public final class BytesBytesMultiHashMap {
     if (!compareHashBits(ref, hashCode)) {
       return false; // Hash bits in ref don't match.
     }
-    writeBuffers.setReadPoint(getFirstRecordLengthsOffset(ref, null));
-    int valueLength = (int)writeBuffers.readVLong(), keyLength = (int)writeBuffers.readVLong();
+    writeBuffers.setUnsafeReadPoint(getFirstRecordLengthsOffset(ref, null));
+    int valueLength = (int)writeBuffers.unsafeReadVLong(), keyLength = (int)writeBuffers.unsafeReadVLong();
     if (keyLength != cmpLength) {
       return false;
     }
@@ -730,7 +730,7 @@ public final class BytesBytesMultiHashMap {
   private long getFirstRecordLengthsOffset(long ref, WriteBuffers.Position readPos) {
     long tailOffset = Ref.getOffset(ref);
     if (Ref.hasList(ref)) {
-      long relativeOffset = (readPos == null) ? writeBuffers.readNByteLong(tailOffset, 5)
+      long relativeOffset = (readPos == null) ? writeBuffers.unsafeReadNByteLong(tailOffset, 5)
           : writeBuffers.readNByteLong(tailOffset, 5, readPos);
       tailOffset += relativeOffset;
     }
@@ -763,10 +763,10 @@ public final class BytesBytesMultiHashMap {
       // TODO: we could actually store a bit flag in ref indicating whether this is a hash
       //       match or a probe, and in the former case use hash bits (for a first few resizes).
       // int hashCodeOrPart = oldSlot | Ref.getNthHashBit(oldRef, startingHashBitCount, newHashBitCount);
-      writeBuffers.setReadPoint(getFirstRecordLengthsOffset(oldRef, null));
+      writeBuffers.setUnsafeReadPoint(getFirstRecordLengthsOffset(oldRef, null));
       // Read the value and key length for the first record.
-      int hashCode = (int)writeBuffers.readNByteLong(Ref.getOffset(oldRef)
-          - writeBuffers.readVLong() - writeBuffers.readVLong() - 4, 4);
+      int hashCode = (int)writeBuffers.unsafeReadNByteLong(Ref.getOffset(oldRef)
+          - writeBuffers.unsafeReadVLong() - writeBuffers.unsafeReadVLong() - 4, 4);
       int probeSteps = relocateKeyRef(newRefs, oldRef, hashCode);
       maxSteps = Math.max(probeSteps, maxSteps);
     }
@@ -785,16 +785,16 @@ public final class BytesBytesMultiHashMap {
   private long createOrGetListRecord(long ref) {
     if (Ref.hasList(ref)) {
       // LOG.info("Found list record at " + writeBuffers.getReadPoint());
-      return writeBuffers.getReadPoint(); // Assumes we are here after key compare.
+      return writeBuffers.getUnsafeReadPoint(); // Assumes we are here after key compare.
     }
     long firstTailOffset = Ref.getOffset(ref);
     // LOG.info("First tail offset to create list record is " + firstTailOffset);
 
     // Determine the length of storage for value and key lengths of the first record.
-    writeBuffers.setReadPoint(firstTailOffset);
-    writeBuffers.skipVLong();
-    writeBuffers.skipVLong();
-    int lengthsLength = (int)(writeBuffers.getReadPoint() - firstTailOffset);
+    writeBuffers.setUnsafeReadPoint(firstTailOffset);
+    writeBuffers.unsafeSkipVLong();
+    writeBuffers.unsafeSkipVLong();
+    int lengthsLength = (int)(writeBuffers.getUnsafeReadPoint() - firstTailOffset);
 
     // Create the list record, copy first record value/key lengths there.
     writeBuffers.writeBytes(firstTailOffset, lengthsLength);
@@ -816,7 +816,7 @@ public final class BytesBytesMultiHashMap {
    */
   private void addRecordToList(long lrPtrOffset, long tailOffset) {
     // Now, insert this record into the list.
-    long prevHeadOffset = writeBuffers.readNByteLong(lrPtrOffset, 5);
+    long prevHeadOffset = writeBuffers.unsafeReadNByteLong(lrPtrOffset, 5);
     // LOG.info("Reading offset " + prevHeadOffset + " at " + lrPtrOffset);
     assert prevHeadOffset < tailOffset; // We replace an earlier element, must have lower offset.
     writeBuffers.writeFiveByteULong(lrPtrOffset, tailOffset);
@@ -885,10 +885,10 @@ public final class BytesBytesMultiHashMap {
       ++examined;
       long recOffset = getFirstRecordLengthsOffset(ref, null);
       long tailOffset = Ref.getOffset(ref);
-      writeBuffers.setReadPoint(recOffset);
-      int valueLength = (int)writeBuffers.readVLong(),
-          keyLength = (int)writeBuffers.readVLong();
-      long ptrOffset = writeBuffers.getReadPoint();
+      writeBuffers.setUnsafeReadPoint(recOffset);
+      int valueLength = (int)writeBuffers.unsafeReadVLong(),
+          keyLength = (int)writeBuffers.unsafeReadVLong();
+      long ptrOffset = writeBuffers.getUnsafeReadPoint();
       if (Ref.hasList(ref)) {
         byteIntervals.put(recOffset, (int)(ptrOffset + 5 - recOffset));
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
index 3323df3..0cb6c8a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
@@ -154,7 +154,7 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator {
 
       // This is a vectorized aware evaluator
       ExprNodeEvaluator eval = new ExprNodeEvaluator<ExprNodeDesc>(desc) {
-        int columnIndex;;
+        int columnIndex;
         int writerIndex;
 
         public ExprNodeEvaluator initVectorExpr(int columnIndex, int writerIndex) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
index 9471e66..990e896 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
@@ -702,14 +702,15 @@ public class VectorizedBatchUtil {
 
   public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) {
     StringBuilder sb = new StringBuilder();
+    LOG.info(debugFormatOneRow(batch, index, prefix, sb).toString());
+  }
+
+  public static StringBuilder debugFormatOneRow(VectorizedRowBatch batch,
+      int index, String prefix, StringBuilder sb) {
     sb.append(prefix + " row " + index + " ");
     for (int p = 0; p < batch.projectionSize; p++) {
       int column = batch.projectedColumns[p];
-      if (p == column) {
-        sb.append("(col " + p + ") ");
-      } else {
-        sb.append("(proj col " + p + " col " + column + ") ");
-      }
+      sb.append("(" + p + "," + column + ") ");
       ColumnVector colVector = batch.cols[column];
       if (colVector == null) {
         sb.append("(null ColumnVector)");
@@ -752,7 +753,7 @@ public class VectorizedBatchUtil {
       }
       sb.append(" ");
     }
-    LOG.info(sb.toString());
+    return sb;
   }
 
   public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index 8ad7ca4..24668f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -636,6 +636,7 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
     default:
       throw new RuntimeException("Unknown vector map join hash table implementation type " + hashTableImplementationType.name());
     }
+    LOG.info("Using " + vectorMapJoinHashTable.getClass().getSimpleName() + " from " + this.getClass().getSimpleName());
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
index 6a3d64b..22eb07e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
@@ -507,6 +507,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
     vectorMapJoinHashTable = VectorMapJoinOptimizedCreateHashTable.createHashTable(conf,
         smallTable);
     needHashTableSetup = true;
+    LOG.info("Created " + vectorMapJoinHashTable.getClass().getSimpleName() + " from " + this.getClass().getSimpleName());
 
     if (isLogDebugEnabled) {
       LOG.debug(CLASS_NAME + " reloadHashTable!");

http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
index a4bc188..d878f65 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
@@ -81,7 +81,7 @@ public abstract class VectorMapJoinFastBytesHashMap
     optimizedHashMapResult.forget();
 
     long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
-    long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+    long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashMapResult.getReadPos());
     JoinUtil.JoinResult joinResult;
     if (valueRefWord == -1) {
       joinResult = JoinUtil.JoinResult.NOMATCH;

http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
index aaf3497..b328efd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
@@ -75,7 +75,7 @@ public abstract class VectorMapJoinFastBytesHashMultiSet
     optimizedHashMultiSetResult.forget();
 
     long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
-    long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+    long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashMultiSetResult.getReadPos());
     JoinUtil.JoinResult joinResult;
     if (count == -1) {
       joinResult = JoinUtil.JoinResult.NOMATCH;

http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
index 841183e..c9b23bf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
@@ -65,7 +65,7 @@ public abstract class VectorMapJoinFastBytesHashSet
     optimizedHashSetResult.forget();
 
     long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
-    long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+    long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashSetResult.getReadPos());
     JoinUtil.JoinResult joinResult;
     if (existance == -1) {
       joinResult = JoinUtil.JoinResult.NOMATCH;

http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
index d6e107b..7987723 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashTable;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hive.common.util.HashCodeUtil;
 
@@ -75,7 +76,7 @@ public abstract class VectorMapJoinFastBytesHashTable
         break;
       }
       if (hashCode == slotTriples[tripleIndex + 1] &&
-          keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
+          keyStore.unsafeEqualKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
         // LOG.debug("VectorMapJoinFastBytesHashMap findWriteSlot slot " + slot + " tripleIndex " + tripleIndex + " existing");
         isNewKey = false;
         break;
@@ -164,7 +165,8 @@ public abstract class VectorMapJoinFastBytesHashTable
     // LOG.debug("VectorMapJoinFastLongHashTable expandAndRehash new logicalHashBucketCount " + logicalHashBucketCount + " resizeThreshold " + resizeThreshold + " metricExpands " + metricExpands);
   }
 
-  protected long findReadSlot(byte[] keyBytes, int keyStart, int keyLength, long hashCode) {
+  protected final long findReadSlot(
+      byte[] keyBytes, int keyStart, int keyLength, long hashCode, WriteBuffers.Position readPos) {
 
     int intHashCode = (int) hashCode;
     int slot = (intHashCode & logicalHashBucketMask);
@@ -176,7 +178,7 @@ public abstract class VectorMapJoinFastBytesHashTable
       if (slotTriples[tripleIndex] != 0 && hashCode == slotTriples[tripleIndex + 1]) {
         // Finally, verify the key bytes match.
 
-        if (keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
+        if (keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength, readPos)) {
           return slotTriples[tripleIndex + 2];
         }
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
index 099f38e..7df9eed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
@@ -29,7 +29,7 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab
   protected int logicalHashBucketMask;
 
   protected float loadFactor;
-  protected int writeBuffersSize;
+  protected final int writeBuffersSize;
 
   protected int metricPutConflict;
   protected int largestNumberOfSteps;

http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
index efdcd43..be51693 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
@@ -30,7 +30,7 @@ public class VectorMapJoinFastKeyStore {
 
   private WriteBuffers writeBuffers;
 
-  private WriteBuffers.Position readPos;
+  private WriteBuffers.Position unsafeReadPos; // Thread-unsafe position used at write time.
 
   /**
    * A store for arbitrary length keys in memory.
@@ -115,7 +115,13 @@ public class VectorMapJoinFastKeyStore {
     return keyRefWord;
   }
 
-  public boolean equalKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength) {
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public boolean unsafeEqualKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength) {
+    return equalKey(keyRefWord, keyBytes, keyStart, keyLength, unsafeReadPos);
+  }
+
+  public boolean equalKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength,
+      WriteBuffers.Position readPos) {
 
     int storedKeyLengthLength =
         (int) ((keyRefWord & SmallKeyLength.bitMask) >> SmallKeyLength.bitShift);
@@ -151,14 +157,12 @@ public class VectorMapJoinFastKeyStore {
 
   public VectorMapJoinFastKeyStore(int writeBuffersSize) {
     writeBuffers = new WriteBuffers(writeBuffersSize, AbsoluteKeyOffset.maxSize);
-
-    readPos = new WriteBuffers.Position();
+    unsafeReadPos = new WriteBuffers.Position();
   }
 
   public VectorMapJoinFastKeyStore(WriteBuffers writeBuffers) {
     // TODO: Check if maximum size compatible with AbsoluteKeyOffset.maxSize.
     this.writeBuffers = writeBuffers;
-
-    readPos = new WriteBuffers.Position();
+    unsafeReadPos = new WriteBuffers.Position();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
index 78b55a1..5373aad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
@@ -48,13 +48,13 @@ public abstract class VectorMapJoinFastLongHashTable
 
   private transient final boolean isLogDebugEnabled = LOG.isDebugEnabled();
 
-  private HashTableKeyType hashTableKeyType;
+  private final HashTableKeyType hashTableKeyType;
 
-  private boolean isOuterJoin;
+  private final boolean isOuterJoin;
 
-  private BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
 
-  private boolean useMinMax;
+  private final boolean useMinMax;
   private long min;
   private long max;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
index 3b73f7d..9f3b107 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
@@ -58,7 +58,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
   private final long keyCount;
 
 
-  private final VectorMapJoinFastHashTable VectorMapJoinFastHashTable;
+  private final VectorMapJoinFastHashTable vectorMapJoinFastHashTable;
 
   public VectorMapJoinFastTableContainer(MapJoinDesc desc, Configuration hconf,
       long keyCount) throws SerDeException {
@@ -83,12 +83,12 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
 
     // LOG.debug("VectorMapJoinFastTableContainer load newThreshold " + newThreshold);
 
-    VectorMapJoinFastHashTable = createHashTable(newThreshold);
+    vectorMapJoinFastHashTable = createHashTable(newThreshold);
   }
 
   @Override
   public VectorMapJoinHashTable vectorMapJoinHashTable() {
-    return VectorMapJoinFastHashTable;
+    return vectorMapJoinFastHashTable;
   }
 
   private VectorMapJoinFastHashTable createHashTable(int newThreshold) {
@@ -178,7 +178,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
       throws SerDeException, HiveException, IOException {
 
     // We are not using the key and value contexts, nor do we support a MapJoinKey.
-    VectorMapJoinFastHashTable.putRow((BytesWritable) currentKey, (BytesWritable) currentValue);
+    vectorMapJoinFastHashTable.putRow((BytesWritable) currentKey, (BytesWritable) currentValue);
     return null;
   }
 
@@ -214,7 +214,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
 
   @Override
   public int size() {
-    return VectorMapJoinFastHashTable.size();
+    return vectorMapJoinFastHashTable.size();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
index ce598e3..b51d6fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable;
 
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
 
 /*
  * Root abstract class for a hash table result.
@@ -29,9 +30,12 @@ public abstract class VectorMapJoinHashTableResult {
 
   private int spillPartitionId;
 
+  private final WriteBuffers.Position readPos;
+
   public VectorMapJoinHashTableResult() {
     joinResult = JoinUtil.JoinResult.NOMATCH;
     spillPartitionId = -1;
+    readPos = new WriteBuffers.Position();
   }
 
   /**
@@ -78,4 +82,8 @@ public abstract class VectorMapJoinHashTableResult {
     sb.append("joinResult " + joinResult.name());
     return sb.toString();
   }
+
+  public WriteBuffers.Position getReadPos() {
+    return readPos;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f008a38b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
index 5900428..a4ecd9f 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
@@ -21,11 +21,8 @@ package org.apache.hadoop.hive.serde2;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
-import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
-import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hive.common.util.HashCodeUtil;
 
@@ -53,7 +50,7 @@ public final class WriteBuffers implements RandomAccessOutput {
   }
 
   Position writePos = new Position(); // Position where we'd write
-  Position defaultReadPos = new Position(); // Position where we'd read (by default).
+  Position unsafeReadPos = new Position(); // Position where we'd read (unsafely at write time).
 
 
   public WriteBuffers(int wbSize, long maxSize) {
@@ -64,16 +61,18 @@ public final class WriteBuffers implements RandomAccessOutput {
     writePos.bufferIndex = -1;
   }
 
-  public int readVInt() {
-    return (int) readVLong(defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public int unsafeReadVInt() {
+    return (int) readVLong(unsafeReadPos);
   }
 
   public int readVInt(Position readPos) {
     return (int) readVLong(readPos);
   }
 
-  public long readVLong() {
-    return readVLong(defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public long unsafeReadVLong() {
+    return readVLong(unsafeReadPos);
   }
 
   public long readVLong(Position readPos) {
@@ -97,8 +96,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
   }
 
-  public void skipVLong() {
-    skipVLong(defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public void unsafeSkipVLong() {
+    skipVLong(unsafeReadPos);
   }
 
   public void skipVLong(Position readPos) {
@@ -117,8 +117,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     }
   }
 
-  public void setReadPoint(long offset) {
-    setReadPoint(offset, defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public void setUnsafeReadPoint(long offset) {
+    setReadPoint(offset, unsafeReadPos);
   }
 
   public void setReadPoint(long offset, Position readPos) {
@@ -127,8 +128,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     readPos.offset = getOffset(offset);
   }
 
-  public int hashCode(long offset, int length) {
-    return hashCode(offset, length, defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public int unsafeHashCode(long offset, int length) {
+    return hashCode(offset, length, unsafeReadPos);
   }
 
   public int hashCode(long offset, int length, Position readPos) {
@@ -352,7 +354,7 @@ public final class WriteBuffers implements RandomAccessOutput {
  
   private void clearState() {
     writePos.clear();
-    defaultReadPos.clear();
+    unsafeReadPos.clear();
   }
 
 
@@ -363,8 +365,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     return ((long)writePos.bufferIndex << wbSizeLog2) + writePos.offset;
   }
 
-  public long getReadPoint() {
-    return getReadPoint(defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public long getUnsafeReadPoint() {
+    return getReadPoint(unsafeReadPos);
   }
 
   public long getReadPoint(Position readPos) {
@@ -518,8 +521,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     clearState();
   }
 
-  public long readNByteLong(long offset, int bytes) {
-    return readNByteLong(offset, bytes, defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public long unsafeReadNByteLong(long offset, int bytes) {
+    return readNByteLong(offset, bytes, unsafeReadPos);
   }
 
   public long readNByteLong(long offset, int bytes, Position readPos) {
@@ -561,7 +565,7 @@ public final class WriteBuffers implements RandomAccessOutput {
   }
 
   public int readInt(long offset) {
-    return (int)readNByteLong(offset, 4);
+    return (int)unsafeReadNByteLong(offset, 4);
   }
 
   @Override
@@ -606,7 +610,8 @@ public final class WriteBuffers implements RandomAccessOutput {
     return writeBuffers.size() * (long) wbSize;
   }
 
-  public Position getReadPosition() {
-    return defaultReadPos;
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public Position getUnsafeReadPosition() {
+    return unsafeReadPos;
   }
 }


[3/3] hive git commit: HIVE-14408 : thread safety issue in fast hashtable (Sergey Shelukhin, reviewed by Gopal V)

Posted by se...@apache.org.
HIVE-14408 : thread safety issue in fast hashtable (Sergey Shelukhin, reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/74c46e84
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/74c46e84
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/74c46e84

Branch: refs/heads/branch-2.0
Commit: 74c46e8464c7de83a34993293aaf07adb0d225a8
Parents: 98c5594
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Aug 3 12:33:35 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Aug 3 12:34:13 2016 -0700

----------------------------------------------------------------------
 .../persistence/BytesBytesMultiHashMap.java     | 34 +++++++-------
 .../ql/exec/vector/VectorMapJoinOperator.java   |  2 +-
 .../ql/exec/vector/VectorizedBatchUtil.java     | 13 +++---
 .../mapjoin/VectorMapJoinCommonOperator.java    |  1 +
 .../VectorMapJoinGenerateResultOperator.java    |  1 +
 .../fast/VectorMapJoinFastBytesHashMap.java     |  2 +-
 .../VectorMapJoinFastBytesHashMultiSet.java     |  2 +-
 .../fast/VectorMapJoinFastBytesHashSet.java     |  2 +-
 .../fast/VectorMapJoinFastBytesHashTable.java   |  8 ++--
 .../fast/VectorMapJoinFastHashTable.java        |  2 +-
 .../mapjoin/fast/VectorMapJoinFastKeyStore.java | 16 ++++---
 .../fast/VectorMapJoinFastLongHashTable.java    |  8 ++--
 .../fast/VectorMapJoinFastTableContainer.java   | 10 ++--
 .../hashtable/VectorMapJoinHashTableResult.java |  8 ++++
 .../apache/hadoop/hive/serde2/WriteBuffers.java | 49 +++++++++++---------
 15 files changed, 90 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
index 51acae0..dd88461 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
@@ -449,7 +449,7 @@ public final class BytesBytesMultiHashMap {
 
     kv.writeKey(writeBuffers);
     int keyLength = (int)(writeBuffers.getWritePoint() - keyOffset);
-    int hashCode = (keyHashCode == -1) ? writeBuffers.hashCode(keyOffset, keyLength) : keyHashCode;
+    int hashCode = (keyHashCode == -1) ? writeBuffers.unsafeHashCode(keyOffset, keyLength) : keyHashCode;
 
     int slot = findKeySlotToWrite(keyOffset, keyLength, hashCode);
     // LOG.info("Write hash code is " + Integer.toBinaryString(hashCode) + " - " + slot);
@@ -683,8 +683,8 @@ public final class BytesBytesMultiHashMap {
     if (!compareHashBits(ref, hashCode)) {
       return false; // Hash bits in ref don't match.
     }
-    writeBuffers.setReadPoint(getFirstRecordLengthsOffset(ref, null));
-    int valueLength = (int)writeBuffers.readVLong(), keyLength = (int)writeBuffers.readVLong();
+    writeBuffers.setUnsafeReadPoint(getFirstRecordLengthsOffset(ref, null));
+    int valueLength = (int)writeBuffers.unsafeReadVLong(), keyLength = (int)writeBuffers.unsafeReadVLong();
     if (keyLength != cmpLength) {
       return false;
     }
@@ -730,7 +730,7 @@ public final class BytesBytesMultiHashMap {
   private long getFirstRecordLengthsOffset(long ref, WriteBuffers.Position readPos) {
     long tailOffset = Ref.getOffset(ref);
     if (Ref.hasList(ref)) {
-      long relativeOffset = (readPos == null) ? writeBuffers.readNByteLong(tailOffset, 5)
+      long relativeOffset = (readPos == null) ? writeBuffers.unsafeReadNByteLong(tailOffset, 5)
           : writeBuffers.readNByteLong(tailOffset, 5, readPos);
       tailOffset += relativeOffset;
     }
@@ -763,10 +763,10 @@ public final class BytesBytesMultiHashMap {
       // TODO: we could actually store a bit flag in ref indicating whether this is a hash
       //       match or a probe, and in the former case use hash bits (for a first few resizes).
       // int hashCodeOrPart = oldSlot | Ref.getNthHashBit(oldRef, startingHashBitCount, newHashBitCount);
-      writeBuffers.setReadPoint(getFirstRecordLengthsOffset(oldRef, null));
+      writeBuffers.setUnsafeReadPoint(getFirstRecordLengthsOffset(oldRef, null));
       // Read the value and key length for the first record.
-      int hashCode = (int)writeBuffers.readNByteLong(Ref.getOffset(oldRef)
-          - writeBuffers.readVLong() - writeBuffers.readVLong() - 4, 4);
+      int hashCode = (int)writeBuffers.unsafeReadNByteLong(Ref.getOffset(oldRef)
+          - writeBuffers.unsafeReadVLong() - writeBuffers.unsafeReadVLong() - 4, 4);
       int probeSteps = relocateKeyRef(newRefs, oldRef, hashCode);
       maxSteps = Math.max(probeSteps, maxSteps);
     }
@@ -785,16 +785,16 @@ public final class BytesBytesMultiHashMap {
   private long createOrGetListRecord(long ref) {
     if (Ref.hasList(ref)) {
       // LOG.info("Found list record at " + writeBuffers.getReadPoint());
-      return writeBuffers.getReadPoint(); // Assumes we are here after key compare.
+      return writeBuffers.getUnsafeReadPoint(); // Assumes we are here after key compare.
     }
     long firstTailOffset = Ref.getOffset(ref);
     // LOG.info("First tail offset to create list record is " + firstTailOffset);
 
     // Determine the length of storage for value and key lengths of the first record.
-    writeBuffers.setReadPoint(firstTailOffset);
-    writeBuffers.skipVLong();
-    writeBuffers.skipVLong();
-    int lengthsLength = (int)(writeBuffers.getReadPoint() - firstTailOffset);
+    writeBuffers.setUnsafeReadPoint(firstTailOffset);
+    writeBuffers.unsafeSkipVLong();
+    writeBuffers.unsafeSkipVLong();
+    int lengthsLength = (int)(writeBuffers.getUnsafeReadPoint() - firstTailOffset);
 
     // Create the list record, copy first record value/key lengths there.
     writeBuffers.writeBytes(firstTailOffset, lengthsLength);
@@ -816,7 +816,7 @@ public final class BytesBytesMultiHashMap {
    */
   private void addRecordToList(long lrPtrOffset, long tailOffset) {
     // Now, insert this record into the list.
-    long prevHeadOffset = writeBuffers.readNByteLong(lrPtrOffset, 5);
+    long prevHeadOffset = writeBuffers.unsafeReadNByteLong(lrPtrOffset, 5);
     // LOG.info("Reading offset " + prevHeadOffset + " at " + lrPtrOffset);
     assert prevHeadOffset < tailOffset; // We replace an earlier element, must have lower offset.
     writeBuffers.writeFiveByteULong(lrPtrOffset, tailOffset);
@@ -885,10 +885,10 @@ public final class BytesBytesMultiHashMap {
       ++examined;
       long recOffset = getFirstRecordLengthsOffset(ref, null);
       long tailOffset = Ref.getOffset(ref);
-      writeBuffers.setReadPoint(recOffset);
-      int valueLength = (int)writeBuffers.readVLong(),
-          keyLength = (int)writeBuffers.readVLong();
-      long ptrOffset = writeBuffers.getReadPoint();
+      writeBuffers.setUnsafeReadPoint(recOffset);
+      int valueLength = (int)writeBuffers.unsafeReadVLong(),
+          keyLength = (int)writeBuffers.unsafeReadVLong();
+      long ptrOffset = writeBuffers.getUnsafeReadPoint();
       if (Ref.hasList(ref)) {
         byteIntervals.put(recOffset, (int)(ptrOffset + 5 - recOffset));
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
index 622f777..a247bb8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
@@ -153,7 +153,7 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator {
 
       // This is a vectorized aware evaluator
       ExprNodeEvaluator eval = new ExprNodeEvaluator<ExprNodeDesc>(desc) {
-        int columnIndex;;
+        int columnIndex;
         int writerIndex;
 
         public ExprNodeEvaluator initVectorExpr(int columnIndex, int writerIndex) {

http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
index be04da8..b0eee64 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
@@ -653,14 +653,15 @@ public class VectorizedBatchUtil {
 
   public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) {
     StringBuilder sb = new StringBuilder();
+    LOG.info(debugFormatOneRow(batch, index, prefix, sb).toString());
+  }
+
+  public static StringBuilder debugFormatOneRow(VectorizedRowBatch batch,
+      int index, String prefix, StringBuilder sb) {
     sb.append(prefix + " row " + index + " ");
     for (int p = 0; p < batch.projectionSize; p++) {
       int column = batch.projectedColumns[p];
-      if (p == column) {
-        sb.append("(col " + p + ") ");
-      } else {
-        sb.append("(proj col " + p + " col " + column + ") ");
-      }
+      sb.append("(" + p + "," + column + ") ");
       ColumnVector colVector = batch.cols[column];
       if (colVector == null) {
         sb.append("(null ColumnVector)");
@@ -703,7 +704,7 @@ public class VectorizedBatchUtil {
       }
       sb.append(" ");
     }
-    LOG.info(sb.toString());
+    return sb;
   }
 
   public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) {

http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index 8ad7ca4..24668f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -636,6 +636,7 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
     default:
       throw new RuntimeException("Unknown vector map join hash table implementation type " + hashTableImplementationType.name());
     }
+    LOG.info("Using " + vectorMapJoinHashTable.getClass().getSimpleName() + " from " + this.getClass().getSimpleName());
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
index 5cbace4..dee30f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
@@ -507,6 +507,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
     vectorMapJoinHashTable = VectorMapJoinOptimizedCreateHashTable.createHashTable(conf,
         smallTable);
     needHashTableSetup = true;
+    LOG.info("Created " + vectorMapJoinHashTable.getClass().getSimpleName() + " from " + this.getClass().getSimpleName());
 
     if (isLogDebugEnabled) {
       LOG.debug(CLASS_NAME + " reloadHashTable!");

http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
index 0ff98bd..a83a111 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
@@ -73,7 +73,7 @@ public abstract class VectorMapJoinFastBytesHashMap
     optimizedHashMapResult.forget();
 
     long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
-    long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+    long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashMapResult.getReadPos());
     JoinUtil.JoinResult joinResult;
     if (valueRefWord == -1) {
       joinResult = JoinUtil.JoinResult.NOMATCH;

http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
index 5d8ed2d..02a8ed1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
@@ -69,7 +69,7 @@ public abstract class VectorMapJoinFastBytesHashMultiSet
     optimizedHashMultiSetResult.forget();
 
     long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
-    long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+    long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashMultiSetResult.getReadPos());
     JoinUtil.JoinResult joinResult;
     if (count == -1) {
       joinResult = JoinUtil.JoinResult.NOMATCH;

http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
index 990a2e5..2c442fb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
@@ -64,7 +64,7 @@ public abstract class VectorMapJoinFastBytesHashSet
     optimizedHashSetResult.forget();
 
     long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
-    long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+    long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashSetResult.getReadPos());
     JoinUtil.JoinResult joinResult;
     if (existance == -1) {
       joinResult = JoinUtil.JoinResult.NOMATCH;

http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
index 6b536f0..7b77969 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashTable;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hive.common.util.HashCodeUtil;
 
@@ -87,7 +88,7 @@ public abstract class VectorMapJoinFastBytesHashTable
         break;
       }
       if (hashCode == slotTriples[tripleIndex + 1] &&
-          keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
+          keyStore.unsafeEqualKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
         // LOG.debug("VectorMapJoinFastBytesHashMap findWriteSlot slot " + slot + " tripleIndex " + tripleIndex + " existing");
         isNewKey = false;
         break;
@@ -176,7 +177,8 @@ public abstract class VectorMapJoinFastBytesHashTable
     // LOG.debug("VectorMapJoinFastLongHashTable expandAndRehash new logicalHashBucketCount " + logicalHashBucketCount + " resizeThreshold " + resizeThreshold + " metricExpands " + metricExpands);
   }
 
-  protected long findReadSlot(byte[] keyBytes, int keyStart, int keyLength, long hashCode) {
+  protected final long findReadSlot(
+      byte[] keyBytes, int keyStart, int keyLength, long hashCode, WriteBuffers.Position readPos) {
 
     int intHashCode = (int) hashCode;
     int slot = (intHashCode & logicalHashBucketMask);
@@ -188,7 +190,7 @@ public abstract class VectorMapJoinFastBytesHashTable
       if (slotTriples[tripleIndex] != 0 && hashCode == slotTriples[tripleIndex + 1]) {
         // Finally, verify the key bytes match.
 
-        if (keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
+        if (keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength, readPos)) {
           return slotTriples[tripleIndex + 2];
         }
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
index 099f38e..7df9eed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
@@ -29,7 +29,7 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab
   protected int logicalHashBucketMask;
 
   protected float loadFactor;
-  protected int writeBuffersSize;
+  protected final int writeBuffersSize;
 
   protected int metricPutConflict;
   protected int largestNumberOfSteps;

http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
index efdcd43..be51693 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
@@ -30,7 +30,7 @@ public class VectorMapJoinFastKeyStore {
 
   private WriteBuffers writeBuffers;
 
-  private WriteBuffers.Position readPos;
+  private WriteBuffers.Position unsafeReadPos; // Thread-unsafe position used at write time.
 
   /**
    * A store for arbitrary length keys in memory.
@@ -115,7 +115,13 @@ public class VectorMapJoinFastKeyStore {
     return keyRefWord;
   }
 
-  public boolean equalKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength) {
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public boolean unsafeEqualKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength) {
+    return equalKey(keyRefWord, keyBytes, keyStart, keyLength, unsafeReadPos);
+  }
+
+  public boolean equalKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength,
+      WriteBuffers.Position readPos) {
 
     int storedKeyLengthLength =
         (int) ((keyRefWord & SmallKeyLength.bitMask) >> SmallKeyLength.bitShift);
@@ -151,14 +157,12 @@ public class VectorMapJoinFastKeyStore {
 
   public VectorMapJoinFastKeyStore(int writeBuffersSize) {
     writeBuffers = new WriteBuffers(writeBuffersSize, AbsoluteKeyOffset.maxSize);
-
-    readPos = new WriteBuffers.Position();
+    unsafeReadPos = new WriteBuffers.Position();
   }
 
   public VectorMapJoinFastKeyStore(WriteBuffers writeBuffers) {
     // TODO: Check if maximum size compatible with AbsoluteKeyOffset.maxSize.
     this.writeBuffers = writeBuffers;
-
-    readPos = new WriteBuffers.Position();
+    unsafeReadPos = new WriteBuffers.Position();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
index f37f056..23f8315 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
@@ -48,13 +48,13 @@ public abstract class VectorMapJoinFastLongHashTable
 
   private transient final boolean isLogDebugEnabled = LOG.isDebugEnabled();
 
-  private HashTableKeyType hashTableKeyType;
+  private final HashTableKeyType hashTableKeyType;
 
-  private boolean isOuterJoin;
+  private final boolean isOuterJoin;
 
-  private BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
 
-  private boolean useMinMax;
+  private final boolean useMinMax;
   private long min;
   private long max;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
index 3b73f7d..9f3b107 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
@@ -58,7 +58,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
   private final long keyCount;
 
 
-  private final VectorMapJoinFastHashTable VectorMapJoinFastHashTable;
+  private final VectorMapJoinFastHashTable vectorMapJoinFastHashTable;
 
   public VectorMapJoinFastTableContainer(MapJoinDesc desc, Configuration hconf,
       long keyCount) throws SerDeException {
@@ -83,12 +83,12 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
 
     // LOG.debug("VectorMapJoinFastTableContainer load newThreshold " + newThreshold);
 
-    VectorMapJoinFastHashTable = createHashTable(newThreshold);
+    vectorMapJoinFastHashTable = createHashTable(newThreshold);
   }
 
   @Override
   public VectorMapJoinHashTable vectorMapJoinHashTable() {
-    return VectorMapJoinFastHashTable;
+    return vectorMapJoinFastHashTable;
   }
 
   private VectorMapJoinFastHashTable createHashTable(int newThreshold) {
@@ -178,7 +178,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
       throws SerDeException, HiveException, IOException {
 
     // We are not using the key and value contexts, nor do we support a MapJoinKey.
-    VectorMapJoinFastHashTable.putRow((BytesWritable) currentKey, (BytesWritable) currentValue);
+    vectorMapJoinFastHashTable.putRow((BytesWritable) currentKey, (BytesWritable) currentValue);
     return null;
   }
 
@@ -214,7 +214,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
 
   @Override
   public int size() {
-    return VectorMapJoinFastHashTable.size();
+    return vectorMapJoinFastHashTable.size();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
index ce598e3..b51d6fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable;
 
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
 
 /*
  * Root abstract class for a hash table result.
@@ -29,9 +30,12 @@ public abstract class VectorMapJoinHashTableResult {
 
   private int spillPartitionId;
 
+  private final WriteBuffers.Position readPos;
+
   public VectorMapJoinHashTableResult() {
     joinResult = JoinUtil.JoinResult.NOMATCH;
     spillPartitionId = -1;
+    readPos = new WriteBuffers.Position();
   }
 
   /**
@@ -78,4 +82,8 @@ public abstract class VectorMapJoinHashTableResult {
     sb.append("joinResult " + joinResult.name());
     return sb.toString();
   }
+
+  public WriteBuffers.Position getReadPos() {
+    return readPos;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
index 5900428..a4ecd9f 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
@@ -21,11 +21,8 @@ package org.apache.hadoop.hive.serde2;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
-import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
-import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hive.common.util.HashCodeUtil;
 
@@ -53,7 +50,7 @@ public final class WriteBuffers implements RandomAccessOutput {
   }
 
   Position writePos = new Position(); // Position where we'd write
-  Position defaultReadPos = new Position(); // Position where we'd read (by default).
+  Position unsafeReadPos = new Position(); // Position where we'd read (unsafely at write time).
 
 
   public WriteBuffers(int wbSize, long maxSize) {
@@ -64,16 +61,18 @@ public final class WriteBuffers implements RandomAccessOutput {
     writePos.bufferIndex = -1;
   }
 
-  public int readVInt() {
-    return (int) readVLong(defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public int unsafeReadVInt() {
+    return (int) readVLong(unsafeReadPos);
   }
 
   public int readVInt(Position readPos) {
     return (int) readVLong(readPos);
   }
 
-  public long readVLong() {
-    return readVLong(defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public long unsafeReadVLong() {
+    return readVLong(unsafeReadPos);
   }
 
   public long readVLong(Position readPos) {
@@ -97,8 +96,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
   }
 
-  public void skipVLong() {
-    skipVLong(defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public void unsafeSkipVLong() {
+    skipVLong(unsafeReadPos);
   }
 
   public void skipVLong(Position readPos) {
@@ -117,8 +117,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     }
   }
 
-  public void setReadPoint(long offset) {
-    setReadPoint(offset, defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public void setUnsafeReadPoint(long offset) {
+    setReadPoint(offset, unsafeReadPos);
   }
 
   public void setReadPoint(long offset, Position readPos) {
@@ -127,8 +128,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     readPos.offset = getOffset(offset);
   }
 
-  public int hashCode(long offset, int length) {
-    return hashCode(offset, length, defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public int unsafeHashCode(long offset, int length) {
+    return hashCode(offset, length, unsafeReadPos);
   }
 
   public int hashCode(long offset, int length, Position readPos) {
@@ -352,7 +354,7 @@ public final class WriteBuffers implements RandomAccessOutput {
  
   private void clearState() {
     writePos.clear();
-    defaultReadPos.clear();
+    unsafeReadPos.clear();
   }
 
 
@@ -363,8 +365,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     return ((long)writePos.bufferIndex << wbSizeLog2) + writePos.offset;
   }
 
-  public long getReadPoint() {
-    return getReadPoint(defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public long getUnsafeReadPoint() {
+    return getReadPoint(unsafeReadPos);
   }
 
   public long getReadPoint(Position readPos) {
@@ -518,8 +521,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     clearState();
   }
 
-  public long readNByteLong(long offset, int bytes) {
-    return readNByteLong(offset, bytes, defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public long unsafeReadNByteLong(long offset, int bytes) {
+    return readNByteLong(offset, bytes, unsafeReadPos);
   }
 
   public long readNByteLong(long offset, int bytes, Position readPos) {
@@ -561,7 +565,7 @@ public final class WriteBuffers implements RandomAccessOutput {
   }
 
   public int readInt(long offset) {
-    return (int)readNByteLong(offset, 4);
+    return (int)unsafeReadNByteLong(offset, 4);
   }
 
   @Override
@@ -606,7 +610,8 @@ public final class WriteBuffers implements RandomAccessOutput {
     return writeBuffers.size() * (long) wbSize;
   }
 
-  public Position getReadPosition() {
-    return defaultReadPos;
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public Position getUnsafeReadPosition() {
+    return unsafeReadPos;
   }
 }


[2/3] hive git commit: HIVE-14408 : thread safety issue in fast hashtable (Sergey Shelukhin, reviewed by Gopal V)

Posted by se...@apache.org.
HIVE-14408 : thread safety issue in fast hashtable (Sergey Shelukhin, reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/18c67972
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/18c67972
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/18c67972

Branch: refs/heads/branch-2.1
Commit: 18c679722c9ab68e3e82cf332fe47a65f962c95b
Parents: f7fdd4e
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Aug 3 12:33:35 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Aug 3 12:34:02 2016 -0700

----------------------------------------------------------------------
 .../persistence/BytesBytesMultiHashMap.java     | 34 +++++++-------
 .../ql/exec/vector/VectorMapJoinOperator.java   |  2 +-
 .../ql/exec/vector/VectorizedBatchUtil.java     | 13 +++---
 .../mapjoin/VectorMapJoinCommonOperator.java    |  1 +
 .../VectorMapJoinGenerateResultOperator.java    |  1 +
 .../fast/VectorMapJoinFastBytesHashMap.java     |  2 +-
 .../VectorMapJoinFastBytesHashMultiSet.java     |  2 +-
 .../fast/VectorMapJoinFastBytesHashSet.java     |  2 +-
 .../fast/VectorMapJoinFastBytesHashTable.java   |  8 ++--
 .../fast/VectorMapJoinFastHashTable.java        |  2 +-
 .../mapjoin/fast/VectorMapJoinFastKeyStore.java | 16 ++++---
 .../fast/VectorMapJoinFastLongHashTable.java    |  8 ++--
 .../fast/VectorMapJoinFastTableContainer.java   | 10 ++--
 .../hashtable/VectorMapJoinHashTableResult.java |  8 ++++
 .../apache/hadoop/hive/serde2/WriteBuffers.java | 49 +++++++++++---------
 15 files changed, 90 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/18c67972/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
index 51acae0..dd88461 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
@@ -449,7 +449,7 @@ public final class BytesBytesMultiHashMap {
 
     kv.writeKey(writeBuffers);
     int keyLength = (int)(writeBuffers.getWritePoint() - keyOffset);
-    int hashCode = (keyHashCode == -1) ? writeBuffers.hashCode(keyOffset, keyLength) : keyHashCode;
+    int hashCode = (keyHashCode == -1) ? writeBuffers.unsafeHashCode(keyOffset, keyLength) : keyHashCode;
 
     int slot = findKeySlotToWrite(keyOffset, keyLength, hashCode);
     // LOG.info("Write hash code is " + Integer.toBinaryString(hashCode) + " - " + slot);
@@ -683,8 +683,8 @@ public final class BytesBytesMultiHashMap {
     if (!compareHashBits(ref, hashCode)) {
       return false; // Hash bits in ref don't match.
     }
-    writeBuffers.setReadPoint(getFirstRecordLengthsOffset(ref, null));
-    int valueLength = (int)writeBuffers.readVLong(), keyLength = (int)writeBuffers.readVLong();
+    writeBuffers.setUnsafeReadPoint(getFirstRecordLengthsOffset(ref, null));
+    int valueLength = (int)writeBuffers.unsafeReadVLong(), keyLength = (int)writeBuffers.unsafeReadVLong();
     if (keyLength != cmpLength) {
       return false;
     }
@@ -730,7 +730,7 @@ public final class BytesBytesMultiHashMap {
   private long getFirstRecordLengthsOffset(long ref, WriteBuffers.Position readPos) {
     long tailOffset = Ref.getOffset(ref);
     if (Ref.hasList(ref)) {
-      long relativeOffset = (readPos == null) ? writeBuffers.readNByteLong(tailOffset, 5)
+      long relativeOffset = (readPos == null) ? writeBuffers.unsafeReadNByteLong(tailOffset, 5)
           : writeBuffers.readNByteLong(tailOffset, 5, readPos);
       tailOffset += relativeOffset;
     }
@@ -763,10 +763,10 @@ public final class BytesBytesMultiHashMap {
       // TODO: we could actually store a bit flag in ref indicating whether this is a hash
       //       match or a probe, and in the former case use hash bits (for a first few resizes).
       // int hashCodeOrPart = oldSlot | Ref.getNthHashBit(oldRef, startingHashBitCount, newHashBitCount);
-      writeBuffers.setReadPoint(getFirstRecordLengthsOffset(oldRef, null));
+      writeBuffers.setUnsafeReadPoint(getFirstRecordLengthsOffset(oldRef, null));
       // Read the value and key length for the first record.
-      int hashCode = (int)writeBuffers.readNByteLong(Ref.getOffset(oldRef)
-          - writeBuffers.readVLong() - writeBuffers.readVLong() - 4, 4);
+      int hashCode = (int)writeBuffers.unsafeReadNByteLong(Ref.getOffset(oldRef)
+          - writeBuffers.unsafeReadVLong() - writeBuffers.unsafeReadVLong() - 4, 4);
       int probeSteps = relocateKeyRef(newRefs, oldRef, hashCode);
       maxSteps = Math.max(probeSteps, maxSteps);
     }
@@ -785,16 +785,16 @@ public final class BytesBytesMultiHashMap {
   private long createOrGetListRecord(long ref) {
     if (Ref.hasList(ref)) {
       // LOG.info("Found list record at " + writeBuffers.getReadPoint());
-      return writeBuffers.getReadPoint(); // Assumes we are here after key compare.
+      return writeBuffers.getUnsafeReadPoint(); // Assumes we are here after key compare.
     }
     long firstTailOffset = Ref.getOffset(ref);
     // LOG.info("First tail offset to create list record is " + firstTailOffset);
 
     // Determine the length of storage for value and key lengths of the first record.
-    writeBuffers.setReadPoint(firstTailOffset);
-    writeBuffers.skipVLong();
-    writeBuffers.skipVLong();
-    int lengthsLength = (int)(writeBuffers.getReadPoint() - firstTailOffset);
+    writeBuffers.setUnsafeReadPoint(firstTailOffset);
+    writeBuffers.unsafeSkipVLong();
+    writeBuffers.unsafeSkipVLong();
+    int lengthsLength = (int)(writeBuffers.getUnsafeReadPoint() - firstTailOffset);
 
     // Create the list record, copy first record value/key lengths there.
     writeBuffers.writeBytes(firstTailOffset, lengthsLength);
@@ -816,7 +816,7 @@ public final class BytesBytesMultiHashMap {
    */
   private void addRecordToList(long lrPtrOffset, long tailOffset) {
     // Now, insert this record into the list.
-    long prevHeadOffset = writeBuffers.readNByteLong(lrPtrOffset, 5);
+    long prevHeadOffset = writeBuffers.unsafeReadNByteLong(lrPtrOffset, 5);
     // LOG.info("Reading offset " + prevHeadOffset + " at " + lrPtrOffset);
     assert prevHeadOffset < tailOffset; // We replace an earlier element, must have lower offset.
     writeBuffers.writeFiveByteULong(lrPtrOffset, tailOffset);
@@ -885,10 +885,10 @@ public final class BytesBytesMultiHashMap {
       ++examined;
       long recOffset = getFirstRecordLengthsOffset(ref, null);
       long tailOffset = Ref.getOffset(ref);
-      writeBuffers.setReadPoint(recOffset);
-      int valueLength = (int)writeBuffers.readVLong(),
-          keyLength = (int)writeBuffers.readVLong();
-      long ptrOffset = writeBuffers.getReadPoint();
+      writeBuffers.setUnsafeReadPoint(recOffset);
+      int valueLength = (int)writeBuffers.unsafeReadVLong(),
+          keyLength = (int)writeBuffers.unsafeReadVLong();
+      long ptrOffset = writeBuffers.getUnsafeReadPoint();
       if (Ref.hasList(ref)) {
         byteIntervals.put(recOffset, (int)(ptrOffset + 5 - recOffset));
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/18c67972/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
index 3323df3..0cb6c8a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
@@ -154,7 +154,7 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator {
 
       // This is a vectorized aware evaluator
       ExprNodeEvaluator eval = new ExprNodeEvaluator<ExprNodeDesc>(desc) {
-        int columnIndex;;
+        int columnIndex;
         int writerIndex;
 
         public ExprNodeEvaluator initVectorExpr(int columnIndex, int writerIndex) {

http://git-wip-us.apache.org/repos/asf/hive/blob/18c67972/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
index 9471e66..990e896 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
@@ -702,14 +702,15 @@ public class VectorizedBatchUtil {
 
   public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) {
     StringBuilder sb = new StringBuilder();
+    LOG.info(debugFormatOneRow(batch, index, prefix, sb).toString());
+  }
+
+  public static StringBuilder debugFormatOneRow(VectorizedRowBatch batch,
+      int index, String prefix, StringBuilder sb) {
     sb.append(prefix + " row " + index + " ");
     for (int p = 0; p < batch.projectionSize; p++) {
       int column = batch.projectedColumns[p];
-      if (p == column) {
-        sb.append("(col " + p + ") ");
-      } else {
-        sb.append("(proj col " + p + " col " + column + ") ");
-      }
+      sb.append("(" + p + "," + column + ") ");
       ColumnVector colVector = batch.cols[column];
       if (colVector == null) {
         sb.append("(null ColumnVector)");
@@ -752,7 +753,7 @@ public class VectorizedBatchUtil {
       }
       sb.append(" ");
     }
-    LOG.info(sb.toString());
+    return sb;
   }
 
   public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) {

http://git-wip-us.apache.org/repos/asf/hive/blob/18c67972/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index 8ad7ca4..24668f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -636,6 +636,7 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
     default:
       throw new RuntimeException("Unknown vector map join hash table implementation type " + hashTableImplementationType.name());
     }
+    LOG.info("Using " + vectorMapJoinHashTable.getClass().getSimpleName() + " from " + this.getClass().getSimpleName());
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/hive/blob/18c67972/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
index 6a3d64b..22eb07e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
@@ -507,6 +507,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
     vectorMapJoinHashTable = VectorMapJoinOptimizedCreateHashTable.createHashTable(conf,
         smallTable);
     needHashTableSetup = true;
+    LOG.info("Created " + vectorMapJoinHashTable.getClass().getSimpleName() + " from " + this.getClass().getSimpleName());
 
     if (isLogDebugEnabled) {
       LOG.debug(CLASS_NAME + " reloadHashTable!");

http://git-wip-us.apache.org/repos/asf/hive/blob/18c67972/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
index a4bc188..d878f65 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
@@ -81,7 +81,7 @@ public abstract class VectorMapJoinFastBytesHashMap
     optimizedHashMapResult.forget();
 
     long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
-    long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+    long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashMapResult.getReadPos());
     JoinUtil.JoinResult joinResult;
     if (valueRefWord == -1) {
       joinResult = JoinUtil.JoinResult.NOMATCH;

http://git-wip-us.apache.org/repos/asf/hive/blob/18c67972/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
index aaf3497..b328efd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
@@ -75,7 +75,7 @@ public abstract class VectorMapJoinFastBytesHashMultiSet
     optimizedHashMultiSetResult.forget();
 
     long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
-    long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+    long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashMultiSetResult.getReadPos());
     JoinUtil.JoinResult joinResult;
     if (count == -1) {
       joinResult = JoinUtil.JoinResult.NOMATCH;

http://git-wip-us.apache.org/repos/asf/hive/blob/18c67972/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
index 841183e..c9b23bf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
@@ -65,7 +65,7 @@ public abstract class VectorMapJoinFastBytesHashSet
     optimizedHashSetResult.forget();
 
     long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
-    long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+    long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashSetResult.getReadPos());
     JoinUtil.JoinResult joinResult;
     if (existance == -1) {
       joinResult = JoinUtil.JoinResult.NOMATCH;

http://git-wip-us.apache.org/repos/asf/hive/blob/18c67972/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
index d6e107b..7987723 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashTable;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hive.common.util.HashCodeUtil;
 
@@ -75,7 +76,7 @@ public abstract class VectorMapJoinFastBytesHashTable
         break;
       }
       if (hashCode == slotTriples[tripleIndex + 1] &&
-          keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
+          keyStore.unsafeEqualKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
         // LOG.debug("VectorMapJoinFastBytesHashMap findWriteSlot slot " + slot + " tripleIndex " + tripleIndex + " existing");
         isNewKey = false;
         break;
@@ -164,7 +165,8 @@ public abstract class VectorMapJoinFastBytesHashTable
     // LOG.debug("VectorMapJoinFastLongHashTable expandAndRehash new logicalHashBucketCount " + logicalHashBucketCount + " resizeThreshold " + resizeThreshold + " metricExpands " + metricExpands);
   }
 
-  protected long findReadSlot(byte[] keyBytes, int keyStart, int keyLength, long hashCode) {
+  protected final long findReadSlot(
+      byte[] keyBytes, int keyStart, int keyLength, long hashCode, WriteBuffers.Position readPos) {
 
     int intHashCode = (int) hashCode;
     int slot = (intHashCode & logicalHashBucketMask);
@@ -176,7 +178,7 @@ public abstract class VectorMapJoinFastBytesHashTable
       if (slotTriples[tripleIndex] != 0 && hashCode == slotTriples[tripleIndex + 1]) {
         // Finally, verify the key bytes match.
 
-        if (keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
+        if (keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength, readPos)) {
           return slotTriples[tripleIndex + 2];
         }
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/18c67972/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
index 099f38e..7df9eed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
@@ -29,7 +29,7 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab
   protected int logicalHashBucketMask;
 
   protected float loadFactor;
-  protected int writeBuffersSize;
+  protected final int writeBuffersSize;
 
   protected int metricPutConflict;
   protected int largestNumberOfSteps;

http://git-wip-us.apache.org/repos/asf/hive/blob/18c67972/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
index efdcd43..be51693 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
@@ -30,7 +30,7 @@ public class VectorMapJoinFastKeyStore {
 
   private WriteBuffers writeBuffers;
 
-  private WriteBuffers.Position readPos;
+  private WriteBuffers.Position unsafeReadPos; // Thread-unsafe position used at write time.
 
   /**
    * A store for arbitrary length keys in memory.
@@ -115,7 +115,13 @@ public class VectorMapJoinFastKeyStore {
     return keyRefWord;
   }
 
-  public boolean equalKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength) {
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public boolean unsafeEqualKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength) {
+    return equalKey(keyRefWord, keyBytes, keyStart, keyLength, unsafeReadPos);
+  }
+
+  public boolean equalKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength,
+      WriteBuffers.Position readPos) {
 
     int storedKeyLengthLength =
         (int) ((keyRefWord & SmallKeyLength.bitMask) >> SmallKeyLength.bitShift);
@@ -151,14 +157,12 @@ public class VectorMapJoinFastKeyStore {
 
   public VectorMapJoinFastKeyStore(int writeBuffersSize) {
     writeBuffers = new WriteBuffers(writeBuffersSize, AbsoluteKeyOffset.maxSize);
-
-    readPos = new WriteBuffers.Position();
+    unsafeReadPos = new WriteBuffers.Position();
   }
 
   public VectorMapJoinFastKeyStore(WriteBuffers writeBuffers) {
     // TODO: Check if maximum size compatible with AbsoluteKeyOffset.maxSize.
     this.writeBuffers = writeBuffers;
-
-    readPos = new WriteBuffers.Position();
+    unsafeReadPos = new WriteBuffers.Position();
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/18c67972/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
index 78b55a1..5373aad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
@@ -48,13 +48,13 @@ public abstract class VectorMapJoinFastLongHashTable
 
   private transient final boolean isLogDebugEnabled = LOG.isDebugEnabled();
 
-  private HashTableKeyType hashTableKeyType;
+  private final HashTableKeyType hashTableKeyType;
 
-  private boolean isOuterJoin;
+  private final boolean isOuterJoin;
 
-  private BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
 
-  private boolean useMinMax;
+  private final boolean useMinMax;
   private long min;
   private long max;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/18c67972/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
index 3b73f7d..9f3b107 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
@@ -58,7 +58,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
   private final long keyCount;
 
 
-  private final VectorMapJoinFastHashTable VectorMapJoinFastHashTable;
+  private final VectorMapJoinFastHashTable vectorMapJoinFastHashTable;
 
   public VectorMapJoinFastTableContainer(MapJoinDesc desc, Configuration hconf,
       long keyCount) throws SerDeException {
@@ -83,12 +83,12 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
 
     // LOG.debug("VectorMapJoinFastTableContainer load newThreshold " + newThreshold);
 
-    VectorMapJoinFastHashTable = createHashTable(newThreshold);
+    vectorMapJoinFastHashTable = createHashTable(newThreshold);
   }
 
   @Override
   public VectorMapJoinHashTable vectorMapJoinHashTable() {
-    return VectorMapJoinFastHashTable;
+    return vectorMapJoinFastHashTable;
   }
 
   private VectorMapJoinFastHashTable createHashTable(int newThreshold) {
@@ -178,7 +178,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
       throws SerDeException, HiveException, IOException {
 
     // We are not using the key and value contexts, nor do we support a MapJoinKey.
-    VectorMapJoinFastHashTable.putRow((BytesWritable) currentKey, (BytesWritable) currentValue);
+    vectorMapJoinFastHashTable.putRow((BytesWritable) currentKey, (BytesWritable) currentValue);
     return null;
   }
 
@@ -214,7 +214,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
 
   @Override
   public int size() {
-    return VectorMapJoinFastHashTable.size();
+    return vectorMapJoinFastHashTable.size();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/18c67972/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
index ce598e3..b51d6fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable;
 
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
 
 /*
  * Root abstract class for a hash table result.
@@ -29,9 +30,12 @@ public abstract class VectorMapJoinHashTableResult {
 
   private int spillPartitionId;
 
+  private final WriteBuffers.Position readPos;
+
   public VectorMapJoinHashTableResult() {
     joinResult = JoinUtil.JoinResult.NOMATCH;
     spillPartitionId = -1;
+    readPos = new WriteBuffers.Position();
   }
 
   /**
@@ -78,4 +82,8 @@ public abstract class VectorMapJoinHashTableResult {
     sb.append("joinResult " + joinResult.name());
     return sb.toString();
   }
+
+  public WriteBuffers.Position getReadPos() {
+    return readPos;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/18c67972/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
index 5900428..a4ecd9f 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
@@ -21,11 +21,8 @@ package org.apache.hadoop.hive.serde2;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
-import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
-import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hive.common.util.HashCodeUtil;
 
@@ -53,7 +50,7 @@ public final class WriteBuffers implements RandomAccessOutput {
   }
 
   Position writePos = new Position(); // Position where we'd write
-  Position defaultReadPos = new Position(); // Position where we'd read (by default).
+  Position unsafeReadPos = new Position(); // Position where we'd read (unsafely at write time).
 
 
   public WriteBuffers(int wbSize, long maxSize) {
@@ -64,16 +61,18 @@ public final class WriteBuffers implements RandomAccessOutput {
     writePos.bufferIndex = -1;
   }
 
-  public int readVInt() {
-    return (int) readVLong(defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public int unsafeReadVInt() {
+    return (int) readVLong(unsafeReadPos);
   }
 
   public int readVInt(Position readPos) {
     return (int) readVLong(readPos);
   }
 
-  public long readVLong() {
-    return readVLong(defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public long unsafeReadVLong() {
+    return readVLong(unsafeReadPos);
   }
 
   public long readVLong(Position readPos) {
@@ -97,8 +96,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
   }
 
-  public void skipVLong() {
-    skipVLong(defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public void unsafeSkipVLong() {
+    skipVLong(unsafeReadPos);
   }
 
   public void skipVLong(Position readPos) {
@@ -117,8 +117,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     }
   }
 
-  public void setReadPoint(long offset) {
-    setReadPoint(offset, defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public void setUnsafeReadPoint(long offset) {
+    setReadPoint(offset, unsafeReadPos);
   }
 
   public void setReadPoint(long offset, Position readPos) {
@@ -127,8 +128,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     readPos.offset = getOffset(offset);
   }
 
-  public int hashCode(long offset, int length) {
-    return hashCode(offset, length, defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public int unsafeHashCode(long offset, int length) {
+    return hashCode(offset, length, unsafeReadPos);
   }
 
   public int hashCode(long offset, int length, Position readPos) {
@@ -352,7 +354,7 @@ public final class WriteBuffers implements RandomAccessOutput {
  
   private void clearState() {
     writePos.clear();
-    defaultReadPos.clear();
+    unsafeReadPos.clear();
   }
 
 
@@ -363,8 +365,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     return ((long)writePos.bufferIndex << wbSizeLog2) + writePos.offset;
   }
 
-  public long getReadPoint() {
-    return getReadPoint(defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public long getUnsafeReadPoint() {
+    return getReadPoint(unsafeReadPos);
   }
 
   public long getReadPoint(Position readPos) {
@@ -518,8 +521,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     clearState();
   }
 
-  public long readNByteLong(long offset, int bytes) {
-    return readNByteLong(offset, bytes, defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public long unsafeReadNByteLong(long offset, int bytes) {
+    return readNByteLong(offset, bytes, unsafeReadPos);
   }
 
   public long readNByteLong(long offset, int bytes, Position readPos) {
@@ -561,7 +565,7 @@ public final class WriteBuffers implements RandomAccessOutput {
   }
 
   public int readInt(long offset) {
-    return (int)readNByteLong(offset, 4);
+    return (int)unsafeReadNByteLong(offset, 4);
   }
 
   @Override
@@ -606,7 +610,8 @@ public final class WriteBuffers implements RandomAccessOutput {
     return writeBuffers.size() * (long) wbSize;
   }
 
-  public Position getReadPosition() {
-    return defaultReadPos;
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public Position getUnsafeReadPosition() {
+    return unsafeReadPos;
   }
 }