You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2018/01/26 15:22:18 UTC
[ambari] 13/21: AMBARI-22805. Improve Blueprints error handling in
case of timeout (#185)
This is an automated email from the ASF dual-hosted git repository.
ncole pushed a commit to branch branch-feature-AMBARI-21674
in repository https://gitbox.apache.org/repos/asf/ambari.git
commit 71d363d81d580e5154fca91d5f5b8ca5387d29f1
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Thu Jan 25 08:25:29 2018 +0100
AMBARI-22805. Improve Blueprints error handling in case of timeout (#185)
---
.../server/topology/AsyncCallableService.java | 16 +++++++++---
.../topology/tasks/ConfigureClusterTask.java | 5 ++--
.../server/topology/AsyncCallableServiceTest.java | 30 ++++++++++++++++++++++
3 files changed, 45 insertions(+), 6 deletions(-)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
index ecd2133..7142e49 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
@@ -81,8 +81,8 @@ public class AsyncCallableService<T> implements Callable<T> {
Future<T> future = executorService.submit(task);
LOG.info("Task {} execution started at {}", taskName, startTime);
+ Throwable lastError = null;
while (true) {
- Throwable lastError;
try {
LOG.debug("Task {} waiting for result at most {} ms", taskName, timeLeft);
T taskResult = future.get(timeLeft, TimeUnit.MILLISECONDS);
@@ -90,7 +90,9 @@ public class AsyncCallableService<T> implements Callable<T> {
return taskResult;
} catch (TimeoutException e) {
LOG.debug("Task {} timeout", taskName);
- lastError = e;
+ if (lastError == null) {
+ lastError = e;
+ }
timeLeft = 0;
} catch (ExecutionException e) {
Throwable cause = Throwables.getRootCause(e);
@@ -98,10 +100,10 @@ public class AsyncCallableService<T> implements Callable<T> {
LOG.info(String.format("Task %s exception during execution", taskName), cause);
}
lastError = cause;
- timeLeft = timeout - (System.currentTimeMillis() - startTime);
+ timeLeft = timeout - (System.currentTimeMillis() - startTime) - retryDelay;
}
- if (timeLeft < retryDelay) {
+ if (timeLeft <= 0) {
attemptToCancel(future);
LOG.warn("Task {} timeout exceeded, no more retries", taskName);
onError.accept(lastError);
@@ -124,6 +126,12 @@ public class AsyncCallableService<T> implements Callable<T> {
public static class RetryTaskSilently extends RuntimeException {
// marker, throw if the task needs to be retried
+ public RetryTaskSilently() {
+ super();
+ }
+ public RetryTaskSilently(String message) {
+ super(message);
+ }
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
index 0f13ec2..ed1c451 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
@@ -71,8 +71,9 @@ public class ConfigureClusterTask implements Callable<Boolean> {
Collection<String> requiredHostGroups = getTopologyRequiredHostGroups();
if (!areHostGroupsResolved(requiredHostGroups)) {
- LOG.info("Some host groups require more hosts, cluster configuration cannot begin");
- throw new AsyncCallableService.RetryTaskSilently();
+ String msg = "Some host groups require more hosts, cluster configuration cannot begin";
+ LOG.info(msg);
+ throw new AsyncCallableService.RetryTaskSilently(msg);
}
LOG.info("All required host groups are complete, cluster configuration can now begin");
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
index 6fdb798..348b827 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
@@ -19,16 +19,20 @@
package org.apache.ambari.server.topology;
import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.captureLong;
+import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
+import org.easymock.Capture;
import org.easymock.EasyMockRule;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
@@ -82,6 +86,32 @@ public class AsyncCallableServiceTest extends EasyMockSupport {
}
@Test
+ public void lastErrorIsReturnedIfSubsequentAttemptTimesOut() throws Exception {
+ // GIVEN
+ Exception computationException = new ExecutionException(new ArithmeticException("Computation error during first attempt"));
+ Exception timeoutException = new TimeoutException("Timeout during second attempt");
+ expect(futureMock.get(TIMEOUT, TimeUnit.MILLISECONDS)).andThrow(computationException);
+ expect(executorServiceMock.schedule(taskMock, RETRY_DELAY, TimeUnit.MILLISECONDS)).andReturn(futureMock);
+ Capture<Long> timeoutCapture = Capture.newInstance();
+ expect(futureMock.get(captureLong(timeoutCapture), eq(TimeUnit.MILLISECONDS))).andThrow(timeoutException);
+ expect(futureMock.isDone()).andReturn(Boolean.FALSE);
+ expect(futureMock.cancel(true)).andReturn(Boolean.TRUE);
+ expect(executorServiceMock.submit(taskMock)).andReturn(futureMock);
+ onErrorMock.accept(computationException.getCause());
+ replayAll();
+
+ asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test", executorServiceMock, onErrorMock);
+
+ // WHEN
+ Boolean serviceResult = asyncCallableService.call();
+
+ // THEN
+ verifyAll();
+ Assert.assertTrue(timeoutCapture.getValue() <= TIMEOUT - RETRY_DELAY);
+ Assert.assertNull("No result expected in case of timeout", serviceResult);
+ }
+
+ @Test
public void testCallableServiceShouldCancelTaskWhenTaskHangsAndTimeoutExceeded() throws Exception {
// GIVEN
//the task call hangs, it doesn't return within a reasonable period of time
--
To stop receiving notification emails like this one, please contact
ncole@apache.org.