You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/01/07 06:42:32 UTC

svn commit: r1649994 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/io/ test/org/apache/hadoop/hive/ql/io/

Author: xuefu
Date: Wed Jan  7 05:42:31 2015
New Revision: 1649994

URL: http://svn.apache.org/r1649994
Log:
HIVE-9243: Static Map in IOContext is not thread safe (Brock via Xuefu)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1649994&r1=1649993&r2=1649994&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Wed Jan  7 05:42:31 2015
@@ -583,15 +583,15 @@ public class MapOperator extends Operato
         }
       }
       else if(vc.equals(VirtualColumn.ROWID)) {
-        if(ctx.getIoCxt().ri == null) {
+        if(ctx.getIoCxt().getRecordIdentifier() == null) {
           vcValues[i] = null;
         }
         else {
           if(vcValues[i] == null) {
             vcValues[i] = new Object[RecordIdentifier.Field.values().length];
           }
-          RecordIdentifier.StructInfo.toArray(ctx.getIoCxt().ri, (Object[])vcValues[i]);
-          ctx.getIoCxt().ri = null;//so we don't accidentally cache the value; shouldn't
+          RecordIdentifier.StructInfo.toArray(ctx.getIoCxt().getRecordIdentifier(), (Object[])vcValues[i]);
+          ctx.getIoCxt().setRecordIdentifier(null);//so we don't accidentally cache the value; shouldn't
           //happen since IO layer either knows how to produce ROW__ID or not - but to be safe
         }
       }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1649994&r1=1649993&r2=1649994&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Wed Jan  7 05:42:31 2015
@@ -117,11 +117,11 @@ public abstract class HiveContextAwareRe
       if(retVal) {
         if(key instanceof RecordIdentifier) {
           //supports AcidInputFormat which uses the KEY pass ROW__ID info
-          ioCxtRef.ri = (RecordIdentifier)key;
+          ioCxtRef.setRecordIdentifier((RecordIdentifier)key);
         }
         else if(recordReader instanceof AcidInputFormat.AcidRecordReader) {
           //supports AcidInputFormat which do not use the KEY pass ROW__ID info
-          ioCxtRef.ri = ((AcidInputFormat.AcidRecordReader) recordReader).getRecordIdentifier();
+          ioCxtRef.setRecordIdentifier(((AcidInputFormat.AcidRecordReader) recordReader).getRecordIdentifier());
         }
       }
       return retVal;
@@ -134,30 +134,30 @@ public abstract class HiveContextAwareRe
   protected void updateIOContext()
       throws IOException {
     long pointerPos = this.getPos();
-    if (!ioCxtRef.isBlockPointer) {
-      ioCxtRef.currentBlockStart = pointerPos;
-      ioCxtRef.currentRow = 0;
+    if (!ioCxtRef.isBlockPointer()) {
+      ioCxtRef.setCurrentBlockStart(pointerPos);
+      ioCxtRef.setCurrentRow(0);
       return;
     }
 
-    ioCxtRef.currentRow++;
+    ioCxtRef.setCurrentRow(ioCxtRef.getCurrentRow() + 1);
 
-    if (ioCxtRef.nextBlockStart == -1) {
-      ioCxtRef.nextBlockStart = pointerPos;
-      ioCxtRef.currentRow = 0;
+    if (ioCxtRef.getNextBlockStart() == -1) {
+      ioCxtRef.setNextBlockStart(pointerPos);
+      ioCxtRef.setCurrentRow(0);
     }
-    if (pointerPos != ioCxtRef.nextBlockStart) {
+    if (pointerPos != ioCxtRef.getNextBlockStart()) {
       // the reader pointer has moved to the end of next block, or the end of
       // current record.
 
-      ioCxtRef.currentRow = 0;
+      ioCxtRef.setCurrentRow(0);
 
-      if (ioCxtRef.currentBlockStart == ioCxtRef.nextBlockStart) {
-        ioCxtRef.currentRow = 1;
+      if (ioCxtRef.getCurrentBlockStart() == ioCxtRef.getNextBlockStart()) {
+        ioCxtRef.setCurrentRow(1);
       }
 
-      ioCxtRef.currentBlockStart = ioCxtRef.nextBlockStart;
-      ioCxtRef.nextBlockStart = pointerPos;
+      ioCxtRef.setCurrentBlockStart(ioCxtRef.getNextBlockStart());
+      ioCxtRef.setNextBlockStart(pointerPos);
     }
   }
 
@@ -168,9 +168,9 @@ public abstract class HiveContextAwareRe
   private void initIOContext(long startPos, boolean isBlockPointer,
       Path inputPath) {
     ioCxtRef = this.getIOContext();
-    ioCxtRef.currentBlockStart = startPos;
-    ioCxtRef.isBlockPointer = isBlockPointer;
-    ioCxtRef.inputPath = inputPath;
+    ioCxtRef.setCurrentBlockStart(startPos);
+    ioCxtRef.setBlockPointer(isBlockPointer);
+    ioCxtRef.setInputPath(inputPath);
     LOG.info("Processing file " + inputPath);
     initDone = true;
   }
@@ -223,7 +223,7 @@ public abstract class HiveContextAwareRe
       // Binary search only works if we know the size of the split, and the recordReader is an
       // RCFileRecordReader
       this.getIOContext().setUseSorted(true);
-      this.getIOContext().setIsBinarySearching(true);
+      this.getIOContext().setBinarySearching(true);
       this.wasUsingSortedSearch = true;
     } else {
       // Use the defalut methods for next in the child class
@@ -285,7 +285,7 @@ public abstract class HiveContextAwareRe
           // binary search, if the new position at least as big as the size of the split, any
           // matching rows must be in the final block, so we can end the binary search.
           if (newPosition == previousPosition || newPosition >= splitEnd) {
-            this.getIOContext().setIsBinarySearching(false);
+            this.getIOContext().setBinarySearching(false);
             sync(rangeStart);
           }
 
@@ -405,7 +405,7 @@ public abstract class HiveContextAwareRe
    */
   private void beginLinearSearch() throws IOException {
     sync(rangeStart);
-    this.getIOContext().setIsBinarySearching(false);
+    this.getIOContext().setBinarySearching(false);
     this.wasUsingSortedSearch = false;
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1649994&r1=1649993&r2=1649994&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Wed Jan  7 05:42:31 2015
@@ -34,18 +34,21 @@ import org.apache.hadoop.fs.Path;
  */
 public class IOContext {
 
-  private static ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){
+  /**
+   * Spark uses this thread local
+   */
+  private static final ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){
     @Override
     protected synchronized IOContext initialValue() { return new IOContext(); }
  };
 
-  private static Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
-  public static Map<String, IOContext> getMap() {
-    return inputNameIOContextMap;
-  }
+  /**
+   * Tez and MR use this map but are single threaded per JVM thus no synchronization is required.
+   */
+  private static final Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
 
   public static IOContext get(String inputName) {
-    if (inputNameIOContextMap.containsKey(inputName) == false) {
+    if (!inputNameIOContextMap.containsKey(inputName)) {
       IOContext ioContext = new IOContext();
       inputNameIOContextMap.put(inputName, ioContext);
     }
@@ -58,26 +61,26 @@ public class IOContext {
     inputNameIOContextMap.clear();
   }
 
-  long currentBlockStart;
-  long nextBlockStart;
-  long currentRow;
-  boolean isBlockPointer;
-  boolean ioExceptions;
+  private long currentBlockStart;
+  private long nextBlockStart;
+  private long currentRow;
+  private boolean isBlockPointer;
+  private boolean ioExceptions;
 
   // Are we using the fact the input is sorted
-  boolean useSorted = false;
+  private boolean useSorted = false;
   // Are we currently performing a binary search
-  boolean isBinarySearching = false;
+  private boolean isBinarySearching = false;
   // Do we want to end the binary search
-  boolean endBinarySearch = false;
+  private boolean endBinarySearch = false;
   // The result of the comparison of the last row processed
-  Comparison comparison = null;
+  private Comparison comparison = null;
   // The class name of the generic UDF being used by the filter
-  String genericUDFClassName = null;
+  private String genericUDFClassName = null;
   /**
    * supports {@link org.apache.hadoop.hive.ql.metadata.VirtualColumn#ROWID}
    */
-  public RecordIdentifier ri;
+  private  RecordIdentifier ri;
 
   public static enum Comparison {
     GREATER,
@@ -86,7 +89,7 @@ public class IOContext {
     UNKNOWN
   }
 
-  Path inputPath;
+  private Path inputPath;
 
   public IOContext() {
     this.currentBlockStart = 0;
@@ -156,7 +159,7 @@ public class IOContext {
     return isBinarySearching;
   }
 
-  public void setIsBinarySearching(boolean isBinarySearching) {
+  public void setBinarySearching(boolean isBinarySearching) {
     this.isBinarySearching = isBinarySearching;
   }
 
@@ -197,6 +200,14 @@ public class IOContext {
     this.genericUDFClassName = genericUDFClassName;
   }
 
+  public RecordIdentifier getRecordIdentifier() {
+    return this.ri;
+  }
+
+  public void setRecordIdentifier(RecordIdentifier ri) {
+    this.ri = ri;
+  }
+
   /**
    * The thread local IOContext is static, we may need to restart the search if, for instance,
    * multiple files are being searched as part of a CombinedHiveRecordReader

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java?rev=1649994&r1=1649993&r2=1649994&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java Wed Jan  7 05:42:31 2015
@@ -118,7 +118,7 @@ public class TestHiveBinarySearchRecordR
     conf.set(Utilities.INPUT_NAME, "TestHiveBinarySearchRecordReader");
     ioContext = IOContext.get(conf.get(Utilities.INPUT_NAME));
     ioContext.setUseSorted(false);
-    ioContext.setIsBinarySearching(false);
+    ioContext.setBinarySearching(false);
     ioContext.setEndBinarySearch(false);
     ioContext.setComparison(null);
     ioContext.setGenericUDFClassName(null);
@@ -252,7 +252,7 @@ public class TestHiveBinarySearchRecordR
     ioContext.setGenericUDFClassName(GenericUDFOPEqual.class.getName());
     Assert.assertTrue(ioContext.isBinarySearching());
     Assert.assertTrue(executeDoNext(hbsReader));
-    ioContext.setIsBinarySearching(false);
+    ioContext.setBinarySearching(false);
     ioContext.setComparison(-1);
     Assert.assertTrue(executeDoNext(hbsReader));
     ioContext.setComparison(0);
@@ -292,7 +292,7 @@ public class TestHiveBinarySearchRecordR
     ioContext.setGenericUDFClassName(GenericUDFOPGreaterThan.class.getName());
     Assert.assertTrue(ioContext.isBinarySearching());
     Assert.assertTrue(executeDoNext(hbsReader));
-    ioContext.setIsBinarySearching(false);
+    ioContext.setBinarySearching(false);
     ioContext.setComparison(-1);
     Assert.assertTrue(executeDoNext(hbsReader));
     ioContext.setComparison(0);
@@ -306,7 +306,7 @@ public class TestHiveBinarySearchRecordR
     ioContext.setGenericUDFClassName(GenericUDFOPEqualOrGreaterThan.class.getName());
     Assert.assertTrue(ioContext.isBinarySearching());
     Assert.assertTrue(executeDoNext(hbsReader));
-    ioContext.setIsBinarySearching(false);
+    ioContext.setBinarySearching(false);
     ioContext.setComparison(-1);
     Assert.assertTrue(executeDoNext(hbsReader));
     ioContext.setComparison(0);