You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/05/04 21:42:42 UTC
hive git commit: HIVE-17838: Make org.apache.hive.spark.client.rpc
logging HoS specific and other logging cleanup (Sahil Takiar,
reviewed by Aihua Xu)
Repository: hive
Updated Branches:
refs/heads/master 8158f8848 -> 52f1b2471
HIVE-17838: Make org.apache.hive.spark.client.rpc logging HoS specific and other logging cleanup (Sahil Takiar, reviewed by Aihua Xu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/52f1b247
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/52f1b247
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/52f1b247
Branch: refs/heads/master
Commit: 52f1b2471545a797856e4b9b1ae0a36cb4233c18
Parents: 8158f88
Author: Sahil Takiar <ta...@gmail.com>
Authored: Fri May 4 14:32:50 2018 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Fri May 4 14:32:50 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 8 +--
.../ql/exec/spark/HiveSparkClientFactory.java | 4 +-
.../hadoop/hive/ql/exec/spark/SparkTask.java | 6 +--
.../ql/exec/spark/session/SparkSessionImpl.java | 28 ++++++-----
.../spark/SetSparkReducerParallelism.java | 4 +-
.../apache/hive/spark/client/BaseProtocol.java | 53 ++++++++++++++++++++
.../apache/hive/spark/client/RemoteDriver.java | 39 ++++++++------
.../hive/spark/client/SparkClientFactory.java | 4 +-
.../hive/spark/client/SparkClientImpl.java | 44 ++++++++++------
.../hive/spark/client/metrics/InputMetrics.java | 6 +++
.../hive/spark/client/metrics/Metrics.java | 18 +++++++
.../client/metrics/ShuffleReadMetrics.java | 9 ++++
.../client/metrics/ShuffleWriteMetrics.java | 7 +++
.../hive/spark/client/rpc/KryoMessageCodec.java | 4 +-
.../org/apache/hive/spark/client/rpc/Rpc.java | 23 ++++++---
.../hive/spark/client/rpc/RpcConfiguration.java | 9 ++--
.../hive/spark/client/rpc/RpcDispatcher.java | 20 ++++----
.../apache/hive/spark/client/rpc/RpcServer.java | 18 ++++---
18 files changed, 216 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 99df967..8baf309 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -495,8 +495,8 @@ public enum ErrorMsg {
FILE_NOT_FOUND(20012, "File not found: {0}", "64000", true),
WRONG_FILE_FORMAT(20013, "Wrong file format. Please check the file's format.", "64000", true),
- SPARK_CREATE_CLIENT_INVALID_QUEUE(20014, "Spark job is submitted to an invalid queue: {0}."
- + " Please fix and try again.", true),
+ SPARK_CREATE_CLIENT_INVALID_QUEUE(20014, "Spark app for session {0} was submitted to an invalid" +
+ " queue: {1}. Please fix and try again.", true),
SPARK_RUNTIME_OOM(20015, "Spark job failed because of out of memory."),
// An exception from runtime that will show the full stack to client
@@ -574,13 +574,13 @@ public enum ErrorMsg {
SPARK_CREATE_CLIENT_TIMEOUT(30038,
"Timed out while creating Spark client for session {0}.", true),
SPARK_CREATE_CLIENT_QUEUE_FULL(30039,
- "Failed to create Spark client because job queue is full: {0}.", true),
+ "Failed to create Spark client for session {0} because job queue is full: {1}.", true),
SPARK_CREATE_CLIENT_INTERRUPTED(30040,
"Interrupted while creating Spark client for session {0}", true),
SPARK_CREATE_CLIENT_ERROR(30041,
"Failed to create Spark client for Spark session {0}: {1}", true),
SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST(30042,
- "Failed to create Spark client due to invalid resource request: {0}", true),
+ "Failed to create Spark client for session {0} due to invalid resource request: {1}", true),
SPARK_CREATE_CLIENT_CLOSED_SESSION(30043,
"Cannot create Spark client on a closed session {0}", true),
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 565c43b..5ed5d42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -104,7 +104,7 @@ public class HiveSparkClientFactory {
inputStream = HiveSparkClientFactory.class.getClassLoader()
.getResourceAsStream(SPARK_DEFAULT_CONF_FILE);
if (inputStream != null) {
- LOG.info("loading spark properties from: " + SPARK_DEFAULT_CONF_FILE);
+ LOG.info("Loading Spark properties from: " + SPARK_DEFAULT_CONF_FILE);
Properties properties = new Properties();
properties.load(new InputStreamReader(inputStream, CharsetNames.UTF_8));
for (String propertyName : properties.stringPropertyNames()) {
@@ -118,7 +118,7 @@ public class HiveSparkClientFactory {
}
}
} catch (IOException e) {
- LOG.info("Failed to open spark configuration file: "
+ LOG.info("Failed to open Spark configuration file: "
+ SPARK_DEFAULT_CONF_FILE, e);
} finally {
if (inputStream != null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index bfa2da6..8038771 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -130,7 +130,7 @@ public class SparkTask extends Task<SparkWork> {
if (driverContext.isShutdown()) {
LOG.warn("Killing Spark job");
killJob();
- throw new HiveException("Operation is cancelled.");
+ throw new HiveException(String.format("Spark task %s cancelled for query %s", getId(), sparkWork.getQueryId()));
}
// Get the Job Handle id associated with the Spark job
@@ -176,7 +176,7 @@ public class SparkTask extends Task<SparkWork> {
? "UNKNOWN" : jobID));
killJob();
} else if (rc == 4) {
- LOG.info("The spark job or one stage of it has too many tasks" +
+ LOG.info("The Spark job or one stage of it has too many tasks" +
". Cancelling Spark job " + sparkJobID + " with application ID " + jobID );
killJob();
}
@@ -186,7 +186,7 @@ public class SparkTask extends Task<SparkWork> {
}
sparkJobStatus.cleanup();
} catch (Exception e) {
- String msg = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e) + "'";
+ String msg = "Failed to execute Spark task " + getId() + ", with exception '" + Utilities.getNameMessage(e) + "'";
// Has to use full name to make sure it does not conflict with
// org.apache.commons.lang.StringUtils
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index 189de19..c8cb1ac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -79,7 +79,7 @@ public class SparkSessionImpl implements SparkSession {
@Override
public void open(HiveConf conf) throws HiveException {
- LOG.info("Trying to open Spark session {}", sessionId);
+ LOG.info("Trying to open Hive on Spark session {}", sessionId);
this.conf = conf;
isOpen = true;
try {
@@ -94,12 +94,12 @@ public class SparkSessionImpl implements SparkSession {
}
throw he;
}
- LOG.info("Spark session {} is successfully opened", sessionId);
+ LOG.info("Hive on Spark session {} successfully opened", sessionId);
}
@Override
public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception {
- Preconditions.checkState(isOpen, "Session is not open. Can't submit jobs.");
+ Preconditions.checkState(isOpen, "Hive on Spark session is not open. Can't submit jobs.");
return hiveSparkClient.execute(driverContext, sparkWork);
}
@@ -129,9 +129,9 @@ public class SparkSessionImpl implements SparkSession {
totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1);
long memoryPerTaskInBytes = totalMemory / totalCores;
- LOG.info("Spark cluster current has executors: " + numExecutors
+ LOG.info("Hive on Spark application currently has number of executors: " + numExecutors
+ ", total cores: " + totalCores + ", memory per executor: "
- + executorMemoryInMB + "M, memoryFraction: " + memoryFraction);
+ + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction);
return new ObjectPair<Long, Integer>(Long.valueOf(memoryPerTaskInBytes),
Integer.valueOf(totalCores));
}
@@ -153,15 +153,15 @@ public class SparkSessionImpl implements SparkSession {
@Override
public void close() {
- LOG.info("Trying to close Spark session {}", sessionId);
+ LOG.info("Trying to close Hive on Spark session {}", sessionId);
isOpen = false;
if (hiveSparkClient != null) {
try {
hiveSparkClient.close();
- LOG.info("Spark session {} is successfully closed", sessionId);
+ LOG.info("Hive on Spark session {} successfully closed", sessionId);
cleanScratchDir();
} catch (IOException e) {
- LOG.error("Failed to close spark session (" + sessionId + ").", e);
+ LOG.error("Failed to close Hive on Spark session (" + sessionId + ")", e);
}
}
hiveSparkClient = null;
@@ -197,20 +197,22 @@ public class SparkSessionImpl implements SparkSession {
StringBuilder matchedString = new StringBuilder();
while (e != null) {
if (e instanceof TimeoutException) {
- return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT);
+ return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT, sessionId);
} else if (e instanceof InterruptedException) {
return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INTERRUPTED, sessionId);
} else if (e instanceof RuntimeException) {
String sts = Throwables.getStackTraceAsString(e);
if (matches(sts, AM_TIMEOUT_ERR, matchedString)) {
- return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT);
+ return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT, sessionId);
} else if (matches(sts, UNKNOWN_QUEUE_ERR, matchedString) || matches(sts, STOPPED_QUEUE_ERR, matchedString)) {
- return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, matchedString.toString());
+ return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, sessionId,
+ matchedString.toString());
} else if (matches(sts, FULL_QUEUE_ERR, matchedString)) {
- return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_QUEUE_FULL, matchedString.toString());
+ return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_QUEUE_FULL, sessionId,
+ matchedString.toString());
} else if (matches(sts, INVALILD_MEM_ERR, matchedString) || matches(sts, INVALID_CORE_ERR, matchedString)) {
return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST,
- matchedString.toString());
+ sessionId, matchedString.toString());
} else {
return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, Throwables.getRootCause(e).getMessage());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
index eecb103..ab87c79 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
@@ -264,9 +264,9 @@ public class SetSparkReducerParallelism implements NodeProcessor {
context.getConf(), sparkSessionManager);
sparkMemoryAndCores = sparkSession.getMemoryAndCores();
} catch (HiveException e) {
- throw new SemanticException("Failed to get a spark session: " + e);
+ throw new SemanticException("Failed to get a Hive on Spark session", e);
} catch (Exception e) {
- LOG.warn("Failed to get spark memory/core info", e);
+ LOG.warn("Failed to get spark memory/core info, reducer parallelism may be inaccurate", e);
} finally {
if (sparkSession != null && sparkSessionManager != null) {
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
index 6a988a4..558ed80 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
@@ -38,10 +38,20 @@ public abstract class BaseProtocol extends RpcDispatcher {
this(null);
}
+ @Override
+ public String toString() {
+ return "CancelJob{" +
+ "id='" + id + '\'' +
+ '}';
+ }
}
protected static class EndSession implements Serializable {
+ @Override
+ public String toString() {
+ return "EndSession";
+ }
}
protected static class Error implements Serializable {
@@ -56,6 +66,12 @@ public abstract class BaseProtocol extends RpcDispatcher {
this(null);
}
+ @Override
+ public String toString() {
+ return "Error{" +
+ "cause='" + cause + '\'' +
+ '}';
+ }
}
protected static class JobMetrics implements Serializable {
@@ -78,6 +94,16 @@ public abstract class BaseProtocol extends RpcDispatcher {
this(null, -1, -1, -1, null);
}
+ @Override
+ public String toString() {
+ return "JobMetrics{" +
+ "jobId='" + jobId + '\'' +
+ ", sparkJobId=" + sparkJobId +
+ ", stageId=" + stageId +
+ ", taskId=" + taskId +
+ ", metrics=" + metrics +
+ '}';
+ }
}
protected static class JobRequest<T extends Serializable> implements Serializable {
@@ -94,6 +120,13 @@ public abstract class BaseProtocol extends RpcDispatcher {
this(null, null);
}
+ @Override
+ public String toString() {
+ return "JobRequest{" +
+ "id='" + id + '\'' +
+ ", job=" + job +
+ '}';
+ }
}
public static class JobResult<T extends Serializable> implements Serializable {
@@ -137,6 +170,12 @@ public abstract class BaseProtocol extends RpcDispatcher {
this(null);
}
+ @Override
+ public String toString() {
+ return "JobStarted{" +
+ "id='" + id + '\'' +
+ '}';
+ }
}
/**
@@ -154,6 +193,14 @@ public abstract class BaseProtocol extends RpcDispatcher {
JobSubmitted() {
this(null, -1);
}
+
+ @Override
+ public String toString() {
+ return "JobSubmitted{" +
+ "clientJobId='" + clientJobId + '\'' +
+ ", sparkJobId=" + sparkJobId +
+ '}';
+ }
}
protected static class SyncJobRequest<T extends Serializable> implements Serializable {
@@ -168,5 +215,11 @@ public abstract class BaseProtocol extends RpcDispatcher {
this(null);
}
+ @Override
+ public String toString() {
+ return "SyncJobRequest{" +
+ "job=" + job +
+ '}';
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
index 6e546d4..caa850c 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
@@ -25,8 +25,6 @@ import io.netty.channel.nio.NioEventLoopGroup;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
-import java.net.InetAddress;
-import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -118,17 +116,18 @@ public class RemoteDriver {
// as these are non-spark specific configs used by the remote driver
mapConf.put(val[0], val[1]);
} else {
- throw new IllegalArgumentException("Invalid command line: " + Joiner.on(" ").join(args));
+ throw new IllegalArgumentException("Invalid command line arguments: "
+ + Joiner.on(" ").join(args));
}
}
executor = Executors.newCachedThreadPool();
- LOG.info("Connecting to: {}:{}", serverAddress, serverPort);
+ LOG.info("Connecting to HiveServer2 address: {}:{}", serverAddress, serverPort);
for (Tuple2<String, String> e : conf.getAll()) {
mapConf.put(e._1(), e._2());
- LOG.debug("Remote Driver configured with: " + e._1() + "=" + e._2());
+ LOG.debug("Remote Spark Driver configured with: " + e._1() + "=" + e._2());
}
String clientId = mapConf.get(SparkClientFactory.CONF_CLIENT_ID);
@@ -140,7 +139,7 @@ public class RemoteDriver {
this.egroup = new NioEventLoopGroup(
threadCount,
new ThreadFactoryBuilder()
- .setNameFormat("Driver-RPC-Handler-%d")
+ .setNameFormat("Spark-Driver-RPC-Handler-%d")
.setDaemon(true)
.build());
this.protocol = new DriverProtocol();
@@ -153,9 +152,14 @@ public class RemoteDriver {
this.clientRpc.addListener(new Rpc.Listener() {
@Override
public void rpcClosed(Rpc rpc) {
- LOG.warn("Shutting down driver because RPC channel was closed.");
+ LOG.warn("Shutting down driver because Remote Spark Driver to HiveServer2 connection was closed.");
shutdown(null);
}
+
+ @Override
+ public String toString() {
+ return "Shutting Down Remote Spark Driver to HiveServer2 Connection";
+ }
});
try {
@@ -211,7 +215,7 @@ public class RemoteDriver {
if (jc != null) {
job.submit();
} else {
- LOG.info("SparkContext not yet up, queueing job request.");
+ LOG.info("SparkContext not yet up; adding Hive on Spark job request to the queue.");
jobQueue.add(job);
}
}
@@ -220,9 +224,9 @@ public class RemoteDriver {
private synchronized void shutdown(Throwable error) {
if (running) {
if (error == null) {
- LOG.info("Shutting down remote driver.");
+ LOG.info("Shutting down Spark Remote Driver.");
} else {
- LOG.error("Shutting down remote driver due to error: " + error, error);
+ LOG.error("Shutting down Spark Remote Driver due to error: " + error, error);
}
running = false;
for (JobWrapper<?> job : activeJobs.values()) {
@@ -253,7 +257,7 @@ public class RemoteDriver {
private String getArg(String[] args, int keyIdx) {
int valIdx = keyIdx + 1;
if (args.length <= valIdx) {
- throw new IllegalArgumentException("Invalid command line: "
+ throw new IllegalArgumentException("Invalid command line arguments: "
+ Joiner.on(" ").join(args));
}
return args[valIdx];
@@ -294,7 +298,7 @@ public class RemoteDriver {
private void handle(ChannelHandlerContext ctx, CancelJob msg) {
JobWrapper<?> job = activeJobs.get(msg.id);
if (job == null || !cancelJob(job)) {
- LOG.info("Requested to cancel an already finished job.");
+ LOG.info("Requested to cancel an already finished client job.");
}
}
@@ -304,7 +308,7 @@ public class RemoteDriver {
}
private void handle(ChannelHandlerContext ctx, JobRequest msg) {
- LOG.info("Received job request {}", msg.id);
+ LOG.debug("Received client job request {}", msg.id);
JobWrapper<?> wrapper = new JobWrapper<Serializable>(msg);
activeJobs.put(msg.id, wrapper);
submit(wrapper);
@@ -318,7 +322,7 @@ public class RemoteDriver {
while (jc == null) {
jcLock.wait();
if (!running) {
- throw new IllegalStateException("Remote context is shutting down.");
+ throw new IllegalStateException("Remote Spark context is shutting down.");
}
}
}
@@ -339,6 +343,10 @@ public class RemoteDriver {
}
}
+ @Override
+ public String name() {
+ return "Remote Spark Driver to HiveServer2 Connection";
+ }
}
private class JobWrapper<T extends Serializable> implements Callable<Void> {
@@ -404,12 +412,13 @@ public class RemoteDriver {
if (sparkCounters != null) {
counters = sparkCounters.snapshot();
}
+
protocol.jobFinished(req.id, result, null, counters);
} catch (Throwable t) {
// Catch throwables in a best-effort to report job status back to the client. It's
// re-thrown so that the executor can destroy the affected thread (or the JVM can
// die or whatever would happen if the throwable bubbled up).
- LOG.error("Failed to run job " + req.id, t);
+ LOG.error("Failed to run client job " + req.id, t);
protocol.jobFinished(req.id, null, t,
sparkCounters != null ? sparkCounters.snapshot() : null);
throw new ExecutionException(t);
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
index fd9b725..88b5c95 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
@@ -84,8 +84,8 @@ public final class SparkClientFactory {
public static SparkClient createClient(Map<String, String> sparkConf, HiveConf hiveConf,
String sessionId)
throws IOException, SparkException {
- Preconditions.checkState(server != null, "initialize() not called.");
+ Preconditions.checkState(server != null,
+ "Invalid state: Hive on Spark RPC Server has not been initialized");
return new SparkClientImpl(server, sparkConf, hiveConf, sessionId);
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index d450515..f8b5d19 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -43,6 +43,7 @@ import java.io.Writer;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -103,17 +104,16 @@ class SparkClientImpl implements SparkClient {
// The RPC server will take care of timeouts here.
this.driverRpc = rpcServer.registerClient(sessionid, secret, protocol).get();
} catch (Throwable e) {
- String errorMsg = null;
+ String errorMsg;
if (e.getCause() instanceof TimeoutException) {
- errorMsg = "Timed out waiting for client to connect.\nPossible reasons include network " +
- "issues, errors in remote driver or the cluster has no available resources, etc." +
+ errorMsg = "Timed out waiting for Remote Spark Driver to connect to HiveServer2.\nPossible reasons " +
+ "include network issues, errors in remote driver, cluster has no available resources, etc." +
"\nPlease check YARN or Spark driver's logs for further information.";
} else if (e.getCause() instanceof InterruptedException) {
- errorMsg = "Interruption occurred while waiting for client to connect.\nPossibly the Spark session is closed " +
- "such as in case of query cancellation." +
- "\nPlease refer to HiveServer2 logs for further information.";
+ errorMsg = "Interrupted while waiting for Remote Spark Driver to connect to HiveServer2.\nIt is possible " +
+ "that the query was cancelled which would cause the Spark Session to close.";
} else {
- errorMsg = "Error while waiting for client to connect.";
+ errorMsg = "Error while waiting for Remote Spark Driver to connect back to HiveServer2.";
}
LOG.error(errorMsg, e);
driverThread.interrupt();
@@ -126,14 +126,21 @@ class SparkClientImpl implements SparkClient {
throw Throwables.propagate(e);
}
+ LOG.info("Successfully connected to Remote Spark Driver at: " + this.driverRpc.getRemoteAddress());
+
driverRpc.addListener(new Rpc.Listener() {
@Override
public void rpcClosed(Rpc rpc) {
if (isAlive) {
- LOG.warn("Client RPC channel closed unexpectedly.");
+ LOG.warn("Connection to Remote Spark Driver {} closed unexpectedly", driverRpc.getRemoteAddress());
isAlive = false;
}
}
+
+ @Override
+ public String toString() {
+ return "Connection to Remote Spark Driver Closed Unexpectedly";
+ }
});
isAlive = true;
}
@@ -256,7 +263,7 @@ class SparkClientImpl implements SparkClient {
try {
URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf");
if (sparkDefaultsUrl != null) {
- LOG.info("Loading spark defaults: " + sparkDefaultsUrl);
+ LOG.info("Loading spark defaults configs from: " + sparkDefaultsUrl);
allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl)));
}
} catch (Exception e) {
@@ -574,7 +581,7 @@ class SparkClientImpl implements SparkClient {
}
private void handle(ChannelHandlerContext ctx, Error msg) {
- LOG.warn("Error reported from remote driver: {}", msg.cause);
+ LOG.warn("Error reported from Remote Spark Driver: {}", msg.cause);
}
private void handle(ChannelHandlerContext ctx, JobMetrics msg) {
@@ -582,14 +589,14 @@ class SparkClientImpl implements SparkClient {
if (handle != null) {
handle.getMetrics().addMetrics(msg.sparkJobId, msg.stageId, msg.taskId, msg.metrics);
} else {
- LOG.warn("Received metrics for unknown job {}", msg.jobId);
+ LOG.warn("Received metrics for unknown Spark job {}", msg.sparkJobId);
}
}
private void handle(ChannelHandlerContext ctx, JobResult msg) {
JobHandleImpl<?> handle = jobs.remove(msg.id);
if (handle != null) {
- LOG.info("Received result for {}", msg.id);
+ LOG.debug("Received result for client job {}", msg.id);
handle.setSparkCounters(msg.sparkCounters);
Throwable error = msg.error;
if (error == null) {
@@ -598,7 +605,7 @@ class SparkClientImpl implements SparkClient {
handle.setFailure(error);
}
} else {
- LOG.warn("Received result for unknown job {}", msg.id);
+ LOG.warn("Received result for unknown client job {}", msg.id);
}
}
@@ -607,19 +614,24 @@ class SparkClientImpl implements SparkClient {
if (handle != null) {
handle.changeState(JobHandle.State.STARTED);
} else {
- LOG.warn("Received event for unknown job {}", msg.id);
+ LOG.warn("Received event for unknown client job {}", msg.id);
}
}
private void handle(ChannelHandlerContext ctx, JobSubmitted msg) {
JobHandleImpl<?> handle = jobs.get(msg.clientJobId);
if (handle != null) {
- LOG.info("Received spark job ID: {} for {}", msg.sparkJobId, msg.clientJobId);
+ LOG.info("Received Spark job ID: {} for client job {}", msg.sparkJobId, msg.clientJobId);
handle.addSparkJobId(msg.sparkJobId);
} else {
- LOG.warn("Received spark job ID: {} for unknown job {}", msg.sparkJobId, msg.clientJobId);
+ LOG.warn("Received Spark job ID: {} for unknown client job {}", msg.sparkJobId, msg.clientJobId);
}
}
+
+ @Override
+ protected String name() {
+ return "HiveServer2 to Remote Spark Driver Connection";
+ }
}
private static class AddJarJob implements Job<Serializable> {
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
index f137007..6a13071 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
@@ -44,4 +44,10 @@ public class InputMetrics implements Serializable {
this(metrics.inputMetrics().bytesRead());
}
+ @Override
+ public String toString() {
+ return "InputMetrics{" +
+ "bytesRead=" + bytesRead +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
index b718b3b..cf7a1f6 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
@@ -127,4 +127,22 @@ public class Metrics implements Serializable {
return (metrics.shuffleWriteMetrics() != null) ? new ShuffleWriteMetrics(metrics) : null;
}
+ @Override
+ public String toString() {
+ return "Metrics{" +
+ "executorDeserializeTime=" + executorDeserializeTime +
+ ", executorDeserializeCpuTime=" + executorDeserializeCpuTime +
+ ", executorRunTime=" + executorRunTime +
+ ", executorCpuTime=" + executorCpuTime +
+ ", resultSize=" + resultSize +
+ ", jvmGCTime=" + jvmGCTime +
+ ", resultSerializationTime=" + resultSerializationTime +
+ ", memoryBytesSpilled=" + memoryBytesSpilled +
+ ", diskBytesSpilled=" + diskBytesSpilled +
+ ", taskDurationTime=" + taskDurationTime +
+ ", inputMetrics=" + inputMetrics +
+ ", shuffleReadMetrics=" + shuffleReadMetrics +
+ ", shuffleWriteMetrics=" + shuffleWriteMetrics +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
index 9ff4d0f..e3d564f 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java
@@ -73,4 +73,13 @@ public class ShuffleReadMetrics implements Serializable {
return remoteBlocksFetched + localBlocksFetched;
}
+ @Override
+ public String toString() {
+ return "ShuffleReadMetrics{" +
+ "remoteBlocksFetched=" + remoteBlocksFetched +
+ ", localBlocksFetched=" + localBlocksFetched +
+ ", fetchWaitTime=" + fetchWaitTime +
+ ", remoteBytesRead=" + remoteBytesRead +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
index 64a4b86..e9cf6a1 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java
@@ -51,4 +51,11 @@ public class ShuffleWriteMetrics implements Serializable {
metrics.shuffleWriteMetrics().shuffleWriteTime());
}
+ @Override
+ public String toString() {
+ return "ShuffleWriteMetrics{" +
+ "shuffleBytesWritten=" + shuffleBytesWritten +
+ ", shuffleWriteTime=" + shuffleWriteTime +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
index 5454ec2..d3a6812 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java
@@ -101,7 +101,7 @@ class KryoMessageCodec extends ByteToMessageCodec<Object> {
Input kryoIn = new Input(new ByteBufferInputStream(nioBuffer));
Object msg = kryos.get().readClassAndObject(kryoIn);
- LOG.debug("Decoded message of type {} ({} bytes)",
+ LOG.trace("Decoded message of type {} ({} bytes)",
msg != null ? msg.getClass().getName() : msg, msgSize);
out.add(msg);
} finally {
@@ -118,7 +118,7 @@ class KryoMessageCodec extends ByteToMessageCodec<Object> {
kryoOut.flush();
byte[] msgData = maybeEncrypt(bytes.toByteArray());
- LOG.debug("Encoded message of type {} ({} bytes)", msg.getClass().getName(), msgData.length);
+ LOG.trace("Encoded message of type {} ({} bytes)", msg.getClass().getName(), msgData.length);
checkSize(msgData.length);
buf.ensureWritable(msgData.length + 4);
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
index cbbfb1c..298a210 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
@@ -19,6 +19,7 @@ package org.apache.hive.spark.client.rpc;
import java.io.Closeable;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
@@ -116,7 +117,7 @@ public class Rpc implements Closeable {
final Runnable timeoutTask = new Runnable() {
@Override
public void run() {
- promise.setFailure(new TimeoutException("Timed out waiting for RPC server connection."));
+ promise.setFailure(new TimeoutException("Timed out waiting to connect to HiveServer2."));
}
};
final ScheduledFuture<?> timeoutFuture = eloop.schedule(timeoutTask,
@@ -272,7 +273,8 @@ public class Rpc implements Closeable {
*/
public <T> Future<T> call(final Object msg, Class<T> retType) {
Preconditions.checkArgument(msg != null);
- Preconditions.checkState(channel.isActive(), "RPC channel is closed.");
+ Preconditions.checkState(channel.isActive(), "Unable to send message " + msg +
+ " because the Remote Spark Driver - HiveServer2 connection has been closed.");
try {
final long id = rpcId.getAndIncrement();
final Promise<T> promise = createPromise();
@@ -280,7 +282,8 @@ public class Rpc implements Closeable {
@Override
public void operationComplete(ChannelFuture cf) {
if (!cf.isSuccess() && !promise.isDone()) {
- LOG.warn("Failed to send RPC, closing connection.", cf.cause());
+ LOG.warn("Failed to send message '" + msg + "', closing Remote Spark Driver - " +
+ "HiveServer2 connection.", cf.cause());
promise.setFailure(cf.cause());
dispatcher.discardRpc(id);
close();
@@ -314,6 +317,14 @@ public class Rpc implements Closeable {
return channel;
}
+ /**
+ * Returns the "hostname:port" that the RPC is connected to
+ */
+ public String getRemoteAddress() {
+ InetSocketAddress remoteAddress = ((InetSocketAddress) this.channel.remoteAddress());
+ return remoteAddress.getHostName() + ":" + remoteAddress.getPort();
+ }
+
void setDispatcher(RpcDispatcher dispatcher) {
Preconditions.checkNotNull(dispatcher);
Preconditions.checkState(this.dispatcher == null);
@@ -336,7 +347,7 @@ public class Rpc implements Closeable {
try {
l.rpcClosed(this);
} catch (Exception e) {
- LOG.warn("Error caught in Rpc.Listener invocation.", e);
+ LOG.warn("Error caught while running '" + l + "' listener", e);
}
}
}
@@ -493,12 +504,10 @@ public class Rpc implements Closeable {
client.evaluateChallenge(new byte[0]) : new byte[0];
c.writeAndFlush(new SaslMessage(clientId, hello)).addListener(future -> {
if (!future.isSuccess()) {
- LOG.error("Failed to send hello to server", future.cause());
+ LOG.error("Failed to send test message to HiveServer2", future.cause());
onError(future.cause());
}
});
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
index a535b8d..090c628 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
@@ -115,10 +115,9 @@ public final class RpcConfiguration {
* Parses the port string like 49152-49222,49228 into the port list. A default 0
* is added for the empty port string.
* @return a list of configured ports.
- * @exception IOException is thrown if the property is not configured properly
*/
- List<Integer> getServerPorts() throws IOException {
- String errMsg = "Incorrect RPC server port configuration for HiveServer2";
+ List<Integer> getServerPorts() {
+ String errMsg = "Malformed configuration value for " + HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname;
String portString = config.get(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname);
ArrayList<Integer> ports = new ArrayList<Integer>();
try {
@@ -127,7 +126,7 @@ public final class RpcConfiguration {
String[] range = portRange.split("-");
if (range.length == 0 || range.length > 2
|| (range.length == 2 && Integer.valueOf(range[0]) > Integer.valueOf(range[1]))) {
- throw new IOException(errMsg);
+ throw new IllegalArgumentException(errMsg);
}
if (range.length == 1) {
ports.add(Integer.valueOf(range[0]));
@@ -143,7 +142,7 @@ public final class RpcConfiguration {
return ports;
} catch(NumberFormatException e) {
- throw new IOException(errMsg);
+ throw new IllegalArgumentException(errMsg, e);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
index 00f5a17..b588547 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
@@ -66,13 +66,12 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
protected final void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (lastHeader == null) {
if (!(msg instanceof Rpc.MessageHeader)) {
- LOG.warn("[{}] Expected RPC header, got {} instead.", name(),
- msg != null ? msg.getClass().getName() : null);
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException(String.format("[%s] Expected RPC header, got %s instead.", name(),
+ msg != null ? msg.getClass().getName() : null));
}
lastHeader = (Rpc.MessageHeader) msg;
} else {
- LOG.debug("[{}] Received RPC message: type={} id={} payload={}", name(),
+ LOG.trace("[{}] Received RPC message: type={} id={} payload={}", name(),
lastHeader.type, lastHeader.id, msg != null ? msg.getClass().getName() : null);
try {
switch (lastHeader.type) {
@@ -86,7 +85,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
handleError(ctx, msg, findRpc(lastHeader.id));
break;
default:
- throw new IllegalArgumentException("Unknown RPC message type: " + lastHeader.type);
+ throw new IllegalArgumentException("[" + name() + "] Unknown RPC message type: " + lastHeader.type);
}
} finally {
lastHeader = null;
@@ -103,7 +102,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
}
}
throw new IllegalArgumentException(String.format(
- "Received RPC reply for unknown RPC (%d).", id));
+ "[%s] Received RPC reply for unknown RPC (%d).", name(), id));
}
private void handleCall(ChannelHandlerContext ctx, Object msg) throws Exception {
@@ -124,7 +123,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
}
replyType = Rpc.MessageType.REPLY;
} catch (InvocationTargetException ite) {
- LOG.debug(String.format("[%s] Error in RPC handler.", name()), ite.getCause());
+ LOG.error(String.format("[%s] Error in RPC handler.", name()), ite.getCause());
replyPayload = Throwables.getStackTraceAsString(ite.getCause());
replyType = Rpc.MessageType.ERROR;
}
@@ -140,12 +139,12 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
private void handleError(ChannelHandlerContext ctx, Object msg, OutstandingRpc rpc)
throws Exception {
if (msg instanceof String) {
- LOG.warn("Received error message:{}.", msg);
+ LOG.error("[{}] Received error message: {}.", name(), msg);
rpc.future.setFailure(new RpcException((String) msg));
} else {
String error = String.format("Received error with unexpected payload (%s).",
msg != null ? msg.getClass().getName() : null);
- LOG.warn(String.format("[%s] %s", name(), error));
+ LOG.error(String.format("[%s] %s", name(), error));
rpc.future.setFailure(new IllegalArgumentException(error));
ctx.close();
}
@@ -178,7 +177,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
}
void registerRpc(long id, Promise promise, String type) {
- LOG.debug("[{}] Registered outstanding rpc {} ({}).", name(), id, type);
+ LOG.trace("[{}] Registered outstanding rpc {} ({}).", name(), id, type);
rpcs.add(new OutstandingRpc(id, promise));
}
@@ -196,5 +195,4 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
this.future = future;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
index 6c6ab74..f1383d6 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.SecureRandom;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -84,7 +85,7 @@ public class RpcServer implements Closeable {
this.group = new NioEventLoopGroup(
this.config.getRpcThreadCount(),
new ThreadFactoryBuilder()
- .setNameFormat("RPC-Handler-%d")
+ .setNameFormat("Spark-Driver-RPC-Handler-%d")
.setDaemon(true)
.build());
ServerBootstrap serverBootstrap = new ServerBootstrap()
@@ -100,7 +101,7 @@ public class RpcServer implements Closeable {
Runnable cancelTask = new Runnable() {
@Override
public void run() {
- LOG.warn("Timed out waiting for hello from client.");
+ LOG.warn("Timed out waiting for test message from Remote Spark driver.");
newRpc.close();
}
};
@@ -117,6 +118,8 @@ public class RpcServer implements Closeable {
this.port = ((InetSocketAddress) channel.localAddress()).getPort();
this.pendingClients = Maps.newConcurrentMap();
this.address = this.config.getServerAddress();
+
+ LOG.info("Successfully created Remote Spark Driver RPC Server with address {}:{}", this.address, this.port);
}
/**
@@ -143,7 +146,8 @@ public class RpcServer implements Closeable {
// Retry the next port
}
}
- throw new IOException("No available ports from configured RPC Server ports for HiveServer2");
+ throw new IOException("Remote Spark Driver RPC Server cannot bind to any of the configured ports: "
+ + Arrays.toString(config.getServerPorts().toArray()));
}
}
@@ -169,7 +173,9 @@ public class RpcServer implements Closeable {
Runnable timeout = new Runnable() {
@Override
public void run() {
- promise.setFailure(new TimeoutException("Timed out waiting for client connection."));
+ promise.setFailure(new TimeoutException(
+ String.format("Client '%s' timed out waiting for connection from the Remote Spark" +
+ " Driver", clientId)));
}
};
ScheduledFuture<?> timeoutFuture = group.schedule(timeout,
@@ -179,7 +185,7 @@ public class RpcServer implements Closeable {
timeoutFuture);
if (pendingClients.putIfAbsent(clientId, client) != null) {
throw new IllegalStateException(
- String.format("Client '%s' already registered.", clientId));
+ String.format("Remote Spark Driver with client ID '%s' already registered", clientId));
}
promise.addListener(new GenericFutureListener<Promise<Rpc>>() {
@@ -208,7 +214,7 @@ public class RpcServer implements Closeable {
cinfo.timeoutFuture.cancel(true);
if (!cinfo.promise.isDone()) {
cinfo.promise.setFailure(new RuntimeException(
- String.format("Cancel client '%s'. Error: " + msg, clientId)));
+ String.format("Cancelling Remote Spark Driver client connection '%s' with error: " + msg, clientId)));
}
}