You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/01/21 04:57:59 UTC

svn commit: r1653432 - in /hive/branches/spark: common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ spark-client/src/main/java/org/apache/hive/spark/client/ spark-client/src/main/java/org/apache/hive/spark/cli...

Author: szehon
Date: Wed Jan 21 03:57:59 2015
New Revision: 1653432

URL: http://svn.apache.org/r1653432
Log:
HIVE-9337 : Move more hive.spark.* configurations to HiveConf [Spark Branch] (Szehon, reviewed by Brock, Chengxiang, and Lefty)

Modified:
    hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
    hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
    hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java

Modified: hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1653432&r1=1653431&r2=1653432&view=diff
==============================================================================
--- hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Jan 21 03:57:59 2015
@@ -1982,17 +1982,27 @@ public class HiveConf extends Configurat
         "hive.tez.exec.inplace.progress",
         true,
         "Updates tez job execution progress in-place in the terminal."),
-    SPARK_CLIENT_FUTURE_TIMEOUT(
-        "hive.spark.client.future.timeout",
-        "60s",
-        new TimeValidator(TimeUnit.SECONDS),
-        "Remote Spark client JobHandle future timeout value in seconds."),
-    SPARK_JOB_MONITOR_TIMEOUT(
-        "hive.spark.job.monitor.timeout",
-        "60s",
-        new TimeValidator(TimeUnit.SECONDS),
-        "Spark job monitor timeout if could not get job state in specified time interval.")
-    ;
+    SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
+      "60s", new TimeValidator(TimeUnit.SECONDS),
+      "Timeout for requests from Hive client to remote Spark driver."),
+    SPARK_JOB_MONITOR_TIMEOUT("hive.spark.job.monitor.timeout",
+      "60s", new TimeValidator(TimeUnit.SECONDS),
+      "Timeout for job monitor to get Spark job state."),
+    SPARK_RPC_CLIENT_CONNECT_TIMEOUT("hive.spark.client.connect.timeout",
+      "1000ms", new TimeValidator(TimeUnit.MILLISECONDS),
+      "Timeout for remote Spark driver in connecting back to Hive client."),
+    SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT("hive.spark.client.server.connect.timeout",
+      "20000ms", new TimeValidator(TimeUnit.MILLISECONDS),
+      "Timeout for handshake between Hive client and remote Spark driver.  Checked by both processes."),
+    SPARK_RPC_SECRET_RANDOM_BITS("hive.spark.client.secret.bits", "256",
+      "Number of bits of randomness in the generated secret for communication between Hive client and remote Spark driver. " +
+      "Rounded down to the nearest multiple of 8."),
+    SPARK_RPC_MAX_THREADS("hive.spark.client.rpc.threads", 8,
+      "Maximum number of threads for remote Spark driver's RPC event loop."),
+    SPARK_RPC_MAX_MESSAGE_SIZE("hive.spark.client.rpc.max.size", 50 * 1024 * 1024,
+      "Maximum message size in bytes for communication between Hive client and remote Spark driver. Default is 50MB."),
+    SPARK_RPC_CHANNEL_LOG_LEVEL("hive.spark.client.channel.log.level", null,
+      "Channel logging level for remote Spark driver.  One of {DEBUG, ERROR, INFO, TRACE, WARN}.");
 
     public final String varname;
     private final String defaultExpr;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1653432&r1=1653431&r2=1653432&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java Wed Jan 21 03:57:59 2015
@@ -21,14 +21,18 @@ package org.apache.hadoop.hive.ql.exec.s
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.compress.utils.CharsetNames;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.spark.client.rpc.RpcConfiguration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkException;
 
@@ -95,7 +99,8 @@ public class HiveSparkClientFactory {
       }
     }
 
-    // load properties from hive configurations.
+    // load properties from hive configurations, including both spark.* properties
+    // and properties for remote driver RPC.
     for (Map.Entry<String, String> entry : hiveConf) {
       String propertyName = entry.getKey();
       if (propertyName.startsWith("spark")) {
@@ -105,6 +110,13 @@ public class HiveSparkClientFactory {
           "load spark configuration from hive configuration (%s -> %s).",
           propertyName, value));
       }
+      if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) {
+        String value = RpcConfiguration.getValue(hiveConf, propertyName);
+        sparkConf.put(propertyName, value);
+        LOG.info(String.format(
+          "load RPC configuration from hive configuration (%s -> %s).",
+          propertyName, value));
+      }
     }
 
     return sparkConf;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1653432&r1=1653431&r2=1653432&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Wed Jan 21 03:57:59 2015
@@ -80,7 +80,7 @@ public class RemoteHiveSparkClient imple
   RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf) throws IOException, SparkException {
     this.hiveConf = hiveConf;
     sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
-    remoteClient = SparkClientFactory.createClient(conf);
+    remoteClient = SparkClientFactory.createClient(conf, hiveConf);
   }
 
   @Override

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java?rev=1653432&r1=1653431&r2=1653432&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java Wed Jan 21 03:57:59 2015
@@ -124,6 +124,7 @@ public class RemoteDriver {
     Map<String, String> mapConf = Maps.newHashMap();
     for (Tuple2<String, String> e : conf.getAll()) {
       mapConf.put(e._1(), e._2());
+      LOG.debug("Remote Driver configured with: " + e._1() + "=" + e._2());
     }
 
     String secret = mapConf.get(SparkClientFactory.CONF_KEY_SECRET);

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java?rev=1653432&r1=1653431&r2=1653432&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java Wed Jan 21 03:57:59 2015
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Map;
 
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.spark.client.rpc.RpcServer;
 import org.apache.spark.SparkException;
 
@@ -67,12 +68,13 @@ public final class SparkClientFactory {
   /**
    * Instantiates a new Spark client.
    *
-   * @param conf Configuration for the remote Spark application.
+   * @param sparkConf Configuration for the remote Spark application, contains spark.* properties.
+   * @param hiveConf Configuration for Hive, contains hive.* properties.
    */
-  public static synchronized SparkClient createClient(Map<String, String> conf)
+  public static synchronized SparkClient createClient(Map<String, String> sparkConf, HiveConf hiveConf)
       throws IOException, SparkException {
     Preconditions.checkState(server != null, "initialize() not called.");
-    return new SparkClientImpl(server, conf);
+    return new SparkClientImpl(server, sparkConf, hiveConf);
   }
 
 }

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1653432&r1=1653431&r2=1653432&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Wed Jan 21 03:57:59 2015
@@ -40,7 +40,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.spark.client.rpc.Rpc;
+import org.apache.hive.spark.client.rpc.RpcConfiguration;
 import org.apache.hive.spark.client.rpc.RpcServer;
 import org.apache.spark.SparkContext;
 import org.apache.spark.SparkException;
@@ -67,6 +69,7 @@ class SparkClientImpl implements SparkCl
   private static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
 
   private final Map<String, String> conf;
+  private final HiveConf hiveConf;
   private final AtomicInteger childIdGenerator;
   private final Thread driverThread;
   private final Map<String, JobHandleImpl<?>> jobs;
@@ -74,8 +77,9 @@ class SparkClientImpl implements SparkCl
   private final ClientProtocol protocol;
   private volatile boolean isAlive;
 
-  SparkClientImpl(RpcServer rpcServer, Map<String, String> conf) throws IOException, SparkException {
+  SparkClientImpl(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf) throws IOException, SparkException {
     this.conf = conf;
+    this.hiveConf = hiveConf;
     this.childIdGenerator = new AtomicInteger();
     this.jobs = Maps.newConcurrentMap();
 
@@ -335,6 +339,14 @@ class SparkClientImpl implements SparkCl
       argv.add("--remote-port");
       argv.add(serverPort);
 
+      //hive.spark.* keys are passed down to the RemoteDriver via --conf,
+      //as --properties-file contains the spark.* keys that are meant for SparkConf object.
+      for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) {
+        String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey);
+        argv.add("--conf");
+        argv.add(String.format("%s=%s", hiveSparkConfKey, value));
+      }
+
       LOG.debug("Running client driver with argv: {}", Joiner.on(" ").join(argv));
 
       ProcessBuilder pb = new ProcessBuilder(argv.toArray(new String[argv.size()]));

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java?rev=1653432&r1=1653431&r2=1653432&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java Wed Jan 21 03:57:59 2015
@@ -84,7 +84,7 @@ public class Rpc implements Closeable {
       final String secret,
       final RpcDispatcher dispatcher) throws Exception {
     final RpcConfiguration rpcConf = new RpcConfiguration(config);
-    int connectTimeoutMs = rpcConf.getConnectTimeoutMs();
+    int connectTimeoutMs = (int) rpcConf.getConnectTimeoutMs();
 
     final ChannelFuture cf = new Bootstrap()
         .group(eloop)

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java?rev=1653432&r1=1653431&r2=1653432&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java Wed Jan 21 03:57:59 2015
@@ -21,9 +21,14 @@ import java.io.IOException;
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
+import java.util.Arrays;
 import java.util.Enumeration;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,63 +42,49 @@ public final class RpcConfiguration {
 
   private static final Logger LOG = LoggerFactory.getLogger(RpcConfiguration.class);
 
-  /** Connection timeout for RPC clients. */
-  public static final String CONNECT_TIMEOUT_MS_KEY = "hive.spark.client.connect.timeout.ms";
-  private static final int CONNECT_TIMEOUT_MS_DEFAULT = 1000;
+  public static final ImmutableSet<String> HIVE_SPARK_RSC_CONFIGS = ImmutableSet.of(
+    HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,
+    HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname,
+    HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname,
+    HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname,
+    HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname,
+    HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname
+  );
+  public static final ImmutableSet<String> HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of(
+    HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,
+    HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname
+  );
 
-  /**
-   * How long the server should wait for clients to connect back after they're
-   * registered. Also used to time out the client waiting for the server to
-   * reply to its "hello" message.
-   */
-  public static final String SERVER_CONNECT_TIMEOUT_MS_KEY = "hive.spark.client.server.connect.timeout.ms";
-  private static final long SERVER_CONNECT_TIMEOUT_MS_DEFAULT = 10000L;
-
-  /**
-   * Number of bits of randomness in the generated client secrets. Rounded down
-   * to the nearest multiple of 8.
-   */
-  public static final String SECRET_RANDOM_BITS_KEY = "hive.spark.client.secret.bits";
-  private static final int SECRET_RANDOM_BITS_DEFAULT = 256;
-
-  /** Hostname or IP address to advertise for the server. */
   public static final String SERVER_LISTEN_ADDRESS_KEY = "hive.spark.client.server.address";
 
-  /** Maximum number of threads to use for the RPC event loop. */
-  public static final String RPC_MAX_THREADS_KEY = "hive.spark.client.rpc.threads";
-  public static final int RPC_MAX_THREADS_DEFAULT = 8;
-
-  /** Maximum message size. Default = 10MB. */
-  public static final String RPC_MAX_MESSAGE_SIZE_KEY = "hive.spark.client.rpc.max.size";
-  public static final int RPC_MAX_MESSAGE_SIZE_DEFAULT = 50 * 1024 * 1024;
-
-  /** Channel logging level. */
-  public static final String RPC_CHANNEL_LOG_LEVEL_KEY = "hive.spark.client.channel.log.level";
-
   private final Map<String, String> config;
 
+  private static final HiveConf DEFAULT_CONF = new HiveConf();
+
   public RpcConfiguration(Map<String, String> config) {
     this.config = config;
   }
 
-  int getConnectTimeoutMs() {
-    String value = config.get(CONNECT_TIMEOUT_MS_KEY);
-    return value != null ? Integer.parseInt(value) : CONNECT_TIMEOUT_MS_DEFAULT;
+  long getConnectTimeoutMs() {
+    String value = config.get(HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname);
+    return value != null ? Integer.parseInt(value) : DEFAULT_CONF.getTimeVar(
+      HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS);
   }
 
   int getMaxMessageSize() {
-    String value = config.get(RPC_MAX_MESSAGE_SIZE_KEY);
-    return value != null ? Integer.parseInt(value) : RPC_MAX_MESSAGE_SIZE_DEFAULT;
+    String value = config.get(HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname);
+    return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.defaultIntVal;
   }
 
   long getServerConnectTimeoutMs() {
-    String value = config.get(SERVER_CONNECT_TIMEOUT_MS_KEY);
-    return value != null ? Long.parseLong(value) : SERVER_CONNECT_TIMEOUT_MS_DEFAULT;
+    String value = config.get(HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname);
+    return value != null ? Long.parseLong(value) : DEFAULT_CONF.getTimeVar(
+      HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT, TimeUnit.MILLISECONDS);
   }
 
   int getSecretBits() {
-    String value = config.get(SECRET_RANDOM_BITS_KEY);
-    return value != null ? Integer.parseInt(value) : SECRET_RANDOM_BITS_DEFAULT;
+    String value = config.get(HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname);
+    return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.defaultIntVal;
   }
 
   String getServerAddress() throws IOException {
@@ -133,12 +124,28 @@ public final class RpcConfiguration {
   }
 
   String getRpcChannelLogLevel() {
-    return config.get(RPC_CHANNEL_LOG_LEVEL_KEY);
+    return config.get(HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname);
   }
 
   public int getRpcThreadCount() {
-    String value = config.get(RPC_MAX_THREADS_KEY);
-    return value != null ? Integer.parseInt(value) : RPC_MAX_THREADS_DEFAULT;
+    String value = config.get(HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname);
+    return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.defaultIntVal;
   }
 
+
+  /**
+   * Utility method for a given RpcConfiguration key, to convert value to millisecond if it is a time value,
+   * and return as string in either case.
+   * @param conf hive configuration
+   * @param key Rpc configuration to lookup (hive.spark.*)
+   * @return string form of the value
+   */
+  public static String getValue(HiveConf conf, String key) {
+    if (HIVE_SPARK_TIME_CONFIGS.contains(key)) {
+      HiveConf.ConfVars confVar = HiveConf.getConfVars(key);
+      return String.valueOf(conf.getTimeVar(confVar, TimeUnit.MILLISECONDS));
+    } else {
+      return conf.get(key);
+    }
+  }
 }

Modified: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java?rev=1653432&r1=1653431&r2=1653432&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java (original)
+++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java Wed Jan 21 03:57:59 2015
@@ -35,6 +35,7 @@ import java.util.zip.ZipEntry;
 import com.google.common.base.Objects;
 import com.google.common.base.Strings;
 import com.google.common.io.ByteStreams;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.spark.counter.SparkCounters;
 import org.apache.spark.SparkException;
 import org.apache.spark.SparkFiles;
@@ -50,6 +51,7 @@ public class TestSparkClient {
 
   // Timeouts are bad... mmmkay.
   private static final long TIMEOUT = 10;
+  private static final HiveConf HIVECONF = new HiveConf();
 
   private Map<String, String> createConf(boolean local) {
     Map<String, String> conf = new HashMap<String, String>();
@@ -269,7 +271,7 @@ public class TestSparkClient {
     SparkClient client = null;
     try {
       test.config(conf);
-      client = SparkClientFactory.createClient(conf);
+      client = SparkClientFactory.createClient(conf, HIVECONF);
       test.call(client);
     } finally {
       if (client != null) {

Modified: hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java?rev=1653432&r1=1653431&r2=1653432&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java (original)
+++ hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java Wed Jan 21 03:57:59 2015
@@ -32,6 +32,7 @@ import io.netty.channel.embedded.Embedde
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.Future;
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +50,7 @@ public class TestRpc {
 
   private Collection<Closeable> closeables;
   private Map<String, String> emptyConfig =
-      ImmutableMap.of(RpcConfiguration.RPC_CHANNEL_LOG_LEVEL_KEY, "DEBUG");
+      ImmutableMap.of(HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, "DEBUG");
 
   @Before
   public void setUp() {