You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/11 07:10:39 UTC

svn commit: r1617202 - in /hive/branches/spark: pom.xml ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java

Author: brock
Date: Mon Aug 11 05:10:38 2014
New Revision: 1617202

URL: http://svn.apache.org/r1617202
Log:
HIVE-7540 - NotSerializableException encountered when using sortByKey transformation (Rui Li via Brock) [Spark Branch]

Modified:
    hive/branches/spark/pom.xml
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java

Modified: hive/branches/spark/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/pom.xml?rev=1617202&r1=1617201&r2=1617202&view=diff
==============================================================================
--- hive/branches/spark/pom.xml (original)
+++ hive/branches/spark/pom.xml Mon Aug 11 05:10:38 2014
@@ -147,7 +147,7 @@
     <slf4j.version>1.7.5</slf4j.version>
     <ST4.version>4.0.4</ST4.version>
     <tez.version>0.4.0-incubating</tez.version>
-    <spark.version>1.0.1</spark.version>
+    <spark.version>1.1.0-SNAPSHOT</spark.version>
     <scala.binary.version>2.10</scala.binary.version>
     <scala.version>2.10.4</scala.version>
     <tempus-fugit.version>1.1</tempus-fugit.version>
@@ -193,7 +193,7 @@
       <snapshots>
         <enabled>false</enabled>
       </snapshots>
-    </repository>
+     </repository>
      <repository>
        <id>sonatype-snapshot</id>
        <url>https://oss.sonatype.org/content/repositories/snapshots</url>
@@ -204,6 +204,16 @@
          <enabled>false</enabled>
        </snapshots>
      </repository>
+     <repository>
+       <id>spark-snapshot</id>
+       <url>http://ec2-50-18-79-139.us-west-1.compute.amazonaws.com/data/spark_2.10-1.1-SNAPSHOT/</url>
+       <releases>
+         <enabled>false</enabled>
+       </releases>
+       <snapshots>
+         <enabled>true</enabled>
+       </snapshots>
+     </repository>
   </repositories>
 
   <!-- Hadoop dependency management is done at the bottom under profiles -->

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java?rev=1617202&r1=1617201&r2=1617202&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java Mon Aug 11 05:10:38 2014
@@ -18,23 +18,24 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import java.util.*;
-
-import com.google.common.collect.Ordering;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-
 import scala.Tuple2;
 
+import java.util.*;
+
 public class SortByShuffler implements SparkShuffler {
 
   @Override
   public JavaPairRDD<BytesWritable, Iterable<BytesWritable>> shuffle(
       JavaPairRDD<BytesWritable, BytesWritable> input, int numPartitions) {
-    Comparator comp = Ordering.<BytesWritable>natural();
-    // Due to HIVE-7540, numPartitions must be to 1
-    JavaPairRDD<BytesWritable, BytesWritable> rdd = input.sortByKey(comp, true, 1);
+    JavaPairRDD<BytesWritable, BytesWritable> rdd;
+    if (numPartitions > 0) {
+      rdd = input.sortByKey(true, numPartitions);
+    } else {
+      rdd = input.sortByKey(true);
+    }
     return rdd.mapPartitionsToPair(new ShuffleFunction());
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java?rev=1617202&r1=1617201&r2=1617202&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java Mon Aug 11 05:10:38 2014
@@ -28,7 +28,7 @@ public class SparkEdgeProperty {
   private long edgeType;
   
   private int numPartitions;
-  
+
   public SparkEdgeProperty(long edgeType, int numPartitions) {
     this.edgeType = edgeType;
     this.numPartitions = numPartitions;