You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/24 20:24:02 UTC
svn commit: r1634116 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql:
exec/spark/SparkMapRecordHandler.java exec/spark/SparkPlanGenerator.java
exec/spark/SparkUtilities.java io/HiveContextAwareRecordReader.java
io/IOContext.java
Author: xuefu
Date: Fri Oct 24 18:24:01 2014
New Revision: 1634116
URL: http://svn.apache.org/r1634116
Log:
HIVE-8457: MapOperator initialization fails when multiple Spark threads is enabled [Spark Branch] (Chao via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1634116&r1=1634115&r2=1634116&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java Fri Oct 24 18:24:01 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
+import org.apache.hadoop.hive.ql.io.IOContext;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -88,6 +89,14 @@ public class SparkMapRecordHandler exten
mo = new MapOperator();
}
mo.setConf(mrwork);
+
+ // If the current thread's IOContext is not initialized (because it's reading from a
+ // cached input HadoopRDD), copy from the saved result.
+ IOContext ioContext = IOContext.get();
+ if (ioContext.getInputPath() == null) {
+ IOContext.copy(ioContext, IOContext.getMap().get(SparkUtilities.MAP_IO_CONTEXT));
+ }
+
// initialize map operator
mo.setChildren(job);
l4j.info(mo.dump(0));
@@ -199,6 +208,10 @@ public class SparkMapRecordHandler exten
} finally {
MapredContext.close();
Utilities.clearWorkMap();
+
+ // It's possible that a thread get reused for different queries, so we need to
+ // reset the input path.
+ IOContext.get().setInputPath(null);
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1634116&r1=1634115&r2=1634116&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Fri Oct 24 18:24:01 2014
@@ -294,8 +294,7 @@ public class SparkPlanGenerator {
JavaPairRDD<WritableComparable, Writable> hadoopRDD = sc.hadoopRDD(jobConf, ifClass,
WritableComparable.class, Writable.class);
- MapInput result = new MapInput(hadoopRDD,
- false /*TODO: fix this after resolving HIVE-8457: cloneToWork.containsKey(mapWork)*/);
+ MapInput result = new MapInput(hadoopRDD, cloneToWork.containsKey(mapWork));
return result;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1634116&r1=1634115&r2=1634116&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Fri Oct 24 18:24:01 2014
@@ -25,6 +25,9 @@ import org.apache.hadoop.io.BytesWritabl
*/
public class SparkUtilities {
+ // Used to save and retrieve IOContext for multi-insertion.
+ public static final String MAP_IO_CONTEXT = "MAP_IO_CONTEXT";
+
public static HiveKey copyHiveKey(HiveKey key) {
HiveKey copy = new HiveKey();
copy.setDistKeyLength(key.getDistKeyLength());
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1634116&r1=1634115&r2=1634116&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Fri Oct 24 18:24:01 2014
@@ -27,9 +27,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.FooterBuffer;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.io.IOContext.Comparison;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -171,6 +173,18 @@ public abstract class HiveContextAwareRe
ioCxtRef.isBlockPointer = isBlockPointer;
ioCxtRef.inputPath = inputPath;
LOG.info("Processing file " + inputPath);
+
+ // In spark, in multi-insert an input HadoopRDD maybe be shared by multiple
+ // mappers, and if we cache it, only the first thread will have its thread-local
+ // IOContext initialized, while the rest will not.
+ // To solve this issue, we need to save a copy of the initialized IOContext, so that
+ // later it can be used for other threads.
+ if (HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ IOContext iocontext = new IOContext();
+ IOContext.copy(iocontext, ioCxtRef);
+ IOContext.getMap().put(SparkUtilities.MAP_IO_CONTEXT, iocontext);
+ }
+
initDone = true;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1634116&r1=1634115&r2=1634116&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Fri Oct 24 18:24:01 2014
@@ -112,6 +112,27 @@ public class IOContext {
this.ioExceptions = false;
}
+ /**
+ * Copy all fields values from orig to dest, all existing fields in dest will be overwritten.
+ *
+ * @param dest the IOContext to copy to
+ * @param orig the IOContext to copy from
+ */
+ public static void copy(IOContext dest, IOContext orig) {
+ dest.currentBlockStart = orig.currentBlockStart;
+ dest.nextBlockStart = orig.nextBlockStart;
+ dest.currentRow = orig.currentRow;
+ dest.isBlockPointer = orig.isBlockPointer;
+ dest.ioExceptions = orig.ioExceptions;
+ dest.useSorted = orig.useSorted;
+ dest.isBinarySearching = orig.isBinarySearching;
+ dest.endBinarySearch = orig.endBinarySearch;
+ dest.comparison = orig.comparison;
+ dest.genericUDFClassName = orig.genericUDFClassName;
+ dest.ri = orig.ri;
+ dest.inputPath = orig.inputPath;
+ }
+
public long getCurrentBlockStart() {
return currentBlockStart;
}
@@ -224,4 +245,5 @@ public class IOContext {
this.comparison = null;
this.genericUDFClassName = null;
}
+
}