You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "Brock Noland (JIRA)" <ji...@apache.org> on 2018/09/06 01:42:00 UTC

[jira] [Comment Edited] (HIVE-20506) HOS times out when cluster is full while Hive-on-MR waits

    [ https://issues.apache.org/jira/browse/HIVE-20506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16605147#comment-16605147 ] 

Brock Noland edited comment on HIVE-20506 at 9/6/18 1:41 AM:
-------------------------------------------------------------

More psuedo code than anything else, but here is what I was thinking.

 
{noformat}
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
index 7a6e77bdc6..528cdf5d3e 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
@@ -51,6 +51,7 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(SparkSubmitSparkClient.class);
 
+  private static final Pattern YARN_APPLICATION_ID_REGEX = Pattern.compile("\\s(application_[0-9]+_[0-9]+)(\\s|$)");
   private static final String SPARK_HOME_ENV = "SPARK_HOME";
   private static final String SPARK_HOME_KEY = "spark.home";
 
@@ -191,10 +192,11 @@ private String getSparkJobCredentialProviderPassword() {
     final Process child = pb.start();
     String threadName = Thread.currentThread().getName();
     final List<String> childErrorLog = Collections.synchronizedList(new ArrayList<String>());
+    final List<String> childOutLog = Collections.synchronizedList(new ArrayList<String>());
     final LogRedirector.LogSourceCallback callback = () -> isAlive;
 
     LogRedirector.redirect("spark-submit-stdout-redir-" + threadName,
-        new LogRedirector(child.getInputStream(), LOG, callback));
+        new LogRedirector(child.getInputStream(), LOG, childOutLog, callback));
     LogRedirector.redirect("spark-submit-stderr-redir-" + threadName,
         new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback));
 
@@ -216,6 +218,15 @@ private String getSparkJobCredentialProviderPassword() {
           rpcServer.cancelClient(clientId, new RuntimeException("spark-submit process failed " +
                   "with exit code " + exitCode + " and error " + errStr));
         }
+        synchronized (childOutLog) {
+          for (String line : childOutLog) {
+            Matcher m = YARN_APPLICATION_ID_REGEX.matcher(line);
+            if (m.find()) {
+              LOG.info("Found application id " + m.group(1));
+              rpcServer.setApplicationId(m.group(1));
+            }
+          }
+        }
       } catch (InterruptedException ie) {
         LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process.");
         rpcServer.cancelClient(clientId, "Thread waiting on the child process (spark-submit) is interrupted");
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
index 0c67ffd813..9e717a9d2a 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
@@ -79,6 +79,7 @@
   private final int port;
   private final ConcurrentMap<String, ClientInfo> pendingClients;
   private final RpcConfiguration config;
+  private String applicationId;
 
   public RpcServer(Map<String, String> mapConf) throws IOException, InterruptedException {
     this.config = new RpcConfiguration(mapConf);
@@ -166,6 +167,10 @@ private ChannelFuture bindServerPort(ServerBootstrap serverBootstrap)
     return registerClient(clientId, secret, serverDispatcher, config.getServerConnectTimeoutMs());
   }
 
+  public void setApplicationId(String applicationId) {
+    this.applicationId = applicationId;
+  }
+
   @VisibleForTesting
   Future<Rpc> registerClient(final String clientId, String secret,
       RpcDispatcher serverDispatcher, long clientTimeoutMs) {
@@ -174,6 +179,13 @@ private ChannelFuture bindServerPort(ServerBootstrap serverBootstrap)
     Runnable timeout = new Runnable() {
       @Override
       public void run() {
+        // check to see if application is in ACCEPTED state, if so, don't set failure
+        // if applicationId is not null
+        //   do yarn application -status $applicationId
+        //   if state == ACCEPTED
+        //     reschedule timeout runnable
+        //   else
+        //    set failure as below
         promise.setFailure(new TimeoutException(
                 String.format("Client '%s' timed out waiting for connection from the Remote Spark" +
                         " Driver", clientId)));
{noformat}


was (Author: brocknoland):
More psuedo code than anything else, but here is what I was thinking.

 
{noformat}
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
index 7a6e77bdc6..528cdf5d3e 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
@@ -51,6 +51,7 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(SparkSubmitSparkClient.class);
 
+  private static final Pattern YARN_APPLICATION_ID_REGEX = Pattern.compile("\\s(application_[0-9]+_[0-9]+)(\\s|$)");
   private static final String SPARK_HOME_ENV = "SPARK_HOME";
   private static final String SPARK_HOME_KEY = "spark.home";
 
@@ -191,10 +192,11 @@ private String getSparkJobCredentialProviderPassword() {
     final Process child = pb.start();
     String threadName = Thread.currentThread().getName();
     final List<String> childErrorLog = Collections.synchronizedList(new ArrayList<String>());
+    final List<String> childOutLog = Collections.synchronizedList(new ArrayList<String>());
     final LogRedirector.LogSourceCallback callback = () -> isAlive;
 
     LogRedirector.redirect("spark-submit-stdout-redir-" + threadName,
-        new LogRedirector(child.getInputStream(), LOG, callback));
+        new LogRedirector(child.getInputStream(), LOG, childOutLog, callback));
     LogRedirector.redirect("spark-submit-stderr-redir-" + threadName,
         new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback));
 
@@ -216,6 +218,15 @@ private String getSparkJobCredentialProviderPassword() {
           rpcServer.cancelClient(clientId, new RuntimeException("spark-submit process failed " +
                   "with exit code " + exitCode + " and error " + errStr));
         }
+        synchronized (childOutLog) {
+          for (String line : childOutLog) {
+            Matcher m = YARN_APPLICATION_ID_REGEX.matcher(line);
+            if (m.find()) {
+              LOG.info("Found application id " + m.group(1));
+              rpcServer.setApplicationId(m.group(1));
+            }
+          }
+        }
       } catch (InterruptedException ie) {
         LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process.");
         rpcServer.cancelClient(clientId, "Thread waiting on the child process (spark-submit) is interrupted");
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
index 0c67ffd813..9e717a9d2a 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
@@ -79,6 +79,7 @@
   private final int port;
   private final ConcurrentMap<String, ClientInfo> pendingClients;
   private final RpcConfiguration config;
+  private String applicationId;
 
   public RpcServer(Map<String, String> mapConf) throws IOException, InterruptedException {
     this.config = new RpcConfiguration(mapConf);
@@ -166,6 +167,10 @@ private ChannelFuture bindServerPort(ServerBootstrap serverBootstrap)
     return registerClient(clientId, secret, serverDispatcher, config.getServerConnectTimeoutMs());
   }
 
+  public void setApplicationId(String applicationId) {
+    this.applicationId = applicationId;
+  }
+
   @VisibleForTesting
   Future<Rpc> registerClient(final String clientId, String secret,
       RpcDispatcher serverDispatcher, long clientTimeoutMs) {
@@ -174,6 +179,13 @@ private ChannelFuture bindServerPort(ServerBootstrap serverBootstrap)
     Runnable timeout = new Runnable() {
       @Override
       public void run() {
+        // check to see if application is in ACCEPTED state, if so, don't set failure
+        // if applicationId is null
+        //   do yarn application -status $applicationId
+        //   if state == ACCEPTED
+        //     reschedule timeout runnable
+        //   else
+        //    set failure as below
         promise.setFailure(new TimeoutException(
                 String.format("Client '%s' timed out waiting for connection from the Remote Spark" +
                         " Driver", clientId)));
{noformat}

> HOS times out when cluster is full while Hive-on-MR waits
> ---------------------------------------------------------
>
>                 Key: HIVE-20506
>                 URL: https://issues.apache.org/jira/browse/HIVE-20506
>             Project: Hive
>          Issue Type: Improvement
>            Reporter: Brock Noland
>            Priority: Major
>
> My understanding is as follows:
> Hive-on-MR when the cluster is full will wait for resources to be available before submitting a job. This is because the hadoop jar command is the primary mechanism Hive uses to know if a job is complete or failed.
>  
> Hive-on-Spark will timeout after {{SPARK_RPC_CLIENT_CONNECT_TIMEOUT}} because the RPC client in the AppMaster doesn't connect back to the RPC Server in HS2. 
> This is a behavior difference it'd be great to close.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)