You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2018/01/18 17:55:55 UTC
[35/70] [abbrv] hive git commit: HIVE-18214: Flaky test:
TestSparkClient (Sahil Takiar, reviewed by Peter Vary)
HIVE-18214: Flaky test: TestSparkClient (Sahil Takiar, reviewed by Peter Vary)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/87860fbc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/87860fbc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/87860fbc
Branch: refs/heads/standalone-metastore
Commit: 87860fbca42b2477f504c8b1cefd43c865a2629c
Parents: 2402f3c
Author: Sahil Takiar <st...@cloudera.com>
Authored: Tue Jan 16 09:18:32 2018 -0800
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Tue Jan 16 09:18:32 2018 -0800
----------------------------------------------------------------------
.../hive/spark/client/SparkClientFactory.java | 3 -
.../hive/spark/client/SparkClientImpl.java | 528 +++++++++----------
.../hive/spark/client/TestSparkClient.java | 48 +-
3 files changed, 265 insertions(+), 314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/87860fbc/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
index 50c7bb2..8abeed8 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
@@ -34,9 +34,6 @@ import com.google.common.base.Throwables;
@InterfaceAudience.Private
public final class SparkClientFactory {
- /** Used to run the driver in-process, mostly for testing. */
- static final String CONF_KEY_IN_PROCESS = "spark.client.do_not_use.run_driver_in_process";
-
/** Used by client and driver to share a client ID for establishing an RPC session. */
static final String CONF_CLIENT_ID = "spark.client.authentication.client_id";
http://git-wip-us.apache.org/repos/asf/hive/blob/87860fbc/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 49b7deb..eed8e53 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -214,324 +214,294 @@ class SparkClientImpl implements SparkClient {
final String serverAddress = rpcServer.getAddress();
final String serverPort = String.valueOf(rpcServer.getPort());
- if (conf.containsKey(SparkClientFactory.CONF_KEY_IN_PROCESS)) {
- // Mostly for testing things quickly. Do not do this in production.
- // when invoked in-process it inherits the environment variables of the parent
- LOG.warn("!!!! Running remote driver in-process. !!!!");
- runnable = new Runnable() {
- @Override
- public void run() {
- List<String> args = Lists.newArrayList();
- args.add("--remote-host");
- args.add(serverAddress);
- args.add("--remote-port");
- args.add(serverPort);
- args.add("--client-id");
- args.add(clientId);
- args.add("--secret");
- args.add(secret);
-
- for (Map.Entry<String, String> e : conf.entrySet()) {
- args.add("--conf");
- args.add(String.format("%s=%s", e.getKey(), conf.get(e.getKey())));
- }
- try {
- RemoteDriver.main(args.toArray(new String[args.size()]));
- } catch (Exception e) {
- LOG.error("Error running driver.", e);
- }
- }
- };
- } else {
- // If a Spark installation is provided, use the spark-submit script. Otherwise, call the
- // SparkSubmit class directly, which has some caveats (like having to provide a proper
- // version of Guava on the classpath depending on the deploy mode).
- String sparkHome = Strings.emptyToNull(conf.get(SPARK_HOME_KEY));
- if (sparkHome == null) {
- sparkHome = Strings.emptyToNull(System.getenv(SPARK_HOME_ENV));
- }
+ // If a Spark installation is provided, use the spark-submit script. Otherwise, call the
+ // SparkSubmit class directly, which has some caveats (like having to provide a proper
+ // version of Guava on the classpath depending on the deploy mode).
+ String sparkHome = Strings.emptyToNull(conf.get(SPARK_HOME_KEY));
+ if (sparkHome == null) {
+ sparkHome = Strings.emptyToNull(System.getenv(SPARK_HOME_ENV));
+ }
+ if (sparkHome == null) {
+ sparkHome = Strings.emptyToNull(System.getProperty(SPARK_HOME_KEY));
+ }
+ String sparkLogDir = conf.get("hive.spark.log.dir");
+ if (sparkLogDir == null) {
if (sparkHome == null) {
- sparkHome = Strings.emptyToNull(System.getProperty(SPARK_HOME_KEY));
- }
- String sparkLogDir = conf.get("hive.spark.log.dir");
- if (sparkLogDir == null) {
- if (sparkHome == null) {
- sparkLogDir = "./target/";
- } else {
- sparkLogDir = sparkHome + "/logs/";
- }
+ sparkLogDir = "./target/";
+ } else {
+ sparkLogDir = sparkHome + "/logs/";
}
+ }
- String osxTestOpts = "";
- if (Strings.nullToEmpty(System.getProperty("os.name")).toLowerCase().contains("mac")) {
- osxTestOpts = Strings.nullToEmpty(System.getenv(OSX_TEST_OPTS));
- }
+ String osxTestOpts = "";
+ if (Strings.nullToEmpty(System.getProperty("os.name")).toLowerCase().contains("mac")) {
+ osxTestOpts = Strings.nullToEmpty(System.getenv(OSX_TEST_OPTS));
+ }
- String driverJavaOpts = Joiner.on(" ").skipNulls().join(
- "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(DRIVER_OPTS_KEY));
- String executorJavaOpts = Joiner.on(" ").skipNulls().join(
- "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(EXECUTOR_OPTS_KEY));
-
- // Create a file with all the job properties to be read by spark-submit. Change the
- // file's permissions so that only the owner can read it. This avoid having the
- // connection secret show up in the child process's command line.
- File properties = File.createTempFile("spark-submit.", ".properties");
- if (!properties.setReadable(false) || !properties.setReadable(true, true)) {
- throw new IOException("Cannot change permissions of job properties file.");
- }
- properties.deleteOnExit();
+ String driverJavaOpts = Joiner.on(" ").skipNulls().join(
+ "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(DRIVER_OPTS_KEY));
+ String executorJavaOpts = Joiner.on(" ").skipNulls().join(
+ "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(EXECUTOR_OPTS_KEY));
+
+ // Create a file with all the job properties to be read by spark-submit. Change the
+ // file's permissions so that only the owner can read it. This avoid having the
+ // connection secret show up in the child process's command line.
+ File properties = File.createTempFile("spark-submit.", ".properties");
+ if (!properties.setReadable(false) || !properties.setReadable(true, true)) {
+ throw new IOException("Cannot change permissions of job properties file.");
+ }
+ properties.deleteOnExit();
- Properties allProps = new Properties();
- // first load the defaults from spark-defaults.conf if available
- try {
- URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf");
- if (sparkDefaultsUrl != null) {
- LOG.info("Loading spark defaults: " + sparkDefaultsUrl);
- allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl)));
- }
- } catch (Exception e) {
- String msg = "Exception trying to load spark-defaults.conf: " + e;
- throw new IOException(msg, e);
- }
- // then load the SparkClientImpl config
- for (Map.Entry<String, String> e : conf.entrySet()) {
- allProps.put(e.getKey(), conf.get(e.getKey()));
+ Properties allProps = new Properties();
+ // first load the defaults from spark-defaults.conf if available
+ try {
+ URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf");
+ if (sparkDefaultsUrl != null) {
+ LOG.info("Loading spark defaults: " + sparkDefaultsUrl);
+ allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl)));
}
- allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId);
- allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret);
- allProps.put(DRIVER_OPTS_KEY, driverJavaOpts);
- allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts);
-
- String isTesting = conf.get("spark.testing");
- if (isTesting != null && isTesting.equalsIgnoreCase("true")) {
- String hiveHadoopTestClasspath = Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH"));
- if (!hiveHadoopTestClasspath.isEmpty()) {
- String extraDriverClasspath = Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH));
- if (extraDriverClasspath.isEmpty()) {
- allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
- } else {
- extraDriverClasspath = extraDriverClasspath.endsWith(File.pathSeparator) ? extraDriverClasspath : extraDriverClasspath + File.pathSeparator;
- allProps.put(DRIVER_EXTRA_CLASSPATH, extraDriverClasspath + hiveHadoopTestClasspath);
- }
+ } catch (Exception e) {
+ String msg = "Exception trying to load spark-defaults.conf: " + e;
+ throw new IOException(msg, e);
+ }
+ // then load the SparkClientImpl config
+ for (Map.Entry<String, String> e : conf.entrySet()) {
+ allProps.put(e.getKey(), conf.get(e.getKey()));
+ }
+ allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId);
+ allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret);
+ allProps.put(DRIVER_OPTS_KEY, driverJavaOpts);
+ allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts);
+
+ String isTesting = conf.get("spark.testing");
+ if (isTesting != null && isTesting.equalsIgnoreCase("true")) {
+ String hiveHadoopTestClasspath = Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH"));
+ if (!hiveHadoopTestClasspath.isEmpty()) {
+ String extraDriverClasspath = Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH));
+ if (extraDriverClasspath.isEmpty()) {
+ allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
+ } else {
+ extraDriverClasspath = extraDriverClasspath.endsWith(File.pathSeparator) ? extraDriverClasspath : extraDriverClasspath + File.pathSeparator;
+ allProps.put(DRIVER_EXTRA_CLASSPATH, extraDriverClasspath + hiveHadoopTestClasspath);
+ }
- String extraExecutorClasspath = Strings.nullToEmpty((String)allProps.get(EXECUTOR_EXTRA_CLASSPATH));
- if (extraExecutorClasspath.isEmpty()) {
- allProps.put(EXECUTOR_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
- } else {
- extraExecutorClasspath = extraExecutorClasspath.endsWith(File.pathSeparator) ? extraExecutorClasspath : extraExecutorClasspath + File.pathSeparator;
- allProps.put(EXECUTOR_EXTRA_CLASSPATH, extraExecutorClasspath + hiveHadoopTestClasspath);
- }
+ String extraExecutorClasspath = Strings.nullToEmpty((String)allProps.get(EXECUTOR_EXTRA_CLASSPATH));
+ if (extraExecutorClasspath.isEmpty()) {
+ allProps.put(EXECUTOR_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
+ } else {
+ extraExecutorClasspath = extraExecutorClasspath.endsWith(File.pathSeparator) ? extraExecutorClasspath : extraExecutorClasspath + File.pathSeparator;
+ allProps.put(EXECUTOR_EXTRA_CLASSPATH, extraExecutorClasspath + hiveHadoopTestClasspath);
}
}
+ }
- Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8);
- try {
- allProps.store(writer, "Spark Context configuration");
- } finally {
- writer.close();
- }
+ Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8);
+ try {
+ allProps.store(writer, "Spark Context configuration");
+ } finally {
+ writer.close();
+ }
- // Define how to pass options to the child process. If launching in client (or local)
- // mode, the driver options need to be passed directly on the command line. Otherwise,
- // SparkSubmit will take care of that for us.
- String master = conf.get("spark.master");
- Preconditions.checkArgument(master != null, "spark.master is not defined.");
- String deployMode = conf.get("spark.submit.deployMode");
+ // Define how to pass options to the child process. If launching in client (or local)
+ // mode, the driver options need to be passed directly on the command line. Otherwise,
+ // SparkSubmit will take care of that for us.
+ String master = conf.get("spark.master");
+ Preconditions.checkArgument(master != null, "spark.master is not defined.");
+ String deployMode = conf.get("spark.submit.deployMode");
- List<String> argv = Lists.newLinkedList();
+ List<String> argv = Lists.newLinkedList();
- if (sparkHome != null) {
- argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath());
- } else {
- LOG.info("No spark.home provided, calling SparkSubmit directly.");
- argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());
-
- if (master.startsWith("local") || master.startsWith("mesos") ||
- SparkClientUtilities.isYarnClientMode(master, deployMode) ||
- master.startsWith("spark")) {
- String mem = conf.get("spark.driver.memory");
- if (mem != null) {
- argv.add("-Xms" + mem);
- argv.add("-Xmx" + mem);
- }
+ if (sparkHome != null) {
+ argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath());
+ } else {
+ LOG.info("No spark.home provided, calling SparkSubmit directly.");
+ argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());
+
+ if (master.startsWith("local") || master.startsWith("mesos") ||
+ SparkClientUtilities.isYarnClientMode(master, deployMode) ||
+ master.startsWith("spark")) {
+ String mem = conf.get("spark.driver.memory");
+ if (mem != null) {
+ argv.add("-Xms" + mem);
+ argv.add("-Xmx" + mem);
+ }
- String cp = conf.get("spark.driver.extraClassPath");
- if (cp != null) {
- argv.add("-classpath");
- argv.add(cp);
- }
+ String cp = conf.get("spark.driver.extraClassPath");
+ if (cp != null) {
+ argv.add("-classpath");
+ argv.add(cp);
+ }
- String libPath = conf.get("spark.driver.extraLibPath");
- if (libPath != null) {
- argv.add("-Djava.library.path=" + libPath);
- }
+ String libPath = conf.get("spark.driver.extraLibPath");
+ if (libPath != null) {
+ argv.add("-Djava.library.path=" + libPath);
+ }
- String extra = conf.get(DRIVER_OPTS_KEY);
- if (extra != null) {
- for (String opt : extra.split("[ ]")) {
- if (!opt.trim().isEmpty()) {
- argv.add(opt.trim());
- }
+ String extra = conf.get(DRIVER_OPTS_KEY);
+ if (extra != null) {
+ for (String opt : extra.split("[ ]")) {
+ if (!opt.trim().isEmpty()) {
+ argv.add(opt.trim());
}
}
}
-
- argv.add("org.apache.spark.deploy.SparkSubmit");
}
- if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) {
- String executorCores = conf.get("spark.executor.cores");
- if (executorCores != null) {
- argv.add("--executor-cores");
- argv.add(executorCores);
- }
+ argv.add("org.apache.spark.deploy.SparkSubmit");
+ }
- String executorMemory = conf.get("spark.executor.memory");
- if (executorMemory != null) {
- argv.add("--executor-memory");
- argv.add(executorMemory);
- }
+ if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) {
+ String executorCores = conf.get("spark.executor.cores");
+ if (executorCores != null) {
+ argv.add("--executor-cores");
+ argv.add(executorCores);
+ }
- String numOfExecutors = conf.get("spark.executor.instances");
- if (numOfExecutors != null) {
- argv.add("--num-executors");
- argv.add(numOfExecutors);
- }
+ String executorMemory = conf.get("spark.executor.memory");
+ if (executorMemory != null) {
+ argv.add("--executor-memory");
+ argv.add(executorMemory);
}
- // The options --principal/--keypad do not work with --proxy-user in spark-submit.sh
- // (see HIVE-15485, SPARK-5493, SPARK-19143), so Hive could only support doAs or
- // delegation token renewal, but not both. Since doAs is a more common case, if both
- // are needed, we choose to favor doAs. So when doAs is enabled, we use kinit command,
- // otherwise, we pass the principal/keypad to spark to support the token renewal for
- // long-running application.
- if ("kerberos".equals(hiveConf.get(HADOOP_SECURITY_AUTHENTICATION))) {
- String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL),
- "0.0.0.0");
- String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
- if (StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(keyTabFile)) {
- if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
- List<String> kinitArgv = Lists.newLinkedList();
- kinitArgv.add("kinit");
- kinitArgv.add(principal);
- kinitArgv.add("-k");
- kinitArgv.add("-t");
- kinitArgv.add(keyTabFile + ";");
- kinitArgv.addAll(argv);
- argv = kinitArgv;
- } else {
- // if doAs is not enabled, we pass the principal/keypad to spark-submit in order to
- // support the possible delegation token renewal in Spark
- argv.add("--principal");
- argv.add(principal);
- argv.add("--keytab");
- argv.add(keyTabFile);
- }
+
+ String numOfExecutors = conf.get("spark.executor.instances");
+ if (numOfExecutors != null) {
+ argv.add("--num-executors");
+ argv.add(numOfExecutors);
+ }
+ }
+ // The options --principal/--keypad do not work with --proxy-user in spark-submit.sh
+ // (see HIVE-15485, SPARK-5493, SPARK-19143), so Hive could only support doAs or
+ // delegation token renewal, but not both. Since doAs is a more common case, if both
+ // are needed, we choose to favor doAs. So when doAs is enabled, we use kinit command,
+ // otherwise, we pass the principal/keypad to spark to support the token renewal for
+ // long-running application.
+ if ("kerberos".equals(hiveConf.get(HADOOP_SECURITY_AUTHENTICATION))) {
+ String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL),
+ "0.0.0.0");
+ String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
+ if (StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(keyTabFile)) {
+ if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
+ List<String> kinitArgv = Lists.newLinkedList();
+ kinitArgv.add("kinit");
+ kinitArgv.add(principal);
+ kinitArgv.add("-k");
+ kinitArgv.add("-t");
+ kinitArgv.add(keyTabFile + ";");
+ kinitArgv.addAll(argv);
+ argv = kinitArgv;
+ } else {
+ // if doAs is not enabled, we pass the principal/keypad to spark-submit in order to
+ // support the possible delegation token renewal in Spark
+ argv.add("--principal");
+ argv.add(principal);
+ argv.add("--keytab");
+ argv.add(keyTabFile);
}
}
- if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
- try {
- String currentUser = Utils.getUGI().getShortUserName();
- // do not do impersonation in CLI mode
- if (!currentUser.equals(System.getProperty("user.name"))) {
- LOG.info("Attempting impersonation of " + currentUser);
- argv.add("--proxy-user");
- argv.add(currentUser);
- }
- } catch (Exception e) {
- String msg = "Cannot obtain username: " + e;
- throw new IllegalStateException(msg, e);
+ }
+ if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
+ try {
+ String currentUser = Utils.getUGI().getShortUserName();
+ // do not do impersonation in CLI mode
+ if (!currentUser.equals(System.getProperty("user.name"))) {
+ LOG.info("Attempting impersonation of " + currentUser);
+ argv.add("--proxy-user");
+ argv.add(currentUser);
}
+ } catch (Exception e) {
+ String msg = "Cannot obtain username: " + e;
+ throw new IllegalStateException(msg, e);
}
+ }
- String regStr = conf.get("spark.kryo.registrator");
- if (HIVE_KRYO_REG_NAME.equals(regStr)) {
- argv.add("--jars");
- argv.add(SparkClientUtilities.findKryoRegistratorJar(hiveConf));
- }
+ String regStr = conf.get("spark.kryo.registrator");
+ if (HIVE_KRYO_REG_NAME.equals(regStr)) {
+ argv.add("--jars");
+ argv.add(SparkClientUtilities.findKryoRegistratorJar(hiveConf));
+ }
- argv.add("--properties-file");
- argv.add(properties.getAbsolutePath());
- argv.add("--class");
- argv.add(RemoteDriver.class.getName());
+ argv.add("--properties-file");
+ argv.add(properties.getAbsolutePath());
+ argv.add("--class");
+ argv.add(RemoteDriver.class.getName());
- String jar = "spark-internal";
- if (SparkContext.jarOfClass(this.getClass()).isDefined()) {
- jar = SparkContext.jarOfClass(this.getClass()).get();
- }
- argv.add(jar);
-
- argv.add("--remote-host");
- argv.add(serverAddress);
- argv.add("--remote-port");
- argv.add(serverPort);
-
- //hive.spark.* keys are passed down to the RemoteDriver via --conf,
- //as --properties-file contains the spark.* keys that are meant for SparkConf object.
- for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) {
- String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey);
- argv.add("--conf");
- argv.add(String.format("%s=%s", hiveSparkConfKey, value));
- }
+ String jar = "spark-internal";
+ if (SparkContext.jarOfClass(this.getClass()).isDefined()) {
+ jar = SparkContext.jarOfClass(this.getClass()).get();
+ }
+ argv.add(jar);
+
+ argv.add("--remote-host");
+ argv.add(serverAddress);
+ argv.add("--remote-port");
+ argv.add(serverPort);
+
+ //hive.spark.* keys are passed down to the RemoteDriver via --conf,
+ //as --properties-file contains the spark.* keys that are meant for SparkConf object.
+ for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) {
+ String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey);
+ argv.add("--conf");
+ argv.add(String.format("%s=%s", hiveSparkConfKey, value));
+ }
- String cmd = Joiner.on(" ").join(argv);
- LOG.info("Running client driver with argv: {}", cmd);
- ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd);
-
- // Prevent hive configurations from being visible in Spark.
- pb.environment().remove("HIVE_HOME");
- pb.environment().remove("HIVE_CONF_DIR");
- // Add credential provider password to the child process's environment
- // In case of Spark the credential provider location is provided in the jobConf when the job is submitted
- String password = getSparkJobCredentialProviderPassword();
- if(password != null) {
- pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password);
- }
- if (isTesting != null) {
- pb.environment().put("SPARK_TESTING", isTesting);
- }
+ String cmd = Joiner.on(" ").join(argv);
+ LOG.info("Running client driver with argv: {}", cmd);
+ ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd);
+
+ // Prevent hive configurations from being visible in Spark.
+ pb.environment().remove("HIVE_HOME");
+ pb.environment().remove("HIVE_CONF_DIR");
+ // Add credential provider password to the child process's environment
+ // In case of Spark the credential provider location is provided in the jobConf when the job is submitted
+ String password = getSparkJobCredentialProviderPassword();
+ if(password != null) {
+ pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password);
+ }
+ if (isTesting != null) {
+ pb.environment().put("SPARK_TESTING", isTesting);
+ }
- final Process child = pb.start();
- String threadName = Thread.currentThread().getName();
- final List<String> childErrorLog = Collections.synchronizedList(new ArrayList<String>());
- final LogRedirector.LogSourceCallback callback = () -> {return isAlive;};
+ final Process child = pb.start();
+ String threadName = Thread.currentThread().getName();
+ final List<String> childErrorLog = Collections.synchronizedList(new ArrayList<String>());
+ final LogRedirector.LogSourceCallback callback = () -> {return isAlive;};
- LogRedirector.redirect("RemoteDriver-stdout-redir-" + threadName,
- new LogRedirector(child.getInputStream(), LOG, callback));
- LogRedirector.redirect("RemoteDriver-stderr-redir-" + threadName,
- new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback));
+ LogRedirector.redirect("RemoteDriver-stdout-redir-" + threadName,
+ new LogRedirector(child.getInputStream(), LOG, callback));
+ LogRedirector.redirect("RemoteDriver-stderr-redir-" + threadName,
+ new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback));
- runnable = new Runnable() {
- @Override
- public void run() {
- try {
- int exitCode = child.waitFor();
- if (exitCode != 0) {
- StringBuilder errStr = new StringBuilder();
- synchronized(childErrorLog) {
- Iterator iter = childErrorLog.iterator();
- while(iter.hasNext()){
- errStr.append(iter.next());
- errStr.append('\n');
- }
+ runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ int exitCode = child.waitFor();
+ if (exitCode != 0) {
+ StringBuilder errStr = new StringBuilder();
+ synchronized(childErrorLog) {
+ Iterator iter = childErrorLog.iterator();
+ while(iter.hasNext()){
+ errStr.append(iter.next());
+ errStr.append('\n');
}
-
- LOG.warn("Child process exited with code {}", exitCode);
- rpcServer.cancelClient(clientId,
- "Child process (spark-submit) exited before connecting back with error log " + errStr.toString());
}
- } catch (InterruptedException ie) {
- LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process.");
- rpcServer.cancelClient(clientId, "Thread waiting on the child porcess (spark-submit) is interrupted");
- Thread.interrupted();
- child.destroy();
- } catch (Exception e) {
- String errMsg = "Exception while waiting for child process (spark-submit)";
- LOG.warn(errMsg, e);
- rpcServer.cancelClient(clientId, errMsg);
+
+ LOG.warn("Child process exited with code {}", exitCode);
+ rpcServer.cancelClient(clientId,
+ "Child process (spark-submit) exited before connecting back with error log " + errStr.toString());
}
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process.");
+ rpcServer.cancelClient(clientId, "Thread waiting on the child porcess (spark-submit) is interrupted");
+ Thread.interrupted();
+ child.destroy();
+ } catch (Exception e) {
+ String errMsg = "Exception while waiting for child process (spark-submit)";
+ LOG.warn(errMsg, e);
+ rpcServer.cancelClient(clientId, errMsg);
}
- };
- }
+ }
+ };
Thread thread = new Thread(runnable);
thread.setDaemon(true);
http://git-wip-us.apache.org/repos/asf/hive/blob/87860fbc/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
index 697d8d1..23df792 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
@@ -68,19 +68,14 @@ public class TestSparkClient {
private static final long TIMEOUT = 20;
private static final HiveConf HIVECONF = new HiveConf();
- private Map<String, String> createConf(boolean local) {
+ private Map<String, String> createConf() {
Map<String, String> conf = new HashMap<String, String>();
- if (local) {
- conf.put(SparkClientFactory.CONF_KEY_IN_PROCESS, "true");
- conf.put("spark.master", "local");
- conf.put("spark.app.name", "SparkClientSuite Local App");
- } else {
- String classpath = System.getProperty("java.class.path");
- conf.put("spark.master", "local");
- conf.put("spark.app.name", "SparkClientSuite Remote App");
- conf.put("spark.driver.extraClassPath", classpath);
- conf.put("spark.executor.extraClassPath", classpath);
- }
+
+ String classpath = System.getProperty("java.class.path");
+ conf.put("spark.master", "local");
+ conf.put("spark.app.name", "SparkClientSuite Remote App");
+ conf.put("spark.driver.extraClassPath", classpath);
+ conf.put("spark.executor.extraClassPath", classpath);
if (!Strings.isNullOrEmpty(System.getProperty("spark.home"))) {
conf.put("spark.home", System.getProperty("spark.home"));
@@ -91,7 +86,7 @@ public class TestSparkClient {
@Test
public void testJobSubmission() throws Exception {
- runTest(true, new TestFunction() {
+ runTest(new TestFunction() {
@Override
public void call(SparkClient client) throws Exception {
JobHandle.Listener<String> listener = newListener();
@@ -112,7 +107,7 @@ public class TestSparkClient {
@Test
public void testSimpleSparkJob() throws Exception {
- runTest(true, new TestFunction() {
+ runTest(new TestFunction() {
@Override
public void call(SparkClient client) throws Exception {
JobHandle<Long> handle = client.submit(new SparkJob());
@@ -123,7 +118,7 @@ public class TestSparkClient {
@Test
public void testErrorJob() throws Exception {
- runTest(true, new TestFunction() {
+ runTest(new TestFunction() {
@Override
public void call(SparkClient client) throws Exception {
JobHandle.Listener<String> listener = newListener();
@@ -151,7 +146,7 @@ public class TestSparkClient {
@Test
public void testSyncRpc() throws Exception {
- runTest(true, new TestFunction() {
+ runTest(new TestFunction() {
@Override
public void call(SparkClient client) throws Exception {
Future<String> result = client.run(new SyncRpc());
@@ -161,19 +156,8 @@ public class TestSparkClient {
}
@Test
- public void testRemoteClient() throws Exception {
- runTest(false, new TestFunction() {
- @Override
- public void call(SparkClient client) throws Exception {
- JobHandle<Long> handle = client.submit(new SparkJob());
- assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
- }
- });
- }
-
- @Test
public void testMetricsCollection() throws Exception {
- runTest(true, new TestFunction() {
+ runTest(new TestFunction() {
@Override
public void call(SparkClient client) throws Exception {
JobHandle.Listener<Integer> listener = newListener();
@@ -202,7 +186,7 @@ public class TestSparkClient {
@Test
public void testAddJarsAndFiles() throws Exception {
- runTest(true, new TestFunction() {
+ runTest(new TestFunction() {
@Override
public void call(SparkClient client) throws Exception {
File jar = null;
@@ -256,7 +240,7 @@ public class TestSparkClient {
@Test
public void testCounters() throws Exception {
- runTest(true, new TestFunction() {
+ runTest(new TestFunction() {
@Override
public void call(SparkClient client) throws Exception {
JobHandle<?> job = client.submit(new CounterIncrementJob());
@@ -308,8 +292,8 @@ public class TestSparkClient {
}).when(listener);
}
- private void runTest(boolean local, TestFunction test) throws Exception {
- Map<String, String> conf = createConf(local);
+ private void runTest(TestFunction test) throws Exception {
+ Map<String, String> conf = createConf();
SparkClientFactory.initialize(conf);
SparkClient client = null;
try {