You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/05/14 23:23:03 UTC
hive git commit: HIVE-10671: yarn-cluster mode offers a degraded
performance from yarn-client [Spark Branch] (Rui via Xuefu,
reviewed by Chengxiang)
Repository: hive
Updated Branches:
refs/heads/spark a9489c34b -> 889c41f12
HIVE-10671: yarn-cluster mode offers a degraded performance from yarn-client [Spark Branch] (Rui via Xuefu, reviewed by Chengxiang)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/889c41f1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/889c41f1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/889c41f1
Branch: refs/heads/spark
Commit: 889c41f1290ddaf19e5d8139c183cb235ed0e65a
Parents: a9489c3
Author: xzhang <xz...@xzdt>
Authored: Thu May 14 14:20:11 2015 -0700
Committer: xzhang <xz...@xzdt>
Committed: Thu May 14 14:20:11 2015 -0700
----------------------------------------------------------------------
.../hive/ql/exec/spark/RemoteHiveSparkClient.java | 10 +++++-----
.../hadoop/hive/ql/exec/spark/SparkUtilities.java | 11 +++++++----
.../java/org/apache/hive/spark/client/JobContext.java | 2 +-
.../org/apache/hive/spark/client/JobContextImpl.java | 7 ++++---
.../apache/hive/spark/client/SparkClientUtilities.java | 13 ++++++++-----
5 files changed, 25 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/889c41f1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 059016d..bae30f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -28,6 +28,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -167,10 +168,10 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
try {
URI fileUri = SparkUtilities.getURI(addedFile);
if (fileUri != null && !localFiles.contains(fileUri)) {
+ localFiles.add(fileUri);
if (SparkUtilities.needUploadToHDFS(fileUri, sparkConf)) {
fileUri = SparkUtilities.uploadToHDFS(fileUri, hiveConf);
}
- localFiles.add(fileUri);
remoteClient.addFile(fileUri);
}
} catch (URISyntaxException e) {
@@ -184,10 +185,10 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
try {
URI jarUri = SparkUtilities.getURI(addedJar);
if (jarUri != null && !localJars.contains(jarUri)) {
+ localJars.add(jarUri);
if (SparkUtilities.needUploadToHDFS(jarUri, sparkConf)) {
jarUri = SparkUtilities.uploadToHDFS(jarUri, hiveConf);
}
- localJars.add(jarUri);
remoteClient.addJar(jarUri);
}
} catch (URISyntaxException e) {
@@ -224,10 +225,9 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
// Add jar to current thread class loader dynamically, and add jar paths to JobConf as Spark
// may need to load classes from this jar in other threads.
- List<String> addedJars = jc.getAddedJars();
+ Set<String> addedJars = jc.getAddedJars();
if (addedJars != null && !addedJars.isEmpty()) {
- SparkClientUtilities.addToClassPath(addedJars.toArray(new String[addedJars.size()]),
- localJobConf, jc.getLocalTmpDir());
+ SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir());
localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";"));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/889c41f1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index 8499933..e6c845c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.UUID;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
@@ -77,11 +78,13 @@ public class SparkUtilities {
* @throws IOException
*/
public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException {
- Path tmpDir = SessionState.getHDFSSessionPath(conf);
+ Path localFile = new Path(source.getPath());
+ // give the uploaded file a UUID
+ Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf),
+ UUID.randomUUID() + "-" + getFileName(source));
FileSystem fileSystem = FileSystem.get(conf);
- fileSystem.copyFromLocalFile(new Path(source.getPath()), tmpDir);
- String filePath = tmpDir + File.separator + getFileName(source);
- Path fullPath = fileSystem.getFileStatus(new Path(filePath)).getPath();
+ fileSystem.copyFromLocalFile(localFile, remoteFile);
+ Path fullPath = fileSystem.getFileStatus(remoteFile).getPath();
return fullPath.toUri();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/889c41f1/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
index 36e252c..af6332e 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
@@ -57,7 +57,7 @@ public interface JobContext {
/**
* Return all added jar path which added through AddJarJob.
*/
- List<String> getAddedJars();
+ Set<String> getAddedJars();
/**
* Returns a local tmp dir specific to the context
http://git-wip-us.apache.org/repos/asf/hive/blob/889c41f1/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
index 164d90a..beed8a3 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
@@ -18,6 +18,7 @@
package org.apache.hive.spark.client;
import java.io.File;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -34,14 +35,14 @@ class JobContextImpl implements JobContext {
private final JavaSparkContext sc;
private final ThreadLocal<MonitorCallback> monitorCb;
private final Map<String, List<JavaFutureAction<?>>> monitoredJobs;
- private final List<String> addedJars;
+ private final Set<String> addedJars;
private final File localTmpDir;
public JobContextImpl(JavaSparkContext sc, File localTmpDir) {
this.sc = sc;
this.monitorCb = new ThreadLocal<MonitorCallback>();
monitoredJobs = new ConcurrentHashMap<String, List<JavaFutureAction<?>>>();
- addedJars = new CopyOnWriteArrayList<String>();
+ addedJars = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
this.localTmpDir = localTmpDir;
}
@@ -64,7 +65,7 @@ class JobContextImpl implements JobContext {
}
@Override
- public List<String> getAddedJars() {
+ public Set<String> getAddedJars() {
return addedJars;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/889c41f1/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
index 879f8a4..b079ee2 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
@@ -24,6 +24,7 @@ import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
+import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -38,9 +39,9 @@ public class SparkClientUtilities {
/**
* Add new elements to the classpath.
*
- * @param newPaths Array of classpath elements
+ * @param newPaths Set of classpath elements
*/
- public static void addToClassPath(String[] newPaths, Configuration conf, File localTmpDir)
+ public static void addToClassPath(Set<String> newPaths, Configuration conf, File localTmpDir)
throws Exception {
ClassLoader cloader = Thread.currentThread().getContextClassLoader();
URLClassLoader loader = (URLClassLoader) cloader;
@@ -74,9 +75,11 @@ public class SparkClientUtilities {
Path remoteFile = new Path(path);
Path localFile =
new Path(localTmpDir.getAbsolutePath() + File.separator + remoteFile.getName());
- LOG.info("Copying " + remoteFile + " to " + localFile);
- FileSystem fs = remoteFile.getFileSystem(conf);
- fs.copyToLocalFile(remoteFile, localFile);
+ if (!new File(localFile.toString()).exists()) {
+ LOG.info("Copying " + remoteFile + " to " + localFile);
+ FileSystem remoteFS = remoteFile.getFileSystem(conf);
+ remoteFS.copyToLocalFile(remoteFile, localFile);
+ }
return urlFromPathString(localFile.toString(), conf, localTmpDir);
} else {
url = new File(path).toURL();