You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/24 20:24:02 UTC

svn commit: r1634116 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/spark/SparkMapRecordHandler.java exec/spark/SparkPlanGenerator.java exec/spark/SparkUtilities.java io/HiveContextAwareRecordReader.java io/IOContext.java

Author: xuefu
Date: Fri Oct 24 18:24:01 2014
New Revision: 1634116

URL: http://svn.apache.org/r1634116
Log:
HIVE-8457: MapOperator initialization fails when multiple Spark threads is enabled [Spark Branch] (Chao via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1634116&r1=1634115&r2=1634116&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java Fri Oct 24 18:24:01 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
+import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -88,6 +89,14 @@ public class SparkMapRecordHandler exten
         mo = new MapOperator();
       }
       mo.setConf(mrwork);
+
+      // If the current thread's IOContext is not initialized (because it's reading from a
+      // cached input HadoopRDD), copy from the saved result.
+      IOContext ioContext = IOContext.get();
+      if (ioContext.getInputPath() == null) {
+        IOContext.copy(ioContext, IOContext.getMap().get(SparkUtilities.MAP_IO_CONTEXT));
+      }
+
       // initialize map operator
       mo.setChildren(job);
       l4j.info(mo.dump(0));
@@ -199,6 +208,10 @@ public class SparkMapRecordHandler exten
     } finally {
       MapredContext.close();
       Utilities.clearWorkMap();
+
+      // It's possible that a thread get reused for different queries, so we need to
+      // reset the input path.
+      IOContext.get().setInputPath(null);
     }
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1634116&r1=1634115&r2=1634116&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Fri Oct 24 18:24:01 2014
@@ -294,8 +294,7 @@ public class SparkPlanGenerator {
 
     JavaPairRDD<WritableComparable, Writable> hadoopRDD = sc.hadoopRDD(jobConf, ifClass,
         WritableComparable.class, Writable.class);
-    MapInput result = new MapInput(hadoopRDD,
-        false /*TODO: fix this after resolving HIVE-8457: cloneToWork.containsKey(mapWork)*/);
+    MapInput result = new MapInput(hadoopRDD, cloneToWork.containsKey(mapWork));
     return result;
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1634116&r1=1634115&r2=1634116&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Fri Oct 24 18:24:01 2014
@@ -25,6 +25,9 @@ import org.apache.hadoop.io.BytesWritabl
  */
 public class SparkUtilities {
 
+  // Used to save and retrieve IOContext for multi-insertion.
+  public static final String MAP_IO_CONTEXT = "MAP_IO_CONTEXT";
+
   public static HiveKey copyHiveKey(HiveKey key) {
     HiveKey copy = new HiveKey();
     copy.setDistKeyLength(key.getDistKeyLength());

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1634116&r1=1634115&r2=1634116&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Fri Oct 24 18:24:01 2014
@@ -27,9 +27,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.FooterBuffer;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
 import org.apache.hadoop.hive.ql.io.IOContext.Comparison;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -171,6 +173,18 @@ public abstract class HiveContextAwareRe
     ioCxtRef.isBlockPointer = isBlockPointer;
     ioCxtRef.inputPath = inputPath;
     LOG.info("Processing file " + inputPath);
+
+    // In spark, in multi-insert an input HadoopRDD maybe be shared by multiple
+    // mappers, and if we cache it, only the first thread will have its thread-local
+    // IOContext initialized, while the rest will not.
+    // To solve this issue, we need to save a copy of the initialized IOContext, so that
+    // later it can be used for other threads.
+    if (HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+      IOContext iocontext = new IOContext();
+      IOContext.copy(iocontext, ioCxtRef);
+      IOContext.getMap().put(SparkUtilities.MAP_IO_CONTEXT, iocontext);
+    }
+
     initDone = true;
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1634116&r1=1634115&r2=1634116&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Fri Oct 24 18:24:01 2014
@@ -112,6 +112,27 @@ public class IOContext {
     this.ioExceptions = false;
   }
 
+  /**
+   * Copy all fields values from orig to dest, all existing fields in dest will be overwritten.
+   *
+   * @param dest the IOContext to copy to
+   * @param orig the IOContext to copy from
+   */
+  public static void copy(IOContext dest, IOContext orig) {
+    dest.currentBlockStart = orig.currentBlockStart;
+    dest.nextBlockStart = orig.nextBlockStart;
+    dest.currentRow = orig.currentRow;
+    dest.isBlockPointer = orig.isBlockPointer;
+    dest.ioExceptions = orig.ioExceptions;
+    dest.useSorted = orig.useSorted;
+    dest.isBinarySearching = orig.isBinarySearching;
+    dest.endBinarySearch = orig.endBinarySearch;
+    dest.comparison = orig.comparison;
+    dest.genericUDFClassName = orig.genericUDFClassName;
+    dest.ri = orig.ri;
+    dest.inputPath = orig.inputPath;
+  }
+
   public long getCurrentBlockStart() {
     return currentBlockStart;
   }
@@ -224,4 +245,5 @@ public class IOContext {
     this.comparison = null;
     this.genericUDFClassName = null;
   }
+
 }