You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/08/03 19:34:51 UTC
[3/3] hive git commit: HIVE-14408 : thread safety issue in fast
hashtable (Sergey Shelukhin, reviewed by Gopal V)
HIVE-14408 : thread safety issue in fast hashtable (Sergey Shelukhin, reviewed by Gopal V)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/74c46e84
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/74c46e84
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/74c46e84
Branch: refs/heads/branch-2.0
Commit: 74c46e8464c7de83a34993293aaf07adb0d225a8
Parents: 98c5594
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Aug 3 12:33:35 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Aug 3 12:34:13 2016 -0700
----------------------------------------------------------------------
.../persistence/BytesBytesMultiHashMap.java | 34 +++++++-------
.../ql/exec/vector/VectorMapJoinOperator.java | 2 +-
.../ql/exec/vector/VectorizedBatchUtil.java | 13 +++---
.../mapjoin/VectorMapJoinCommonOperator.java | 1 +
.../VectorMapJoinGenerateResultOperator.java | 1 +
.../fast/VectorMapJoinFastBytesHashMap.java | 2 +-
.../VectorMapJoinFastBytesHashMultiSet.java | 2 +-
.../fast/VectorMapJoinFastBytesHashSet.java | 2 +-
.../fast/VectorMapJoinFastBytesHashTable.java | 8 ++--
.../fast/VectorMapJoinFastHashTable.java | 2 +-
.../mapjoin/fast/VectorMapJoinFastKeyStore.java | 16 ++++---
.../fast/VectorMapJoinFastLongHashTable.java | 8 ++--
.../fast/VectorMapJoinFastTableContainer.java | 10 ++--
.../hashtable/VectorMapJoinHashTableResult.java | 8 ++++
.../apache/hadoop/hive/serde2/WriteBuffers.java | 49 +++++++++++---------
15 files changed, 90 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
index 51acae0..dd88461 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
@@ -449,7 +449,7 @@ public final class BytesBytesMultiHashMap {
kv.writeKey(writeBuffers);
int keyLength = (int)(writeBuffers.getWritePoint() - keyOffset);
- int hashCode = (keyHashCode == -1) ? writeBuffers.hashCode(keyOffset, keyLength) : keyHashCode;
+ int hashCode = (keyHashCode == -1) ? writeBuffers.unsafeHashCode(keyOffset, keyLength) : keyHashCode;
int slot = findKeySlotToWrite(keyOffset, keyLength, hashCode);
// LOG.info("Write hash code is " + Integer.toBinaryString(hashCode) + " - " + slot);
@@ -683,8 +683,8 @@ public final class BytesBytesMultiHashMap {
if (!compareHashBits(ref, hashCode)) {
return false; // Hash bits in ref don't match.
}
- writeBuffers.setReadPoint(getFirstRecordLengthsOffset(ref, null));
- int valueLength = (int)writeBuffers.readVLong(), keyLength = (int)writeBuffers.readVLong();
+ writeBuffers.setUnsafeReadPoint(getFirstRecordLengthsOffset(ref, null));
+ int valueLength = (int)writeBuffers.unsafeReadVLong(), keyLength = (int)writeBuffers.unsafeReadVLong();
if (keyLength != cmpLength) {
return false;
}
@@ -730,7 +730,7 @@ public final class BytesBytesMultiHashMap {
private long getFirstRecordLengthsOffset(long ref, WriteBuffers.Position readPos) {
long tailOffset = Ref.getOffset(ref);
if (Ref.hasList(ref)) {
- long relativeOffset = (readPos == null) ? writeBuffers.readNByteLong(tailOffset, 5)
+ long relativeOffset = (readPos == null) ? writeBuffers.unsafeReadNByteLong(tailOffset, 5)
: writeBuffers.readNByteLong(tailOffset, 5, readPos);
tailOffset += relativeOffset;
}
@@ -763,10 +763,10 @@ public final class BytesBytesMultiHashMap {
// TODO: we could actually store a bit flag in ref indicating whether this is a hash
// match or a probe, and in the former case use hash bits (for a first few resizes).
// int hashCodeOrPart = oldSlot | Ref.getNthHashBit(oldRef, startingHashBitCount, newHashBitCount);
- writeBuffers.setReadPoint(getFirstRecordLengthsOffset(oldRef, null));
+ writeBuffers.setUnsafeReadPoint(getFirstRecordLengthsOffset(oldRef, null));
// Read the value and key length for the first record.
- int hashCode = (int)writeBuffers.readNByteLong(Ref.getOffset(oldRef)
- - writeBuffers.readVLong() - writeBuffers.readVLong() - 4, 4);
+ int hashCode = (int)writeBuffers.unsafeReadNByteLong(Ref.getOffset(oldRef)
+ - writeBuffers.unsafeReadVLong() - writeBuffers.unsafeReadVLong() - 4, 4);
int probeSteps = relocateKeyRef(newRefs, oldRef, hashCode);
maxSteps = Math.max(probeSteps, maxSteps);
}
@@ -785,16 +785,16 @@ public final class BytesBytesMultiHashMap {
private long createOrGetListRecord(long ref) {
if (Ref.hasList(ref)) {
// LOG.info("Found list record at " + writeBuffers.getReadPoint());
- return writeBuffers.getReadPoint(); // Assumes we are here after key compare.
+ return writeBuffers.getUnsafeReadPoint(); // Assumes we are here after key compare.
}
long firstTailOffset = Ref.getOffset(ref);
// LOG.info("First tail offset to create list record is " + firstTailOffset);
// Determine the length of storage for value and key lengths of the first record.
- writeBuffers.setReadPoint(firstTailOffset);
- writeBuffers.skipVLong();
- writeBuffers.skipVLong();
- int lengthsLength = (int)(writeBuffers.getReadPoint() - firstTailOffset);
+ writeBuffers.setUnsafeReadPoint(firstTailOffset);
+ writeBuffers.unsafeSkipVLong();
+ writeBuffers.unsafeSkipVLong();
+ int lengthsLength = (int)(writeBuffers.getUnsafeReadPoint() - firstTailOffset);
// Create the list record, copy first record value/key lengths there.
writeBuffers.writeBytes(firstTailOffset, lengthsLength);
@@ -816,7 +816,7 @@ public final class BytesBytesMultiHashMap {
*/
private void addRecordToList(long lrPtrOffset, long tailOffset) {
// Now, insert this record into the list.
- long prevHeadOffset = writeBuffers.readNByteLong(lrPtrOffset, 5);
+ long prevHeadOffset = writeBuffers.unsafeReadNByteLong(lrPtrOffset, 5);
// LOG.info("Reading offset " + prevHeadOffset + " at " + lrPtrOffset);
assert prevHeadOffset < tailOffset; // We replace an earlier element, must have lower offset.
writeBuffers.writeFiveByteULong(lrPtrOffset, tailOffset);
@@ -885,10 +885,10 @@ public final class BytesBytesMultiHashMap {
++examined;
long recOffset = getFirstRecordLengthsOffset(ref, null);
long tailOffset = Ref.getOffset(ref);
- writeBuffers.setReadPoint(recOffset);
- int valueLength = (int)writeBuffers.readVLong(),
- keyLength = (int)writeBuffers.readVLong();
- long ptrOffset = writeBuffers.getReadPoint();
+ writeBuffers.setUnsafeReadPoint(recOffset);
+ int valueLength = (int)writeBuffers.unsafeReadVLong(),
+ keyLength = (int)writeBuffers.unsafeReadVLong();
+ long ptrOffset = writeBuffers.getUnsafeReadPoint();
if (Ref.hasList(ref)) {
byteIntervals.put(recOffset, (int)(ptrOffset + 5 - recOffset));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
index 622f777..a247bb8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
@@ -153,7 +153,7 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator {
// This is a vectorized aware evaluator
ExprNodeEvaluator eval = new ExprNodeEvaluator<ExprNodeDesc>(desc) {
- int columnIndex;;
+ int columnIndex;
int writerIndex;
public ExprNodeEvaluator initVectorExpr(int columnIndex, int writerIndex) {
http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
index be04da8..b0eee64 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
@@ -653,14 +653,15 @@ public class VectorizedBatchUtil {
public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) {
StringBuilder sb = new StringBuilder();
+ LOG.info(debugFormatOneRow(batch, index, prefix, sb).toString());
+ }
+
+ public static StringBuilder debugFormatOneRow(VectorizedRowBatch batch,
+ int index, String prefix, StringBuilder sb) {
sb.append(prefix + " row " + index + " ");
for (int p = 0; p < batch.projectionSize; p++) {
int column = batch.projectedColumns[p];
- if (p == column) {
- sb.append("(col " + p + ") ");
- } else {
- sb.append("(proj col " + p + " col " + column + ") ");
- }
+ sb.append("(" + p + "," + column + ") ");
ColumnVector colVector = batch.cols[column];
if (colVector == null) {
sb.append("(null ColumnVector)");
@@ -703,7 +704,7 @@ public class VectorizedBatchUtil {
}
sb.append(" ");
}
- LOG.info(sb.toString());
+ return sb;
}
public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) {
http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index 8ad7ca4..24668f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -636,6 +636,7 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
default:
throw new RuntimeException("Unknown vector map join hash table implementation type " + hashTableImplementationType.name());
}
+ LOG.info("Using " + vectorMapJoinHashTable.getClass().getSimpleName() + " from " + this.getClass().getSimpleName());
}
/*
http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
index 5cbace4..dee30f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
@@ -507,6 +507,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
vectorMapJoinHashTable = VectorMapJoinOptimizedCreateHashTable.createHashTable(conf,
smallTable);
needHashTableSetup = true;
+ LOG.info("Created " + vectorMapJoinHashTable.getClass().getSimpleName() + " from " + this.getClass().getSimpleName());
if (isLogDebugEnabled) {
LOG.debug(CLASS_NAME + " reloadHashTable!");
http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
index 0ff98bd..a83a111 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
@@ -73,7 +73,7 @@ public abstract class VectorMapJoinFastBytesHashMap
optimizedHashMapResult.forget();
long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
- long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+ long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashMapResult.getReadPos());
JoinUtil.JoinResult joinResult;
if (valueRefWord == -1) {
joinResult = JoinUtil.JoinResult.NOMATCH;
http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
index 5d8ed2d..02a8ed1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
@@ -69,7 +69,7 @@ public abstract class VectorMapJoinFastBytesHashMultiSet
optimizedHashMultiSetResult.forget();
long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
- long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+ long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashMultiSetResult.getReadPos());
JoinUtil.JoinResult joinResult;
if (count == -1) {
joinResult = JoinUtil.JoinResult.NOMATCH;
http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
index 990a2e5..2c442fb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
@@ -64,7 +64,7 @@ public abstract class VectorMapJoinFastBytesHashSet
optimizedHashSetResult.forget();
long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
- long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
+ long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode, hashSetResult.getReadPos());
JoinUtil.JoinResult joinResult;
if (existance == -1) {
joinResult = JoinUtil.JoinResult.NOMATCH;
http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
index 6b536f0..7b77969 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashTable;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hive.common.util.HashCodeUtil;
@@ -87,7 +88,7 @@ public abstract class VectorMapJoinFastBytesHashTable
break;
}
if (hashCode == slotTriples[tripleIndex + 1] &&
- keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
+ keyStore.unsafeEqualKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
// LOG.debug("VectorMapJoinFastBytesHashMap findWriteSlot slot " + slot + " tripleIndex " + tripleIndex + " existing");
isNewKey = false;
break;
@@ -176,7 +177,8 @@ public abstract class VectorMapJoinFastBytesHashTable
// LOG.debug("VectorMapJoinFastLongHashTable expandAndRehash new logicalHashBucketCount " + logicalHashBucketCount + " resizeThreshold " + resizeThreshold + " metricExpands " + metricExpands);
}
- protected long findReadSlot(byte[] keyBytes, int keyStart, int keyLength, long hashCode) {
+ protected final long findReadSlot(
+ byte[] keyBytes, int keyStart, int keyLength, long hashCode, WriteBuffers.Position readPos) {
int intHashCode = (int) hashCode;
int slot = (intHashCode & logicalHashBucketMask);
@@ -188,7 +190,7 @@ public abstract class VectorMapJoinFastBytesHashTable
if (slotTriples[tripleIndex] != 0 && hashCode == slotTriples[tripleIndex + 1]) {
// Finally, verify the key bytes match.
- if (keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength)) {
+ if (keyStore.equalKey(slotTriples[tripleIndex], keyBytes, keyStart, keyLength, readPos)) {
return slotTriples[tripleIndex + 2];
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
index 099f38e..7df9eed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
@@ -29,7 +29,7 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab
protected int logicalHashBucketMask;
protected float loadFactor;
- protected int writeBuffersSize;
+ protected final int writeBuffersSize;
protected int metricPutConflict;
protected int largestNumberOfSteps;
http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
index efdcd43..be51693 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
@@ -30,7 +30,7 @@ public class VectorMapJoinFastKeyStore {
private WriteBuffers writeBuffers;
- private WriteBuffers.Position readPos;
+ private WriteBuffers.Position unsafeReadPos; // Thread-unsafe position used at write time.
/**
* A store for arbitrary length keys in memory.
@@ -115,7 +115,13 @@ public class VectorMapJoinFastKeyStore {
return keyRefWord;
}
- public boolean equalKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength) {
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public boolean unsafeEqualKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength) {
+ return equalKey(keyRefWord, keyBytes, keyStart, keyLength, unsafeReadPos);
+ }
+
+ public boolean equalKey(long keyRefWord, byte[] keyBytes, int keyStart, int keyLength,
+ WriteBuffers.Position readPos) {
int storedKeyLengthLength =
(int) ((keyRefWord & SmallKeyLength.bitMask) >> SmallKeyLength.bitShift);
@@ -151,14 +157,12 @@ public class VectorMapJoinFastKeyStore {
public VectorMapJoinFastKeyStore(int writeBuffersSize) {
writeBuffers = new WriteBuffers(writeBuffersSize, AbsoluteKeyOffset.maxSize);
-
- readPos = new WriteBuffers.Position();
+ unsafeReadPos = new WriteBuffers.Position();
}
public VectorMapJoinFastKeyStore(WriteBuffers writeBuffers) {
// TODO: Check if maximum size compatible with AbsoluteKeyOffset.maxSize.
this.writeBuffers = writeBuffers;
-
- readPos = new WriteBuffers.Position();
+ unsafeReadPos = new WriteBuffers.Position();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
index f37f056..23f8315 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
@@ -48,13 +48,13 @@ public abstract class VectorMapJoinFastLongHashTable
private transient final boolean isLogDebugEnabled = LOG.isDebugEnabled();
- private HashTableKeyType hashTableKeyType;
+ private final HashTableKeyType hashTableKeyType;
- private boolean isOuterJoin;
+ private final boolean isOuterJoin;
- private BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
+ private final BinarySortableDeserializeRead keyBinarySortableDeserializeRead;
- private boolean useMinMax;
+ private final boolean useMinMax;
private long min;
private long max;
http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
index 3b73f7d..9f3b107 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
@@ -58,7 +58,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
private final long keyCount;
- private final VectorMapJoinFastHashTable VectorMapJoinFastHashTable;
+ private final VectorMapJoinFastHashTable vectorMapJoinFastHashTable;
public VectorMapJoinFastTableContainer(MapJoinDesc desc, Configuration hconf,
long keyCount) throws SerDeException {
@@ -83,12 +83,12 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
// LOG.debug("VectorMapJoinFastTableContainer load newThreshold " + newThreshold);
- VectorMapJoinFastHashTable = createHashTable(newThreshold);
+ vectorMapJoinFastHashTable = createHashTable(newThreshold);
}
@Override
public VectorMapJoinHashTable vectorMapJoinHashTable() {
- return VectorMapJoinFastHashTable;
+ return vectorMapJoinFastHashTable;
}
private VectorMapJoinFastHashTable createHashTable(int newThreshold) {
@@ -178,7 +178,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
throws SerDeException, HiveException, IOException {
// We are not using the key and value contexts, nor do we support a MapJoinKey.
- VectorMapJoinFastHashTable.putRow((BytesWritable) currentKey, (BytesWritable) currentValue);
+ vectorMapJoinFastHashTable.putRow((BytesWritable) currentKey, (BytesWritable) currentValue);
return null;
}
@@ -214,7 +214,7 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
@Override
public int size() {
- return VectorMapJoinFastHashTable.size();
+ return vectorMapJoinFastHashTable.size();
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
index ce598e3..b51d6fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTableResult.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable;
import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
/*
* Root abstract class for a hash table result.
@@ -29,9 +30,12 @@ public abstract class VectorMapJoinHashTableResult {
private int spillPartitionId;
+ private final WriteBuffers.Position readPos;
+
public VectorMapJoinHashTableResult() {
joinResult = JoinUtil.JoinResult.NOMATCH;
spillPartitionId = -1;
+ readPos = new WriteBuffers.Position();
}
/**
@@ -78,4 +82,8 @@ public abstract class VectorMapJoinHashTableResult {
sb.append("joinResult " + joinResult.name());
return sb.toString();
}
+
+ public WriteBuffers.Position getReadPos() {
+ return readPos;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/74c46e84/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
index 5900428..a4ecd9f 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
@@ -21,11 +21,8 @@ package org.apache.hadoop.hive.serde2;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
-import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hive.common.util.HashCodeUtil;
@@ -53,7 +50,7 @@ public final class WriteBuffers implements RandomAccessOutput {
}
Position writePos = new Position(); // Position where we'd write
- Position defaultReadPos = new Position(); // Position where we'd read (by default).
+ Position unsafeReadPos = new Position(); // Position where we'd read (unsafely at write time).
public WriteBuffers(int wbSize, long maxSize) {
@@ -64,16 +61,18 @@ public final class WriteBuffers implements RandomAccessOutput {
writePos.bufferIndex = -1;
}
- public int readVInt() {
- return (int) readVLong(defaultReadPos);
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public int unsafeReadVInt() {
+ return (int) readVLong(unsafeReadPos);
}
public int readVInt(Position readPos) {
return (int) readVLong(readPos);
}
- public long readVLong() {
- return readVLong(defaultReadPos);
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public long unsafeReadVLong() {
+ return readVLong(unsafeReadPos);
}
public long readVLong(Position readPos) {
@@ -97,8 +96,9 @@ public final class WriteBuffers implements RandomAccessOutput {
return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
}
- public void skipVLong() {
- skipVLong(defaultReadPos);
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public void unsafeSkipVLong() {
+ skipVLong(unsafeReadPos);
}
public void skipVLong(Position readPos) {
@@ -117,8 +117,9 @@ public final class WriteBuffers implements RandomAccessOutput {
}
}
- public void setReadPoint(long offset) {
- setReadPoint(offset, defaultReadPos);
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public void setUnsafeReadPoint(long offset) {
+ setReadPoint(offset, unsafeReadPos);
}
public void setReadPoint(long offset, Position readPos) {
@@ -127,8 +128,9 @@ public final class WriteBuffers implements RandomAccessOutput {
readPos.offset = getOffset(offset);
}
- public int hashCode(long offset, int length) {
- return hashCode(offset, length, defaultReadPos);
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public int unsafeHashCode(long offset, int length) {
+ return hashCode(offset, length, unsafeReadPos);
}
public int hashCode(long offset, int length, Position readPos) {
@@ -352,7 +354,7 @@ public final class WriteBuffers implements RandomAccessOutput {
private void clearState() {
writePos.clear();
- defaultReadPos.clear();
+ unsafeReadPos.clear();
}
@@ -363,8 +365,9 @@ public final class WriteBuffers implements RandomAccessOutput {
return ((long)writePos.bufferIndex << wbSizeLog2) + writePos.offset;
}
- public long getReadPoint() {
- return getReadPoint(defaultReadPos);
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public long getUnsafeReadPoint() {
+ return getReadPoint(unsafeReadPos);
}
public long getReadPoint(Position readPos) {
@@ -518,8 +521,9 @@ public final class WriteBuffers implements RandomAccessOutput {
clearState();
}
- public long readNByteLong(long offset, int bytes) {
- return readNByteLong(offset, bytes, defaultReadPos);
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public long unsafeReadNByteLong(long offset, int bytes) {
+ return readNByteLong(offset, bytes, unsafeReadPos);
}
public long readNByteLong(long offset, int bytes, Position readPos) {
@@ -561,7 +565,7 @@ public final class WriteBuffers implements RandomAccessOutput {
}
public int readInt(long offset) {
- return (int)readNByteLong(offset, 4);
+ return (int)unsafeReadNByteLong(offset, 4);
}
@Override
@@ -606,7 +610,8 @@ public final class WriteBuffers implements RandomAccessOutput {
return writeBuffers.size() * (long) wbSize;
}
- public Position getReadPosition() {
- return defaultReadPos;
+ /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */
+ public Position getUnsafeReadPosition() {
+ return unsafeReadPos;
}
}