You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/23 21:24:56 UTC
svn commit: r1633931 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark:
HiveBaseFunctionResultList.java MapInput.java SparkPlanGenerator.java
SparkTran.java SparkUtilities.java
Author: xuefu
Date: Thu Oct 23 19:24:55 2014
New Revision: 1633931
URL: http://svn.apache.org/r1633931
Log:
HIVE-8545: Exception when casting Text to BytesWritable [Spark Branch] (Chao via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java?rev=1633931&r1=1633930&r2=1633931&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java Thu Oct 23 19:24:55 2014
@@ -63,15 +63,8 @@ public abstract class HiveBaseFunctionRe
@Override
public void collect(HiveKey key, BytesWritable value) throws IOException {
- lastRecordOutput.add(copyHiveKey(key), SparkUtilities.copyBytesWritable(value));
- }
-
- private static HiveKey copyHiveKey(HiveKey key) {
- HiveKey copy = new HiveKey();
- copy.setDistKeyLength(key.getDistKeyLength());
- copy.setHashCode(key.hashCode());
- copy.set(key);
- return copy;
+ lastRecordOutput.add(SparkUtilities.copyHiveKey(key),
+ SparkUtilities.copyBytesWritable(value));
}
/** Process the given record. */
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java?rev=1633931&r1=1633930&r2=1633931&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java Thu Oct 23 19:24:55 2014
@@ -18,24 +18,26 @@
package org.apache.hadoop.hive.ql.exec.spark;
-import org.apache.hadoop.hive.ql.io.HiveKey;
-import org.apache.hadoop.io.BinaryComparable;
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.spark.api.java.JavaPairRDD;
-
import com.google.common.base.Preconditions;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
-public class MapInput implements SparkTran<BytesWritable, BytesWritable, HiveKey, BytesWritable> {
- private JavaPairRDD<HiveKey, BytesWritable> hadoopRDD;
+
+public class MapInput implements SparkTran<WritableComparable, Writable,
+ WritableComparable, Writable> {
+ private JavaPairRDD<WritableComparable, Writable> hadoopRDD;
private boolean toCache;
- public MapInput(JavaPairRDD<HiveKey, BytesWritable> hadoopRDD) {
- this.hadoopRDD = hadoopRDD;
+ public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD) {
+ this(hadoopRDD, false);
}
- public MapInput(JavaPairRDD<HiveKey, BytesWritable> hadoopRDD, boolean toCache) {
+ public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache) {
this.hadoopRDD = hadoopRDD;
this.toCache = toCache;
}
@@ -45,28 +47,28 @@ public class MapInput implements SparkTr
}
@Override
- public JavaPairRDD<HiveKey, BytesWritable> transform(
- JavaPairRDD<BytesWritable, BytesWritable> input) {
+ public JavaPairRDD<WritableComparable, Writable> transform(
+ JavaPairRDD<WritableComparable, Writable> input) {
Preconditions.checkArgument(input == null,
"AssertionError: MapInput doesn't take any input");
- JavaPairRDD result = hadoopRDD;
- if (toCache) {
- result = result.mapToPair(new CopyFunction());
- return result.cache();
- } else {
- return result;
- }
+ return toCache ? hadoopRDD.mapToPair(new CopyFunction()).cache() : hadoopRDD;
}
- private static class CopyFunction implements PairFunction<Tuple2<BytesWritable, BytesWritable>,
- BytesWritable, BytesWritable> {
+ private static class CopyFunction implements PairFunction<Tuple2<WritableComparable, Writable>,
+ WritableComparable, Writable> {
+
+ private transient Configuration conf;
@Override
- public Tuple2<BytesWritable, BytesWritable> call(Tuple2<BytesWritable, BytesWritable> tup) throws Exception {
- // no need to copy key since it never get used in HiveMapFunction
- BytesWritable value = SparkUtilities.copyBytesWritable(tup._2());
- return new Tuple2<BytesWritable, BytesWritable>(tup._1(), value);
+ public Tuple2<WritableComparable, Writable>
+ call(Tuple2<WritableComparable, Writable> tuple) throws Exception {
+ if (conf == null) {
+ conf = new Configuration();
+ }
+
+ return new Tuple2<WritableComparable, Writable>(tuple._1(),
+ WritableUtils.clone(tuple._2(), conf));
}
- }
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1633931&r1=1633930&r2=1633931&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Thu Oct 23 19:24:55 2014
@@ -39,7 +39,6 @@ import org.apache.hadoop.hive.ql.io.merg
import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
@@ -57,7 +56,6 @@ import org.apache.hadoop.hive.ql.plan.Sp
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
@@ -294,9 +292,11 @@ public class SparkPlanGenerator {
JobConf jobConf = cloneJobConf(mapWork);
Class ifClass = getInputFormat(jobConf, mapWork);
- JavaPairRDD<HiveKey, BytesWritable> hadoopRDD = sc.hadoopRDD(jobConf, ifClass,
+ JavaPairRDD<WritableComparable, Writable> hadoopRDD = sc.hadoopRDD(jobConf, ifClass,
WritableComparable.class, Writable.class);
- return new MapInput(hadoopRDD, false /*TODO: fix this after resolving HIVE-8457: cloneToWork.containsKey(mapWork)*/);
+ MapInput result = new MapInput(hadoopRDD,
+ false /*TODO: fix this after resolving HIVE-8457: cloneToWork.containsKey(mapWork)*/);
+ return result;
}
private ShuffleTran generate(SparkEdgeProperty edge, boolean needCache) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java?rev=1633931&r1=1633930&r2=1633931&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java Thu Oct 23 19:24:55 2014
@@ -18,10 +18,10 @@
package org.apache.hadoop.hive.ql.exec.spark;
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.spark.api.java.JavaPairRDD;
-public interface SparkTran<KI extends BytesWritable, VI, KO extends BytesWritable, VO> {
+public interface SparkTran<KI extends WritableComparable, VI, KO extends WritableComparable, VO> {
JavaPairRDD<KO, VO> transform(
JavaPairRDD<KI, VI> input);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1633931&r1=1633930&r2=1633931&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Thu Oct 23 19:24:55 2014
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.exec.spark;
+import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
/**
@@ -24,6 +25,14 @@ import org.apache.hadoop.io.BytesWritabl
*/
public class SparkUtilities {
+ public static HiveKey copyHiveKey(HiveKey key) {
+ HiveKey copy = new HiveKey();
+ copy.setDistKeyLength(key.getDistKeyLength());
+ copy.setHashCode(key.hashCode());
+ copy.set(key);
+ return copy;
+ }
+
public static BytesWritable copyBytesWritable(BytesWritable bw) {
BytesWritable copy = new BytesWritable();
copy.set(bw);