You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2015/11/05 09:52:03 UTC

hive git commit: HIVE-12229: Custom script in query cannot be executed in yarn-cluster mode [Spark Branch] (Rui reviewed by Xuefu)

Repository: hive
Updated Branches:
  refs/heads/spark fd1192914 -> b02cd4abc


HIVE-12229: Custom script in query cannot be executed in yarn-cluster mode [Spark Branch] (Rui reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b02cd4ab
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b02cd4ab
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b02cd4ab

Branch: refs/heads/spark
Commit: b02cd4abce10003dc90646b710875fba00b9b5b0
Parents: fd11929
Author: Rui Li <ru...@intel.com>
Authored: Thu Nov 5 16:48:25 2015 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Nov 5 16:51:22 2015 +0800

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/ScriptOperator.java     | 15 ++++++++++++
 .../ql/exec/spark/RemoteHiveSparkClient.java    |  4 ++--
 .../hive/ql/exec/spark/SparkUtilities.java      | 10 ++++----
 .../apache/hive/spark/client/JobContext.java    |  4 ++--
 .../hive/spark/client/JobContextImpl.java       |  8 +++----
 .../hive/spark/client/SparkClientImpl.java      |  2 +-
 .../hive/spark/client/SparkClientUtilities.java | 24 +++++++++++++-------
 7 files changed, 43 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
index 5df9ea2..63837ce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkEnv;
 import org.apache.spark.SparkFiles;
 
 import java.io.BufferedInputStream;
@@ -329,6 +331,7 @@ public class ScriptOperator extends Operator<ScriptDesc> implements
     // initialize the user's process only when you receive the first row
     if (firstRow) {
       firstRow = false;
+      SparkConf sparkConf = null;
       try {
         String[] cmdArgs = splitArgs(conf.getScriptCmd());
 
@@ -341,6 +344,7 @@ public class ScriptOperator extends Operator<ScriptDesc> implements
 
           // In spark local mode, we need to search added files in root directory.
           if (HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+            sparkConf = SparkEnv.get().conf();
             finder.prependPathComponent(SparkFiles.getRootDirectory());
           }
           File f = finder.getAbsolutePath(prog);
@@ -371,6 +375,17 @@ public class ScriptOperator extends Operator<ScriptDesc> implements
         String idEnvVarVal = getOperatorId();
         env.put(safeEnvVarName(idEnvVarName), idEnvVarVal);
 
+        // For spark, in non-local mode, any added dependencies are stored at
+        // SparkFiles::getRootDirectory, which is the executor's working directory.
+        // In local mode, we need to manually point the process's working directory to it,
+        // in order to make the dependencies accessible.
+        if (sparkConf != null) {
+          String master = sparkConf.get("spark.master");
+          if (master.equals("local") || master.startsWith("local[")) {
+            pb.directory(new File(SparkFiles.getRootDirectory()));
+          }
+        }
+
         scriptPid = pb.start(); // Runtime.getRuntime().exec(wrappedCmdArgs);
 
         DataOutputStream scriptOut = new DataOutputStream(

http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 2e8d1d3..cf81424 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -295,11 +295,11 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
 
       // Add jar to current thread class loader dynamically, and add jar paths to JobConf as Spark
       // may need to load classes from this jar in other threads.
-      Set<String> addedJars = jc.getAddedJars();
+      Map<String, Long> addedJars = jc.getAddedJars();
       if (addedJars != null && !addedJars.isEmpty()) {
         SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir());
         KryoSerializer.setClassLoader(Thread.currentThread().getContextClassLoader());
-        localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";"));
+        localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars.keySet(), ";"));
       }
 
       Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class);

http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index cf2c3bc..0268469 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -22,8 +22,6 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
-import java.util.UUID;
-import java.util.Collection;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.FilenameUtils;
@@ -91,11 +89,11 @@ public class SparkUtilities {
    */
   public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException {
     Path localFile = new Path(source.getPath());
-    // give the uploaded file a UUID
-    Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf),
-        UUID.randomUUID() + "-" + getFileName(source));
+    Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf), getFileName(source));
     FileSystem fileSystem = FileSystem.get(conf);
-    fileSystem.copyFromLocalFile(localFile, remoteFile);
+    // Overwrite if the remote file already exists. Whether the file can be added
+    // on executor is up to spark, i.e. spark.files.overwrite
+    fileSystem.copyFromLocalFile(false, true, localFile, remoteFile);
     Path fullPath = fileSystem.getFileStatus(remoteFile).getPath();
     return fullPath.toUri();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
index af6332e..c9c975b 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
@@ -55,9 +55,9 @@ public interface JobContext {
   Map<String, List<JavaFutureAction<?>>> getMonitoredJobs();
 
   /**
-   * Return all added jar path which added through AddJarJob.
+   * Return all added jar path and timestamp which added through AddJarJob.
    */
-  Set<String> getAddedJars();
+  Map<String, Long> getAddedJars();
 
   /**
    * Returns a local tmp dir specific to the context

http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
index beed8a3..b73bcd7 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
@@ -18,12 +18,10 @@
 package org.apache.hive.spark.client;
 
 import java.io.File;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.hive.spark.counter.SparkCounters;
 
@@ -35,14 +33,14 @@ class JobContextImpl implements JobContext {
   private final JavaSparkContext sc;
   private final ThreadLocal<MonitorCallback> monitorCb;
   private final Map<String, List<JavaFutureAction<?>>> monitoredJobs;
-  private final Set<String> addedJars;
+  private final Map<String, Long> addedJars;
   private final File localTmpDir;
 
   public JobContextImpl(JavaSparkContext sc, File localTmpDir) {
     this.sc = sc;
     this.monitorCb = new ThreadLocal<MonitorCallback>();
     monitoredJobs = new ConcurrentHashMap<String, List<JavaFutureAction<?>>>();
-    addedJars = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+    addedJars = new ConcurrentHashMap<>();
     this.localTmpDir = localTmpDir;
   }
 
@@ -65,7 +63,7 @@ class JobContextImpl implements JobContext {
   }
 
   @Override
-  public Set<String> getAddedJars() {
+  public Map<String, Long> getAddedJars() {
     return addedJars;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index ceebbb3..3d682a0 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -617,7 +617,7 @@ class SparkClientImpl implements SparkClient {
       jc.sc().addJar(path);
       // Following remote job may refer to classes in this jar, and the remote job would be executed
       // in a different thread, so we add this jar path to JobContext for further usage.
-      jc.getAddedJars().add(path);
+      jc.getAddedJars().put(path, System.currentTimeMillis());
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
index 589436d..bbbd97b 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
@@ -24,7 +24,8 @@ import java.io.File;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -35,20 +36,21 @@ import org.apache.hadoop.fs.Path;
 
 public class SparkClientUtilities {
   protected static final transient Log LOG = LogFactory.getLog(SparkClientUtilities.class);
+  private static final Map<String, Long> downloadedFiles = new ConcurrentHashMap<>();
 
   /**
    * Add new elements to the classpath.
    *
-   * @param newPaths Set of classpath elements
+   * @param newPaths Map of classpath elements and corresponding timestamp
    */
-  public static void addToClassPath(Set<String> newPaths, Configuration conf, File localTmpDir)
+  public static void addToClassPath(Map<String, Long> newPaths, Configuration conf, File localTmpDir)
       throws Exception {
     URLClassLoader loader = (URLClassLoader) Thread.currentThread().getContextClassLoader();
     List<URL> curPath = Lists.newArrayList(loader.getURLs());
 
     boolean newPathAdded = false;
-    for (String newPath : newPaths) {
-      URL newUrl = urlFromPathString(newPath, conf, localTmpDir);
+    for (Map.Entry<String, Long> entry : newPaths.entrySet()) {
+      URL newUrl = urlFromPathString(entry.getKey(), entry.getValue(), conf, localTmpDir);
       if (newUrl != null && !curPath.contains(newUrl)) {
         curPath.add(newUrl);
         LOG.info("Added jar[" + newUrl + "] to classpath.");
@@ -69,7 +71,8 @@ public class SparkClientUtilities {
    * @param path  path string
    * @return
    */
-  private static URL urlFromPathString(String path, Configuration conf, File localTmpDir) {
+  private static URL urlFromPathString(String path, Long timeStamp,
+      Configuration conf, File localTmpDir) {
     URL url = null;
     try {
       if (StringUtils.indexOf(path, "file:/") == 0) {
@@ -78,12 +81,17 @@ public class SparkClientUtilities {
         Path remoteFile = new Path(path);
         Path localFile =
             new Path(localTmpDir.getAbsolutePath() + File.separator + remoteFile.getName());
-        if (!new File(localFile.toString()).exists()) {
+        Long currentTS = downloadedFiles.get(path);
+        if (currentTS == null) {
+          currentTS = -1L;
+        }
+        if (!new File(localFile.toString()).exists() || currentTS < timeStamp) {
           LOG.info("Copying " + remoteFile + " to " + localFile);
           FileSystem remoteFS = remoteFile.getFileSystem(conf);
           remoteFS.copyToLocalFile(remoteFile, localFile);
+          downloadedFiles.put(path, timeStamp);
         }
-        return urlFromPathString(localFile.toString(), conf, localTmpDir);
+        return urlFromPathString(localFile.toString(), timeStamp, conf, localTmpDir);
       } else {
         url = new File(path).toURL();
       }