You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/08/03 19:42:38 UTC

hive git commit: HIVE-14408 : thread safety issue in fast hashtable (Sergey Shelukhin, reviewed by Gopal V)

Repository: hive
Updated Branches:
  refs/heads/branch-1 411038515 -> cb3945209


HIVE-14408 : thread safety issue in fast hashtable (Sergey Shelukhin, reviewed by Gopal V)

Conflicts:
	ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
	ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
	ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
	ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cb394520
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cb394520
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cb394520

Branch: refs/heads/branch-1
Commit: cb3945209a886556068b6291d6a94355cfacd909
Parents: 4110385
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Aug 3 12:33:35 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Aug 3 12:42:25 2016 -0700

----------------------------------------------------------------------
 .../persistence/BytesBytesMultiHashMap.java     | 34 ++++++-------
 .../ql/exec/vector/VectorMapJoinOperator.java   |  2 +-
 .../ql/exec/vector/VectorizedBatchUtil.java     | 13 ++---
 .../mapjoin/VectorMapJoinCommonOperator.java    |  1 +
 .../VectorMapJoinGenerateResultOperator.java    |  1 +
 .../fast/VectorMapJoinFastBytesHashMap.java     |  4 +-
 .../VectorMapJoinFastBytesHashMultiSet.java     |  4 +-
 .../fast/VectorMapJoinFastBytesHashSet.java     |  4 +-
 .../fast/VectorMapJoinFastBytesHashTable.java   | 10 ++--
 .../fast/VectorMapJoinFastHashTable.java        |  4 +-
 .../mapjoin/fast/VectorMapJoinFastKeyStore.java | 18 ++++---
 .../fast/VectorMapJoinFastLongHashTable.java    | 10 ++--
 .../fast/VectorMapJoinFastTableContainer.java   | 12 ++---
 .../hashtable/VectorMapJoinHashTableResult.java |  8 +++
 .../apache/hadoop/hive/serde2/WriteBuffers.java | 51 +++++++++++---------
 15 files changed, 99 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
index 573a7aa..080c509 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
@@ -458,7 +458,7 @@ public final class BytesBytesMultiHashMap {
 
     kv.writeKey(writeBuffers);
     int keyLength = (int)(writeBuffers.getWritePoint() - keyOffset);
-    int hashCode = (keyHashCode == -1) ? writeBuffers.hashCode(keyOffset, keyLength) : keyHashCode;
+    int hashCode = (keyHashCode == -1) ? writeBuffers.unsafeHashCode(keyOffset, keyLength) : keyHashCode;
 
     int slot = findKeySlotToWrite(keyOffset, keyLength, hashCode);
     // LOG.info("Write hash code is " + Integer.toBinaryString(hashCode) + " - " + slot);
@@ -699,8 +699,8 @@ public final class BytesBytesMultiHashMap {
     if (!compareHashBits(ref, hashCode)) {
       return false; // Hash bits in ref don't match.
     }
-    writeBuffers.setReadPoint(getFirstRecordLengthsOffset(ref, null));
-    int valueLength = (int)writeBuffers.readVLong(), keyLength = (int)writeBuffers.readVLong();
+    writeBuffers.setUnsafeReadPoint(getFirstRecordLengthsOffset(ref, null));
+    int valueLength = (int)writeBuffers.unsafeReadVLong(), keyLength = (int)writeBuffers.unsafeReadVLong();
     if (keyLength != cmpLength) {
       return false;
     }
@@ -746,7 +746,7 @@ public final class BytesBytesMultiHashMap {
   private long getFirstRecordLengthsOffset(long ref, WriteBuffers.Position readPos) {
     long tailOffset = Ref.getOffset(ref);
     if (Ref.hasList(ref)) {
-      long relativeOffset = (readPos == null) ? writeBuffers.readNByteLong(tailOffset, 5)
+      long relativeOffset = (readPos == null) ? writeBuffers.unsafeReadNByteLong(tailOffset, 5)
           : writeBuffers.readNByteLong(tailOffset, 5, readPos);
       tailOffset += relativeOffset;
     }
@@ -779,10 +779,10 @@ public final class BytesBytesMultiHashMap {
       // TODO: we could actually store a bit flag in ref indicating whether this is a hash
       //       match or a probe, and in the former case use hash bits (for a first few resizes).
       // int hashCodeOrPart = oldSlot | Ref.getNthHashBit(oldRef, startingHashBitCount, newHashBitCount);
-      writeBuffers.setReadPoint(getFirstRecordLengthsOffset(oldRef, null));
+      writeBuffers.setUnsafeReadPoint(getFirstRecordLengthsOffset(oldRef, null));
       // Read the value and key length for the first record.
-      int hashCode = (int)writeBuffers.readNByteLong(Ref.getOffset(oldRef)
-          - writeBuffers.readVLong() - writeBuffers.readVLong() - 4, 4);
+      int hashCode = (int)writeBuffers.unsafeReadNByteLong(Ref.getOffset(oldRef)
+          - writeBuffers.unsafeReadVLong() - writeBuffers.unsafeReadVLong() - 4, 4);
       int probeSteps = relocateKeyRef(newRefs, oldRef, hashCode);
       maxSteps = Math.max(probeSteps, maxSteps);
     }
@@ -801,16 +801,16 @@ public final class BytesBytesMultiHashMap {
   private long createOrGetListRecord(long ref) {
     if (Ref.hasList(ref)) {
       // LOG.info("Found list record at " + writeBuffers.getReadPoint());
-      return writeBuffers.getReadPoint(); // Assumes we are here after key compare.
+      return writeBuffers.getUnsafeReadPoint(); // Assumes we are here after key compare.
     }
     long firstTailOffset = Ref.getOffset(ref);
     // LOG.info("First tail offset to create list record is " + firstTailOffset);
 
     // Determine the length of storage for value and key lengths of the first record.
-    writeBuffers.setReadPoint(firstTailOffset);
-    writeBuffers.skipVLong();
-    writeBuffers.skipVLong();
-    int lengthsLength = (int)(writeBuffers.getReadPoint() - firstTailOffset);
+    writeBuffers.setUnsafeReadPoint(firstTailOffset);
+    writeBuffers.unsafeSkipVLong();
+    writeBuffers.unsafeSkipVLong();
+    int lengthsLength = (int)(writeBuffers.getUnsafeReadPoint() - firstTailOffset);
 
     // Create the list record, copy first record value/key lengths there.
     writeBuffers.writeBytes(firstTailOffset, lengthsLength);
@@ -832,7 +832,7 @@ public final class BytesBytesMultiHashMap {
    */
   private void addRecordToList(long lrPtrOffset, long tailOffset) {
     // Now, insert this record into the list.
-    long prevHeadOffset = writeBuffers.readNByteLong(lrPtrOffset, 5);
+    long prevHeadOffset = writeBuffers.unsafeReadNByteLong(lrPtrOffset, 5);
     // LOG.info("Reading offset " + prevHeadOffset + " at " + lrPtrOffset);
     assert prevHeadOffset < tailOffset; // We replace an earlier element, must have lower offset.
     writeBuffers.writeFiveByteULong(lrPtrOffset, tailOffset);
@@ -901,10 +901,10 @@ public final class BytesBytesMultiHashMap {
       ++examined;
       long recOffset = getFirstRecordLengthsOffset(ref, null);
       long tailOffset = Ref.getOffset(ref);
-      writeBuffers.setReadPoint(recOffset);
-      int valueLength = (int)writeBuffers.readVLong(),
-          keyLength = (int)writeBuffers.readVLong();
-      long ptrOffset = writeBuffers.getReadPoint();
+      writeBuffers.setUnsafeReadPoint(recOffset);
+      int valueLength = (int)writeBuffers.unsafeReadVLong(),
+          keyLength = (int)writeBuffers.unsafeReadVLong();
+      long ptrOffset = writeBuffers.getUnsafeReadPoint();
       if (Ref.hasList(ref)) {
         byteIntervals.put(recOffset, (int)(ptrOffset + 5 - recOffset));
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
index 9bd811c..1aad041 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
@@ -147,7 +147,7 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator {
 
       // This is a vectorized aware evaluator
       ExprNodeEvaluator eval = new ExprNodeEvaluator<ExprNodeDesc>(desc) {
-        int columnIndex;;
+        int columnIndex;
         int writerIndex;
 
         public ExprNodeEvaluator initVectorExpr(int columnIndex, int writerIndex) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
index 1482855..5d9410c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
@@ -650,14 +650,15 @@ public class VectorizedBatchUtil {
 
   public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) {
     StringBuilder sb = new StringBuilder();
+    LOG.info(debugFormatOneRow(batch, index, prefix, sb).toString());
+  }
+
+  public static StringBuilder debugFormatOneRow(VectorizedRowBatch batch,
+      int index, String prefix, StringBuilder sb) {
     sb.append(prefix + " row " + index + " ");
     for (int p = 0; p < batch.projectionSize; p++) {
       int column = batch.projectedColumns[p];
-      if (p == column) {
-        sb.append("(col " + p + ") ");
-      } else {
-        sb.append("(proj col " + p + " col " + column + ") ");
-      }
+      sb.append("(" + p + "," + column + ") ");
       ColumnVector colVector = batch.cols[column];
       if (colVector == null) {
         sb.append("(null ColumnVector)");
@@ -700,7 +701,7 @@ public class VectorizedBatchUtil {
       }
       sb.append(" ");
     }
-    LOG.info(sb.toString());
+    return sb;
   }
 
   public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) {

http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index 45b52c4..164e6c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -639,6 +639,7 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
     default:
       throw new RuntimeException("Unknown vector map join hash table implementation type " + hashTableImplementationType.name());
     }
+    LOG.info("Using " + vectorMapJoinHashTable.getClass().getSimpleName() + " from " + this.getClass().getSimpleName());
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
index e1c2f31..6b6d7ce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
@@ -501,6 +501,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
     vectorMapJoinHashTable = VectorMapJoinOptimizedCreateHashTable.createHashTable(conf,
         smallTable);
     needHashTableSetup = true;
+    LOG.info("Created " + vectorMapJoinHashTable.getClass().getSimpleName() + " from " + this.getClass().getSimpleName());
 
     if (LOG.isDebugEnabled()) {
       LOG.debug(CLASS_NAME + " reloadHashTable!");

http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
index 6afaec3..c7c146d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
@@ -72,7 +72,7 @@ public abstract class VectorMapJoinFastBytesHashMap
     optimizedHashMapResult.forget();
 
     long hashCode = VectorMapJoinFastBytesHashUtil.hashKey(keyBytes, keyStart, keyLength);
-    long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+    long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashMapResult.getReadPos());
     JoinUtil.JoinResult joinResult;
     if (valueRefWord == -1) {
       joinResult = JoinUtil.JoinResult.NOMATCH;
@@ -98,4 +98,4 @@ public abstract class VectorMapJoinFastBytesHashMap
     // Share the same write buffers with our value store.
     keyStore = new VectorMapJoinFastKeyStore(valueStore.writeBuffers());
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
index dceb99c..dca486c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
@@ -68,7 +68,7 @@ public abstract class VectorMapJoinFastBytesHashMultiSet
     optimizedHashMultiSetResult.forget();
 
     long hashCode = VectorMapJoinFastBytesHashUtil.hashKey(keyBytes, keyStart, keyLength);
-    long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+    long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashMultiSetResult.getReadPos());
     JoinUtil.JoinResult joinResult;
     if (count == -1) {
       joinResult = JoinUtil.JoinResult.NOMATCH;
@@ -90,4 +90,4 @@ public abstract class VectorMapJoinFastBytesHashMultiSet
 
     keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
index 9f122c4..46788d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
@@ -63,7 +63,7 @@ public abstract class VectorMapJoinFastBytesHashSet
     optimizedHashSetResult.forget();
 
     long hashCode = VectorMapJoinFastBytesHashUtil.hashKey(keyBytes, keyStart, keyLength);
-    long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+    long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashSetResult.getReadPos());
     JoinUtil.JoinResult joinResult;
     if (existance == -1) {
       joinResult = JoinUtil.JoinResult.NOMATCH;
@@ -82,4 +82,4 @@ public abstract class VectorMapJoinFastBytesHashSet
 
     keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
index 91d7fd6..5da3b21 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashTable;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
 import org.apache.hadoop.io.BytesWritable;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -84,7 +85,7 @@ public abstract class VectorMapJoinFastBytesHashTable
         break;
       }
       if (hashCode == slotTriples[tripleIndex + 1] &&
-          keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
+          keyStore.unsafeEqualKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
         // LOG.debug("VectorMapJoinFastBytesHashMap findWriteSlot slot " + slot + " tripleIndex " + tripleIndex + " existing");
         isNewKey = false;
         break;
@@ -173,7 +174,8 @@ public abstract class VectorMapJoinFastBytesHashTable
     // LOG.debug("VectorMapJoinFastLongHashTable expandAndRehash new logicalHashBucketCount " + logicalHashBucketCount + " resizeThreshold " + resizeThreshold + " metricExpands " + metricExpands);
   }
 
-  protected long findReadSlot(byte[] keyBytes, int keyStart, int keyLength, long hashCode) {
+  protected final long findReadSlot(
+      byte[] keyBytes, int keyStart, int keyLength, long hashCode, WriteBuffers.Position readPos) {
 
     int intHashCode = (int) hashCode;
     int slot = (intHashCode & logicalHashBucketMask);
@@ -185,7 +187,7 @@ public abstract class VectorMapJoinFastBytesHashTable
       if (slotTriples[tripleIndex] != 0 && hashCode == slotTriples[tripleIndex + 1]) {
         // Finally, verify the key bytes match.
 
-        if (keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
+        if (keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength, readPos)) {
           return slotTriples[tripleIndex + 2];
         }
       }
@@ -218,4 +220,4 @@ public abstract class VectorMapJoinFastBytesHashTable
     super(initialCapacity, loadFactor, writeBuffersSize);
     allocateBucketArray();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
index 666d666..38d750e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
@@ -29,7 +29,7 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab
   protected int logicalHashBucketMask;
 
   protected float loadFactor;
-  protected int writeBuffersSize;
+  protected final int writeBuffersSize;
 
   protected int metricPutConflict;
   protected int largestNumberOfSteps;
@@ -70,4 +70,4 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab
   public int size() {
     return keysAssigned;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
index ab3dac4..5bb22e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
@@ -30,7 +30,7 @@ public class VectorMapJoinFastKeyStore {
 
   private WriteBuffers writeBuffers;
 
-  private WriteBuffers.Position readPos;
+  private WriteBuffers.Position unsafeReadPos; // Thread-unsafe position used at write time.
 
   /**
    * A store for arbitrary length keys in memory.
@@ -115,7 +115,13 @@ public class VectorMapJoinFastKeyStore {
     return keyRefWord;
   }
 
-  public boolean equalKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength) {
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public boolean unsafeEqualKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength) {
+    return equalKey(keyRefWord, keyBytes, keyStart, keyLength, unsafeReadPos);
+  }
+
+  public boolean equalKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength,
+      WriteBuffers.Position readPos) {
 
     int storedKeyLengthLength =
         (int) ((keyRefWord & SmallKeyLength.bitMask) >> SmallKeyLength.bitShift);
@@ -151,14 +157,12 @@ public class VectorMapJoinFastKeyStore {
 
   public VectorMapJoinFastKeyStore(int writeBuffersSize) {
     writeBuffers = new WriteBuffers(writeBuffersSize, AbsoluteKeyOffset.maxSize);
-
-    readPos = new WriteBuffers.Position();
+    unsafeReadPos = new WriteBuffers.Position();
   }
 
   public VectorMapJoinFastKeyStore(WriteBuffers writeBuffers) {
     // TODO: Check if maximum size compatible with AbsoluteKeyOffset.maxSize.
     this.writeBuffers = writeBuffers;
-
-    readPos = new WriteBuffers.Position();
+    unsafeReadPos = new WriteBuffers.Position();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
index d90849e..843c7d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
@@ -45,13 +45,13 @@ public abstract class VectorMapJoinFastLongHashTable
 
   public static final Log LOG = LogFactory.getLog(VectorMapJoinFastLongHashTable.class);
 
-  private HashTableKeyType hashTableKeyType;
+  private final HashTableKeyType hashTableKeyType;
 
-  private boolean isOuterJoin;
+  private final boolean isOuterJoin;
 
-  private BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+  private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
 
-  private boolean useMinMax;
+  private final boolean useMinMax;
   private long min;
   private long max;
 
@@ -276,4 +276,4 @@ public abstract class VectorMapJoinFastLongHashTable
     min = Long.MAX_VALUE;
     max = Long.MIN_VALUE;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
index cf6c0e3..0043044 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
@@ -58,7 +58,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
   private long keyCount;
 
 
-  private VectorMapJoinFastHashTable VectorMapJoinFastHashTable;
+  private final VectorMapJoinFastHashTable vectorMapJoinFastHashTable;
 
   public VectorMapJoinFastTableContainer(MapJoinDesc desc, Configuration hconf,
       long keyCount) throws SerDeException {
@@ -83,12 +83,12 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
 
     // LOG.debug("VectorMapJoinFastTableContainer load newThreshold " + newThreshold);
 
-    VectorMapJoinFastHashTable = createHashTable(newThreshold);
+    vectorMapJoinFastHashTable = createHashTable(newThreshold);
   }
 
   @Override
   public VectorMapJoinHashTable vectorMapJoinHashTable() {
-    return (VectorMapJoinHashTable) VectorMapJoinFastHashTable;
+    return vectorMapJoinFastHashTable;
   }
 
   private VectorMapJoinFastHashTable createHashTable(int newThreshold) {
@@ -179,7 +179,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
       Writable currentValue) throws SerDeException, HiveException, IOException {
 
     // We are not using the key and value contexts, nor do we support a MapJoinKey.
-    VectorMapJoinFastHashTable.putRow((BytesWritable) currentKey, (BytesWritable) currentValue);
+    vectorMapJoinFastHashTable.putRow((BytesWritable) currentKey, (BytesWritable) currentValue);
     return null;
   }
 
@@ -215,7 +215,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
 
   @Override
   public int size() {
-    return VectorMapJoinFastHashTable.size();
+    return vectorMapJoinFastHashTable.size();
   }
 
   /*
@@ -224,4 +224,4 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
     throw new RuntimeException("Not applicable");
   }
   */
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
index ce598e3..b51d6fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable;
 
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
 
 /*
  * Root abstract class for a hash table result.
@@ -29,9 +30,12 @@ public abstract class VectorMapJoinHashTableResult {
 
   private int spillPartitionId;
 
+  private final WriteBuffers.Position readPos;
+
   public VectorMapJoinHashTableResult() {
     joinResult = JoinUtil.JoinResult.NOMATCH;
     spillPartitionId = -1;
+    readPos = new WriteBuffers.Position();
   }
 
   /**
@@ -78,4 +82,8 @@ public abstract class VectorMapJoinHashTableResult {
     sb.append("joinResult " + joinResult.name());
     return sb.toString();
   }
+
+  public WriteBuffers.Position getReadPos() {
+    return readPos;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
index 8811aca..db20fca 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
@@ -21,11 +21,8 @@ package org.apache.hadoop.hive.serde2;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
-import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
-import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.io.WritableUtils;
 
 
@@ -52,7 +49,7 @@ public final class WriteBuffers implements RandomAccessOutput {
   }
 
   Position writePos = new Position(); // Position where we'd write
-  Position defaultReadPos = new Position(); // Position where we'd read (by default).
+  Position unsafeReadPos = new Position(); // Position where we'd read (unsafely at write time).
 
 
   public WriteBuffers(int wbSize, long maxSize) {
@@ -63,16 +60,18 @@ public final class WriteBuffers implements RandomAccessOutput {
     writePos.bufferIndex = -1;
   }
 
-  public int readVInt() {
-    return (int) readVLong(defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public int unsafeReadVInt() {
+    return (int) readVLong(unsafeReadPos);
   }
 
   public int readVInt(Position readPos) {
     return (int) readVLong(readPos);
   }
 
-  public long readVLong() {
-    return readVLong(defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public long unsafeReadVLong() {
+    return readVLong(unsafeReadPos);
   }
 
   public long readVLong(Position readPos) {
@@ -96,8 +95,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
   }
 
-  public void skipVLong() {
-    skipVLong(defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public void unsafeSkipVLong() {
+    skipVLong(unsafeReadPos);
   }
 
   public void skipVLong(Position readPos) {
@@ -116,8 +116,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     }
   }
 
-  public void setReadPoint(long offset) {
-    setReadPoint(offset, defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public void setUnsafeReadPoint(long offset) {
+    setReadPoint(offset, unsafeReadPos);
   }
 
   public void setReadPoint(long offset, Position readPos) {
@@ -126,8 +127,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     readPos.offset = getOffset(offset);
   }
 
-  public int hashCode(long offset, int length) {
-    return hashCode(offset, length, defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public int unsafeHashCode(long offset, int length) {
+    return hashCode(offset, length, unsafeReadPos);
   }
 
   public int hashCode(long offset, int length, Position readPos) {
@@ -351,7 +353,7 @@ public final class WriteBuffers implements RandomAccessOutput {
 
   private void clearState() {
     writePos.clear();
-    defaultReadPos.clear();
+    unsafeReadPos.clear();
   }
 
 
@@ -362,8 +364,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     return ((long)writePos.bufferIndex << wbSizeLog2) + writePos.offset;
   }
 
-  public long getReadPoint() {
-    return getReadPoint(defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public long getUnsafeReadPoint() {
+    return getReadPoint(unsafeReadPos);
   }
 
   public long getReadPoint(Position readPos) {
@@ -517,8 +520,9 @@ public final class WriteBuffers implements RandomAccessOutput {
     clearState();
   }
 
-  public long readNByteLong(long offset, int bytes) {
-    return readNByteLong(offset, bytes, defaultReadPos);
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public long unsafeReadNByteLong(long offset, int bytes) {
+    return readNByteLong(offset, bytes, unsafeReadPos);
   }
 
   public long readNByteLong(long offset, int bytes, Position readPos) {
@@ -560,7 +564,7 @@ public final class WriteBuffers implements RandomAccessOutput {
   }
 
   public int readInt(long offset) {
-    return (int)readNByteLong(offset, 4);
+    return (int)unsafeReadNByteLong(offset, 4);
   }
 
   @Override
@@ -656,7 +660,8 @@ public final class WriteBuffers implements RandomAccessOutput {
     return writeBuffers.size() * (long) wbSize;
   }
 
-  public Position getReadPosition() {
-    return defaultReadPos;
+  /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+  public Position getUnsafeReadPosition() {
+    return unsafeReadPos;
   }
-}
\ No newline at end of file
+}