You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/08/03 19:42:38 UTC
hive git commit: HIVE-14408 : thread safety issue in fast hashtable
(Sergey Shelukhin, reviewed by Gopal V)
Repository: hive
Updated Branches:
refs/heads/branch-1 411038515 -> cb3945209
HIVE-14408 : thread safety issue in fast hashtable (Sergey Shelukhin, reviewed by Gopal V)
Conflicts:
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cb394520
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cb394520
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cb394520
Branch: refs/heads/branch-1
Commit: cb3945209a886556068b6291d6a94355cfacd909
Parents: 4110385
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Aug 3 12:33:35 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Aug 3 12:42:25 2016 -0700
----------------------------------------------------------------------
.../persistence/BytesBytesMultiHashMap.java | 34 ++++++-------
.../ql/exec/vector/VectorMapJoinOperator.java | 2 +-
.../ql/exec/vector/VectorizedBatchUtil.java | 13 ++---
.../mapjoin/VectorMapJoinCommonOperator.java | 1 +
.../VectorMapJoinGenerateResultOperator.java | 1 +
.../fast/VectorMapJoinFastBytesHashMap.java | 4 +-
.../VectorMapJoinFastBytesHashMultiSet.java | 4 +-
.../fast/VectorMapJoinFastBytesHashSet.java | 4 +-
.../fast/VectorMapJoinFastBytesHashTable.java | 10 ++--
.../fast/VectorMapJoinFastHashTable.java | 4 +-
.../mapjoin/fast/VectorMapJoinFastKeyStore.java | 18 ++++---
.../fast/VectorMapJoinFastLongHashTable.java | 10 ++--
.../fast/VectorMapJoinFastTableContainer.java | 12 ++---
.../hashtable/VectorMapJoinHashTableResult.java | 8 +++
.../apache/hadoop/hive/serde2/WriteBuffers.java | 51 +++++++++++---------
15 files changed, 99 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
index 573a7aa..080c509 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
@@ -458,7 +458,7 @@ public final class BytesBytesMultiHashMap {
kv.writeKey(writeBuffers);
int keyLength = (int)(writeBuffers.getWritePoint() - keyOffset);
- int hashCode = (keyHashCode == -1) ? writeBuffers.hashCode(keyOffset, keyLength) : keyHashCode;
+ int hashCode = (keyHashCode == -1) ? writeBuffers.unsafeHashCode(keyOffset, keyLength) : keyHashCode;
int slot = findKeySlotToWrite(keyOffset, keyLength, hashCode);
// LOG.info("Write hash code is " + Integer.toBinaryString(hashCode) + " - " + slot);
@@ -699,8 +699,8 @@ public final class BytesBytesMultiHashMap {
if (!compareHashBits(ref, hashCode)) {
return false; // Hash bits in ref don't match.
}
- writeBuffers.setReadPoint(getFirstRecordLengthsOffset(ref, null));
- int valueLength = (int)writeBuffers.readVLong(), keyLength = (int)writeBuffers.readVLong();
+ writeBuffers.setUnsafeReadPoint(getFirstRecordLengthsOffset(ref, null));
+ int valueLength = (int)writeBuffers.unsafeReadVLong(), keyLength = (int)writeBuffers.unsafeReadVLong();
if (keyLength != cmpLength) {
return false;
}
@@ -746,7 +746,7 @@ public final class BytesBytesMultiHashMap {
private long getFirstRecordLengthsOffset(long ref, WriteBuffers.Position readPos) {
long tailOffset = Ref.getOffset(ref);
if (Ref.hasList(ref)) {
- long relativeOffset = (readPos == null) ? writeBuffers.readNByteLong(tailOffset, 5)
+ long relativeOffset = (readPos == null) ? writeBuffers.unsafeReadNByteLong(tailOffset, 5)
: writeBuffers.readNByteLong(tailOffset, 5, readPos);
tailOffset += relativeOffset;
}
@@ -779,10 +779,10 @@ public final class BytesBytesMultiHashMap {
// TODO: we could actually store a bit flag in ref indicating whether this is a hash
// match or a probe, and in the former case use hash bits (for a first few resizes).
// int hashCodeOrPart = oldSlot | Ref.getNthHashBit(oldRef, startingHashBitCount, newHashBitCount);
- writeBuffers.setReadPoint(getFirstRecordLengthsOffset(oldRef, null));
+ writeBuffers.setUnsafeReadPoint(getFirstRecordLengthsOffset(oldRef, null));
// Read the value and key length for the first record.
- int hashCode = (int)writeBuffers.readNByteLong(Ref.getOffset(oldRef)
- - writeBuffers.readVLong() - writeBuffers.readVLong() - 4, 4);
+ int hashCode = (int)writeBuffers.unsafeReadNByteLong(Ref.getOffset(oldRef)
+ - writeBuffers.unsafeReadVLong() - writeBuffers.unsafeReadVLong() - 4, 4);
int probeSteps = relocateKeyRef(newRefs, oldRef, hashCode);
maxSteps = Math.max(probeSteps, maxSteps);
}
@@ -801,16 +801,16 @@ public final class BytesBytesMultiHashMap {
private long createOrGetListRecord(long ref) {
if (Ref.hasList(ref)) {
// LOG.info("Found list record at " + writeBuffers.getReadPoint());
- return writeBuffers.getReadPoint(); // Assumes we are here after key compare.
+ return writeBuffers.getUnsafeReadPoint(); // Assumes we are here after key compare.
}
long firstTailOffset = Ref.getOffset(ref);
// LOG.info("First tail offset to create list record is " + firstTailOffset);
// Determine the length of storage for value and key lengths of the first record.
- writeBuffers.setReadPoint(firstTailOffset);
- writeBuffers.skipVLong();
- writeBuffers.skipVLong();
- int lengthsLength = (int)(writeBuffers.getReadPoint() - firstTailOffset);
+ writeBuffers.setUnsafeReadPoint(firstTailOffset);
+ writeBuffers.unsafeSkipVLong();
+ writeBuffers.unsafeSkipVLong();
+ int lengthsLength = (int)(writeBuffers.getUnsafeReadPoint() - firstTailOffset);
// Create the list record, copy first record value/key lengths there.
writeBuffers.writeBytes(firstTailOffset, lengthsLength);
@@ -832,7 +832,7 @@ public final class BytesBytesMultiHashMap {
*/
private void addRecordToList(long lrPtrOffset, long tailOffset) {
// Now, insert this record into the list.
- long prevHeadOffset = writeBuffers.readNByteLong(lrPtrOffset, 5);
+ long prevHeadOffset = writeBuffers.unsafeReadNByteLong(lrPtrOffset, 5);
// LOG.info("Reading offset " + prevHeadOffset + " at " + lrPtrOffset);
assert prevHeadOffset < tailOffset; // We replace an earlier element, must have lower offset.
writeBuffers.writeFiveByteULong(lrPtrOffset, tailOffset);
@@ -901,10 +901,10 @@ public final class BytesBytesMultiHashMap {
++examined;
long recOffset = getFirstRecordLengthsOffset(ref, null);
long tailOffset = Ref.getOffset(ref);
- writeBuffers.setReadPoint(recOffset);
- int valueLength = (int)writeBuffers.readVLong(),
- keyLength = (int)writeBuffers.readVLong();
- long ptrOffset = writeBuffers.getReadPoint();
+ writeBuffers.setUnsafeReadPoint(recOffset);
+ int valueLength = (int)writeBuffers.unsafeReadVLong(),
+ keyLength = (int)writeBuffers.unsafeReadVLong();
+ long ptrOffset = writeBuffers.getUnsafeReadPoint();
if (Ref.hasList(ref)) {
byteIntervals.put(recOffset, (int)(ptrOffset + 5 - recOffset));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
index 9bd811c..1aad041 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
@@ -147,7 +147,7 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator {
// This is a vectorized aware evaluator
ExprNodeEvaluator eval = new ExprNodeEvaluator<ExprNodeDesc>(desc) {
- int columnIndex;;
+ int columnIndex;
int writerIndex;
public ExprNodeEvaluator initVectorExpr(int columnIndex, int writerIndex) {
http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
index 1482855..5d9410c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
@@ -650,14 +650,15 @@ public class VectorizedBatchUtil {
public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) {
StringBuilder sb = new StringBuilder();
+ LOG.info(debugFormatOneRow(batch, index, prefix, sb).toString());
+ }
+
+ public static StringBuilder debugFormatOneRow(VectorizedRowBatch batch,
+ int index, String prefix, StringBuilder sb) {
sb.append(prefix + " row " + index + " ");
for (int p = 0; p < batch.projectionSize; p++) {
int column = batch.projectedColumns[p];
- if (p == column) {
- sb.append("(col " + p + ") ");
- } else {
- sb.append("(proj col " + p + " col " + column + ") ");
- }
+ sb.append("(" + p + "," + column + ") ");
ColumnVector colVector = batch.cols[column];
if (colVector == null) {
sb.append("(null ColumnVector)");
@@ -700,7 +701,7 @@ public class VectorizedBatchUtil {
}
sb.append(" ");
}
- LOG.info(sb.toString());
+ return sb;
}
public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) {
http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index 45b52c4..164e6c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -639,6 +639,7 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
default:
throw new RuntimeException("Unknown vector map join hash table implementation type " + hashTableImplementationType.name());
}
+ LOG.info("Using " + vectorMapJoinHashTable.getClass().getSimpleName() + " from " + this.getClass().getSimpleName());
}
/*
http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
index e1c2f31..6b6d7ce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
@@ -501,6 +501,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
vectorMapJoinHashTable = VectorMapJoinOptimizedCreateHashTable.createHashTable(conf,
smallTable);
needHashTableSetup = true;
+ LOG.info("Created " + vectorMapJoinHashTable.getClass().getSimpleName() + " from " + this.getClass().getSimpleName());
if (LOG.isDebugEnabled()) {
LOG.debug(CLASS_NAME + " reloadHashTable!");
http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
index 6afaec3..c7c146d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
@@ -72,7 +72,7 @@ public abstract class VectorMapJoinFastBytesHashMap
optimizedHashMapResult.forget();
long hashCode = VectorMapJoinFastBytesHashUtil.hashKey(keyBytes, keyStart, keyLength);
- long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+ long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashMapResult.getReadPos());
JoinUtil.JoinResult joinResult;
if (valueRefWord == -1) {
joinResult = JoinUtil.JoinResult.NOMATCH;
@@ -98,4 +98,4 @@ public abstract class VectorMapJoinFastBytesHashMap
// Share the same write buffers with our value store.
keyStore = new VectorMapJoinFastKeyStore(valueStore.writeBuffers());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
index dceb99c..dca486c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
@@ -68,7 +68,7 @@ public abstract class VectorMapJoinFastBytesHashMultiSet
optimizedHashMultiSetResult.forget();
long hashCode = VectorMapJoinFastBytesHashUtil.hashKey(keyBytes, keyStart, keyLength);
- long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+ long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashMultiSetResult.getReadPos());
JoinUtil.JoinResult joinResult;
if (count == -1) {
joinResult = JoinUtil.JoinResult.NOMATCH;
@@ -90,4 +90,4 @@ public abstract class VectorMapJoinFastBytesHashMultiSet
keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
index 9f122c4..46788d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
@@ -63,7 +63,7 @@ public abstract class VectorMapJoinFastBytesHashSet
optimizedHashSetResult.forget();
long hashCode = VectorMapJoinFastBytesHashUtil.hashKey(keyBytes, keyStart, keyLength);
- long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+ long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashSetResult.getReadPos());
JoinUtil.JoinResult joinResult;
if (existance == -1) {
joinResult = JoinUtil.JoinResult.NOMATCH;
@@ -82,4 +82,4 @@ public abstract class VectorMapJoinFastBytesHashSet
keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
index 91d7fd6..5da3b21 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashTable;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
import org.apache.hadoop.io.BytesWritable;
import com.google.common.annotations.VisibleForTesting;
@@ -84,7 +85,7 @@ public abstract class VectorMapJoinFastBytesHashTable
break;
}
if (hashCode == slotTriples[tripleIndex + 1] &&
- keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
+ keyStore.unsafeEqualKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
// LOG.debug("VectorMapJoinFastBytesHashMap findWriteSlot slot " + slot + " tripleIndex " + tripleIndex + " existing");
isNewKey = false;
break;
@@ -173,7 +174,8 @@ public abstract class VectorMapJoinFastBytesHashTable
// LOG.debug("VectorMapJoinFastLongHashTable expandAndRehash new logicalHashBucketCount " + logicalHashBucketCount + " resizeThreshold " + resizeThreshold + " metricExpands " + metricExpands);
}
- protected long findReadSlot(byte[] keyBytes, int keyStart, int keyLength, long hashCode) {
+ protected final long findReadSlot(
+ byte[] keyBytes, int keyStart, int keyLength, long hashCode, WriteBuffers.Position readPos) {
int intHashCode = (int) hashCode;
int slot = (intHashCode & logicalHashBucketMask);
@@ -185,7 +187,7 @@ public abstract class VectorMapJoinFastBytesHashTable
if (slotTriples[tripleIndex] != 0 && hashCode == slotTriples[tripleIndex + 1]) {
// Finally, verify the key bytes match.
- if (keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
+ if (keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength, readPos)) {
return slotTriples[tripleIndex + 2];
}
}
@@ -218,4 +220,4 @@ public abstract class VectorMapJoinFastBytesHashTable
super(initialCapacity, loadFactor, writeBuffersSize);
allocateBucketArray();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
index 666d666..38d750e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
@@ -29,7 +29,7 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab
protected int logicalHashBucketMask;
protected float loadFactor;
- protected int writeBuffersSize;
+ protected final int writeBuffersSize;
protected int metricPutConflict;
protected int largestNumberOfSteps;
@@ -70,4 +70,4 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab
public int size() {
return keysAssigned;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
index ab3dac4..5bb22e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
@@ -30,7 +30,7 @@ public class VectorMapJoinFastKeyStore {
private WriteBuffers writeBuffers;
- private WriteBuffers.Position readPos;
+ private WriteBuffers.Position unsafeReadPos; // Thread-unsafe position used at write time.
/**
* A store for arbitrary length keys in memory.
@@ -115,7 +115,13 @@ public class VectorMapJoinFastKeyStore {
return keyRefWord;
}
- public boolean equalKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength) {
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public boolean unsafeEqualKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength) {
+ return equalKey(keyRefWord, keyBytes, keyStart, keyLength, unsafeReadPos);
+ }
+
+ public boolean equalKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength,
+ WriteBuffers.Position readPos) {
int storedKeyLengthLength =
(int) ((keyRefWord & SmallKeyLength.bitMask) >> SmallKeyLength.bitShift);
@@ -151,14 +157,12 @@ public class VectorMapJoinFastKeyStore {
public VectorMapJoinFastKeyStore(int writeBuffersSize) {
writeBuffers = new WriteBuffers(writeBuffersSize, AbsoluteKeyOffset.maxSize);
-
- readPos = new WriteBuffers.Position();
+ unsafeReadPos = new WriteBuffers.Position();
}
public VectorMapJoinFastKeyStore(WriteBuffers writeBuffers) {
// TODO: Check if maximum size compatible with AbsoluteKeyOffset.maxSize.
this.writeBuffers = writeBuffers;
-
- readPos = new WriteBuffers.Position();
+ unsafeReadPos = new WriteBuffers.Position();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
index d90849e..843c7d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
@@ -45,13 +45,13 @@ public abstract class VectorMapJoinFastLongHashTable
public static final Log LOG = LogFactory.getLog(VectorMapJoinFastLongHashTable.class);
- private HashTableKeyType hashTableKeyType;
+ private final HashTableKeyType hashTableKeyType;
- private boolean isOuterJoin;
+ private final boolean isOuterJoin;
- private BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+ private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
- private boolean useMinMax;
+ private final boolean useMinMax;
private long min;
private long max;
@@ -276,4 +276,4 @@ public abstract class VectorMapJoinFastLongHashTable
min = Long.MAX_VALUE;
max = Long.MIN_VALUE;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
index cf6c0e3..0043044 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
@@ -58,7 +58,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
private long keyCount;
- private VectorMapJoinFastHashTable VectorMapJoinFastHashTable;
+ private final VectorMapJoinFastHashTable vectorMapJoinFastHashTable;
public VectorMapJoinFastTableContainer(MapJoinDesc desc, Configuration hconf,
long keyCount) throws SerDeException {
@@ -83,12 +83,12 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
// LOG.debug("VectorMapJoinFastTableContainer load newThreshold " + newThreshold);
- VectorMapJoinFastHashTable = createHashTable(newThreshold);
+ vectorMapJoinFastHashTable = createHashTable(newThreshold);
}
@Override
public VectorMapJoinHashTable vectorMapJoinHashTable() {
- return (VectorMapJoinHashTable) VectorMapJoinFastHashTable;
+ return vectorMapJoinFastHashTable;
}
private VectorMapJoinFastHashTable createHashTable(int newThreshold) {
@@ -179,7 +179,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
Writable currentValue) throws SerDeException, HiveException, IOException {
// We are not using the key and value contexts, nor do we support a MapJoinKey.
- VectorMapJoinFastHashTable.putRow((BytesWritable) currentKey, (BytesWritable) currentValue);
+ vectorMapJoinFastHashTable.putRow((BytesWritable) currentKey, (BytesWritable) currentValue);
return null;
}
@@ -215,7 +215,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
@Override
public int size() {
- return VectorMapJoinFastHashTable.size();
+ return vectorMapJoinFastHashTable.size();
}
/*
@@ -224,4 +224,4 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
throw new RuntimeException("Not applicable");
}
*/
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
index ce598e3..b51d6fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable;
import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
/*
* Root abstract class for a hash table result.
@@ -29,9 +30,12 @@ public abstract class VectorMapJoinHashTableResult {
private int spillPartitionId;
+ private final WriteBuffers.Position readPos;
+
public VectorMapJoinHashTableResult() {
joinResult = JoinUtil.JoinResult.NOMATCH;
spillPartitionId = -1;
+ readPos = new WriteBuffers.Position();
}
/**
@@ -78,4 +82,8 @@ public abstract class VectorMapJoinHashTableResult {
sb.append("joinResult " + joinResult.name());
return sb.toString();
}
+
+ public WriteBuffers.Position getReadPos() {
+ return readPos;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cb394520/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
index 8811aca..db20fca 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
@@ -21,11 +21,8 @@ package org.apache.hadoop.hive.serde2;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
-import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.io.WritableUtils;
@@ -52,7 +49,7 @@ public final class WriteBuffers implements RandomAccessOutput {
}
Position writePos = new Position(); // Position where we'd write
- Position defaultReadPos = new Position(); // Position where we'd read (by default).
+ Position unsafeReadPos = new Position(); // Position where we'd read (unsafely at write time).
public WriteBuffers(int wbSize, long maxSize) {
@@ -63,16 +60,18 @@ public final class WriteBuffers implements RandomAccessOutput {
writePos.bufferIndex = -1;
}
- public int readVInt() {
- return (int) readVLong(defaultReadPos);
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public int unsafeReadVInt() {
+ return (int) readVLong(unsafeReadPos);
}
public int readVInt(Position readPos) {
return (int) readVLong(readPos);
}
- public long readVLong() {
- return readVLong(defaultReadPos);
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public long unsafeReadVLong() {
+ return readVLong(unsafeReadPos);
}
public long readVLong(Position readPos) {
@@ -96,8 +95,9 @@ public final class WriteBuffers implements RandomAccessOutput {
return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
}
- public void skipVLong() {
- skipVLong(defaultReadPos);
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public void unsafeSkipVLong() {
+ skipVLong(unsafeReadPos);
}
public void skipVLong(Position readPos) {
@@ -116,8 +116,9 @@ public final class WriteBuffers implements RandomAccessOutput {
}
}
- public void setReadPoint(long offset) {
- setReadPoint(offset, defaultReadPos);
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public void setUnsafeReadPoint(long offset) {
+ setReadPoint(offset, unsafeReadPos);
}
public void setReadPoint(long offset, Position readPos) {
@@ -126,8 +127,9 @@ public final class WriteBuffers implements RandomAccessOutput {
readPos.offset = getOffset(offset);
}
- public int hashCode(long offset, int length) {
- return hashCode(offset, length, defaultReadPos);
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public int unsafeHashCode(long offset, int length) {
+ return hashCode(offset, length, unsafeReadPos);
}
public int hashCode(long offset, int length, Position readPos) {
@@ -351,7 +353,7 @@ public final class WriteBuffers implements RandomAccessOutput {
private void clearState() {
writePos.clear();
- defaultReadPos.clear();
+ unsafeReadPos.clear();
}
@@ -362,8 +364,9 @@ public final class WriteBuffers implements RandomAccessOutput {
return ((long)writePos.bufferIndex << wbSizeLog2) + writePos.offset;
}
- public long getReadPoint() {
- return getReadPoint(defaultReadPos);
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public long getUnsafeReadPoint() {
+ return getReadPoint(unsafeReadPos);
}
public long getReadPoint(Position readPos) {
@@ -517,8 +520,9 @@ public final class WriteBuffers implements RandomAccessOutput {
clearState();
}
- public long readNByteLong(long offset, int bytes) {
- return readNByteLong(offset, bytes, defaultReadPos);
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public long unsafeReadNByteLong(long offset, int bytes) {
+ return readNByteLong(offset, bytes, unsafeReadPos);
}
public long readNByteLong(long offset, int bytes, Position readPos) {
@@ -560,7 +564,7 @@ public final class WriteBuffers implements RandomAccessOutput {
}
public int readInt(long offset) {
- return (int)readNByteLong(offset, 4);
+ return (int)unsafeReadNByteLong(offset, 4);
}
@Override
@@ -656,7 +660,8 @@ public final class WriteBuffers implements RandomAccessOutput {
return writeBuffers.size() * (long) wbSize;
}
- public Position getReadPosition() {
- return defaultReadPos;
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public Position getUnsafeReadPosition() {
+ return unsafeReadPos;
}
-}
\ No newline at end of file
+}