You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/18 02:17:02 UTC

svn commit: r1618530 - /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java

Author: brock
Date: Mon Aug 18 00:17:02 2014
New Revision: 1618530

URL: http://svn.apache.org/r1618530
Log:
HIVE-7746 - Cleanup SparkClient and make refreshLocalResources method synchronized (Venki Korukanti via Brock) [Spark Branch]

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1618530&r1=1618529&r2=1618530&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Mon Aug 18 00:17:02 2014
@@ -18,7 +18,8 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import org.apache.commons.lang.StringUtils;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -42,9 +43,13 @@ import java.util.*;
 
 public class SparkClient implements Serializable {
   private static final long serialVersionUID = 1L;
+
+  private static final String MR_JAR_PROPERTY = "tmpjars";
   protected static transient final Log LOG = LogFactory
       .getLog(SparkClient.class);
 
+  private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
+
   private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
   private static final String SPARK_DEFAULT_MASTER = "local";
   private static final String SAPRK_DEFAULT_APP_NAME = "Hive on Spark";
@@ -164,72 +169,48 @@ public class SparkClient implements Seri
     return 0;
   }
 
-  private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
+  /**
+   * At this point single SparkContext is used by more than one thread, so make this
+   * method synchronized.
+   *
+   * TODO: This method can't remove a jar/resource from SparkContext. Looks like this is an
+   * issue we have to live with until multiple SparkContexts are supported in a single JVM.
+   */
+  private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
     // add hive-exec jar
-    String hiveJar = conf.getJar();
-    if (!localJars.contains(hiveJar)) {
-      localJars.add(hiveJar);
-      sc.addJar(hiveJar);
-    }
+    addJars(conf.getJar());
+
     // add aux jars
-    String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
-    if (StringUtils.isNotEmpty(auxJars) && StringUtils.isNotBlank(auxJars)) {
-      addJars(auxJars);
-    }
+    addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS));
 
     // add added jars
-    String addedJars = Utilities.getResourceFiles(conf,
-        SessionState.ResourceType.JAR);
-    if (StringUtils.isNotEmpty(addedJars) && StringUtils.isNotBlank(addedJars)) {
-      HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
-      addJars(addedJars);
-    }
+    String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR);
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
+    addJars(addedJars);
 
     // add plugin module jars on demand
-    final String MR_JAR_PROPERTY = "tmpjars";
     // jobConf will hold all the configuration for hadoop, tez, and hive
     JobConf jobConf = new JobConf(conf);
-    jobConf.setStrings(MR_JAR_PROPERTY, new String[0]);
-
+    jobConf.set(MR_JAR_PROPERTY, "");
     for (BaseWork work : sparkWork.getAllWork()) {
       work.configureJobConf(jobConf);
     }
-
-    String[] newTmpJars = jobConf.getStrings(MR_JAR_PROPERTY);
-    if (newTmpJars != null && newTmpJars.length > 0) {
-      for (String tmpJar : newTmpJars) {
-        if (StringUtils.isNotEmpty(tmpJar) && StringUtils.isNotBlank(tmpJar)
-            && !localJars.contains(tmpJar)) {
-          localJars.add(tmpJar);
-          sc.addJar(tmpJar);
-        }
-      }
-    }
+    addJars(conf.get(MR_JAR_PROPERTY));
 
     // add added files
-    String addedFiles = Utilities.getResourceFiles(conf,
-        SessionState.ResourceType.FILE);
-    if (StringUtils.isNotEmpty(addedFiles)
-        && StringUtils.isNotBlank(addedFiles)) {
-      HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles);
-      addResources(addedFiles);
-    }
+    String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles);
+    addResources(addedFiles);
 
     // add added archives
-    String addedArchives = Utilities.getResourceFiles(conf,
-        SessionState.ResourceType.ARCHIVE);
-    if (StringUtils.isNotEmpty(addedArchives)
-        && StringUtils.isNotBlank(addedArchives)) {
-      HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives);
-      addResources(addedArchives);
-    }
+    String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE);
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives);
+    addResources(addedArchives);
   }
 
   private void addResources(String addedFiles) {
-    for (String addedFile : addedFiles.split(",")) {
-      if (StringUtils.isNotEmpty(addedFile)
-          && StringUtils.isNotBlank(addedFile)
-          && !localFiles.contains(addedFile)) {
+    for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) {
+      if (!localFiles.contains(addedFile)) {
         localFiles.add(addedFile);
         sc.addFile(addedFile);
       }
@@ -237,9 +218,8 @@ public class SparkClient implements Seri
   }
 
   private void addJars(String addedJars) {
-    for (String addedJar : addedJars.split(",")) {
-      if (StringUtils.isNotEmpty(addedJar) && StringUtils.isNotBlank(addedJar)
-          && !localJars.contains(addedJar)) {
+    for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) {
+      if (!localJars.contains(addedJar)) {
         localJars.add(addedJar);
         sc.addJar(addedJar);
       }