You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/07/27 13:43:53 UTC
hive git commit: HIVE-20032: Don't serialize hashCode for
repartitionAndSortWithinPartitions (Sahil Takiar, reviewed by Rui Li)
Repository: hive
Updated Branches:
refs/heads/master 94ec36865 -> 1e437e2b1
HIVE-20032: Don't serialize hashCode for repartitionAndSortWithinPartitions (Sahil Takiar, reviewed by Rui Li)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1e437e2b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1e437e2b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1e437e2b
Branch: refs/heads/master
Commit: 1e437e2b1e00aa1942781f0a9b5b2f0868644b81
Parents: 94ec368
Author: Sahil Takiar <ta...@gmail.com>
Authored: Thu Jun 28 17:50:41 2018 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Fri Jul 27 08:38:28 2018 -0500
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +-
.../hive/ql/exec/spark/TestSparkStatistics.java | 12 ++--
.../jdbc/TestJdbcWithLocalClusterSpark.java | 10 +--
.../jdbc/TestJdbcWithMiniHS2ErasureCoding.java | 11 ++--
...stMultiSessionsHS2WithLocalClusterSpark.java | 10 +--
.../apache/hive/spark/HiveKryoRegistrator.java | 2 +-
.../hive/spark/NoHashCodeKryoSerializer.java | 65 ++++++++++++++++++++
.../ql/exec/spark/ShuffleKryoSerializer.java | 62 +++++++++++++++++++
.../hive/ql/exec/spark/SortByShuffler.java | 12 +++-
.../hive/ql/exec/spark/SparkPlanGenerator.java | 20 ++++--
.../org/apache/hadoop/hive/ql/io/HiveKey.java | 4 ++
.../hive/ql/exec/spark/TestHiveSparkClient.java | 17 ++---
.../hive/ql/exec/spark/TestSparkPlan.java | 14 +++--
.../hive/spark/client/TestSparkClient.java | 9 ++-
14 files changed, 208 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9f1da60..39c77b3 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4239,7 +4239,7 @@ public class HiveConf extends Configuration {
"If this is set to true, mapjoin optimization in Hive/Spark will use statistics from\n" +
"TableScan operators at the root of operator tree, instead of parent ReduceSink\n" +
"operators of the Join operator."),
- SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", false,
+ SPARK_OPTIMIZE_SHUFFLE_SERDE("hive.spark.optimize.shuffle.serde", true,
"If this is set to true, Hive on Spark will register custom serializers for data types\n" +
"in shuffle. This should result in less shuffled data."),
SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
index d383873..191d5f5 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java
@@ -33,6 +33,9 @@ import org.apache.hadoop.hive.ql.session.SessionState;
import org.junit.Assert;
import org.junit.Test;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
@@ -41,13 +44,10 @@ import java.util.stream.Collectors;
public class TestSparkStatistics {
@Test
- public void testSparkStatistics() {
+ public void testSparkStatistics() throws MalformedURLException {
+ String confDir = "../../data/conf/spark/standalone/hive-site.xml";
+ HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
HiveConf conf = new HiveConf();
- conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
- SQLStdHiveAuthorizerFactory.class.getName());
- conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
- conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark");
- conf.set("spark.master", "local-cluster[1,2,1024]");
conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
"TestSparkStatistics-local-dir").toString());
http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java
index fe8a32f..341da33 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.net.MalformedURLException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -66,12 +68,10 @@ public class TestJdbcWithLocalClusterSpark {
private Connection hs2Conn = null;
private Statement stmt;
- private static HiveConf createHiveConf() {
+ private static HiveConf createHiveConf() throws MalformedURLException {
+ String confDir = "../../data/conf/spark/standalone/hive-site.xml";
+ HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
HiveConf conf = new HiveConf();
- conf.set("hive.execution.engine", "spark");
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- conf.set("spark.master", "local-cluster[2,2,1024]");
- conf.set("hive.spark.client.connect.timeout", "30000ms");
// FIXME: Hadoop3 made the incompatible change for dfs.client.datanode-restart.timeout
// while spark2 is still using Hadoop2.
// Spark requires Hive to support Hadoop3 first then Spark can start
http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java
index efb3759..b2ddff7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java
@@ -18,9 +18,11 @@
package org.apache.hive.jdbc;
+import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
+import java.net.MalformedURLException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -32,7 +34,6 @@ import java.util.Collections;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.processors.ErasureProcessor;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsErasureCodingShim;
@@ -63,19 +64,17 @@ public class TestJdbcWithMiniHS2ErasureCoding {
private static HiveConf conf;
private Connection hs2Conn = null;
- private static HiveConf createHiveOnSparkConf() {
+ private static HiveConf createHiveOnSparkConf() throws MalformedURLException {
+ String confDir = "../../data/conf/spark/standalone/hive-site.xml";
+ HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
HiveConf hiveConf = new HiveConf();
// Tell dfs not to consider load when choosing a datanode as this can cause failure as
// in a test we do not have spare datanode capacity.
hiveConf.setBoolean("dfs.namenode.redundancy.considerLoad", false);
- hiveConf.set("hive.execution.engine", "spark");
- hiveConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- hiveConf.set("spark.master", "local-cluster[2,2,1024]");
hiveConf.set("hive.spark.client.connect.timeout", "30000ms");
hiveConf.set("spark.local.dir",
Paths.get(System.getProperty("test.tmp.dir"), "TestJdbcWithMiniHS2ErasureCoding-local-dir")
.toString());
- hiveConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); // avoid ZK errors
return hiveConf;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java
index 79d56f5..f7586c1 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestMultiSessionsHS2WithLocalClusterSpark.java
@@ -18,6 +18,8 @@
package org.apache.hive.jdbc;
+import java.io.File;
+import java.net.MalformedURLException;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -73,14 +75,12 @@ public class TestMultiSessionsHS2WithLocalClusterSpark {
private ExecutorService pool = null;
- private static HiveConf createHiveConf() {
+ private static HiveConf createHiveConf() throws MalformedURLException {
+ String confDir = "../../data/conf/spark/standalone/hive-site.xml";
+ HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
HiveConf conf = new HiveConf();
conf.set("hive.exec.parallel", "true");
- conf.set("hive.execution.engine", "spark");
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- conf.set("spark.master", "local-cluster[2,2,1024]");
conf.set("spark.deploy.defaultCores", "2");
- conf.set("hive.spark.client.connect.timeout", "30000ms");
// FIXME: Hadoop3 made the incompatible change for dfs.client.datanode-restart.timeout
// while spark2 is still using Hadoop2.
// Spark requires Hive to support Hadoop3 first then Spark can start
http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java
----------------------------------------------------------------------
diff --git a/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java b/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java
index 838ad99..001ab8e 100644
--- a/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java
+++ b/kryo-registrator/src/main/java/org/apache/hive/spark/HiveKryoRegistrator.java
@@ -54,7 +54,7 @@ public class HiveKryoRegistrator implements KryoRegistrator {
}
}
- private static class BytesWritableSerializer extends Serializer<BytesWritable> {
+ static class BytesWritableSerializer extends Serializer<BytesWritable> {
public void write(Kryo kryo, Output output, BytesWritable object) {
output.writeVarInt(object.getLength(), true);
http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java
----------------------------------------------------------------------
diff --git a/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java b/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java
new file mode 100644
index 0000000..d4bcc5b
--- /dev/null
+++ b/kryo-registrator/src/main/java/org/apache/hive/spark/NoHashCodeKryoSerializer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.spark;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.KryoSerializer;
+
+
+/**
+ * A {@link KryoSerializer} that does not serialize hash codes while serializing a
+ * {@link HiveKey}. This decreases the amount of data to be shuffled during a Spark shuffle.
+ */
+public class NoHashCodeKryoSerializer extends KryoSerializer {
+
+ private static final long serialVersionUID = 3350910170041648022L;
+
+ public NoHashCodeKryoSerializer(SparkConf conf) {
+ super(conf);
+ }
+
+ @Override
+ public Kryo newKryo() {
+ Kryo kryo = super.newKryo();
+ kryo.register(HiveKey.class, new HiveKeySerializer());
+ kryo.register(BytesWritable.class, new HiveKryoRegistrator.BytesWritableSerializer());
+ return kryo;
+ }
+
+ private static class HiveKeySerializer extends Serializer<HiveKey> {
+
+ public void write(Kryo kryo, Output output, HiveKey object) {
+ output.writeVarInt(object.getLength(), true);
+ output.write(object.getBytes(), 0, object.getLength());
+ }
+
+ public HiveKey read(Kryo kryo, Input input, Class<HiveKey> type) {
+ int len = input.readVarInt(true);
+ byte[] bytes = new byte[len];
+ input.readBytes(bytes);
+ return new HiveKey(bytes);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleKryoSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleKryoSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleKryoSerializer.java
new file mode 100644
index 0000000..47a0f77
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleKryoSerializer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hive.spark.client.SparkClientUtilities;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.FileNotFoundException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+
+
+final class ShuffleKryoSerializer {
+
+ private static final String HIVE_SHUFFLE_KRYO_SERIALIZER = "org.apache.hive.spark.NoHashCodeKryoSerializer";
+
+ private static org.apache.spark.serializer.KryoSerializer INSTANCE;
+
+ private ShuffleKryoSerializer() {
+ // Don't create me
+ }
+
+ static org.apache.spark.serializer.KryoSerializer getInstance(JavaSparkContext sc,
+ Configuration conf) {
+ if (INSTANCE == null) {
+ synchronized (ShuffleKryoSerializer.class) {
+ if (INSTANCE == null) {
+ try {
+ INSTANCE = (org.apache.spark.serializer.KryoSerializer) Thread.currentThread().getContextClassLoader().loadClass(
+ HIVE_SHUFFLE_KRYO_SERIALIZER).getConstructor(SparkConf.class).newInstance(
+ sc.getConf());
+ return INSTANCE;
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
+ throw new IllegalStateException(
+ "Unable to create kryo serializer for shuffle RDDs using " +
+ "class " + HIVE_SHUFFLE_KRYO_SERIALIZER, e);
+ }
+ } else {
+ return INSTANCE;
+ }
+ }
+ }
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
index 22b598f..1bf5a56 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
@@ -23,19 +23,24 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.rdd.ShuffledRDD;
+import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.storage.StorageLevel;
+
public class SortByShuffler implements SparkShuffler<BytesWritable> {
private final boolean totalOrder;
private final SparkPlan sparkPlan;
+ private final KryoSerializer shuffleSerializer;
/**
* @param totalOrder whether this shuffler provides total order shuffle.
*/
- public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) {
+ public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan, KryoSerializer shuffleSerializer) {
this.totalOrder = totalOrder;
this.sparkPlan = sparkPlan;
+ this.shuffleSerializer = shuffleSerializer;
}
@Override
@@ -56,6 +61,11 @@ public class SortByShuffler implements SparkShuffler<BytesWritable> {
Partitioner partitioner = new HashPartitioner(numPartitions);
rdd = input.repartitionAndSortWithinPartitions(partitioner);
}
+ if (shuffleSerializer != null) {
+ if (rdd.rdd() instanceof ShuffledRDD) {
+ ((ShuffledRDD) rdd.rdd()).setSerializer(shuffleSerializer);
+ }
+ }
return rdd;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index 001d0b0..806deb5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -18,13 +18,19 @@
package org.apache.hadoop.hive.ql.exec.spark;
+import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.hive.spark.client.SparkClientUtilities;
+import org.apache.spark.SparkConf;
import org.apache.spark.util.CallSite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,10 +74,11 @@ import com.google.common.base.Preconditions;
@SuppressWarnings("rawtypes")
public class SparkPlanGenerator {
+
private static final String CLASS_NAME = SparkPlanGenerator.class.getName();
- private final PerfLogger perfLogger = SessionState.getPerfLogger();
private static final Logger LOG = LoggerFactory.getLogger(SparkPlanGenerator.class);
+ private final PerfLogger perfLogger = SessionState.getPerfLogger();
private final JavaSparkContext sc;
private final JobConf jobConf;
private final Context context;
@@ -82,6 +89,7 @@ public class SparkPlanGenerator {
private final Map<BaseWork, SparkTran> workToParentWorkTranMap;
// a map from each BaseWork to its cloned JobConf
private final Map<BaseWork, JobConf> workToJobConf;
+ private final org.apache.spark.serializer.KryoSerializer shuffleSerializer;
public SparkPlanGenerator(
JavaSparkContext sc,
@@ -98,6 +106,11 @@ public class SparkPlanGenerator {
this.workToParentWorkTranMap = new HashMap<BaseWork, SparkTran>();
this.sparkReporter = sparkReporter;
this.workToJobConf = new HashMap<BaseWork, JobConf>();
+ if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE)) {
+ this.shuffleSerializer = ShuffleKryoSerializer.getInstance(sc, jobConf);
+ } else {
+ this.shuffleSerializer = null;
+ }
}
public SparkPlan generate(SparkWork sparkWork) throws Exception {
@@ -251,9 +264,9 @@ public class SparkPlanGenerator {
"AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
SparkShuffler shuffler;
if (edge.isMRShuffle()) {
- shuffler = new SortByShuffler(false, sparkPlan);
+ shuffler = new SortByShuffler(false, sparkPlan, shuffleSerializer);
} else if (edge.isShuffleSort()) {
- shuffler = new SortByShuffler(true, sparkPlan);
+ shuffler = new SortByShuffler(true, sparkPlan, shuffleSerializer);
} else {
shuffler = new GroupByShuffler();
}
@@ -398,5 +411,4 @@ public class SparkPlanGenerator {
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
index a1f9446..b6f78d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
@@ -38,6 +38,10 @@ public class HiveKey extends BytesWritable {
hashCodeValid = false;
}
+ public HiveKey(byte[] bytes) {
+ super(bytes);
+ }
+
public HiveKey(byte[] bytes, int hashcode) {
super(bytes);
hashCode = hashcode;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java
index b960508..f42cffd 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java
@@ -37,6 +37,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.junit.Assert;
import org.junit.Test;
+import java.io.File;
import java.nio.file.Paths;
import java.util.List;
@@ -48,19 +49,19 @@ public class TestHiveSparkClient {
@Test
public void testSetJobGroupAndDescription() throws Exception {
-
+ String confDir = "../data/conf/spark/local/hive-site.xml";
+ HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
HiveConf conf = new HiveConf();
- conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
- SQLStdHiveAuthorizerFactory.class.getName());
- conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
- conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark");
- conf.set("spark.master", "local");
+
+ // Set to false because we don't launch a job using LocalHiveSparkClient so the
+ // hive-kryo-registrator jar is never added to the classpath
+ conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false);
conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
"TestHiveSparkClient-local-dir").toString());
SessionState.start(conf);
FileSystem fs = FileSystem.getLocal(conf);
- Path tmpDir = new Path("TestSparkPlan-tmp");
+ Path tmpDir = new Path("TestHiveSparkClient-tmp");
IDriver driver = null;
JavaSparkContext sc = null;
@@ -81,7 +82,7 @@ public class TestHiveSparkClient {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local");
- sparkConf.setAppName("TestSparkPlan-app");
+ sparkConf.setAppName("TestHiveSparkClient-app");
sc = new JavaSparkContext(sparkConf);
byte[] jobConfBytes = KryoSerializer.serializeJobConf(jobConf);
http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java
index 5f47bb4..ef02a29 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkPlan.java
@@ -46,6 +46,8 @@ import org.junit.Test;
import scala.Tuple2;
import scala.collection.JavaConversions;
+import java.io.File;
+import java.nio.file.Paths;
import java.util.List;
@@ -53,11 +55,15 @@ public class TestSparkPlan {
@Test
public void testSetRDDCallSite() throws Exception {
+ String confDir = "../data/conf/spark/local/hive-site.xml";
+ HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
HiveConf conf = new HiveConf();
- conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
- SQLStdHiveAuthorizerFactory.class.getName());
- conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
- conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark");
+
+ // Set to false because we don't launch a job using LocalHiveSparkClient so the
+ // hive-kryo-registrator jar is never added to the classpath
+ conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false);
+ conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
+ "TestSparkPlan-local-dir").toString());
FileSystem fs = FileSystem.getLocal(conf);
Path tmpDir = new Path("TestSparkPlan-tmp");
http://git-wip-us.apache.org/repos/asf/hive/blob/1e437e2b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
index 681463e..d738003 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
@@ -35,6 +35,7 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.Serializable;
+import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Paths;
import java.util.Arrays;
@@ -70,7 +71,13 @@ public class TestSparkClient {
private static final HiveConf HIVECONF = new HiveConf();
static {
- HIVECONF.set("hive.spark.client.connect.timeout", "30000ms");
+ String confDir = "../data/conf/spark/standalone/hive-site.xml";
+ try {
+ HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL());
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ HIVECONF.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, true);
HIVECONF.setVar(HiveConf.ConfVars.SPARK_CLIENT_TYPE, HiveConf.HIVE_SPARK_LAUNCHER_CLIENT);
}