You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/05/14 23:23:03 UTC

hive git commit: HIVE-10671: yarn-cluster mode offers a degraded performance from yarn-client [Spark Branch] (Rui via Xuefu, reviewed by Chengxiang)

Repository: hive
Updated Branches:
  refs/heads/spark a9489c34b -> 889c41f12


HIVE-10671: yarn-cluster mode offers a degraded performance from yarn-client [Spark Branch] (Rui via Xuefu, reviewed by Chengxiang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/889c41f1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/889c41f1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/889c41f1

Branch: refs/heads/spark
Commit: 889c41f1290ddaf19e5d8139c183cb235ed0e65a
Parents: a9489c3
Author: xzhang <xz...@xzdt>
Authored: Thu May 14 14:20:11 2015 -0700
Committer: xzhang <xz...@xzdt>
Committed: Thu May 14 14:20:11 2015 -0700

----------------------------------------------------------------------
 .../hive/ql/exec/spark/RemoteHiveSparkClient.java      | 10 +++++-----
 .../hadoop/hive/ql/exec/spark/SparkUtilities.java      | 11 +++++++----
 .../java/org/apache/hive/spark/client/JobContext.java  |  2 +-
 .../org/apache/hive/spark/client/JobContextImpl.java   |  7 ++++---
 .../apache/hive/spark/client/SparkClientUtilities.java | 13 ++++++++-----
 5 files changed, 25 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/889c41f1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 059016d..bae30f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -28,6 +28,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -167,10 +168,10 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
       try {
         URI fileUri = SparkUtilities.getURI(addedFile);
         if (fileUri != null && !localFiles.contains(fileUri)) {
+          localFiles.add(fileUri);
           if (SparkUtilities.needUploadToHDFS(fileUri, sparkConf)) {
             fileUri = SparkUtilities.uploadToHDFS(fileUri, hiveConf);
           }
-          localFiles.add(fileUri);
           remoteClient.addFile(fileUri);
         }
       } catch (URISyntaxException e) {
@@ -184,10 +185,10 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
       try {
         URI jarUri = SparkUtilities.getURI(addedJar);
         if (jarUri != null && !localJars.contains(jarUri)) {
+          localJars.add(jarUri);
           if (SparkUtilities.needUploadToHDFS(jarUri, sparkConf)) {
             jarUri = SparkUtilities.uploadToHDFS(jarUri, hiveConf);
           }
-          localJars.add(jarUri);
           remoteClient.addJar(jarUri);
         }
       } catch (URISyntaxException e) {
@@ -224,10 +225,9 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
 
       // Add jar to current thread class loader dynamically, and add jar paths to JobConf as Spark
       // may need to load classes from this jar in other threads.
-      List<String> addedJars = jc.getAddedJars();
+      Set<String> addedJars = jc.getAddedJars();
       if (addedJars != null && !addedJars.isEmpty()) {
-        SparkClientUtilities.addToClassPath(addedJars.toArray(new String[addedJars.size()]),
-            localJobConf, jc.getLocalTmpDir());
+        SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir());
         localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";"));
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/889c41f1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index 8499933..e6c845c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.UUID;
 
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -77,11 +78,13 @@ public class SparkUtilities {
    * @throws IOException
    */
   public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException {
-    Path tmpDir = SessionState.getHDFSSessionPath(conf);
+    Path localFile = new Path(source.getPath());
+    // give the uploaded file a UUID
+    Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf),
+        UUID.randomUUID() + "-" + getFileName(source));
     FileSystem fileSystem = FileSystem.get(conf);
-    fileSystem.copyFromLocalFile(new Path(source.getPath()), tmpDir);
-    String filePath = tmpDir + File.separator + getFileName(source);
-    Path fullPath = fileSystem.getFileStatus(new Path(filePath)).getPath();
+    fileSystem.copyFromLocalFile(localFile, remoteFile);
+    Path fullPath = fileSystem.getFileStatus(remoteFile).getPath();
     return fullPath.toUri();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/889c41f1/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
index 36e252c..af6332e 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
@@ -57,7 +57,7 @@ public interface JobContext {
   /**
    * Return all added jar path which added through AddJarJob.
    */
-  List<String> getAddedJars();
+  Set<String> getAddedJars();
 
   /**
    * Returns a local tmp dir specific to the context

http://git-wip-us.apache.org/repos/asf/hive/blob/889c41f1/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
index 164d90a..beed8a3 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
@@ -18,6 +18,7 @@
 package org.apache.hive.spark.client;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -34,14 +35,14 @@ class JobContextImpl implements JobContext {
   private final JavaSparkContext sc;
   private final ThreadLocal<MonitorCallback> monitorCb;
   private final Map<String, List<JavaFutureAction<?>>> monitoredJobs;
-  private final List<String> addedJars;
+  private final Set<String> addedJars;
   private final File localTmpDir;
 
   public JobContextImpl(JavaSparkContext sc, File localTmpDir) {
     this.sc = sc;
     this.monitorCb = new ThreadLocal<MonitorCallback>();
     monitoredJobs = new ConcurrentHashMap<String, List<JavaFutureAction<?>>>();
-    addedJars = new CopyOnWriteArrayList<String>();
+    addedJars = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
     this.localTmpDir = localTmpDir;
   }
 
@@ -64,7 +65,7 @@ class JobContextImpl implements JobContext {
   }
 
   @Override
-  public List<String> getAddedJars() {
+  public Set<String> getAddedJars() {
     return addedJars;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/889c41f1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
index 879f8a4..b079ee2 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -38,9 +39,9 @@ public class SparkClientUtilities {
   /**
    * Add new elements to the classpath.
    *
-   * @param newPaths Array of classpath elements
+   * @param newPaths Set of classpath elements
    */
-  public static void addToClassPath(String[] newPaths, Configuration conf, File localTmpDir)
+  public static void addToClassPath(Set<String> newPaths, Configuration conf, File localTmpDir)
       throws Exception {
     ClassLoader cloader = Thread.currentThread().getContextClassLoader();
     URLClassLoader loader = (URLClassLoader) cloader;
@@ -74,9 +75,11 @@ public class SparkClientUtilities {
         Path remoteFile = new Path(path);
         Path localFile =
             new Path(localTmpDir.getAbsolutePath() + File.separator + remoteFile.getName());
-        LOG.info("Copying " + remoteFile + " to " + localFile);
-        FileSystem fs = remoteFile.getFileSystem(conf);
-        fs.copyToLocalFile(remoteFile, localFile);
+        if (!new File(localFile.toString()).exists()) {
+          LOG.info("Copying " + remoteFile + " to " + localFile);
+          FileSystem remoteFS = remoteFile.getFileSystem(conf);
+          remoteFS.copyToLocalFile(remoteFile, localFile);
+        }
         return urlFromPathString(localFile.toString(), conf, localTmpDir);
       } else {
         url = new File(path).toURL();