You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/18 02:17:02 UTC
svn commit: r1618530 -
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
Author: brock
Date: Mon Aug 18 00:17:02 2014
New Revision: 1618530
URL: http://svn.apache.org/r1618530
Log:
HIVE-7746 - Cleanup SparkClient and make refreshLocalResources method synchronized (Venki Korukanti via Brock) [Spark Branch]
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1618530&r1=1618529&r2=1618530&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Mon Aug 18 00:17:02 2014
@@ -18,7 +18,8 @@
package org.apache.hadoop.hive.ql.exec.spark;
-import org.apache.commons.lang.StringUtils;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -42,9 +43,13 @@ import java.util.*;
public class SparkClient implements Serializable {
private static final long serialVersionUID = 1L;
+
+ private static final String MR_JAR_PROPERTY = "tmpjars";
protected static transient final Log LOG = LogFactory
.getLog(SparkClient.class);
+ private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
+
private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
private static final String SPARK_DEFAULT_MASTER = "local";
private static final String SAPRK_DEFAULT_APP_NAME = "Hive on Spark";
@@ -164,72 +169,48 @@ public class SparkClient implements Seri
return 0;
}
- private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
+ /**
+ * At this point single SparkContext is used by more than one thread, so make this
+ * method synchronized.
+ *
+ * TODO: This method can't remove a jar/resource from SparkContext. Looks like this is an
+ * issue we have to live with until multiple SparkContexts are supported in a single JVM.
+ */
+ private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
// add hive-exec jar
- String hiveJar = conf.getJar();
- if (!localJars.contains(hiveJar)) {
- localJars.add(hiveJar);
- sc.addJar(hiveJar);
- }
+ addJars(conf.getJar());
+
// add aux jars
- String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
- if (StringUtils.isNotEmpty(auxJars) && StringUtils.isNotBlank(auxJars)) {
- addJars(auxJars);
- }
+ addJars(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS));
// add added jars
- String addedJars = Utilities.getResourceFiles(conf,
- SessionState.ResourceType.JAR);
- if (StringUtils.isNotEmpty(addedJars) && StringUtils.isNotBlank(addedJars)) {
- HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
- addJars(addedJars);
- }
+ String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
+ addJars(addedJars);
// add plugin module jars on demand
- final String MR_JAR_PROPERTY = "tmpjars";
// jobConf will hold all the configuration for hadoop, tez, and hive
JobConf jobConf = new JobConf(conf);
- jobConf.setStrings(MR_JAR_PROPERTY, new String[0]);
-
+ jobConf.set(MR_JAR_PROPERTY, "");
for (BaseWork work : sparkWork.getAllWork()) {
work.configureJobConf(jobConf);
}
-
- String[] newTmpJars = jobConf.getStrings(MR_JAR_PROPERTY);
- if (newTmpJars != null && newTmpJars.length > 0) {
- for (String tmpJar : newTmpJars) {
- if (StringUtils.isNotEmpty(tmpJar) && StringUtils.isNotBlank(tmpJar)
- && !localJars.contains(tmpJar)) {
- localJars.add(tmpJar);
- sc.addJar(tmpJar);
- }
- }
- }
+ addJars(conf.get(MR_JAR_PROPERTY));
// add added files
- String addedFiles = Utilities.getResourceFiles(conf,
- SessionState.ResourceType.FILE);
- if (StringUtils.isNotEmpty(addedFiles)
- && StringUtils.isNotBlank(addedFiles)) {
- HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles);
- addResources(addedFiles);
- }
+ String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles);
+ addResources(addedFiles);
// add added archives
- String addedArchives = Utilities.getResourceFiles(conf,
- SessionState.ResourceType.ARCHIVE);
- if (StringUtils.isNotEmpty(addedArchives)
- && StringUtils.isNotBlank(addedArchives)) {
- HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives);
- addResources(addedArchives);
- }
+ String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives);
+ addResources(addedArchives);
}
private void addResources(String addedFiles) {
- for (String addedFile : addedFiles.split(",")) {
- if (StringUtils.isNotEmpty(addedFile)
- && StringUtils.isNotBlank(addedFile)
- && !localFiles.contains(addedFile)) {
+ for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) {
+ if (!localFiles.contains(addedFile)) {
localFiles.add(addedFile);
sc.addFile(addedFile);
}
@@ -237,9 +218,8 @@ public class SparkClient implements Seri
}
private void addJars(String addedJars) {
- for (String addedJar : addedJars.split(",")) {
- if (StringUtils.isNotEmpty(addedJar) && StringUtils.isNotBlank(addedJar)
- && !localJars.contains(addedJar)) {
+ for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) {
+ if (!localJars.contains(addedJar)) {
localJars.add(addedJar);
sc.addJar(addedJar);
}