You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2016/12/15 09:05:07 UTC

[39/50] [abbrv] hadoop git commit: YARN-5999. AMRMClientAsync will stop if any exceptions thrown on allocate call. Contributed by Jian He

YARN-5999. AMRMClientAsync will stop if any exceptions thrown on allocate call. Contributed by Jian He


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/64a2d5be
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/64a2d5be
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/64a2d5be

Branch: refs/heads/HDFS-10285
Commit: 64a2d5be91a7f344aadf820481c542c967bc46a8
Parents: f5e0bd3
Author: Xuan <xg...@apache.org>
Authored: Wed Dec 14 14:33:23 2016 -0800
Committer: Xuan <xg...@apache.org>
Committed: Wed Dec 14 14:33:23 2016 -0800

----------------------------------------------------------------------
 .../api/async/impl/AMRMClientAsyncImpl.java     | 30 ++++++++------------
 .../api/async/impl/TestAMRMClientAsync.java     |  2 +-
 2 files changed, 13 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a2d5be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index bc6cadd..3dd53d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -61,7 +61,7 @@ extends AMRMClientAsync<T> {
   private final HeartbeatThread heartbeatThread;
   private final CallbackHandlerThread handlerThread;
 
-  private final BlockingQueue<AllocateResponse> responseQueue;
+  private final BlockingQueue<Object> responseQueue;
   
   private final Object unregisterHeartbeatLock = new Object();
   
@@ -70,8 +70,6 @@ extends AMRMClientAsync<T> {
   
   private volatile String collectorAddr;
 
-  private volatile Throwable savedException;
-
   /**
    *
    * @param intervalMs heartbeat interval in milliseconds between AM and RM
@@ -90,7 +88,6 @@ extends AMRMClientAsync<T> {
     handlerThread = new CallbackHandlerThread();
     responseQueue = new LinkedBlockingQueue<>();
     keepRunning = true;
-    savedException = null;
   }
 
   /**
@@ -111,9 +108,8 @@ extends AMRMClientAsync<T> {
     super(client, intervalMs, callbackHandler);
     heartbeatThread = new HeartbeatThread();
     handlerThread = new CallbackHandlerThread();
-    responseQueue = new LinkedBlockingQueue<AllocateResponse>();
+    responseQueue = new LinkedBlockingQueue<Object>();
     keepRunning = true;
-    savedException = null;
   }
 
   @Override
@@ -265,7 +261,7 @@ extends AMRMClientAsync<T> {
     
     public void run() {
       while (true) {
-        AllocateResponse response = null;
+        Object response = null;
         // synchronization ensures we don't send heartbeats after unregistering
         synchronized (unregisterHeartbeatLock) {
           if (!keepRunning) {
@@ -280,10 +276,7 @@ extends AMRMClientAsync<T> {
             return;
           } catch (Throwable ex) {
             LOG.error("Exception on heartbeat", ex);
-            savedException = ex;
-            // interrupt handler thread in case it waiting on the queue
-            handlerThread.interrupt();
-            return;
+            response = ex;
           }
           if (response != null) {
             while (true) {
@@ -316,19 +309,20 @@ extends AMRMClientAsync<T> {
           return;
         }
         try {
-          AllocateResponse response;
-          if(savedException != null) {
-            LOG.error("Stopping callback due to: ", savedException);
-            handler.onError(savedException);
-            return;
-          }
+          Object object;
           try {
-            response = responseQueue.take();
+            object = responseQueue.take();
           } catch (InterruptedException ex) {
             LOG.info("Interrupted while waiting for queue", ex);
             continue;
           }
+          if (object instanceof Throwable) {
+            progress = handler.getProgress();
+            handler.onError((Throwable) object);
+            continue;
+          }
 
+          AllocateResponse response = (AllocateResponse) object;
           String collectorAddress = response.getCollectorAddr();
           TimelineClient timelineClient = client.getRegisteredTimelineClient();
           if (timelineClient != null && collectorAddress != null

http://git-wip-us.apache.org/repos/asf/hadoop/blob/64a2d5be/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
index dac82e4..ba38340 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
@@ -213,7 +213,7 @@ public class TestAMRMClientAsync {
     
     asyncClient.stop();
     // stopping should have joined all threads and completed all callbacks
-    Assert.assertTrue(callbackHandler.callbackCount == 0);
+    Assert.assertTrue(callbackHandler.callbackCount > 0);
   }
 
   @Test (timeout = 10000)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org