You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2017/10/16 00:17:32 UTC

hive git commit: HIVE-16395: ConcurrentModificationException on config object in HoS (Andrew Sherman via Sahil Takiar)

Repository: hive
Updated Branches:
  refs/heads/master 133d3c473 -> e33126281


HIVE-16395: ConcurrentModificationException on config object in HoS (Andrew Sherman via Sahil Takiar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e3312628
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e3312628
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e3312628

Branch: refs/heads/master
Commit: e331262813027ca2a1aae7fedcd1c8863ed6b751
Parents: 133d3c4
Author: Andrew Sherman <as...@cloudera.com>
Authored: Sun Oct 15 17:16:35 2017 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Sun Oct 15 17:16:35 2017 -0700

----------------------------------------------------------------------
 .../ql/exec/spark/HiveSparkClientFactory.java   |  8 ++-
 .../ql/exec/spark/session/SparkSessionImpl.java |  6 +++
 .../session/TestSparkSessionManagerImpl.java    | 51 ++++++++++++++++++++
 3 files changed, 64 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e3312628/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 194585e..597fcab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -26,10 +26,10 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.compress.utils.CharsetNames;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hive.common.LogUtils;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.spark.client.SparkClientUtilities;
 import org.slf4j.Logger;
@@ -60,6 +60,8 @@ public class HiveSparkClientFactory {
   private static final String SPARK_DEFAULT_REFERENCE_TRACKING = "false";
   private static final String SPARK_WAIT_APP_COMPLETE = "spark.yarn.submit.waitAppCompletion";
   private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";
+  @VisibleForTesting
+  public static final String SPARK_CLONE_CONFIGURATION = "spark.hadoop.cloneConf";
 
   public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception {
     Map<String, String> sparkConf = initiateSparkConf(hiveconf);
@@ -222,6 +224,10 @@ public class HiveSparkClientFactory {
       sparkConf.put(SPARK_WAIT_APP_COMPLETE, "false");
     }
 
+    // Force Spark configs to be cloned by default
+    sparkConf.putIfAbsent(SPARK_CLONE_CONFIGURATION, "true");
+
+
     // Set the credential provider passwords if found, if there is job specific password
     // the credential provider location is set directly in the execute method of LocalSparkClient
     // and submit method of RemoteHiveSparkClient when the job config is created

http://git-wip-us.apache.org/repos/asf/hive/blob/e3312628/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index 54d2cec..8d79dd9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.spark.session;
 import java.io.IOException;
 import java.util.UUID;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -174,4 +175,9 @@ public class SparkSessionImpl implements SparkSession {
   public static String makeSessionId() {
     return UUID.randomUUID().toString();
   }
+
+  @VisibleForTesting
+  HiveSparkClient getHiveSparkClient() {
+    return hiveSparkClient;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e3312628/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
index 489383b..47d2437 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/session/TestSparkSessionManagerImpl.java
@@ -27,7 +27,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClient;
+import org.apache.hadoop.hive.ql.exec.spark.HiveSparkClientFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.spark.SparkConf;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -97,6 +103,51 @@ public class TestSparkSessionManagerImpl {
     sessionManagerHS2.shutdown();
   }
 
+  /**
+   *  Test HIVE-16395 - by default we force cloning of Configurations for Spark jobs
+   */
+  @Test
+  public void testForceConfCloning() throws Exception {
+    HiveConf conf = new HiveConf();
+    conf.set("spark.master", "local");
+    String sparkCloneConfiguration = HiveSparkClientFactory.SPARK_CLONE_CONFIGURATION;
+
+    // Clear the value of sparkCloneConfiguration
+    conf.unset(sparkCloneConfiguration);
+    assertNull( "Could not clear " + sparkCloneConfiguration + " in HiveConf",
+        conf.get(sparkCloneConfiguration));
+
+    // By default we should set sparkCloneConfiguration to true in the Spark config
+    checkSparkConf(conf, sparkCloneConfiguration, "true");
+
+    // User can override value for sparkCloneConfiguration in Hive config to false
+    conf.set(sparkCloneConfiguration, "false");
+    checkSparkConf(conf, sparkCloneConfiguration, "false");
+
+    // User can override value of sparkCloneConfiguration in Hive config to true
+    conf.set(sparkCloneConfiguration, "true");
+    checkSparkConf(conf, sparkCloneConfiguration, "true");
+  }
+
+  /**
+   * Force a Spark config to be generated and check that a config value has the expected value
+   * @param conf the Hive config to use as a base
+   * @param paramName the Spark config name to check
+   * @param expectedValue the expected value in the Spark config
+   */
+  private void checkSparkConf(HiveConf conf, String paramName, String expectedValue) throws HiveException {
+    SparkSessionManager sessionManager = SparkSessionManagerImpl.getInstance();
+    SparkSessionImpl sparkSessionImpl = (SparkSessionImpl)
+        sessionManager.getSession(null, conf, true);
+    assertTrue(sparkSessionImpl.isOpen());
+    HiveSparkClient hiveSparkClient = sparkSessionImpl.getHiveSparkClient();
+    SparkConf sparkConf = hiveSparkClient.getSparkConf();
+    String cloneConfig = sparkConf.get(paramName);
+    sessionManager.closeSession(sparkSessionImpl);
+    assertEquals(expectedValue, cloneConfig);
+    sessionManager.shutdown();
+  }
+
   /* Thread simulating a user session in HiveServer2. */
   public class SessionThread implements Runnable {