You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2015/11/05 09:52:03 UTC
hive git commit: HIVE-12229: Custom script in query cannot be
executed in yarn-cluster mode [Spark Branch] (Rui reviewed by Xuefu)
Repository: hive
Updated Branches:
refs/heads/spark fd1192914 -> b02cd4abc
HIVE-12229: Custom script in query cannot be executed in yarn-cluster mode [Spark Branch] (Rui reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b02cd4ab
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b02cd4ab
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b02cd4ab
Branch: refs/heads/spark
Commit: b02cd4abce10003dc90646b710875fba00b9b5b0
Parents: fd11929
Author: Rui Li <ru...@intel.com>
Authored: Thu Nov 5 16:48:25 2015 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Nov 5 16:51:22 2015 +0800
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/ScriptOperator.java | 15 ++++++++++++
.../ql/exec/spark/RemoteHiveSparkClient.java | 4 ++--
.../hive/ql/exec/spark/SparkUtilities.java | 10 ++++----
.../apache/hive/spark/client/JobContext.java | 4 ++--
.../hive/spark/client/JobContextImpl.java | 8 +++----
.../hive/spark/client/SparkClientImpl.java | 2 +-
.../hive/spark/client/SparkClientUtilities.java | 24 +++++++++++++-------
7 files changed, 43 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
index 5df9ea2..63837ce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkEnv;
import org.apache.spark.SparkFiles;
import java.io.BufferedInputStream;
@@ -329,6 +331,7 @@ public class ScriptOperator extends Operator<ScriptDesc> implements
// initialize the user's process only when you receive the first row
if (firstRow) {
firstRow = false;
+ SparkConf sparkConf = null;
try {
String[] cmdArgs = splitArgs(conf.getScriptCmd());
@@ -341,6 +344,7 @@ public class ScriptOperator extends Operator<ScriptDesc> implements
// In spark local mode, we need to search added files in root directory.
if (HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ sparkConf = SparkEnv.get().conf();
finder.prependPathComponent(SparkFiles.getRootDirectory());
}
File f = finder.getAbsolutePath(prog);
@@ -371,6 +375,17 @@ public class ScriptOperator extends Operator<ScriptDesc> implements
String idEnvVarVal = getOperatorId();
env.put(safeEnvVarName(idEnvVarName), idEnvVarVal);
+ // For spark, in non-local mode, any added dependencies are stored at
+ // SparkFiles::getRootDirectory, which is the executor's working directory.
+ // In local mode, we need to manually point the process's working directory to it,
+ // in order to make the dependencies accessible.
+ if (sparkConf != null) {
+ String master = sparkConf.get("spark.master");
+ if (master.equals("local") || master.startsWith("local[")) {
+ pb.directory(new File(SparkFiles.getRootDirectory()));
+ }
+ }
+
scriptPid = pb.start(); // Runtime.getRuntime().exec(wrappedCmdArgs);
DataOutputStream scriptOut = new DataOutputStream(
http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 2e8d1d3..cf81424 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -295,11 +295,11 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
// Add jar to current thread class loader dynamically, and add jar paths to JobConf as Spark
// may need to load classes from this jar in other threads.
- Set<String> addedJars = jc.getAddedJars();
+ Map<String, Long> addedJars = jc.getAddedJars();
if (addedJars != null && !addedJars.isEmpty()) {
SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir());
KryoSerializer.setClassLoader(Thread.currentThread().getContextClassLoader());
- localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";"));
+ localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars.keySet(), ";"));
}
Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class);
http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index cf2c3bc..0268469 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -22,8 +22,6 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
-import java.util.UUID;
-import java.util.Collection;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FilenameUtils;
@@ -91,11 +89,11 @@ public class SparkUtilities {
*/
public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException {
Path localFile = new Path(source.getPath());
- // give the uploaded file a UUID
- Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf),
- UUID.randomUUID() + "-" + getFileName(source));
+ Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf), getFileName(source));
FileSystem fileSystem = FileSystem.get(conf);
- fileSystem.copyFromLocalFile(localFile, remoteFile);
+ // Overwrite if the remote file already exists. Whether the file can be added
+ // on executor is up to spark, i.e. spark.files.overwrite
+ fileSystem.copyFromLocalFile(false, true, localFile, remoteFile);
Path fullPath = fileSystem.getFileStatus(remoteFile).getPath();
return fullPath.toUri();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
index af6332e..c9c975b 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
@@ -55,9 +55,9 @@ public interface JobContext {
Map<String, List<JavaFutureAction<?>>> getMonitoredJobs();
/**
- * Return all added jar path which added through AddJarJob.
+ * Return all added jar path and timestamp which added through AddJarJob.
*/
- Set<String> getAddedJars();
+ Map<String, Long> getAddedJars();
/**
* Returns a local tmp dir specific to the context
http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
index beed8a3..b73bcd7 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
@@ -18,12 +18,10 @@
package org.apache.hive.spark.client;
import java.io.File;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.hive.spark.counter.SparkCounters;
@@ -35,14 +33,14 @@ class JobContextImpl implements JobContext {
private final JavaSparkContext sc;
private final ThreadLocal<MonitorCallback> monitorCb;
private final Map<String, List<JavaFutureAction<?>>> monitoredJobs;
- private final Set<String> addedJars;
+ private final Map<String, Long> addedJars;
private final File localTmpDir;
public JobContextImpl(JavaSparkContext sc, File localTmpDir) {
this.sc = sc;
this.monitorCb = new ThreadLocal<MonitorCallback>();
monitoredJobs = new ConcurrentHashMap<String, List<JavaFutureAction<?>>>();
- addedJars = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+ addedJars = new ConcurrentHashMap<>();
this.localTmpDir = localTmpDir;
}
@@ -65,7 +63,7 @@ class JobContextImpl implements JobContext {
}
@Override
- public Set<String> getAddedJars() {
+ public Map<String, Long> getAddedJars() {
return addedJars;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index ceebbb3..3d682a0 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -617,7 +617,7 @@ class SparkClientImpl implements SparkClient {
jc.sc().addJar(path);
// Following remote job may refer to classes in this jar, and the remote job would be executed
// in a different thread, so we add this jar path to JobContext for further usage.
- jc.getAddedJars().add(path);
+ jc.getAddedJars().put(path, System.currentTimeMillis());
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
index 589436d..bbbd97b 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
@@ -24,7 +24,8 @@ import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
-import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -35,20 +36,21 @@ import org.apache.hadoop.fs.Path;
public class SparkClientUtilities {
protected static final transient Log LOG = LogFactory.getLog(SparkClientUtilities.class);
+ private static final Map<String, Long> downloadedFiles = new ConcurrentHashMap<>();
/**
* Add new elements to the classpath.
*
- * @param newPaths Set of classpath elements
+ * @param newPaths Map of classpath elements and corresponding timestamp
*/
- public static void addToClassPath(Set<String> newPaths, Configuration conf, File localTmpDir)
+ public static void addToClassPath(Map<String, Long> newPaths, Configuration conf, File localTmpDir)
throws Exception {
URLClassLoader loader = (URLClassLoader) Thread.currentThread().getContextClassLoader();
List<URL> curPath = Lists.newArrayList(loader.getURLs());
boolean newPathAdded = false;
- for (String newPath : newPaths) {
- URL newUrl = urlFromPathString(newPath, conf, localTmpDir);
+ for (Map.Entry<String, Long> entry : newPaths.entrySet()) {
+ URL newUrl = urlFromPathString(entry.getKey(), entry.getValue(), conf, localTmpDir);
if (newUrl != null && !curPath.contains(newUrl)) {
curPath.add(newUrl);
LOG.info("Added jar[" + newUrl + "] to classpath.");
@@ -69,7 +71,8 @@ public class SparkClientUtilities {
* @param path path string
* @return
*/
- private static URL urlFromPathString(String path, Configuration conf, File localTmpDir) {
+ private static URL urlFromPathString(String path, Long timeStamp,
+ Configuration conf, File localTmpDir) {
URL url = null;
try {
if (StringUtils.indexOf(path, "file:/") == 0) {
@@ -78,12 +81,17 @@ public class SparkClientUtilities {
Path remoteFile = new Path(path);
Path localFile =
new Path(localTmpDir.getAbsolutePath() + File.separator + remoteFile.getName());
- if (!new File(localFile.toString()).exists()) {
+ Long currentTS = downloadedFiles.get(path);
+ if (currentTS == null) {
+ currentTS = -1L;
+ }
+ if (!new File(localFile.toString()).exists() || currentTS < timeStamp) {
LOG.info("Copying " + remoteFile + " to " + localFile);
FileSystem remoteFS = remoteFile.getFileSystem(conf);
remoteFS.copyToLocalFile(remoteFile, localFile);
+ downloadedFiles.put(path, timeStamp);
}
- return urlFromPathString(localFile.toString(), conf, localTmpDir);
+ return urlFromPathString(localFile.toString(), timeStamp, conf, localTmpDir);
} else {
url = new File(path).toURL();
}