You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2018/01/26 15:22:18 UTC

[ambari] 13/21: AMBARI-22805. Improve Blueprints error handling in case of timeout (#185)

This is an automated email from the ASF dual-hosted git repository.

ncole pushed a commit to branch branch-feature-AMBARI-21674
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit 71d363d81d580e5154fca91d5f5b8ca5387d29f1
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Thu Jan 25 08:25:29 2018 +0100

    AMBARI-22805. Improve Blueprints error handling in case of timeout (#185)
---
 .../server/topology/AsyncCallableService.java      | 16 +++++++++---
 .../topology/tasks/ConfigureClusterTask.java       |  5 ++--
 .../server/topology/AsyncCallableServiceTest.java  | 30 ++++++++++++++++++++++
 3 files changed, 45 insertions(+), 6 deletions(-)

diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
index ecd2133..7142e49 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
@@ -81,8 +81,8 @@ public class AsyncCallableService<T> implements Callable<T> {
     Future<T> future = executorService.submit(task);
     LOG.info("Task {} execution started at {}", taskName, startTime);
 
+    Throwable lastError = null;
     while (true) {
-      Throwable lastError;
       try {
         LOG.debug("Task {} waiting for result at most {} ms", taskName, timeLeft);
         T taskResult = future.get(timeLeft, TimeUnit.MILLISECONDS);
@@ -90,7 +90,9 @@ public class AsyncCallableService<T> implements Callable<T> {
         return taskResult;
       } catch (TimeoutException e) {
         LOG.debug("Task {} timeout", taskName);
-        lastError = e;
+        if (lastError == null) {
+          lastError = e;
+        }
         timeLeft = 0;
       } catch (ExecutionException e) {
         Throwable cause = Throwables.getRootCause(e);
@@ -98,10 +100,10 @@ public class AsyncCallableService<T> implements Callable<T> {
           LOG.info(String.format("Task %s exception during execution", taskName), cause);
         }
         lastError = cause;
-        timeLeft = timeout - (System.currentTimeMillis() - startTime);
+        timeLeft = timeout - (System.currentTimeMillis() - startTime) - retryDelay;
       }
 
-      if (timeLeft < retryDelay) {
+      if (timeLeft <= 0) {
         attemptToCancel(future);
         LOG.warn("Task {} timeout exceeded, no more retries", taskName);
         onError.accept(lastError);
@@ -124,6 +126,12 @@ public class AsyncCallableService<T> implements Callable<T> {
 
   public static class RetryTaskSilently extends RuntimeException {
     // marker, throw if the task needs to be retried
+    public RetryTaskSilently() {
+      super();
+    }
+    public RetryTaskSilently(String message) {
+      super(message);
+    }
   }
 
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
index 0f13ec2..ed1c451 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
@@ -71,8 +71,9 @@ public class ConfigureClusterTask implements Callable<Boolean> {
     Collection<String> requiredHostGroups = getTopologyRequiredHostGroups();
 
     if (!areHostGroupsResolved(requiredHostGroups)) {
-      LOG.info("Some host groups require more hosts, cluster configuration cannot begin");
-      throw new AsyncCallableService.RetryTaskSilently();
+      String msg = "Some host groups require more hosts, cluster configuration cannot begin";
+      LOG.info(msg);
+      throw new AsyncCallableService.RetryTaskSilently(msg);
     }
 
     LOG.info("All required host groups are complete, cluster configuration can now begin");
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
index 6fdb798..348b827 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
@@ -19,16 +19,20 @@
 package org.apache.ambari.server.topology;
 
 import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.captureLong;
+import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 
+import org.easymock.Capture;
 import org.easymock.EasyMockRule;
 import org.easymock.EasyMockSupport;
 import org.easymock.Mock;
@@ -82,6 +86,32 @@ public class AsyncCallableServiceTest extends EasyMockSupport {
   }
 
   @Test
+  public void lastErrorIsReturnedIfSubsequentAttemptTimesOut() throws Exception {
+    // GIVEN
+    Exception computationException = new ExecutionException(new ArithmeticException("Computation error during first attempt"));
+    Exception timeoutException = new TimeoutException("Timeout during second attempt");
+    expect(futureMock.get(TIMEOUT, TimeUnit.MILLISECONDS)).andThrow(computationException);
+    expect(executorServiceMock.schedule(taskMock, RETRY_DELAY, TimeUnit.MILLISECONDS)).andReturn(futureMock);
+    Capture<Long> timeoutCapture = Capture.newInstance();
+    expect(futureMock.get(captureLong(timeoutCapture), eq(TimeUnit.MILLISECONDS))).andThrow(timeoutException);
+    expect(futureMock.isDone()).andReturn(Boolean.FALSE);
+    expect(futureMock.cancel(true)).andReturn(Boolean.TRUE);
+    expect(executorServiceMock.submit(taskMock)).andReturn(futureMock);
+    onErrorMock.accept(computationException.getCause());
+    replayAll();
+
+    asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test", executorServiceMock, onErrorMock);
+
+    // WHEN
+    Boolean serviceResult = asyncCallableService.call();
+
+    // THEN
+    verifyAll();
+    Assert.assertTrue(timeoutCapture.getValue() <= TIMEOUT - RETRY_DELAY);
+    Assert.assertNull("No result expected in case of timeout", serviceResult);
+  }
+
+  @Test
   public void testCallableServiceShouldCancelTaskWhenTaskHangsAndTimeoutExceeded() throws Exception {
     // GIVEN
     //the task call hangs, it doesn't return within a reasonable period of time

-- 
To stop receiving notification emails like this one, please contact
ncole@apache.org.