You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/23 21:24:56 UTC

svn commit: r1633931 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: HiveBaseFunctionResultList.java MapInput.java SparkPlanGenerator.java SparkTran.java SparkUtilities.java

Author: xuefu
Date: Thu Oct 23 19:24:55 2014
New Revision: 1633931

URL: http://svn.apache.org/r1633931
Log:
HIVE-8545: Exception when casting Text to BytesWritable [Spark Branch] (Chao via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java?rev=1633931&r1=1633930&r2=1633931&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java Thu Oct 23 19:24:55 2014
@@ -63,15 +63,8 @@ public abstract class HiveBaseFunctionRe
 
   @Override
   public void collect(HiveKey key, BytesWritable value) throws IOException {
-    lastRecordOutput.add(copyHiveKey(key), SparkUtilities.copyBytesWritable(value));
-  }
-
-  private static HiveKey copyHiveKey(HiveKey key) {
-    HiveKey copy = new HiveKey();
-    copy.setDistKeyLength(key.getDistKeyLength());
-    copy.setHashCode(key.hashCode());
-    copy.set(key);
-    return copy;
+    lastRecordOutput.add(SparkUtilities.copyHiveKey(key),
+        SparkUtilities.copyBytesWritable(value));
   }
 
   /** Process the given record. */

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java?rev=1633931&r1=1633930&r2=1633931&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java Thu Oct 23 19:24:55 2014
@@ -18,24 +18,26 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import org.apache.hadoop.hive.ql.io.HiveKey;
-import org.apache.hadoop.io.BinaryComparable;
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.spark.api.java.JavaPairRDD;
-
 import com.google.common.base.Preconditions;
 import org.apache.spark.api.java.function.PairFunction;
 import scala.Tuple2;
 
-public class MapInput implements SparkTran<BytesWritable, BytesWritable, HiveKey, BytesWritable> {
-  private JavaPairRDD<HiveKey, BytesWritable> hadoopRDD;
+
+public class MapInput implements SparkTran<WritableComparable, Writable,
+    WritableComparable, Writable> {
+  private JavaPairRDD<WritableComparable, Writable> hadoopRDD;
   private boolean toCache;
 
-  public MapInput(JavaPairRDD<HiveKey, BytesWritable> hadoopRDD) {
-    this.hadoopRDD = hadoopRDD;
+  public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD) {
+    this(hadoopRDD, false);
   }
 
-  public MapInput(JavaPairRDD<HiveKey, BytesWritable> hadoopRDD, boolean toCache) {
+  public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache) {
     this.hadoopRDD = hadoopRDD;
     this.toCache = toCache;
   }
@@ -45,28 +47,28 @@ public class MapInput implements SparkTr
   }
 
   @Override
-  public JavaPairRDD<HiveKey, BytesWritable> transform(
-      JavaPairRDD<BytesWritable, BytesWritable> input) {
+  public JavaPairRDD<WritableComparable, Writable> transform(
+      JavaPairRDD<WritableComparable, Writable> input) {
     Preconditions.checkArgument(input == null,
         "AssertionError: MapInput doesn't take any input");
-    JavaPairRDD result = hadoopRDD;
-    if (toCache) {
-      result = result.mapToPair(new CopyFunction());
-      return result.cache();
-    } else {
-      return result;
-    }
+    return toCache ? hadoopRDD.mapToPair(new CopyFunction()).cache() : hadoopRDD;
   }
 
-  private static class CopyFunction implements PairFunction<Tuple2<BytesWritable, BytesWritable>,
-        BytesWritable, BytesWritable> {
+  private static class CopyFunction implements PairFunction<Tuple2<WritableComparable, Writable>,
+    WritableComparable, Writable> {
+
+    private transient Configuration conf;
 
     @Override
-    public Tuple2<BytesWritable, BytesWritable> call(Tuple2<BytesWritable, BytesWritable> tup) throws Exception {
-      // no need to copy key since it never get used in HiveMapFunction
-      BytesWritable value = SparkUtilities.copyBytesWritable(tup._2());
-      return new Tuple2<BytesWritable, BytesWritable>(tup._1(), value);
+    public Tuple2<WritableComparable, Writable>
+    call(Tuple2<WritableComparable, Writable> tuple) throws Exception {
+      if (conf == null) {
+        conf = new Configuration();
+      }
+
+      return new Tuple2<WritableComparable, Writable>(tuple._1(),
+          WritableUtils.clone(tuple._2(), conf));
     }
-  }
 
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1633931&r1=1633930&r2=1633931&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Thu Oct 23 19:24:55 2014
@@ -39,7 +39,6 @@ import org.apache.hadoop.hive.ql.io.merg
 import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
@@ -57,7 +56,6 @@ import org.apache.hadoop.hive.ql.plan.Sp
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
@@ -294,9 +292,11 @@ public class SparkPlanGenerator {
     JobConf jobConf = cloneJobConf(mapWork);
     Class ifClass = getInputFormat(jobConf, mapWork);
 
-    JavaPairRDD<HiveKey, BytesWritable> hadoopRDD = sc.hadoopRDD(jobConf, ifClass,
+    JavaPairRDD<WritableComparable, Writable> hadoopRDD = sc.hadoopRDD(jobConf, ifClass,
         WritableComparable.class, Writable.class);
-    return new MapInput(hadoopRDD, false /*TODO: fix this after resolving HIVE-8457: cloneToWork.containsKey(mapWork)*/);
+    MapInput result = new MapInput(hadoopRDD,
+        false /*TODO: fix this after resolving HIVE-8457: cloneToWork.containsKey(mapWork)*/);
+    return result;
   }
 
   private ShuffleTran generate(SparkEdgeProperty edge, boolean needCache) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java?rev=1633931&r1=1633930&r2=1633931&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java Thu Oct 23 19:24:55 2014
@@ -18,10 +18,10 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.spark.api.java.JavaPairRDD;
 
-public interface SparkTran<KI extends BytesWritable, VI, KO extends BytesWritable, VO> {
+public interface SparkTran<KI extends WritableComparable, VI, KO extends WritableComparable, VO> {
   JavaPairRDD<KO, VO> transform(
       JavaPairRDD<KI, VI> input);
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1633931&r1=1633930&r2=1633931&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Thu Oct 23 19:24:55 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
 
 /**
@@ -24,6 +25,14 @@ import org.apache.hadoop.io.BytesWritabl
  */
 public class SparkUtilities {
 
+  public static HiveKey copyHiveKey(HiveKey key) {
+    HiveKey copy = new HiveKey();
+    copy.setDistKeyLength(key.getDistKeyLength());
+    copy.setHashCode(key.hashCode());
+    copy.set(key);
+    return copy;
+  }
+
   public static BytesWritable copyBytesWritable(BytesWritable bw) {
     BytesWritable copy = new BytesWritable();
     copy.set(bw);