You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/01/07 06:42:32 UTC
svn commit: r1649994 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/io/
test/org/apache/hadoop/hive/ql/io/
Author: xuefu
Date: Wed Jan 7 05:42:31 2015
New Revision: 1649994
URL: http://svn.apache.org/r1649994
Log:
HIVE-9243: Static Map in IOContext is not thread safe (Brock via Xuefu)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1649994&r1=1649993&r2=1649994&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Wed Jan 7 05:42:31 2015
@@ -583,15 +583,15 @@ public class MapOperator extends Operato
}
}
else if(vc.equals(VirtualColumn.ROWID)) {
- if(ctx.getIoCxt().ri == null) {
+ if(ctx.getIoCxt().getRecordIdentifier() == null) {
vcValues[i] = null;
}
else {
if(vcValues[i] == null) {
vcValues[i] = new Object[RecordIdentifier.Field.values().length];
}
- RecordIdentifier.StructInfo.toArray(ctx.getIoCxt().ri, (Object[])vcValues[i]);
- ctx.getIoCxt().ri = null;//so we don't accidentally cache the value; shouldn't
+ RecordIdentifier.StructInfo.toArray(ctx.getIoCxt().getRecordIdentifier(), (Object[])vcValues[i]);
+ ctx.getIoCxt().setRecordIdentifier(null);//so we don't accidentally cache the value; shouldn't
//happen since IO layer either knows how to produce ROW__ID or not - but to be safe
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1649994&r1=1649993&r2=1649994&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Wed Jan 7 05:42:31 2015
@@ -117,11 +117,11 @@ public abstract class HiveContextAwareRe
if(retVal) {
if(key instanceof RecordIdentifier) {
//supports AcidInputFormat which uses the KEY pass ROW__ID info
- ioCxtRef.ri = (RecordIdentifier)key;
+ ioCxtRef.setRecordIdentifier((RecordIdentifier)key);
}
else if(recordReader instanceof AcidInputFormat.AcidRecordReader) {
//supports AcidInputFormat which do not use the KEY pass ROW__ID info
- ioCxtRef.ri = ((AcidInputFormat.AcidRecordReader) recordReader).getRecordIdentifier();
+ ioCxtRef.setRecordIdentifier(((AcidInputFormat.AcidRecordReader) recordReader).getRecordIdentifier());
}
}
return retVal;
@@ -134,30 +134,30 @@ public abstract class HiveContextAwareRe
protected void updateIOContext()
throws IOException {
long pointerPos = this.getPos();
- if (!ioCxtRef.isBlockPointer) {
- ioCxtRef.currentBlockStart = pointerPos;
- ioCxtRef.currentRow = 0;
+ if (!ioCxtRef.isBlockPointer()) {
+ ioCxtRef.setCurrentBlockStart(pointerPos);
+ ioCxtRef.setCurrentRow(0);
return;
}
- ioCxtRef.currentRow++;
+ ioCxtRef.setCurrentRow(ioCxtRef.getCurrentRow() + 1);
- if (ioCxtRef.nextBlockStart == -1) {
- ioCxtRef.nextBlockStart = pointerPos;
- ioCxtRef.currentRow = 0;
+ if (ioCxtRef.getNextBlockStart() == -1) {
+ ioCxtRef.setNextBlockStart(pointerPos);
+ ioCxtRef.setCurrentRow(0);
}
- if (pointerPos != ioCxtRef.nextBlockStart) {
+ if (pointerPos != ioCxtRef.getNextBlockStart()) {
// the reader pointer has moved to the end of next block, or the end of
// current record.
- ioCxtRef.currentRow = 0;
+ ioCxtRef.setCurrentRow(0);
- if (ioCxtRef.currentBlockStart == ioCxtRef.nextBlockStart) {
- ioCxtRef.currentRow = 1;
+ if (ioCxtRef.getCurrentBlockStart() == ioCxtRef.getNextBlockStart()) {
+ ioCxtRef.setCurrentRow(1);
}
- ioCxtRef.currentBlockStart = ioCxtRef.nextBlockStart;
- ioCxtRef.nextBlockStart = pointerPos;
+ ioCxtRef.setCurrentBlockStart(ioCxtRef.getNextBlockStart());
+ ioCxtRef.setNextBlockStart(pointerPos);
}
}
@@ -168,9 +168,9 @@ public abstract class HiveContextAwareRe
private void initIOContext(long startPos, boolean isBlockPointer,
Path inputPath) {
ioCxtRef = this.getIOContext();
- ioCxtRef.currentBlockStart = startPos;
- ioCxtRef.isBlockPointer = isBlockPointer;
- ioCxtRef.inputPath = inputPath;
+ ioCxtRef.setCurrentBlockStart(startPos);
+ ioCxtRef.setBlockPointer(isBlockPointer);
+ ioCxtRef.setInputPath(inputPath);
LOG.info("Processing file " + inputPath);
initDone = true;
}
@@ -223,7 +223,7 @@ public abstract class HiveContextAwareRe
// Binary search only works if we know the size of the split, and the recordReader is an
// RCFileRecordReader
this.getIOContext().setUseSorted(true);
- this.getIOContext().setIsBinarySearching(true);
+ this.getIOContext().setBinarySearching(true);
this.wasUsingSortedSearch = true;
} else {
// Use the defalut methods for next in the child class
@@ -285,7 +285,7 @@ public abstract class HiveContextAwareRe
// binary search, if the new position at least as big as the size of the split, any
// matching rows must be in the final block, so we can end the binary search.
if (newPosition == previousPosition || newPosition >= splitEnd) {
- this.getIOContext().setIsBinarySearching(false);
+ this.getIOContext().setBinarySearching(false);
sync(rangeStart);
}
@@ -405,7 +405,7 @@ public abstract class HiveContextAwareRe
*/
private void beginLinearSearch() throws IOException {
sync(rangeStart);
- this.getIOContext().setIsBinarySearching(false);
+ this.getIOContext().setBinarySearching(false);
this.wasUsingSortedSearch = false;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1649994&r1=1649993&r2=1649994&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Wed Jan 7 05:42:31 2015
@@ -34,18 +34,21 @@ import org.apache.hadoop.fs.Path;
*/
public class IOContext {
- private static ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){
+ /**
+ * Spark uses this thread local
+ */
+ private static final ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){
@Override
protected synchronized IOContext initialValue() { return new IOContext(); }
};
- private static Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
- public static Map<String, IOContext> getMap() {
- return inputNameIOContextMap;
- }
+ /**
+ * Tez and MR use this map but are single threaded per JVM thus no synchronization is required.
+ */
+ private static final Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
public static IOContext get(String inputName) {
- if (inputNameIOContextMap.containsKey(inputName) == false) {
+ if (!inputNameIOContextMap.containsKey(inputName)) {
IOContext ioContext = new IOContext();
inputNameIOContextMap.put(inputName, ioContext);
}
@@ -58,26 +61,26 @@ public class IOContext {
inputNameIOContextMap.clear();
}
- long currentBlockStart;
- long nextBlockStart;
- long currentRow;
- boolean isBlockPointer;
- boolean ioExceptions;
+ private long currentBlockStart;
+ private long nextBlockStart;
+ private long currentRow;
+ private boolean isBlockPointer;
+ private boolean ioExceptions;
// Are we using the fact the input is sorted
- boolean useSorted = false;
+ private boolean useSorted = false;
// Are we currently performing a binary search
- boolean isBinarySearching = false;
+ private boolean isBinarySearching = false;
// Do we want to end the binary search
- boolean endBinarySearch = false;
+ private boolean endBinarySearch = false;
// The result of the comparison of the last row processed
- Comparison comparison = null;
+ private Comparison comparison = null;
// The class name of the generic UDF being used by the filter
- String genericUDFClassName = null;
+ private String genericUDFClassName = null;
/**
* supports {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#ROWID}
*/
- public RecordIdentifier ri;
+ private RecordIdentifier ri;
public static enum Comparison {
GREATER,
@@ -86,7 +89,7 @@ public class IOContext {
UNKNOWN
}
- Path inputPath;
+ private Path inputPath;
public IOContext() {
this.currentBlockStart = 0;
@@ -156,7 +159,7 @@ public class IOContext {
return isBinarySearching;
}
- public void setIsBinarySearching(boolean isBinarySearching) {
+ public void setBinarySearching(boolean isBinarySearching) {
this.isBinarySearching = isBinarySearching;
}
@@ -197,6 +200,14 @@ public class IOContext {
this.genericUDFClassName = genericUDFClassName;
}
+ public RecordIdentifier getRecordIdentifier() {
+ return this.ri;
+ }
+
+ public void setRecordIdentifier(RecordIdentifier ri) {
+ this.ri = ri;
+ }
+
/**
* The thread local IOContext is static, we may need to restart the search if, for instance,
* multiple files are being searched as part of a CombinedHiveRecordReader
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java?rev=1649994&r1=1649993&r2=1649994&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java Wed Jan 7 05:42:31 2015
@@ -118,7 +118,7 @@ public class TestHiveBinarySearchRecordR
conf.set(Utilities.INPUT_NAME, "TestHiveBinarySearchRecordReader");
ioContext = IOContext.get(conf.get(Utilities.INPUT_NAME));
ioContext.setUseSorted(false);
- ioContext.setIsBinarySearching(false);
+ ioContext.setBinarySearching(false);
ioContext.setEndBinarySearch(false);
ioContext.setComparison(null);
ioContext.setGenericUDFClassName(null);
@@ -252,7 +252,7 @@ public class TestHiveBinarySearchRecordR
ioContext.setGenericUDFClassName(GenericUDFOPEqual.class.getName());
Assert.assertTrue(ioContext.isBinarySearching());
Assert.assertTrue(executeDoNext(hbsReader));
- ioContext.setIsBinarySearching(false);
+ ioContext.setBinarySearching(false);
ioContext.setComparison(-1);
Assert.assertTrue(executeDoNext(hbsReader));
ioContext.setComparison(0);
@@ -292,7 +292,7 @@ public class TestHiveBinarySearchRecordR
ioContext.setGenericUDFClassName(GenericUDFOPGreaterThan.class.getName());
Assert.assertTrue(ioContext.isBinarySearching());
Assert.assertTrue(executeDoNext(hbsReader));
- ioContext.setIsBinarySearching(false);
+ ioContext.setBinarySearching(false);
ioContext.setComparison(-1);
Assert.assertTrue(executeDoNext(hbsReader));
ioContext.setComparison(0);
@@ -306,7 +306,7 @@ public class TestHiveBinarySearchRecordR
ioContext.setGenericUDFClassName(GenericUDFOPEqualOrGreaterThan.class.getName());
Assert.assertTrue(ioContext.isBinarySearching());
Assert.assertTrue(executeDoNext(hbsReader));
- ioContext.setIsBinarySearching(false);
+ ioContext.setBinarySearching(false);
ioContext.setComparison(-1);
Assert.assertTrue(executeDoNext(hbsReader));
ioContext.setComparison(0);