You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/05/04 21:42:42 UTC

hive git commit: HIVE-17838: Make org.apache.hive.spark.client.rpc logging HoS specific and other logging cleanup (Sahil Takiar, reviewed by Aihua Xu)

Repository: hive
Updated Branches:
  refs/heads/master 8158f8848 -> 52f1b2471


HIVE-17838: Make org.apache.hive.spark.client.rpc logging HoS specific and other logging cleanup (Sahil Takiar, reviewed by Aihua Xu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/52f1b247
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/52f1b247
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/52f1b247

Branch: refs/heads/master
Commit: 52f1b2471545a797856e4b9b1ae0a36cb4233c18
Parents: 8158f88
Author: Sahil Takiar <ta...@gmail.com>
Authored: Fri May 4 14:32:50 2018 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Fri May 4 14:32:50 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |  8 +--
 .../ql/exec/spark/HiveSparkClientFactory.java   |  4 +-
 .../hadoop/hive/ql/exec/spark/SparkTask.java    |  6 +--
 .../ql/exec/spark/session/SparkSessionImpl.java | 28 ++++++-----
 .../spark/SetSparkReducerParallelism.java       |  4 +-
 .../apache/hive/spark/client/BaseProtocol.java  | 53 ++++++++++++++++++++
 .../apache/hive/spark/client/RemoteDriver.java  | 39 ++++++++------
 .../hive/spark/client/SparkClientFactory.java   |  4 +-
 .../hive/spark/client/SparkClientImpl.java      | 44 ++++++++++------
 .../hive/spark/client/metrics/InputMetrics.java |  6 +++
 .../hive/spark/client/metrics/Metrics.java      | 18 +++++++
 .../client/metrics/ShuffleReadMetrics.java      |  9 ++++
 .../client/metrics/ShuffleWriteMetrics.java     |  7 +++
 .../hive/spark/client/rpc/KryoMessageCodec.java |  4 +-
 .../org/apache/hive/spark/client/rpc/Rpc.java   | 23 ++++++---
 .../hive/spark/client/rpc/RpcConfiguration.java |  9 ++--
 .../hive/spark/client/rpc/RpcDispatcher.java    | 20 ++++----
 .../apache/hive/spark/client/rpc/RpcServer.java | 18 ++++---
 18 files changed, 216 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 99df967..8baf309 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -495,8 +495,8 @@ public enum ErrorMsg {
   FILE_NOT_FOUND(20012, "File not found: {0}", "64000", true),
   WRONG_FILE_FORMAT(20013, "Wrong file format. Please check the file's format.", "64000", true),
 
-  SPARK_CREATE_CLIENT_INVALID_QUEUE(20014, "Spark job is submitted to an invalid queue: {0}."
-      + " Please fix and try again.", true),
+  SPARK_CREATE_CLIENT_INVALID_QUEUE(20014, "Spark app for session {0} was submitted to an invalid" +
+          " queue: {1}. Please fix and try again.", true),
   SPARK_RUNTIME_OOM(20015, "Spark job failed because of out of memory."),
 
   // An exception from runtime that will show the full stack to client
@@ -574,13 +574,13 @@ public enum ErrorMsg {
   SPARK_CREATE_CLIENT_TIMEOUT(30038,
       "Timed out while creating Spark client for session {0}.", true),
   SPARK_CREATE_CLIENT_QUEUE_FULL(30039,
-      "Failed to create Spark client because job queue is full: {0}.", true),
+      "Failed to create Spark client for session {0} because job queue is full: {1}.", true),
   SPARK_CREATE_CLIENT_INTERRUPTED(30040,
       "Interrupted while creating Spark client for session {0}", true),
   SPARK_CREATE_CLIENT_ERROR(30041,
       "Failed to create Spark client for Spark session {0}: {1}", true),
   SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST(30042,
-      "Failed to create Spark client due to invalid resource request: {0}", true),
+      "Failed to create Spark client for session {0} due to invalid resource request: {1}", true),
   SPARK_CREATE_CLIENT_CLOSED_SESSION(30043,
       "Cannot create Spark client on a closed session {0}", true),
 

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 565c43b..5ed5d42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -104,7 +104,7 @@ public class HiveSparkClientFactory {
       inputStream = HiveSparkClientFactory.class.getClassLoader()
         .getResourceAsStream(SPARK_DEFAULT_CONF_FILE);
       if (inputStream != null) {
-        LOG.info("loading spark properties from: " + SPARK_DEFAULT_CONF_FILE);
+        LOG.info("Loading Spark properties from: " + SPARK_DEFAULT_CONF_FILE);
         Properties properties = new Properties();
         properties.load(new InputStreamReader(inputStream, CharsetNames.UTF_8));
         for (String propertyName : properties.stringPropertyNames()) {
@@ -118,7 +118,7 @@ public class HiveSparkClientFactory {
         }
       }
     } catch (IOException e) {
-      LOG.info("Failed to open spark configuration file: "
+      LOG.info("Failed to open Spark configuration file: "
         + SPARK_DEFAULT_CONF_FILE, e);
     } finally {
       if (inputStream != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index bfa2da6..8038771 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -130,7 +130,7 @@ public class SparkTask extends Task<SparkWork> {
       if (driverContext.isShutdown()) {
         LOG.warn("Killing Spark job");
         killJob();
-        throw new HiveException("Operation is cancelled.");
+        throw new HiveException(String.format("Spark task %s cancelled for query %s", getId(), sparkWork.getQueryId()));
       }
 
       // Get the Job Handle id associated with the Spark job
@@ -176,7 +176,7 @@ public class SparkTask extends Task<SparkWork> {
                 ? "UNKNOWN" : jobID));
         killJob();
       } else if (rc == 4) {
-        LOG.info("The spark job or one stage of it has too many tasks" +
+        LOG.info("The Spark job or one stage of it has too many tasks" +
             ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID );
         killJob();
       }
@@ -186,7 +186,7 @@ public class SparkTask extends Task<SparkWork> {
       }
       sparkJobStatus.cleanup();
     } catch (Exception e) {
-      String msg = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e) + "'";
+      String msg = "Failed to execute Spark task " + getId() + ", with exception '" + Utilities.getNameMessage(e) + "'";
 
       // Has to use full name to make sure it does not conflict with
       // org.apache.commons.lang.StringUtils

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index 189de19..c8cb1ac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -79,7 +79,7 @@ public class SparkSessionImpl implements SparkSession {
 
   @Override
   public void open(HiveConf conf) throws HiveException {
-    LOG.info("Trying to open Spark session {}", sessionId);
+    LOG.info("Trying to open Hive on Spark session {}", sessionId);
     this.conf = conf;
     isOpen = true;
     try {
@@ -94,12 +94,12 @@ public class SparkSessionImpl implements SparkSession {
       }
       throw he;
     }
-    LOG.info("Spark session {} is successfully opened", sessionId);
+    LOG.info("Hive on Spark session {} successfully opened", sessionId);
   }
 
   @Override
   public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception {
-    Preconditions.checkState(isOpen, "Session is not open. Can't submit jobs.");
+    Preconditions.checkState(isOpen, "Hive on Spark session is not open. Can't submit jobs.");
     return hiveSparkClient.execute(driverContext, sparkWork);
   }
 
@@ -129,9 +129,9 @@ public class SparkSessionImpl implements SparkSession {
     totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1);
 
     long memoryPerTaskInBytes = totalMemory / totalCores;
-    LOG.info("Spark cluster current has executors: " + numExecutors
+    LOG.info("Hive on Spark application currently has number of executors: " + numExecutors
         + ", total cores: " + totalCores + ", memory per executor: "
-        + executorMemoryInMB + "M, memoryFraction: " + memoryFraction);
+        + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction);
     return new ObjectPair<Long, Integer>(Long.valueOf(memoryPerTaskInBytes),
         Integer.valueOf(totalCores));
   }
@@ -153,15 +153,15 @@ public class SparkSessionImpl implements SparkSession {
 
   @Override
   public void close() {
-    LOG.info("Trying to close Spark session {}", sessionId);
+    LOG.info("Trying to close Hive on Spark session {}", sessionId);
     isOpen = false;
     if (hiveSparkClient != null) {
       try {
         hiveSparkClient.close();
-        LOG.info("Spark session {} is successfully closed", sessionId);
+        LOG.info("Hive on Spark session {} successfully closed", sessionId);
         cleanScratchDir();
       } catch (IOException e) {
-        LOG.error("Failed to close spark session (" + sessionId + ").", e);
+        LOG.error("Failed to close Hive on Spark session (" + sessionId + ")", e);
       }
     }
     hiveSparkClient = null;
@@ -197,20 +197,22 @@ public class SparkSessionImpl implements SparkSession {
     StringBuilder matchedString = new StringBuilder();
     while (e != null) {
       if (e instanceof TimeoutException) {
-        return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT);
+        return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT, sessionId);
       } else if (e instanceof InterruptedException) {
         return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INTERRUPTED, sessionId);
       } else if (e instanceof RuntimeException) {
         String sts = Throwables.getStackTraceAsString(e);
         if (matches(sts, AM_TIMEOUT_ERR, matchedString)) {
-          return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT);
+          return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT, sessionId);
         } else if (matches(sts, UNKNOWN_QUEUE_ERR, matchedString) || matches(sts, STOPPED_QUEUE_ERR, matchedString)) {
-          return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, matchedString.toString());
+          return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, sessionId,
+                  matchedString.toString());
         } else if (matches(sts, FULL_QUEUE_ERR, matchedString)) {
-          return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_QUEUE_FULL, matchedString.toString());
+          return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_QUEUE_FULL, sessionId,
+                  matchedString.toString());
         } else if (matches(sts, INVALILD_MEM_ERR, matchedString) || matches(sts, INVALID_CORE_ERR, matchedString)) {
           return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST,
-              matchedString.toString());
+              sessionId, matchedString.toString());
         } else {
           return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, Throwables.getRootCause(e).getMessage());
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
index eecb103..ab87c79 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
@@ -264,9 +264,9 @@ public class SetSparkReducerParallelism implements NodeProcessor {
           context.getConf(), sparkSessionManager);
       sparkMemoryAndCores = sparkSession.getMemoryAndCores();
     } catch (HiveException e) {
-      throw new SemanticException("Failed to get a spark session: " + e);
+      throw new SemanticException("Failed to get a Hive on Spark session", e);
     } catch (Exception e) {
-      LOG.warn("Failed to get spark memory/core info", e);
+      LOG.warn("Failed to get spark memory/core info, reducer parallelism may be inaccurate", e);
     } finally {
       if (sparkSession != null && sparkSessionManager != null) {
         try {

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
index 6a988a4..558ed80 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
@@ -38,10 +38,20 @@ public abstract class BaseProtocol extends RpcDispatcher {
       this(null);
     }
 
+    @Override
+    public String toString() {
+      return "CancelJob{" +
+              "id='" + id + '\'' +
+              '}';
+    }
   }
 
   protected static class EndSession implements Serializable {
 
+    @Override
+    public String toString() {
+      return "EndSession";
+    }
   }
 
   protected static class Error implements Serializable {
@@ -56,6 +66,12 @@ public abstract class BaseProtocol extends RpcDispatcher {
       this(null);
     }
 
+    @Override
+    public String toString() {
+      return "Error{" +
+              "cause='" + cause + '\'' +
+              '}';
+    }
   }
 
   protected static class JobMetrics implements Serializable {
@@ -78,6 +94,16 @@ public abstract class BaseProtocol extends RpcDispatcher {
       this(null, -1, -1, -1, null);
     }
 
+    @Override
+    public String toString() {
+      return "JobMetrics{" +
+              "jobId='" + jobId + '\'' +
+              ", sparkJobId=" + sparkJobId +
+              ", stageId=" + stageId +
+              ", taskId=" + taskId +
+              ", metrics=" + metrics +
+              '}';
+    }
   }
 
   protected static class JobRequest<T extends Serializable> implements Serializable {
@@ -94,6 +120,13 @@ public abstract class BaseProtocol extends RpcDispatcher {
       this(null, null);
     }
 
+    @Override
+    public String toString() {
+      return "JobRequest{" +
+              "id='" + id + '\'' +
+              ", job=" + job +
+              '}';
+    }
   }
 
   public static class JobResult<T extends Serializable> implements Serializable {
@@ -137,6 +170,12 @@ public abstract class BaseProtocol extends RpcDispatcher {
       this(null);
     }
 
+    @Override
+    public String toString() {
+      return "JobStarted{" +
+              "id='" + id + '\'' +
+              '}';
+    }
   }
 
   /**
@@ -154,6 +193,14 @@ public abstract class BaseProtocol extends RpcDispatcher {
     JobSubmitted() {
       this(null, -1);
     }
+
+    @Override
+    public String toString() {
+      return "JobSubmitted{" +
+              "clientJobId='" + clientJobId + '\'' +
+              ", sparkJobId=" + sparkJobId +
+              '}';
+    }
   }
 
   protected static class SyncJobRequest<T extends Serializable> implements Serializable {
@@ -168,5 +215,11 @@ public abstract class BaseProtocol extends RpcDispatcher {
       this(null);
     }
 
+    @Override
+    public String toString() {
+      return "SyncJobRequest{" +
+              "job=" + job +
+              '}';
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
index 6e546d4..caa850c 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
@@ -25,8 +25,6 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.net.InetAddress;
-import java.net.URI;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -118,17 +116,18 @@ public class RemoteDriver {
         // as these are non-spark specific configs used by the remote driver
         mapConf.put(val[0], val[1]);
       } else {
-        throw new IllegalArgumentException("Invalid command line: " + Joiner.on(" ").join(args));
+        throw new IllegalArgumentException("Invalid command line arguments: "
+          + Joiner.on(" ").join(args));
       }
     }
 
     executor = Executors.newCachedThreadPool();
 
-    LOG.info("Connecting to: {}:{}", serverAddress, serverPort);
+    LOG.info("Connecting to HiveServer2 address: {}:{}", serverAddress, serverPort);
 
     for (Tuple2<String, String> e : conf.getAll()) {
       mapConf.put(e._1(), e._2());
-      LOG.debug("Remote Driver configured with: " + e._1() + "=" + e._2());
+      LOG.debug("Remote Spark Driver configured with: " + e._1() + "=" + e._2());
     }
 
     String clientId = mapConf.get(SparkClientFactory.CONF_CLIENT_ID);
@@ -140,7 +139,7 @@ public class RemoteDriver {
     this.egroup = new NioEventLoopGroup(
         threadCount,
         new ThreadFactoryBuilder()
-            .setNameFormat("Driver-RPC-Handler-%d")
+            .setNameFormat("Spark-Driver-RPC-Handler-%d")
             .setDaemon(true)
             .build());
     this.protocol = new DriverProtocol();
@@ -153,9 +152,14 @@ public class RemoteDriver {
     this.clientRpc.addListener(new Rpc.Listener() {
       @Override
       public void rpcClosed(Rpc rpc) {
-        LOG.warn("Shutting down driver because RPC channel was closed.");
+        LOG.warn("Shutting down driver because Remote Spark Driver to HiveServer2 connection was closed.");
         shutdown(null);
       }
+
+      @Override
+      public String toString() {
+        return "Shutting Down Remote Spark Driver to HiveServer2 Connection";
+      }
     });
 
     try {
@@ -211,7 +215,7 @@ public class RemoteDriver {
       if (jc != null) {
         job.submit();
       } else {
-        LOG.info("SparkContext not yet up, queueing job request.");
+        LOG.info("SparkContext not yet up; adding Hive on Spark job request to the queue.");
         jobQueue.add(job);
       }
     }
@@ -220,9 +224,9 @@ public class RemoteDriver {
   private synchronized void shutdown(Throwable error) {
     if (running) {
       if (error == null) {
-        LOG.info("Shutting down remote driver.");
+        LOG.info("Shutting down Spark Remote Driver.");
       } else {
-        LOG.error("Shutting down remote driver due to error: " + error, error);
+        LOG.error("Shutting down Spark Remote Driver due to error: " + error, error);
       }
       running = false;
       for (JobWrapper<?> job : activeJobs.values()) {
@@ -253,7 +257,7 @@ public class RemoteDriver {
   private String getArg(String[] args, int keyIdx) {
     int valIdx = keyIdx + 1;
     if (args.length <= valIdx) {
-      throw new IllegalArgumentException("Invalid command line: "
+      throw new IllegalArgumentException("Invalid command line arguments: "
         + Joiner.on(" ").join(args));
     }
     return args[valIdx];
@@ -294,7 +298,7 @@ public class RemoteDriver {
     private void handle(ChannelHandlerContext ctx, CancelJob msg) {
       JobWrapper<?> job = activeJobs.get(msg.id);
       if (job == null || !cancelJob(job)) {
-        LOG.info("Requested to cancel an already finished job.");
+        LOG.info("Requested to cancel an already finished client job.");
       }
     }
 
@@ -304,7 +308,7 @@ public class RemoteDriver {
     }
 
     private void handle(ChannelHandlerContext ctx, JobRequest msg) {
-      LOG.info("Received job request {}", msg.id);
+      LOG.debug("Received client job request {}", msg.id);
       JobWrapper<?> wrapper = new JobWrapper<Serializable>(msg);
       activeJobs.put(msg.id, wrapper);
       submit(wrapper);
@@ -318,7 +322,7 @@ public class RemoteDriver {
           while (jc == null) {
             jcLock.wait();
             if (!running) {
-              throw new IllegalStateException("Remote context is shutting down.");
+              throw new IllegalStateException("Remote Spark context is shutting down.");
             }
           }
         }
@@ -339,6 +343,10 @@ public class RemoteDriver {
       }
     }
 
+    @Override
+    public String name() {
+      return "Remote Spark Driver to HiveServer2 Connection";
+    }
   }
 
   private class JobWrapper<T extends Serializable> implements Callable<Void> {
@@ -404,12 +412,13 @@ public class RemoteDriver {
         if (sparkCounters != null) {
           counters = sparkCounters.snapshot();
         }
+
         protocol.jobFinished(req.id, result, null, counters);
       } catch (Throwable t) {
         // Catch throwables in a best-effort to report job status back to the client. It's
         // re-thrown so that the executor can destroy the affected thread (or the JVM can
         // die or whatever would happen if the throwable bubbled up).
-        LOG.error("Failed to run job " + req.id, t);
+        LOG.error("Failed to run client job " + req.id, t);
         protocol.jobFinished(req.id, null, t,
             sparkCounters != null ? sparkCounters.snapshot() : null);
         throw new ExecutionException(t);

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
index fd9b725..88b5c95 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
@@ -84,8 +84,8 @@ public final class SparkClientFactory {
   public static SparkClient createClient(Map<String, String> sparkConf, HiveConf hiveConf,
                                          String sessionId)
           throws IOException, SparkException {
-    Preconditions.checkState(server != null, "initialize() not called.");
+    Preconditions.checkState(server != null,
+            "Invalid state: Hive on Spark RPC Server has not been initialized");
     return new SparkClientImpl(server, sparkConf, hiveConf, sessionId);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index d450515..f8b5d19 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -43,6 +43,7 @@ import java.io.Writer;
 import java.net.URI;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -103,17 +104,16 @@ class SparkClientImpl implements SparkClient {
       // The RPC server will take care of timeouts here.
       this.driverRpc = rpcServer.registerClient(sessionid, secret, protocol).get();
     } catch (Throwable e) {
-      String errorMsg = null;
+      String errorMsg;
       if (e.getCause() instanceof TimeoutException) {
-        errorMsg = "Timed out waiting for client to connect.\nPossible reasons include network " +
-            "issues, errors in remote driver or the cluster has no available resources, etc." +
+        errorMsg = "Timed out waiting for Remote Spark Driver to connect to HiveServer2.\nPossible reasons " +
+            "include network issues, errors in remote driver, cluster has no available resources, etc." +
             "\nPlease check YARN or Spark driver's logs for further information.";
       } else if (e.getCause() instanceof InterruptedException) {
-        errorMsg = "Interruption occurred while waiting for client to connect.\nPossibly the Spark session is closed " +
-            "such as in case of query cancellation." +
-            "\nPlease refer to HiveServer2 logs for further information.";
+        errorMsg = "Interrupted while waiting for Remote Spark Driver to connect to HiveServer2.\nIt is possible " +
+            "that the query was cancelled which would cause the Spark Session to close.";
       } else {
-        errorMsg = "Error while waiting for client to connect.";
+        errorMsg = "Error while waiting for Remote Spark Driver to connect back to HiveServer2.";
       }
       LOG.error(errorMsg, e);
       driverThread.interrupt();
@@ -126,14 +126,21 @@ class SparkClientImpl implements SparkClient {
       throw Throwables.propagate(e);
     }
 
+    LOG.info("Successfully connected to Remote Spark Driver at: " + this.driverRpc.getRemoteAddress());
+
     driverRpc.addListener(new Rpc.Listener() {
         @Override
         public void rpcClosed(Rpc rpc) {
           if (isAlive) {
-            LOG.warn("Client RPC channel closed unexpectedly.");
+            LOG.warn("Connection to Remote Spark Driver {} closed unexpectedly", driverRpc.getRemoteAddress());
             isAlive = false;
           }
         }
+
+        @Override
+        public String toString() {
+          return "Connection to Remote Spark Driver Closed Unexpectedly";
+        }
     });
     isAlive = true;
   }
@@ -256,7 +263,7 @@ class SparkClientImpl implements SparkClient {
     try {
       URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf");
       if (sparkDefaultsUrl != null) {
-        LOG.info("Loading spark defaults: " + sparkDefaultsUrl);
+        LOG.info("Loading spark defaults configs from: " + sparkDefaultsUrl);
         allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl)));
       }
     } catch (Exception e) {
@@ -574,7 +581,7 @@ class SparkClientImpl implements SparkClient {
     }
 
     private void handle(ChannelHandlerContext ctx, Error msg) {
-      LOG.warn("Error reported from remote driver: {}", msg.cause);
+      LOG.warn("Error reported from Remote Spark Driver: {}", msg.cause);
     }
 
     private void handle(ChannelHandlerContext ctx, JobMetrics msg) {
@@ -582,14 +589,14 @@ class SparkClientImpl implements SparkClient {
       if (handle != null) {
         handle.getMetrics().addMetrics(msg.sparkJobId, msg.stageId, msg.taskId, msg.metrics);
       } else {
-        LOG.warn("Received metrics for unknown job {}", msg.jobId);
+        LOG.warn("Received metrics for unknown Spark job {}", msg.sparkJobId);
       }
     }
 
     private void handle(ChannelHandlerContext ctx, JobResult msg) {
       JobHandleImpl<?> handle = jobs.remove(msg.id);
       if (handle != null) {
-        LOG.info("Received result for {}", msg.id);
+        LOG.debug("Received result for client job {}", msg.id);
         handle.setSparkCounters(msg.sparkCounters);
         Throwable error = msg.error;
         if (error == null) {
@@ -598,7 +605,7 @@ class SparkClientImpl implements SparkClient {
           handle.setFailure(error);
         }
       } else {
-        LOG.warn("Received result for unknown job {}", msg.id);
+        LOG.warn("Received result for unknown client job {}", msg.id);
       }
     }
 
@@ -607,19 +614,24 @@ class SparkClientImpl implements SparkClient {
       if (handle != null) {
         handle.changeState(JobHandle.State.STARTED);
       } else {
-        LOG.warn("Received event for unknown job {}", msg.id);
+        LOG.warn("Received event for unknown client job {}", msg.id);
       }
     }
 
     private void handle(ChannelHandlerContext ctx, JobSubmitted msg) {
       JobHandleImpl<?> handle = jobs.get(msg.clientJobId);
       if (handle != null) {
-        LOG.info("Received spark job ID: {} for {}", msg.sparkJobId, msg.clientJobId);
+        LOG.info("Received Spark job ID: {} for client job {}", msg.sparkJobId, msg.clientJobId);
         handle.addSparkJobId(msg.sparkJobId);
       } else {
-        LOG.warn("Received spark job ID: {} for unknown job {}", msg.sparkJobId, msg.clientJobId);
+        LOG.warn("Received Spark job ID: {} for unknown client job {}", msg.sparkJobId, msg.clientJobId);
       }
     }
+
+    @Override
+    protected String name() {
+      return "HiveServer2 to Remote Spark Driver Connection";
+    }
   }
 
   private static class AddJarJob implements Job<Serializable> {

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
index f137007..6a13071 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
@@ -44,4 +44,10 @@ public class InputMetrics implements Serializable {
     this(metrics.inputMetrics().bytesRead());
   }
 
+  @Override
+  public String toString() {
+    return "InputMetrics{" +
+            "bytesRead=" + bytesRead +
+            '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
index b718b3b..cf7a1f6 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
@@ -127,4 +127,22 @@ public class Metrics implements Serializable {
     return (metrics.shuffleWriteMetrics() != null) ? new ShuffleWriteMetrics(metrics) : null;
   }
 
+  @Override
+  public String toString() {
+    return "Metrics{" +
+            "executorDeserializeTime=" + executorDeserializeTime +
+            ", executorDeserializeCpuTime=" + executorDeserializeCpuTime +
+            ", executorRunTime=" + executorRunTime +
+            ", executorCpuTime=" + executorCpuTime +
+            ", resultSize=" + resultSize +
+            ", jvmGCTime=" + jvmGCTime +
+            ", resultSerializationTime=" + resultSerializationTime +
+            ", memoryBytesSpilled=" + memoryBytesSpilled +
+            ", diskBytesSpilled=" + diskBytesSpilled +
+            ", taskDurationTime=" + taskDurationTime +
+            ", inputMetrics=" + inputMetrics +
+            ", shuffleReadMetrics=" + shuffleReadMetrics +
+            ", shuffleWriteMetrics=" + shuffleWriteMetrics +
+            '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
index 9ff4d0f..e3d564f 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
@@ -73,4 +73,13 @@ public class ShuffleReadMetrics implements Serializable {
     return remoteBlocksFetched + localBlocksFetched;
   }
 
+  @Override
+  public String toString() {
+    return "ShuffleReadMetrics{" +
+            "remoteBlocksFetched=" + remoteBlocksFetched +
+            ", localBlocksFetched=" + localBlocksFetched +
+            ", fetchWaitTime=" + fetchWaitTime +
+            ", remoteBytesRead=" + remoteBytesRead +
+            '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
index 64a4b86..e9cf6a1 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
@@ -51,4 +51,11 @@ public class ShuffleWriteMetrics implements Serializable {
       metrics.shuffleWriteMetrics().shuffleWriteTime());
   }
 
+  @Override
+  public String toString() {
+    return "ShuffleWriteMetrics{" +
+            "shuffleBytesWritten=" + shuffleBytesWritten +
+            ", shuffleWriteTime=" + shuffleWriteTime +
+            '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
index 5454ec2..d3a6812 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
@@ -101,7 +101,7 @@ class KryoMessageCodec extends ByteToMessageCodec<Object> {
       Input kryoIn = new Input(new ByteBufferInputStream(nioBuffer));
 
       Object msg = kryos.get().readClassAndObject(kryoIn);
-      LOG.debug("Decoded message of type {} ({} bytes)",
+      LOG.trace("Decoded message of type {} ({} bytes)",
           msg != null ? msg.getClass().getName() : msg, msgSize);
       out.add(msg);
     } finally {
@@ -118,7 +118,7 @@ class KryoMessageCodec extends ByteToMessageCodec<Object> {
     kryoOut.flush();
 
     byte[] msgData = maybeEncrypt(bytes.toByteArray());
-    LOG.debug("Encoded message of type {} ({} bytes)", msg.getClass().getName(), msgData.length);
+    LOG.trace("Encoded message of type {} ({} bytes)", msg.getClass().getName(), msgData.length);
     checkSize(msgData.length);
 
     buf.ensureWritable(msgData.length + 4);

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
index cbbfb1c..298a210 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
@@ -19,6 +19,7 @@ package org.apache.hive.spark.client.rpc;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -116,7 +117,7 @@ public class Rpc implements Closeable {
     final Runnable timeoutTask = new Runnable() {
       @Override
       public void run() {
-        promise.setFailure(new TimeoutException("Timed out waiting for RPC server connection."));
+        promise.setFailure(new TimeoutException("Timed out waiting to connect to HiveServer2."));
       }
     };
     final ScheduledFuture<?> timeoutFuture = eloop.schedule(timeoutTask,
@@ -272,7 +273,8 @@ public class Rpc implements Closeable {
    */
   public <T> Future<T> call(final Object msg, Class<T> retType) {
     Preconditions.checkArgument(msg != null);
-    Preconditions.checkState(channel.isActive(), "RPC channel is closed.");
+    Preconditions.checkState(channel.isActive(), "Unable to send message " + msg +
+            " because the Remote Spark Driver - HiveServer2 connection has been closed.");
     try {
       final long id = rpcId.getAndIncrement();
       final Promise<T> promise = createPromise();
@@ -280,7 +282,8 @@ public class Rpc implements Closeable {
           @Override
           public void operationComplete(ChannelFuture cf) {
             if (!cf.isSuccess() && !promise.isDone()) {
-              LOG.warn("Failed to send RPC, closing connection.", cf.cause());
+              LOG.warn("Failed to send message '" + msg + "', closing Remote Spark Driver - " +
+                      "HiveServer2 connection.", cf.cause());
               promise.setFailure(cf.cause());
               dispatcher.discardRpc(id);
               close();
@@ -314,6 +317,14 @@ public class Rpc implements Closeable {
     return channel;
   }
 
+  /**
+   * Returns the "hostname:port" that the RPC is connected to
+   */
+  public String getRemoteAddress() {
+    InetSocketAddress remoteAddress = ((InetSocketAddress) this.channel.remoteAddress());
+    return remoteAddress.getHostName() + ":" + remoteAddress.getPort();
+  }
+
   void setDispatcher(RpcDispatcher dispatcher) {
     Preconditions.checkNotNull(dispatcher);
     Preconditions.checkState(this.dispatcher == null);
@@ -336,7 +347,7 @@ public class Rpc implements Closeable {
           try {
             l.rpcClosed(this);
           } catch (Exception e) {
-            LOG.warn("Error caught in Rpc.Listener invocation.", e);
+            LOG.warn("Error caught while running '" + l + "' listener", e);
           }
         }
       }
@@ -493,12 +504,10 @@ public class Rpc implements Closeable {
         client.evaluateChallenge(new byte[0]) : new byte[0];
       c.writeAndFlush(new SaslMessage(clientId, hello)).addListener(future -> {
         if (!future.isSuccess()) {
-          LOG.error("Failed to send hello to server", future.cause());
+          LOG.error("Failed to send test message to HiveServer2", future.cause());
           onError(future.cause());
         }
       });
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
index a535b8d..090c628 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
@@ -115,10 +115,9 @@ public final class RpcConfiguration {
    * Parses the port string like 49152-49222,49228 into the port list. A default 0
    * is added for the empty port string.
    * @return a list of configured ports.
-   * @exception IOException is thrown if the property is not configured properly
    */
-  List<Integer> getServerPorts() throws IOException {
-    String errMsg = "Incorrect RPC server port configuration for HiveServer2";
+  List<Integer> getServerPorts() {
+    String errMsg = "Malformed configuration value for " + HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname;
     String portString = config.get(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname);
     ArrayList<Integer> ports = new ArrayList<Integer>();
     try {
@@ -127,7 +126,7 @@ public final class RpcConfiguration {
           String[] range = portRange.split("-");
           if (range.length == 0 || range.length > 2
               || (range.length == 2 && Integer.valueOf(range[0]) > Integer.valueOf(range[1]))) {
-            throw new IOException(errMsg);
+            throw new IllegalArgumentException(errMsg);
           }
           if (range.length == 1) {
             ports.add(Integer.valueOf(range[0]));
@@ -143,7 +142,7 @@ public final class RpcConfiguration {
 
       return ports;
     } catch(NumberFormatException e) {
-      throw new IOException(errMsg);
+      throw new IllegalArgumentException(errMsg, e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
index 00f5a17..b588547 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
@@ -66,13 +66,12 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
   protected final void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
     if (lastHeader == null) {
       if (!(msg instanceof Rpc.MessageHeader)) {
-        LOG.warn("[{}] Expected RPC header, got {} instead.", name(),
-            msg != null ? msg.getClass().getName() : null);
-        throw new IllegalArgumentException();
+        throw new IllegalArgumentException(String.format("[%s] Expected RPC header, got %s instead.", name(),
+                msg != null ? msg.getClass().getName() : null));
       }
       lastHeader = (Rpc.MessageHeader) msg;
     } else {
-      LOG.debug("[{}] Received RPC message: type={} id={} payload={}", name(),
+      LOG.trace("[{}] Received RPC message: type={} id={} payload={}", name(),
         lastHeader.type, lastHeader.id, msg != null ? msg.getClass().getName() : null);
       try {
         switch (lastHeader.type) {
@@ -86,7 +85,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
           handleError(ctx, msg, findRpc(lastHeader.id));
           break;
         default:
-          throw new IllegalArgumentException("Unknown RPC message type: " + lastHeader.type);
+          throw new IllegalArgumentException("[" + name() + "] Unknown RPC message type: " + lastHeader.type);
         }
       } finally {
         lastHeader = null;
@@ -103,7 +102,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
       }
     }
     throw new IllegalArgumentException(String.format(
-        "Received RPC reply for unknown RPC (%d).", id));
+        "[%s] Received RPC reply for unknown RPC (%d).", name(), id));
   }
 
   private void handleCall(ChannelHandlerContext ctx, Object msg) throws Exception {
@@ -124,7 +123,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
       }
       replyType = Rpc.MessageType.REPLY;
     } catch (InvocationTargetException ite) {
-      LOG.debug(String.format("[%s] Error in RPC handler.", name()), ite.getCause());
+      LOG.error(String.format("[%s] Error in RPC handler.", name()), ite.getCause());
       replyPayload = Throwables.getStackTraceAsString(ite.getCause());
       replyType = Rpc.MessageType.ERROR;
     }
@@ -140,12 +139,12 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
   private void handleError(ChannelHandlerContext ctx, Object msg, OutstandingRpc rpc)
       throws Exception {
     if (msg instanceof String) {
-      LOG.warn("Received error message:{}.", msg);
+      LOG.error("[{}] Received error message: {}.", name(), msg);
       rpc.future.setFailure(new RpcException((String) msg));
     } else {
       String error = String.format("Received error with unexpected payload (%s).",
           msg != null ? msg.getClass().getName() : null);
-      LOG.warn(String.format("[%s] %s", name(), error));
+      LOG.error(String.format("[%s] %s", name(), error));
       rpc.future.setFailure(new IllegalArgumentException(error));
       ctx.close();
     }
@@ -178,7 +177,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
   }
 
   void registerRpc(long id, Promise promise, String type) {
-    LOG.debug("[{}] Registered outstanding rpc {} ({}).", name(), id, type);
+    LOG.trace("[{}] Registered outstanding rpc {} ({}).", name(), id, type);
     rpcs.add(new OutstandingRpc(id, promise));
   }
 
@@ -196,5 +195,4 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
       this.future = future;
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
index 6c6ab74..f1383d6 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.SecureRandom;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -84,7 +85,7 @@ public class RpcServer implements Closeable {
     this.group = new NioEventLoopGroup(
         this.config.getRpcThreadCount(),
         new ThreadFactoryBuilder()
-            .setNameFormat("RPC-Handler-%d")
+            .setNameFormat("Spark-Driver-RPC-Handler-%d")
             .setDaemon(true)
             .build());
      ServerBootstrap serverBootstrap = new ServerBootstrap()
@@ -100,7 +101,7 @@ public class RpcServer implements Closeable {
             Runnable cancelTask = new Runnable() {
                 @Override
                 public void run() {
-                  LOG.warn("Timed out waiting for hello from client.");
+                  LOG.warn("Timed out waiting for test message from Remote Spark driver.");
                   newRpc.close();
                 }
             };
@@ -117,6 +118,8 @@ public class RpcServer implements Closeable {
     this.port = ((InetSocketAddress) channel.localAddress()).getPort();
     this.pendingClients = Maps.newConcurrentMap();
     this.address = this.config.getServerAddress();
+
+    LOG.info("Successfully created Remote Spark Driver RPC Server with address {}:{}", this.address, this.port);
   }
 
   /**
@@ -143,7 +146,8 @@ public class RpcServer implements Closeable {
           // Retry the next port
         }
       }
-      throw new IOException("No available ports from configured RPC Server ports for HiveServer2");
+      throw new IOException("Remote Spark Driver RPC Server cannot bind to any of the configured ports: "
+              + Arrays.toString(config.getServerPorts().toArray()));
     }
   }
 
@@ -169,7 +173,9 @@ public class RpcServer implements Closeable {
     Runnable timeout = new Runnable() {
       @Override
       public void run() {
-        promise.setFailure(new TimeoutException("Timed out waiting for client connection."));
+        promise.setFailure(new TimeoutException(
+                String.format("Client '%s' timed out waiting for connection from the Remote Spark" +
+                        " Driver", clientId)));
       }
     };
     ScheduledFuture<?> timeoutFuture = group.schedule(timeout,
@@ -179,7 +185,7 @@ public class RpcServer implements Closeable {
         timeoutFuture);
     if (pendingClients.putIfAbsent(clientId, client) != null) {
       throw new IllegalStateException(
-          String.format("Client '%s' already registered.", clientId));
+          String.format("Remote Spark Driver with client ID '%s' already registered", clientId));
     }
 
     promise.addListener(new GenericFutureListener<Promise<Rpc>>() {
@@ -208,7 +214,7 @@ public class RpcServer implements Closeable {
     cinfo.timeoutFuture.cancel(true);
     if (!cinfo.promise.isDone()) {
       cinfo.promise.setFailure(new RuntimeException(
-          String.format("Cancel client '%s'. Error: " + msg, clientId)));
+          String.format("Cancelling Remote Spark Driver client connection '%s' with error: " + msg, clientId)));
     }
   }