You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "Brock Noland (JIRA)" <ji...@apache.org> on 2018/09/06 01:42:00 UTC
[jira] [Comment Edited] (HIVE-20506) HOS times out when cluster is
full while Hive-on-MR waits
[ https://issues.apache.org/jira/browse/HIVE-20506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16605147#comment-16605147 ]
Brock Noland edited comment on HIVE-20506 at 9/6/18 1:41 AM:
-------------------------------------------------------------
More psuedo code than anything else, but here is what I was thinking.
{noformat}
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
index 7a6e77bdc6..528cdf5d3e 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
@@ -51,6 +51,7 @@
private static final Logger LOG = LoggerFactory.getLogger(SparkSubmitSparkClient.class);
+ private static final Pattern YARN_APPLICATION_ID_REGEX = Pattern.compile("\\s(application_[0-9]+_[0-9]+)(\\s|$)");
private static final String SPARK_HOME_ENV = "SPARK_HOME";
private static final String SPARK_HOME_KEY = "spark.home";
@@ -191,10 +192,11 @@ private String getSparkJobCredentialProviderPassword() {
final Process child = pb.start();
String threadName = Thread.currentThread().getName();
final List<String> childErrorLog = Collections.synchronizedList(new ArrayList<String>());
+ final List<String> childOutLog = Collections.synchronizedList(new ArrayList<String>());
final LogRedirector.LogSourceCallback callback = () -> isAlive;
LogRedirector.redirect("spark-submit-stdout-redir-" + threadName,
- new LogRedirector(child.getInputStream(), LOG, callback));
+ new LogRedirector(child.getInputStream(), LOG, childOutLog, callback));
LogRedirector.redirect("spark-submit-stderr-redir-" + threadName,
new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback));
@@ -216,6 +218,15 @@ private String getSparkJobCredentialProviderPassword() {
rpcServer.cancelClient(clientId, new RuntimeException("spark-submit process failed " +
"with exit code " + exitCode + " and error " + errStr));
}
+ synchronized (childOutLog) {
+ for (String line : childOutLog) {
+ Matcher m = YARN_APPLICATION_ID_REGEX.matcher(line);
+ if (m.find()) {
+ LOG.info("Found application id " + m.group(1));
+ rpcServer.setApplicationId(m.group(1));
+ }
+ }
+ }
} catch (InterruptedException ie) {
LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process.");
rpcServer.cancelClient(clientId, "Thread waiting on the child process (spark-submit) is interrupted");
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
index 0c67ffd813..9e717a9d2a 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
@@ -79,6 +79,7 @@
private final int port;
private final ConcurrentMap<String, ClientInfo> pendingClients;
private final RpcConfiguration config;
+ private String applicationId;
public RpcServer(Map<String, String> mapConf) throws IOException, InterruptedException {
this.config = new RpcConfiguration(mapConf);
@@ -166,6 +167,10 @@ private ChannelFuture bindServerPort(ServerBootstrap serverBootstrap)
return registerClient(clientId, secret, serverDispatcher, config.getServerConnectTimeoutMs());
}
+ public void setApplicationId(String applicationId) {
+ this.applicationId = applicationId;
+ }
+
@VisibleForTesting
Future<Rpc> registerClient(final String clientId, String secret,
RpcDispatcher serverDispatcher, long clientTimeoutMs) {
@@ -174,6 +179,13 @@ private ChannelFuture bindServerPort(ServerBootstrap serverBootstrap)
Runnable timeout = new Runnable() {
@Override
public void run() {
+ // check to see if application is in ACCEPTED state, if so, don't set failure
+ // if applicationId is not null
+ // do yarn application -status $applicationId
+ // if state == ACCEPTED
+ // reschedule timeout runnable
+ // else
+ // set failure as below
promise.setFailure(new TimeoutException(
String.format("Client '%s' timed out waiting for connection from the Remote Spark" +
" Driver", clientId)));
{noformat}
was (Author: brocknoland):
More psuedo code than anything else, but here is what I was thinking.
{noformat}
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
index 7a6e77bdc6..528cdf5d3e 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
@@ -51,6 +51,7 @@
private static final Logger LOG = LoggerFactory.getLogger(SparkSubmitSparkClient.class);
+ private static final Pattern YARN_APPLICATION_ID_REGEX = Pattern.compile("\\s(application_[0-9]+_[0-9]+)(\\s|$)");
private static final String SPARK_HOME_ENV = "SPARK_HOME";
private static final String SPARK_HOME_KEY = "spark.home";
@@ -191,10 +192,11 @@ private String getSparkJobCredentialProviderPassword() {
final Process child = pb.start();
String threadName = Thread.currentThread().getName();
final List<String> childErrorLog = Collections.synchronizedList(new ArrayList<String>());
+ final List<String> childOutLog = Collections.synchronizedList(new ArrayList<String>());
final LogRedirector.LogSourceCallback callback = () -> isAlive;
LogRedirector.redirect("spark-submit-stdout-redir-" + threadName,
- new LogRedirector(child.getInputStream(), LOG, callback));
+ new LogRedirector(child.getInputStream(), LOG, childOutLog, callback));
LogRedirector.redirect("spark-submit-stderr-redir-" + threadName,
new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback));
@@ -216,6 +218,15 @@ private String getSparkJobCredentialProviderPassword() {
rpcServer.cancelClient(clientId, new RuntimeException("spark-submit process failed " +
"with exit code " + exitCode + " and error " + errStr));
}
+ synchronized (childOutLog) {
+ for (String line : childOutLog) {
+ Matcher m = YARN_APPLICATION_ID_REGEX.matcher(line);
+ if (m.find()) {
+ LOG.info("Found application id " + m.group(1));
+ rpcServer.setApplicationId(m.group(1));
+ }
+ }
+ }
} catch (InterruptedException ie) {
LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process.");
rpcServer.cancelClient(clientId, "Thread waiting on the child process (spark-submit) is interrupted");
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
index 0c67ffd813..9e717a9d2a 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
@@ -79,6 +79,7 @@
private final int port;
private final ConcurrentMap<String, ClientInfo> pendingClients;
private final RpcConfiguration config;
+ private String applicationId;
public RpcServer(Map<String, String> mapConf) throws IOException, InterruptedException {
this.config = new RpcConfiguration(mapConf);
@@ -166,6 +167,10 @@ private ChannelFuture bindServerPort(ServerBootstrap serverBootstrap)
return registerClient(clientId, secret, serverDispatcher, config.getServerConnectTimeoutMs());
}
+ public void setApplicationId(String applicationId) {
+ this.applicationId = applicationId;
+ }
+
@VisibleForTesting
Future<Rpc> registerClient(final String clientId, String secret,
RpcDispatcher serverDispatcher, long clientTimeoutMs) {
@@ -174,6 +179,13 @@ private ChannelFuture bindServerPort(ServerBootstrap serverBootstrap)
Runnable timeout = new Runnable() {
@Override
public void run() {
+ // check to see if application is in ACCEPTED state, if so, don't set failure
+ // if applicationId is null
+ // do yarn application -status $applicationId
+ // if state == ACCEPTED
+ // reschedule timeout runnable
+ // else
+ // set failure as below
promise.setFailure(new TimeoutException(
String.format("Client '%s' timed out waiting for connection from the Remote Spark" +
" Driver", clientId)));
{noformat}
> HOS times out when cluster is full while Hive-on-MR waits
> ---------------------------------------------------------
>
> Key: HIVE-20506
> URL: https://issues.apache.org/jira/browse/HIVE-20506
> Project: Hive
> Issue Type: Improvement
> Reporter: Brock Noland
> Priority: Major
>
> My understanding is as follows:
> Hive-on-MR when the cluster is full will wait for resources to be available before submitting a job. This is because the hadoop jar command is the primary mechanism Hive uses to know if a job is complete or failed.
>
> Hive-on-Spark will timeout after {{SPARK_RPC_CLIENT_CONNECT_TIMEOUT}} because the RPC client in the AppMaster doesn't connect back to the RPC Server in HS2.
> This is a behavior difference it'd be great to close.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)