You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2017/10/16 00:17:32 UTC
hive git commit: HIVE-16395: ConcurrentModificationException on
config object in HoS (Andrew Sherman via Sahil Takiar)
Repository: hive
Updated Branches:
refs/heads/master 133d3c473 -> e33126281
HIVE-16395: ConcurrentModificationException on config object in HoS (Andrew Sherman via Sahil Takiar)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e3312628
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e3312628
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e3312628
Branch: refs/heads/master
Commit: e331262813027ca2a1aae7fedcd1c8863ed6b751
Parents: 133d3c4
Author: Andrew Sherman <as...@cloudera.com>
Authored: Sun Oct 15 17:16:35 2017 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Sun Oct 15 17:16:35 2017 -0700
----------------------------------------------------------------------
.../ql/exec/spark/HiveSparkClientFactory.java | 8 ++-
.../ql/exec/spark/session/SparkSessionImpl.java | 6 +++
.../session/TestSparkSessionManagerImpl.java | 51 ++++++++++++++++++++
3 files changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e3312628/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 194585e..597fcab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -26,10 +26,10 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.compress.utils.CharsetNames;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hive.common.LogUtils;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.spark.client.SparkClientUtilities;
import org.slf4j.Logger;
@@ -60,6 +60,8 @@ public class HiveSparkClientFactory {
private static final String SPARK_DEFAULT_REFERENCE_TRACKING = "false";
private static final String SPARK_WAIT_APP_COMPLETE = "spark.yarn.submit.waitAppCompletion";
private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";
+ @VisibleForTesting
+ public static final String SPARK_CLONE_CONFIGURATION = "spark.hadoop.cloneConf";
public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception {
Map<String, String> sparkConf = initiateSparkConf(hiveconf);
@@ -222,6 +224,10 @@ public class HiveSparkClientFactory {
sparkConf.put(SPARK_WAIT_APP_COMPLETE, "false");
}
+ // Force Spark configs to be cloned by default
+ sparkConf.putIfAbsent(SPARK_CLONE_CONFIGURATION, "true");
+
+
// Set the credential provider passwords if found, if there is job specific password
// the credential provider location is set directly in the execute method of LocalSparkClient
// and submit method of RemoteHiveSparkClient when the job config is created
http://git-wip-us.apache.org/repos/asf/hive/blob/e3312628/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index 54d2cec..8d79dd9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.spark.session;
import java.io.IOException;
import java.util.UUID;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -174,4 +175,9 @@ public class SparkSessionImpl implements SparkSession {
public static String makeSessionId() {
return UUID.randomUUID().toString();
}
+
+ @VisibleForTesting
+ HiveSparkClient getHiveSparkClient() {
+ return hiveSparkClient;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e3312628/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
index 489383b..47d2437 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
@@ -27,7 +27,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.spark.SparkConf;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -97,6 +103,51 @@ public class TestSparkSessionManagerImpl {
sessionManagerHS2.shutdown();
}
+ /**
+ * Test HIVE-16395 - by default we force cloning of Configurations for Spark jobs
+ */
+ @Test
+ public void testForceConfCloning() throws Exception {
+ HiveConf conf = new HiveConf();
+ conf.set("spark.master", "local");
+ String sparkCloneConfiguration = HiveSparkClientFactory.SPARK_CLONE_CONFIGURATION;
+
+ // Clear the value of sparkCloneConfiguration
+ conf.unset(sparkCloneConfiguration);
+ assertNull( "Could not clear " + sparkCloneConfiguration + " in HiveConf",
+ conf.get(sparkCloneConfiguration));
+
+ // By default we should set sparkCloneConfiguration to true in the Spark config
+ checkSparkConf(conf, sparkCloneConfiguration, "true");
+
+ // User can override value for sparkCloneConfiguration in Hive config to false
+ conf.set(sparkCloneConfiguration, "false");
+ checkSparkConf(conf, sparkCloneConfiguration, "false");
+
+ // User can override value of sparkCloneConfiguration in Hive config to true
+ conf.set(sparkCloneConfiguration, "true");
+ checkSparkConf(conf, sparkCloneConfiguration, "true");
+ }
+
+ /**
+ * Force a Spark config to be generated and check that a config value has the expected value
+ * @param conf the Hive config to use as a base
+ * @param paramName the Spark config name to check
+ * @param expectedValue the expected value in the Spark config
+ */
+ private void checkSparkConf(HiveConf conf, String paramName, String expectedValue) throws HiveException {
+ SparkSessionManager sessionManager = SparkSessionManagerImpl.getInstance();
+ SparkSessionImpl sparkSessionImpl = (SparkSessionImpl)
+ sessionManager.getSession(null, conf, true);
+ assertTrue(sparkSessionImpl.isOpen());
+ HiveSparkClient hiveSparkClient = sparkSessionImpl.getHiveSparkClient();
+ SparkConf sparkConf = hiveSparkClient.getSparkConf();
+ String cloneConfig = sparkConf.get(paramName);
+ sessionManager.closeSession(sparkSessionImpl);
+ assertEquals(expectedValue, cloneConfig);
+ sessionManager.shutdown();
+ }
+
/* Thread simulating a user session in HiveServer2. */
public class SessionThread implements Runnable {