You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/07/27 13:43:53 UTC

hive git commit: HIVE-20032: Don't serialize hashCode for repartitionAndSortWithinPartitions (Sahil Takiar, reviewed by Rui Li)

Repository: hive
Updated Branches:
  refs/heads/master 94ec36865 -> 1e437e2b1


HIVE-20032: Don't serialize hashCode for repartitionAndSortWithinPartitions (Sahil Takiar, reviewed by Rui Li)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1e437e2b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1e437e2b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1e437e2b

Branch: refs/heads/master
Commit: 1e437e2b1e00aa1942781f0a9b5b2f0868644b81
Parents: 94ec368
Author: Sahil Takiar <ta...@gmail.com>
Authored: Thu Jun 28 17:50:41 2018 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Fri Jul 27 08:38:28 2018 -0500

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  2 +-
 .../hive/ql/exec/spark/TestSparkStatistics.java | 12 ++--
 .../jdbc/TestJdbcWithLocalClusterSpark.java     | 10 +--
 .../jdbc/TestJdbcWithMiniHS2ErasureCoding.java  | 11 ++--
 ...stMultiSessionsHS2WithLocalClusterSpark.java | 10 +--
 .../apache/hive/spark/HiveKryoRegistrator.java  |  2 +-
 .../hive/spark/NoHashCodeKryoSerializer.java    | 65 ++++++++++++++++++++
 .../ql/exec/spark/ShuffleKryoSerializer.java    | 62 +++++++++++++++++++
 .../hive/ql/exec/spark/SortByShuffler.java      | 12 +++-
 .../hive/ql/exec/spark/SparkPlanGenerator.java  | 20 ++++--
 .../org/apache/hadoop/hive/ql/io/HiveKey.java   |  4 ++
 .../hive/ql/exec/spark/TestHiveSparkClient.java | 17 ++---
 .../hive/ql/exec/spark/TestSparkPlan.java       | 14 +++--
 .../hive/spark/client/TestSparkClient.java      |  9 ++-
 14 files changed, 208 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9f1da60..39c77b3 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4239,7 +4239,7 @@ public class HiveConf extends Configuration {
         "If this is set to true, mapjoin optimization in Hive/Spark will use statistics from\n" +
         "TableScan operators at the root of operator tree, instead of parent ReduceSink\n" +
         "operators of the Join operator."),
-    SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", false,
+    SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", true,
         "If this is set to true, Hive on Spark will register custom serializers for data types\n" +
         "in shuffle. This should result in less shuffled data."),
     SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",

http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
index d383873..191d5f5 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
@@ -33,6 +33,9 @@ import org.apache.hadoop.hive.ql.session.SessionState;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
@@ -41,13 +44,10 @@ import java.util.stream.Collectors;
 public class TestSparkStatistics {
 
   @Test
-  public void testSparkStatistics() {
+  public void testSparkStatistics() throws MalformedURLException {
+    String confDir = "../../data/conf/spark/standalone/hive-site.xml";
+    HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
     HiveConf conf = new HiveConf();
-    conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
-            SQLStdHiveAuthorizerFactory.class.getName());
-    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
-    conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark");
-    conf.set("spark.master", "local-cluster[1,2,1024]");
     conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
             "TestSparkStatistics-local-dir").toString());
 

http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java
index fe8a32f..341da33 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
+import java.net.MalformedURLException;
 import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -66,12 +68,10 @@ public class TestJdbcWithLocalClusterSpark {
   private Connection hs2Conn = null;
   private Statement stmt;
 
-  private static HiveConf createHiveConf() {
+  private static HiveConf createHiveConf() throws MalformedURLException {
+    String confDir = "../../data/conf/spark/standalone/hive-site.xml";
+    HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
     HiveConf conf = new HiveConf();
-    conf.set("hive.execution.engine", "spark");
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-    conf.set("spark.master", "local-cluster[2,2,1024]");
-    conf.set("hive.spark.client.connect.timeout", "30000ms");
     // FIXME: Hadoop3 made the incompatible change for dfs.client.datanode-restart.timeout
     // while spark2 is still using Hadoop2.
     // Spark requires Hive to support Hadoop3 first then Spark can start

http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java
index efb3759..b2ddff7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java
@@ -18,9 +18,11 @@
 
 package org.apache.hive.jdbc;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.io.Writer;
+import java.net.MalformedURLException;
 import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -32,7 +34,6 @@ import java.util.Collections;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.processors.ErasureProcessor;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsErasureCodingShim;
@@ -63,19 +64,17 @@ public class TestJdbcWithMiniHS2ErasureCoding {
   private static HiveConf conf;
   private Connection hs2Conn = null;
 
-  private static HiveConf createHiveOnSparkConf() {
+  private static HiveConf createHiveOnSparkConf() throws MalformedURLException {
+    String confDir = "../../data/conf/spark/standalone/hive-site.xml";
+    HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
     HiveConf hiveConf = new HiveConf();
     // Tell dfs not to consider load when choosing a datanode as this can cause failure as
     // in a test we do not have spare datanode capacity.
     hiveConf.setBoolean("dfs.namenode.redundancy.considerLoad", false);
-    hiveConf.set("hive.execution.engine", "spark");
-    hiveConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-    hiveConf.set("spark.master", "local-cluster[2,2,1024]");
     hiveConf.set("hive.spark.client.connect.timeout", "30000ms");
     hiveConf.set("spark.local.dir",
         Paths.get(System.getProperty("test.tmp.dir"), "TestJdbcWithMiniHS2ErasureCoding-local-dir")
             .toString());
-    hiveConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); // avoid ZK errors
     return hiveConf;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java
index 79d56f5..f7586c1 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java
@@ -18,6 +18,8 @@
 
 package org.apache.hive.jdbc;
 
+import java.io.File;
+import java.net.MalformedURLException;
 import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -73,14 +75,12 @@ public class TestMultiSessionsHS2WithLocalClusterSpark {
   private ExecutorService pool = null;
 
 
-  private static HiveConf createHiveConf() {
+  private static HiveConf createHiveConf() throws MalformedURLException {
+    String confDir = "../../data/conf/spark/standalone/hive-site.xml";
+    HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
     HiveConf conf = new HiveConf();
     conf.set("hive.exec.parallel", "true");
-    conf.set("hive.execution.engine", "spark");
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-    conf.set("spark.master", "local-cluster[2,2,1024]");
     conf.set("spark.deploy.defaultCores", "2");
-    conf.set("hive.spark.client.connect.timeout", "30000ms");
     // FIXME: Hadoop3 made the incompatible change for dfs.client.datanode-restart.timeout
     // while spark2 is still using Hadoop2.
     // Spark requires Hive to support Hadoop3 first then Spark can start

http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java
----------------------------------------------------------------------
diff --git a/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java b/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java
index 838ad99..001ab8e 100644
--- a/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java
+++ b/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java
@@ -54,7 +54,7 @@ public class HiveKryoRegistrator implements KryoRegistrator {
     }
   }
 
-  private static class BytesWritableSerializer extends Serializer<BytesWritable> {
+  static class BytesWritableSerializer extends Serializer<BytesWritable> {
 
     public void write(Kryo kryo, Output output, BytesWritable object) {
       output.writeVarInt(object.getLength(), true);

http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java
----------------------------------------------------------------------
diff --git a/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java b/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java
new file mode 100644
index 0000000..d4bcc5b
--- /dev/null
+++ b/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.spark;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.KryoSerializer;
+
+
+/**
+ * A {@link KryoSerializer} that does not serialize hash codes while serializing a
+ * {@link HiveKey}. This decreases the amount of data to be shuffled during a Spark shuffle.
+ */
+public class NoHashCodeKryoSerializer extends KryoSerializer {
+
+  private static final long serialVersionUID = 3350910170041648022L;
+
+  public NoHashCodeKryoSerializer(SparkConf conf) {
+    super(conf);
+  }
+
+  @Override
+  public Kryo newKryo() {
+    Kryo kryo = super.newKryo();
+    kryo.register(HiveKey.class, new HiveKeySerializer());
+    kryo.register(BytesWritable.class, new HiveKryoRegistrator.BytesWritableSerializer());
+    return kryo;
+  }
+
+  private static class HiveKeySerializer extends Serializer<HiveKey> {
+
+    public void write(Kryo kryo, Output output, HiveKey object) {
+      output.writeVarInt(object.getLength(), true);
+      output.write(object.getBytes(), 0, object.getLength());
+    }
+
+    public HiveKey read(Kryo kryo, Input input, Class<HiveKey> type) {
+      int len = input.readVarInt(true);
+      byte[] bytes = new byte[len];
+      input.readBytes(bytes);
+      return new HiveKey(bytes);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleKryoSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleKryoSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleKryoSerializer.java
new file mode 100644
index 0000000..47a0f77
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleKryoSerializer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hive.spark.client.SparkClientUtilities;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.FileNotFoundException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+
+
+final class ShuffleKryoSerializer {
+
+  private static final String HIVE_SHUFFLE_KRYO_SERIALIZER = "org.apache.hive.spark.NoHashCodeKryoSerializer";
+
+  private static org.apache.spark.serializer.KryoSerializer INSTANCE;
+
+  private ShuffleKryoSerializer() {
+    // Don't create me
+  }
+
+  static org.apache.spark.serializer.KryoSerializer getInstance(JavaSparkContext sc,
+                                                                Configuration conf) {
+    if (INSTANCE == null) {
+      synchronized (ShuffleKryoSerializer.class) {
+        if (INSTANCE == null) {
+          try {
+            INSTANCE = (org.apache.spark.serializer.KryoSerializer) Thread.currentThread().getContextClassLoader().loadClass(
+                    HIVE_SHUFFLE_KRYO_SERIALIZER).getConstructor(SparkConf.class).newInstance(
+                    sc.getConf());
+            return INSTANCE;
+          } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
+            throw new IllegalStateException(
+                    "Unable to create kryo serializer for shuffle RDDs using " +
+                            "class " + HIVE_SHUFFLE_KRYO_SERIALIZER, e);
+          }
+        } else {
+          return INSTANCE;
+        }
+      }
+    }
+    return INSTANCE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
index 22b598f..1bf5a56 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
@@ -23,19 +23,24 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.rdd.ShuffledRDD;
+import org.apache.spark.serializer.KryoSerializer;
 import org.apache.spark.storage.StorageLevel;
 
+
 public class SortByShuffler implements SparkShuffler<BytesWritable> {
 
   private final boolean totalOrder;
   private final SparkPlan sparkPlan;
+  private final KryoSerializer shuffleSerializer;
 
   /**
    * @param totalOrder whether this shuffler provides total order shuffle.
    */
-  public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) {
+  public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan, KryoSerializer shuffleSerializer) {
     this.totalOrder = totalOrder;
     this.sparkPlan = sparkPlan;
+    this.shuffleSerializer = shuffleSerializer;
   }
 
   @Override
@@ -56,6 +61,11 @@ public class SortByShuffler implements SparkShuffler<BytesWritable> {
       Partitioner partitioner = new HashPartitioner(numPartitions);
       rdd = input.repartitionAndSortWithinPartitions(partitioner);
     }
+    if (shuffleSerializer != null) {
+      if (rdd.rdd() instanceof ShuffledRDD) {
+        ((ShuffledRDD) rdd.rdd()).setSerializer(shuffleSerializer);
+      }
+    }
     return rdd;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index 001d0b0..806deb5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -18,13 +18,19 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.hive.spark.client.SparkClientUtilities;
+import org.apache.spark.SparkConf;
 import org.apache.spark.util.CallSite;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,10 +74,11 @@ import com.google.common.base.Preconditions;
 
 @SuppressWarnings("rawtypes")
 public class SparkPlanGenerator {
+
   private static final String CLASS_NAME = SparkPlanGenerator.class.getName();
-  private final PerfLogger perfLogger = SessionState.getPerfLogger();
   private static final Logger LOG = LoggerFactory.getLogger(SparkPlanGenerator.class);
 
+  private final PerfLogger perfLogger = SessionState.getPerfLogger();
   private final JavaSparkContext sc;
   private final JobConf jobConf;
   private final Context context;
@@ -82,6 +89,7 @@ public class SparkPlanGenerator {
   private final Map<BaseWork, SparkTran> workToParentWorkTranMap;
   // a map from each BaseWork to its cloned JobConf
   private final Map<BaseWork, JobConf> workToJobConf;
+  private final org.apache.spark.serializer.KryoSerializer shuffleSerializer;
 
   public SparkPlanGenerator(
     JavaSparkContext sc,
@@ -98,6 +106,11 @@ public class SparkPlanGenerator {
     this.workToParentWorkTranMap = new HashMap<BaseWork, SparkTran>();
     this.sparkReporter = sparkReporter;
     this.workToJobConf = new HashMap<BaseWork, JobConf>();
+    if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE)) {
+      this.shuffleSerializer = ShuffleKryoSerializer.getInstance(sc, jobConf);
+    } else {
+      this.shuffleSerializer = null;
+    }
   }
 
   public SparkPlan generate(SparkWork sparkWork) throws Exception {
@@ -251,9 +264,9 @@ public class SparkPlanGenerator {
         "AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
     SparkShuffler shuffler;
     if (edge.isMRShuffle()) {
-      shuffler = new SortByShuffler(false, sparkPlan);
+      shuffler = new SortByShuffler(false, sparkPlan, shuffleSerializer);
     } else if (edge.isShuffleSort()) {
-      shuffler = new SortByShuffler(true, sparkPlan);
+      shuffler = new SortByShuffler(true, sparkPlan, shuffleSerializer);
     } else {
       shuffler = new GroupByShuffler();
     }
@@ -398,5 +411,4 @@ public class SparkPlanGenerator {
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
index a1f9446..b6f78d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
@@ -38,6 +38,10 @@ public class HiveKey extends BytesWritable {
     hashCodeValid = false;
   }
 
+  public HiveKey(byte[] bytes) {
+    super(bytes);
+  }
+
   public HiveKey(byte[] bytes, int hashcode) {
     super(bytes);
     hashCode = hashcode;

http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java
index b960508..f42cffd 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java
@@ -37,6 +37,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.File;
 import java.nio.file.Paths;
 import java.util.List;
 
@@ -48,19 +49,19 @@ public class TestHiveSparkClient {
 
   @Test
   public void testSetJobGroupAndDescription() throws Exception {
-
+    String confDir = "../data/conf/spark/local/hive-site.xml";
+    HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
     HiveConf conf = new HiveConf();
-    conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
-            SQLStdHiveAuthorizerFactory.class.getName());
-    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
-    conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark");
-    conf.set("spark.master", "local");
+
+    // Set to false because we don't launch a job using LocalHiveSparkClient so the
+    // hive-kryo-registrator jar is never added to the classpath
+    conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false);
     conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
             "TestHiveSparkClient-local-dir").toString());
 
     SessionState.start(conf);
     FileSystem fs = FileSystem.getLocal(conf);
-    Path tmpDir = new Path("TestSparkPlan-tmp");
+    Path tmpDir = new Path("TestHiveSparkClient-tmp");
 
     IDriver driver = null;
     JavaSparkContext sc = null;
@@ -81,7 +82,7 @@ public class TestHiveSparkClient {
 
       SparkConf sparkConf = new SparkConf();
       sparkConf.setMaster("local");
-      sparkConf.setAppName("TestSparkPlan-app");
+      sparkConf.setAppName("TestHiveSparkClient-app");
       sc = new JavaSparkContext(sparkConf);
 
       byte[] jobConfBytes = KryoSerializer.serializeJobConf(jobConf);

http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java
index 5f47bb4..ef02a29 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java
@@ -46,6 +46,8 @@ import org.junit.Test;
 import scala.Tuple2;
 import scala.collection.JavaConversions;
 
+import java.io.File;
+import java.nio.file.Paths;
 import java.util.List;
 
 
@@ -53,11 +55,15 @@ public class TestSparkPlan {
 
   @Test
   public void testSetRDDCallSite() throws Exception {
+    String confDir = "../data/conf/spark/local/hive-site.xml";
+    HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
     HiveConf conf = new HiveConf();
-    conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
-            SQLStdHiveAuthorizerFactory.class.getName());
-    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
-    conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark");
+
+    // Set to false because we don't launch a job using LocalHiveSparkClient so the
+    // hive-kryo-registrator jar is never added to the classpath
+    conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false);
+    conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
+            "TestSparkPlan-local-dir").toString());
 
     FileSystem fs = FileSystem.getLocal(conf);
     Path tmpDir = new Path("TestSparkPlan-tmp");

http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
index 681463e..d738003 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
@@ -35,6 +35,7 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.InputStream;
 import java.io.Serializable;
+import java.net.MalformedURLException;
 import java.net.URI;
 import java.nio.file.Paths;
 import java.util.Arrays;
@@ -70,7 +71,13 @@ public class TestSparkClient {
   private static final HiveConf HIVECONF = new HiveConf();
 
   static {
-    HIVECONF.set("hive.spark.client.connect.timeout", "30000ms");
+    String confDir = "../data/conf/spark/standalone/hive-site.xml";
+    try {
+      HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
+    } catch (MalformedURLException e) {
+      throw new RuntimeException(e);
+    }
+    HIVECONF.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, true);
     HIVECONF.setVar(HiveConf.ConfVars.SPARK_CLIENT_TYPE, HiveConf.HIVE_SPARK_LAUNCHER_CLIENT);
   }