You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/11 07:10:39 UTC
svn commit: r1617202 - in /hive/branches/spark: pom.xml
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
Author: brock
Date: Mon Aug 11 05:10:38 2014
New Revision: 1617202
URL: http://svn.apache.org/r1617202
Log:
HIVE-7540 - NotSerializableException encountered when using sortByKey transformation (Rui Li via Brock) [Spark Branch]
Modified:
hive/branches/spark/pom.xml
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
Modified: hive/branches/spark/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/pom.xml?rev=1617202&r1=1617201&r2=1617202&view=diff
==============================================================================
--- hive/branches/spark/pom.xml (original)
+++ hive/branches/spark/pom.xml Mon Aug 11 05:10:38 2014
@@ -147,7 +147,7 @@
<slf4j.version>1.7.5</slf4j.version>
<ST4.version>4.0.4</ST4.version>
<tez.version>0.4.0-incubating</tez.version>
- <spark.version>1.0.1</spark.version>
+ <spark.version>1.1.0-SNAPSHOT</spark.version>
<scala.binary.version>2.10</scala.binary.version>
<scala.version>2.10.4</scala.version>
<tempus-fugit.version>1.1</tempus-fugit.version>
@@ -193,7 +193,7 @@
<snapshots>
<enabled>false</enabled>
</snapshots>
- </repository>
+ </repository>
<repository>
<id>sonatype-snapshot</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
@@ -204,6 +204,16 @@
<enabled>false</enabled>
</snapshots>
</repository>
+ <repository>
+ <id>spark-snapshot</id>
+ <url>http://ec2-50-18-79-139.us-west-1.compute.amazonaws.com/data/spark_2.10-1.1-SNAPSHOT/</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
</repositories>
<!-- Hadoop dependency management is done at the bottom under profiles -->
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java?rev=1617202&r1=1617201&r2=1617202&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java Mon Aug 11 05:10:38 2014
@@ -18,23 +18,24 @@
package org.apache.hadoop.hive.ql.exec.spark;
-import java.util.*;
-
-import com.google.common.collect.Ordering;
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
-
import scala.Tuple2;
+import java.util.*;
+
public class SortByShuffler implements SparkShuffler {
@Override
public JavaPairRDD<BytesWritable, Iterable<BytesWritable>> shuffle(
JavaPairRDD<BytesWritable, BytesWritable> input, int numPartitions) {
- Comparator comp = Ordering.<BytesWritable>natural();
- // Due to HIVE-7540, numPartitions must be to 1
- JavaPairRDD<BytesWritable, BytesWritable> rdd = input.sortByKey(comp, true, 1);
+ JavaPairRDD<BytesWritable, BytesWritable> rdd;
+ if (numPartitions > 0) {
+ rdd = input.sortByKey(true, numPartitions);
+ } else {
+ rdd = input.sortByKey(true);
+ }
return rdd.mapPartitionsToPair(new ShuffleFunction());
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java?rev=1617202&r1=1617201&r2=1617202&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java Mon Aug 11 05:10:38 2014
@@ -28,7 +28,7 @@ public class SparkEdgeProperty {
private long edgeType;
private int numPartitions;
-
+
public SparkEdgeProperty(long edgeType, int numPartitions) {
this.edgeType = edgeType;
this.numPartitions = numPartitions;