You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ad...@apache.org on 2017/09/30 17:43:09 UTC

[2/2] ambari git commit: AMBARI-22092. Blueprint cluster creation constantly throwing exceptions

AMBARI-22092. Blueprint cluster creation constantly throwing exceptions


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4406fd40
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4406fd40
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4406fd40

Branch: refs/heads/branch-2.6
Commit: 4406fd405133d2e379a2793003f8a2682730259f
Parents: cdbe4ba
Author: Attila Doroszlai <ad...@hortonworks.com>
Authored: Wed Sep 27 21:59:25 2017 +0200
Committer: Attila Doroszlai <ad...@hortonworks.com>
Committed: Sat Sep 30 19:42:17 2017 +0200

----------------------------------------------------------------------
 .../server/topology/AsyncCallableService.java   | 112 ++++++++---------
 .../ambari/server/topology/TopologyManager.java |  26 +---
 .../topology/tasks/ConfigureClusterTask.java    | 126 ++++++++++++-------
 .../topology/AsyncCallableServiceTest.java      |  92 +++++---------
 .../ClusterDeployWithStartOnlyTest.java         |  11 ++
 ...InstallWithoutStartOnComponentLevelTest.java |  11 ++
 .../ClusterInstallWithoutStartTest.java         |  11 ++
 .../topology/ConfigureClusterTaskTest.java      |  71 +++--------
 .../server/topology/TopologyManagerTest.java    |  19 ++-
 9 files changed, 223 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
index 95ab6b0..db57378 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
@@ -18,17 +18,19 @@
 
 package org.apache.ambari.server.topology;
 
-import java.util.Calendar;
-import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Callable service implementation for executing tasks asynchronously.
  * The service repeatedly tries to execute the provided task till it successfully completes, or the provided timeout
@@ -45,89 +47,75 @@ public class AsyncCallableService<T> implements Callable<T> {
 
   // the task to be executed
   private final Callable<T> task;
+  private final String taskName;
 
   // the total time the allowed for the task to be executed (retries will be happen within this timeframe in
   // milliseconds)
   private final long timeout;
 
   // the delay between two consecutive execution trials in milliseconds
-  private final long delay;
+  private final long retryDelay;
 
-  private T serviceResult;
+  public AsyncCallableService(Callable<T> task, long timeout, long retryDelay, String taskName) {
+    this(task, timeout, retryDelay, taskName, Executors.newScheduledThreadPool(1));
+  }
 
-  private final Set<Exception> errors = new HashSet<>();
+  public AsyncCallableService(Callable<T> task, long timeout, long retryDelay, String taskName, ScheduledExecutorService executorService) {
+    Preconditions.checkArgument(retryDelay > 0, "retryDelay should be positive");
 
-  public AsyncCallableService(Callable<T> task, long timeout, long delay,
-                              ScheduledExecutorService executorService) {
     this.task = task;
     this.executorService = executorService;
     this.timeout = timeout;
-    this.delay = delay;
+    this.retryDelay = retryDelay;
+    this.taskName = taskName;
   }
 
   @Override
-  public T call() {
-
-    long startTimeInMillis = Calendar.getInstance().getTimeInMillis();
-    LOG.info("Task execution started at: {}", startTimeInMillis);
-
-    // task execution started on a new thread
-    Future future = executorService.submit(task);
-
-    while (!taskCompleted(future)) {
-      if (!timeoutExceeded(startTimeInMillis)) {
-        LOG.debug("Retrying task execution in [ {} ] milliseconds.", delay);
-        future = executorService.schedule(task, delay, TimeUnit.MILLISECONDS);
-      } else {
-        LOG.debug("Timout exceeded, cancelling task ... ");
-        // making sure the task gets cancelled!
-        if (!future.isDone()) {
-          boolean cancelled = future.cancel(true);
-          LOG.debug("Task cancelled: {}", cancelled);
-        } else {
-          LOG.debug("Task already done.");
+  public T call() throws Exception {
+    long startTime = System.currentTimeMillis();
+    long timeLeft = timeout;
+    Future<T> future = executorService.submit(task);
+    LOG.info("Task {} execution started at {}", taskName, startTime);
+
+    while (true) {
+      try {
+        LOG.debug("Task {} waiting for result at most {} ms", taskName, timeLeft);
+        T taskResult = future.get(timeLeft, TimeUnit.MILLISECONDS);
+        LOG.info("Task {} successfully completed with result: {}", taskName, taskResult);
+        return taskResult;
+      } catch (TimeoutException e) {
+        LOG.debug("Task {} timeout", taskName);
+        timeLeft = 0;
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (!(cause instanceof RetryTaskSilently)) {
+          LOG.info(String.format("Task %s exception during execution", taskName), cause);
         }
-        LOG.info("Timeout exceeded, task execution won't be retried!");
-        // exit the "retry" loop!
-        break;
+        timeLeft = timeout - (System.currentTimeMillis() - startTime);
       }
-    }
 
-    LOG.info("Exiting Async task execution with the result: [ {} ]", serviceResult);
-    return serviceResult;
-  }
+      if (timeLeft < retryDelay) {
+        attemptToCancel(future);
+        LOG.warn("Task {} timeout exceeded, no more retries", taskName);
+        return null;
+      }
 
-  private boolean taskCompleted(Future<T> future) {
-    boolean completed = false;
-    try {
-      LOG.debug("Retrieving task execution result ...");
-      // should receive task execution result within the configured timeout interval
-      // exceptions thrown from the task are propagated here
-      T taskResult = future.get(timeout, TimeUnit.MILLISECONDS);
-
-      // task failures are expected to be reportesd as exceptions
-      LOG.debug("Task successfully executed: {}", taskResult);
-      setServiceResult(taskResult);
-      errors.clear();
-      completed = true;
-    } catch (Exception e) {
-      // Future.isDone always true here!
-      LOG.info("Exception during task execution: ", e);
-      errors.add(e);
+      LOG.debug("Task {} retrying execution in {} milliseconds", taskName, retryDelay);
+      future = executorService.schedule(task, retryDelay, TimeUnit.MILLISECONDS);
     }
-    return completed;
-  }
-
-  private boolean timeoutExceeded(long startTimeInMillis) {
-    return timeout < Calendar.getInstance().getTimeInMillis() - startTimeInMillis;
   }
 
-  private void setServiceResult(T serviceResult) {
-    this.serviceResult = serviceResult;
+  private void attemptToCancel(Future<?> future) {
+    LOG.debug("Task {} timeout exceeded, cancelling", taskName);
+    if (!future.isDone() && future.cancel(true)) {
+      LOG.debug("Task {} cancelled", taskName);
+    } else {
+      LOG.debug("Task {} already done", taskName);
+    }
   }
 
-  public Set<Exception> getErrors() {
-    return errors;
+  public static class RetryTaskSilently extends RuntimeException {
+    // marker, throw if the task needs to be retried
   }
 
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index 3029fff..074f929 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -101,9 +101,6 @@ public class TopologyManager {
   public static final String TOPOLOGY_RESOLVED_TAG = "TOPOLOGY_RESOLVED";
   public static final String KDC_ADMIN_CREDENTIAL = "kdc.admin.credential";
 
-  private static final String CLUSTER_ENV_CONFIG_TYPE_NAME = "cluster-env";
-  private static final String CLUSTER_CONFIG_TASK_MAX_TIME_IN_MILLIS_PROPERTY_NAME = "cluster_configure_task_timeout";
-
   private PersistedState persistedState;
 
   /**
@@ -1122,27 +1119,8 @@ public class TopologyManager {
    * @param configurationRequest  configuration request to be executed
    */
   private void addClusterConfigRequest(ClusterTopology topology, ClusterConfigurationRequest configurationRequest) {
-
-    String timeoutStr = topology.getConfiguration().getPropertyValue(CLUSTER_ENV_CONFIG_TYPE_NAME,
-        CLUSTER_CONFIG_TASK_MAX_TIME_IN_MILLIS_PROPERTY_NAME);
-
-    long timeout = 1000 * 60 * 30; // 30 minutes
-    long delay = 1000; //ms
-
-    if (timeoutStr != null) {
-      timeout = Long.parseLong(timeoutStr);
-      LOG.debug("ConfigureClusterTask timeout set to: {}", timeout);
-    } else {
-      LOG.debug("No timeout constraints found in configuration. Wired defaults will be applied.");
-    }
-
-    ConfigureClusterTask configureClusterTask = configureClusterTaskFactory.createConfigureClusterTask(topology,
-      configurationRequest, ambariEventPublisher);
-
-    AsyncCallableService<Boolean> asyncCallableService = new AsyncCallableService<>(configureClusterTask, timeout, delay,
-        Executors.newScheduledThreadPool(1));
-
-    executor.submit(asyncCallableService);
+    ConfigureClusterTask task = configureClusterTaskFactory.createConfigureClusterTask(topology, configurationRequest, ambariEventPublisher);
+    executor.submit(new AsyncCallableService<>(task, task.getTimeout(), task.getRepeatDelay(),"ConfigureClusterTask"));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
index 3aa8cb5..0f13ec2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
@@ -20,11 +20,16 @@ package org.apache.ambari.server.topology.tasks;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.events.ClusterConfigFinishedEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.security.authorization.internal.RunWithInternalSecurityContext;
+import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.topology.AsyncCallableService;
 import org.apache.ambari.server.topology.ClusterConfigurationRequest;
 import org.apache.ambari.server.topology.ClusterTopology;
 import org.apache.ambari.server.topology.HostGroupInfo;
@@ -32,16 +37,23 @@ import org.apache.ambari.server.topology.TopologyManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.AssistedInject;
 
 public class ConfigureClusterTask implements Callable<Boolean> {
 
-  private static Logger LOG = LoggerFactory.getLogger(ConfigureClusterTask.class);
+  private static final long DEFAULT_TIMEOUT = TimeUnit.MINUTES.toMillis(30);
+  private static final long REPEAT_DELAY = TimeUnit.SECONDS.toMillis(1);
+  private static final String TIMEOUT_PROPERTY_NAME = "cluster_configure_task_timeout";
+  private static final Logger LOG = LoggerFactory.getLogger(ConfigureClusterTask.class);
 
-  private ClusterConfigurationRequest configRequest;
-  private ClusterTopology topology;
-  private AmbariEventPublisher ambariEventPublisher;
+  private final ClusterConfigurationRequest configRequest;
+  private final ClusterTopology topology;
+  private final AmbariEventPublisher ambariEventPublisher;
+  private final Map<String, Integer> previousHostCounts = Maps.newHashMap();
+  private final Set<String> missingHostGroups = Sets.newHashSet();
 
   @AssistedInject
   public ConfigureClusterTask(@Assisted ClusterTopology topology, @Assisted ClusterConfigurationRequest configRequest,
@@ -54,55 +66,56 @@ public class ConfigureClusterTask implements Callable<Boolean> {
   @Override
   @RunWithInternalSecurityContext(token = TopologyManager.INTERNAL_AUTH_TOKEN)
   public Boolean call() throws Exception {
-    LOG.info("TopologyManager.ConfigureClusterTask: Entering");
+    LOG.debug("Entering");
 
     Collection<String> requiredHostGroups = getTopologyRequiredHostGroups();
 
-    if (!areRequiredHostGroupsResolved(requiredHostGroups)) {
-      LOG.debug("TopologyManager.ConfigureClusterTask - prerequisites for config request processing not yet " +
-        "satisfied");
-      throw new IllegalArgumentException("TopologyManager.ConfigureClusterTask - prerequisites for config " +
-        "request processing not yet  satisfied");
+    if (!areHostGroupsResolved(requiredHostGroups)) {
+      LOG.info("Some host groups require more hosts, cluster configuration cannot begin");
+      throw new AsyncCallableService.RetryTaskSilently();
     }
 
-    try {
-      LOG.info("TopologyManager.ConfigureClusterTask: All Required host groups are completed, Cluster " +
-        "Configuration can now begin");
-      configRequest.process();
-    } catch (Exception e) {
-      LOG.error("TopologyManager.ConfigureClusterTask: " +
-        "An exception occurred while attempting to process cluster configs and set on cluster: ", e);
-
-      // this will signal an unsuccessful run, retry will be triggered if required
-      throw new Exception(e);
-    }
+    LOG.info("All required host groups are complete, cluster configuration can now begin");
+    configRequest.process();
+    LOG.info("Cluster configuration finished successfully");
 
-    LOG.info("Cluster configuration finished successfully!");
-    // Notify listeners that cluster configuration finished
-    long clusterId = topology.getClusterId();
-    ambariEventPublisher.publish(new ClusterConfigFinishedEvent(clusterId,
-            topology.getAmbariContext().getClusterName(clusterId)));
+    notifyListeners();
 
-    LOG.info("TopologyManager.ConfigureClusterTask: Exiting");
+    LOG.debug("Exiting");
     return true;
   }
 
+  public long getTimeout() {
+    long timeout = DEFAULT_TIMEOUT;
+
+    String timeoutStr = topology.getConfiguration().getPropertyValue(ConfigHelper.CLUSTER_ENV, TIMEOUT_PROPERTY_NAME);
+    if (timeoutStr != null) {
+      try {
+        timeout = Long.parseLong(timeoutStr);
+        LOG.info("Using custom timeout: {} ms", timeout);
+      } catch (NumberFormatException e) {
+        // use default
+      }
+    }
+
+    return timeout;
+  }
+
+  public long getRepeatDelay() {
+    return REPEAT_DELAY;
+  }
+
   /**
    * Return the set of host group names which are required for configuration topology resolution.
-   *
-   * @return set of required host group names
    */
   private Collection<String> getTopologyRequiredHostGroups() {
-    Collection<String> requiredHostGroups;
     try {
-      requiredHostGroups = configRequest.getRequiredHostGroups();
+      return configRequest.getRequiredHostGroups();
     } catch (RuntimeException e) {
       // just log error and allow config topology update
-      LOG.error("TopologyManager.ConfigureClusterTask: An exception occurred while attempting to determine required" +
-        " host groups for config update ", e);
-      requiredHostGroups = Collections.emptyList();
+      LOG.error("Could not determine required host groups", e);
+      return Collections.emptyList();
     }
-    return requiredHostGroups;
   }
 
   /**
@@ -111,23 +124,44 @@ public class ConfigureClusterTask implements Callable<Boolean> {
    * @param requiredHostGroups set of required host groups
    * @return true if all required host groups are resolved
    */
-  private boolean areRequiredHostGroupsResolved(Collection<String> requiredHostGroups) {
-    boolean configTopologyResolved = true;
+  private boolean areHostGroupsResolved(Collection<String> requiredHostGroups) {
+    boolean allHostGroupsResolved = true;
     Map<String, HostGroupInfo> hostGroupInfo = topology.getHostGroupInfo();
     for (String hostGroup : requiredHostGroups) {
       HostGroupInfo groupInfo = hostGroupInfo.get(hostGroup);
-      if (groupInfo == null || groupInfo.getHostNames().size() < groupInfo.getRequestedHostCount()) {
-        configTopologyResolved = false;
-        if (groupInfo != null) {
-          LOG.info("TopologyManager.ConfigureClusterTask areHostGroupsResolved: host group name = {} requires {} hosts to be mapped, but only {} are available.",
-            groupInfo.getHostGroupName(), groupInfo.getRequestedHostCount(), groupInfo.getHostNames().size());
+      if (groupInfo == null) {
+        allHostGroupsResolved = false;
+        if (missingHostGroups.add(hostGroup)) {
+          LOG.warn("Host group '{}' is missing from cluster creation request", hostGroup);
         }
-        break;
       } else {
-        LOG.info("TopologyManager.ConfigureClusterTask areHostGroupsResolved: host group name = {} has been fully resolved, as all {} required hosts are mapped to {} physical hosts.",
-          groupInfo.getHostGroupName(), groupInfo.getRequestedHostCount(), groupInfo.getHostNames().size());
+        int actualHostCount = groupInfo.getHostNames().size();
+        int requestedHostCount = groupInfo.getRequestedHostCount();
+        boolean hostGroupReady = actualHostCount >= requestedHostCount;
+        allHostGroupsResolved &= hostGroupReady;
+
+        Integer previousHostCount = previousHostCounts.put(hostGroup, actualHostCount);
+        if (previousHostCount == null || previousHostCount != actualHostCount) {
+          if (hostGroupReady) {
+            LOG.info("Host group '{}' resolved, requires {} hosts and {} are available",
+              groupInfo.getHostGroupName(), requestedHostCount, actualHostCount
+            );
+          } else {
+            LOG.info("Host group '{}' pending, requires {} hosts, but only {} are available",
+              groupInfo.getHostGroupName(), requestedHostCount, actualHostCount
+            );
+          }
+        }
       }
     }
-    return configTopologyResolved;
+
+    return allHostGroupsResolved;
   }
+
+  private void notifyListeners() throws AmbariException {
+    long clusterId = topology.getClusterId();
+    String clusterName = topology.getAmbariContext().getClusterName(clusterId);
+    ambariEventPublisher.publish(new ClusterConfigFinishedEvent(clusterId, clusterName));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
index d4e4975..bf8fd79 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
@@ -18,30 +18,26 @@
 
 package org.apache.ambari.server.topology;
 
-import org.easymock.EasyMockRule;
-import org.easymock.EasyMockSupport;
-import org.easymock.Mock;
-import org.easymock.MockType;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.easymock.EasyMock.expect;
 
 import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.verify;
+import org.easymock.EasyMockRule;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
 
 public class AsyncCallableServiceTest extends EasyMockSupport {
-  public static final Logger LOGGER = LoggerFactory.getLogger(AsyncCallableService.class);
+
+  private static final long TIMEOUT = 1000; // default timeout
+  private static final long RETRY_DELAY = 50; // default delay between tries
 
   @Rule
   public EasyMockRule mocks = new EasyMockRule(this);
@@ -55,49 +51,26 @@ public class AsyncCallableServiceTest extends EasyMockSupport {
   @Mock
   private ScheduledFuture<Boolean> futureMock;
 
-  private long timeout;
-
-  private long delay;
-
   private AsyncCallableService<Boolean> asyncCallableService;
 
-  @Before
-  public void setup() {
-    // default timeout, overwrite it if necessary
-    timeout = 1000;
-
-    // default delay between tries
-    delay = 500;
-  }
-
-
   @Test
   public void testCallableServiceShouldCancelTaskWhenTimeoutExceeded() throws Exception {
     // GIVEN
-
-    //the timeout period should be less zero for guaranteed timeout!
-    timeout = -1l;
-
-    // the task to be executed never completes successfully
+    long timeout = -1; // guaranteed timeout
     expect(futureMock.get(timeout, TimeUnit.MILLISECONDS)).andThrow(new TimeoutException("Testing the timeout exceeded case"));
     expect(futureMock.isDone()).andReturn(Boolean.FALSE);
-
-    // this is only called when a timeout occurs
     expect(futureMock.cancel(true)).andReturn(Boolean.TRUE);
-
     expect(executorServiceMock.submit(taskMock)).andReturn(futureMock);
-
     replayAll();
 
-    asyncCallableService = new AsyncCallableService(taskMock, timeout, delay, executorServiceMock);
+    asyncCallableService = new AsyncCallableService<>(taskMock, timeout, RETRY_DELAY, "test", executorServiceMock);
 
     // WHEN
     Boolean serviceResult = asyncCallableService.call();
 
     // THEN
-    verify();
-    Assert.assertNull("Service result must be null", serviceResult);
-    Assert.assertFalse("The service should have errors!", asyncCallableService.getErrors().isEmpty());
+    verifyAll();
+    Assert.assertNull("No result expected in case of timeout", serviceResult);
   }
 
   @Test
@@ -112,72 +85,63 @@ public class AsyncCallableServiceTest extends EasyMockSupport {
       }
     };
 
-    asyncCallableService = new AsyncCallableService(hangingTask, timeout, delay, Executors.newScheduledThreadPool(2));
+    asyncCallableService = new AsyncCallableService<>(hangingTask, TIMEOUT, RETRY_DELAY,  "test");
 
     // WHEN
     Boolean serviceResult = asyncCallableService.call();
 
     // THEN
-    Assert.assertNull("Service result must be null", serviceResult);
-    Assert.assertFalse("The service should have errors!", asyncCallableService.getErrors().isEmpty());
+    Assert.assertNull("No result expected from hanging task", serviceResult);
   }
 
   @Test
   public void testCallableServiceShouldExitWhenTaskCompleted() throws Exception {
     // GIVEN
-    // the task to be executed never completes successfully
-    expect(taskMock.call()).andReturn(Boolean.TRUE).times(1);
-
+    expect(taskMock.call()).andReturn(Boolean.TRUE);
     replayAll();
-    asyncCallableService = new AsyncCallableService(taskMock, timeout, delay, Executors.newScheduledThreadPool(2));
+    asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY,  "test");
 
     // WHEN
     Boolean serviceResult = asyncCallableService.call();
 
     // THEN
-    verify();
-    Assert.assertNotNull("Service result must not be null", serviceResult);
-    Assert.assertTrue(serviceResult);
+    verifyAll();
+    Assert.assertEquals(Boolean.TRUE, serviceResult);
   }
 
   @Test
   public void testCallableServiceShouldRetryTaskExecutionTillTimeoutExceededWhenTaskThrowsException() throws Exception {
     // GIVEN
-
-    // the task to be throws exception
     expect(taskMock.call()).andThrow(new IllegalStateException("****************** TESTING ****************")).times(2, 3);
     replayAll();
-    asyncCallableService = new AsyncCallableService(taskMock, timeout, delay, Executors.newScheduledThreadPool(2));
+    asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY,  "test");
 
     // WHEN
     Boolean serviceResult = asyncCallableService.call();
 
     // THEN
-    verify();
-    // THEN
-    Assert.assertNull("Service result must be null", serviceResult);
-
+    verifyAll();
+    Assert.assertNull("No result expected from throwing task", serviceResult);
   }
 
 
   @Test
   public void testShouldAsyncCallableServiceRetryExecutionWhenTaskThrowsException() throws Exception {
     // GIVEN
-    //the task call hangs, it doesn't return within a reasonable period of time
-    Callable<Boolean> hangingTask = new Callable<Boolean>() {
+    // the task throws exception
+    Callable<Boolean> throwingTask = new Callable<Boolean>() {
       @Override
       public Boolean call() throws Exception {
         throw new IllegalStateException("****************** TESTING ****************");
       }
     };
 
-    asyncCallableService = new AsyncCallableService(hangingTask, timeout, delay, Executors.newScheduledThreadPool(2));
+    asyncCallableService = new AsyncCallableService<>(throwingTask, TIMEOUT, RETRY_DELAY,  "test");
 
     // WHEN
     Boolean serviceResult = asyncCallableService.call();
 
     // THEN
-    verify();
-    Assert.assertNull("Service result must be null", serviceResult);
+    Assert.assertNull("No result expected from throwing task", serviceResult);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
index 95f02b1..b1233f4 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
@@ -55,12 +55,14 @@ import org.apache.ambari.server.controller.internal.ProvisionClusterRequest;
 import org.apache.ambari.server.controller.internal.Stack;
 import org.apache.ambari.server.controller.spi.ClusterController;
 import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
 import org.apache.ambari.server.security.encryption.CredentialStoreService;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ComponentInfo;
 import org.apache.ambari.server.state.SecurityType;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTask;
 import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
 import org.apache.ambari.server.topology.validators.TopologyValidatorService;
 import org.easymock.Capture;
@@ -152,6 +154,8 @@ public class ClusterDeployWithStartOnlyTest extends EasyMockSupport {
   private ComponentInfo clientComponentInfo;
   @Mock(type = MockType.NICE)
   private ConfigureClusterTaskFactory configureClusterTaskFactory;
+  @Mock(type = MockType.NICE)
+  private ConfigureClusterTask configureClusterTask;
 
   @Mock(type = MockType.STRICT)
   private Future mockFuture;
@@ -391,6 +395,13 @@ public class ClusterDeployWithStartOnlyTest extends EasyMockSupport {
     ambariContext.persistInstallStateForUI(CLUSTER_NAME, STACK_NAME, STACK_VERSION);
     expectLastCall().once();
 
+    expect(configureClusterTaskFactory.createConfigureClusterTask(
+      anyObject(ClusterTopology.class),
+      anyObject(ClusterConfigurationRequest.class),
+      anyObject(AmbariEventPublisher.class)
+    )).andReturn(configureClusterTask);
+    expect(configureClusterTask.getTimeout()).andReturn(1000L);
+    expect(configureClusterTask.getRepeatDelay()).andReturn(50L);
     expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture).times(1);
 
     persistedTopologyRequest = new PersistedTopologyRequest(1, request);

http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
index 224142a..63c9077 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
@@ -56,12 +56,14 @@ import org.apache.ambari.server.controller.internal.ProvisionClusterRequest;
 import org.apache.ambari.server.controller.internal.Stack;
 import org.apache.ambari.server.controller.spi.ClusterController;
 import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
 import org.apache.ambari.server.security.encryption.CredentialStoreService;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ComponentInfo;
 import org.apache.ambari.server.state.SecurityType;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTask;
 import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
 import org.apache.ambari.server.topology.validators.TopologyValidatorService;
 import org.easymock.Capture;
@@ -143,6 +145,8 @@ public class ClusterInstallWithoutStartOnComponentLevelTest extends EasyMockSupp
   private HostRoleCommand hostRoleCommand;
   @Mock(type = MockType.NICE)
   private ConfigureClusterTaskFactory configureClusterTaskFactory;
+  @Mock(type = MockType.NICE)
+  private ConfigureClusterTask configureClusterTask;
 
 
   @Mock(type = MockType.NICE)
@@ -368,6 +372,13 @@ public class ClusterInstallWithoutStartOnComponentLevelTest extends EasyMockSupp
     ambariContext.persistInstallStateForUI(CLUSTER_NAME, STACK_NAME, STACK_VERSION);
     expectLastCall().once();
 
+    expect(configureClusterTaskFactory.createConfigureClusterTask(
+      anyObject(ClusterTopology.class),
+      anyObject(ClusterConfigurationRequest.class),
+      anyObject(AmbariEventPublisher.class)
+    )).andReturn(configureClusterTask);
+    expect(configureClusterTask.getTimeout()).andReturn(1000L);
+    expect(configureClusterTask.getRepeatDelay()).andReturn(50L);
     expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture).times(1);
 
     persistedTopologyRequest = new PersistedTopologyRequest(1, request);

http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
index 605def0..8e8e3d5 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
@@ -56,12 +56,14 @@ import org.apache.ambari.server.controller.internal.ProvisionClusterRequest;
 import org.apache.ambari.server.controller.internal.Stack;
 import org.apache.ambari.server.controller.spi.ClusterController;
 import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
 import org.apache.ambari.server.security.encryption.CredentialStoreService;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ComponentInfo;
 import org.apache.ambari.server.state.SecurityType;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTask;
 import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
 import org.apache.ambari.server.topology.validators.TopologyValidatorService;
 import org.easymock.Capture;
@@ -144,6 +146,8 @@ public class ClusterInstallWithoutStartTest extends EasyMockSupport {
   private HostRoleCommand hostRoleCommand;
   @Mock(type = MockType.NICE)
   private ConfigureClusterTaskFactory configureClusterTaskFactory;
+  @Mock(type = MockType.NICE)
+  private ConfigureClusterTask configureClusterTask;
 
 
   @Mock(type = MockType.NICE)
@@ -363,6 +367,13 @@ public class ClusterInstallWithoutStartTest extends EasyMockSupport {
     ambariContext.persistInstallStateForUI(CLUSTER_NAME, STACK_NAME, STACK_VERSION);
     expectLastCall().once();
 
+    expect(configureClusterTaskFactory.createConfigureClusterTask(
+      anyObject(ClusterTopology.class),
+      anyObject(ClusterConfigurationRequest.class),
+      anyObject(AmbariEventPublisher.class)
+    )).andReturn(configureClusterTask);
+    expect(configureClusterTask.getTimeout()).andReturn(1000L);
+    expect(configureClusterTask.getRepeatDelay()).andReturn(50L);
     expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture).times(1);
 
     persistedTopologyRequest = new PersistedTopologyRequest(1, request);

http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
index 7d34ca2..11f571b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,41 +19,28 @@
 package org.apache.ambari.server.topology;
 
 import static org.easymock.EasyMock.anyObject;
-
 import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
-import static org.easymock.EasyMock.verify;
 
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Executors;
 
 import org.apache.ambari.server.events.AmbariEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.topology.tasks.ConfigureClusterTask;
 import org.easymock.EasyMockRule;
+import org.easymock.EasyMockSupport;
 import org.easymock.Mock;
 import org.easymock.MockType;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import junit.framework.Assert;
 
 /**
  * Unit test for the ConfigureClusterTask class.
  * As business methods of this class don't return values, the assertions are made by verifying method calls on mocks.
  * Thus having strict mocks is essential!
  */
-public class ConfigureClusterTaskTest {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(ConfigureClusterTaskTest.class);
+public class ConfigureClusterTaskTest extends EasyMockSupport {
 
   @Rule
   public EasyMockRule mocks = new EasyMockRule(this);
@@ -74,71 +61,45 @@ public class ConfigureClusterTaskTest {
 
   @Before
   public void before() {
-    reset(clusterConfigurationRequest, clusterTopology, ambariContext, ambariEventPublisher);
+    resetAll();
     testSubject = new ConfigureClusterTask(clusterTopology, clusterConfigurationRequest, ambariEventPublisher);
   }
 
   @Test
-  public void testShouldConfigureClusterTaskLogicBeExecutedWhenRequiredHostgroupsAreResolved() throws
-      Exception {
+  public void taskShouldBeExecutedIfRequiredHostgroupsAreResolved() throws Exception {
     // GIVEN
-    // is it OK to handle the non existence of hostgroups as a success?!
-
-    expect(clusterConfigurationRequest.getRequiredHostGroups()).andReturn(Collections.EMPTY_LIST);
-    expect(clusterTopology.getHostGroupInfo()).andReturn(Collections.EMPTY_MAP);
+    expect(clusterConfigurationRequest.getRequiredHostGroups()).andReturn(Collections.<String>emptyList());
+    expect(clusterTopology.getHostGroupInfo()).andReturn(Collections.<String, HostGroupInfo>emptyMap());
     expect(clusterTopology.getClusterId()).andReturn(1L).anyTimes();
     expect(clusterTopology.getAmbariContext()).andReturn(ambariContext);
     expect(ambariContext.getClusterName(1L)).andReturn("testCluster");
-
-    // this is only called if the "prerequisites" are satisfied
     clusterConfigurationRequest.process();
     ambariEventPublisher.publish(anyObject(AmbariEvent.class));
-
-    replay(clusterConfigurationRequest, clusterTopology, ambariContext, ambariEventPublisher);
+    replayAll();
 
     // WHEN
     Boolean result = testSubject.call();
 
     // THEN
-    verify();
+    verifyAll();
     Assert.assertTrue(result);
   }
 
   @Test
   public void testsShouldConfigureClusterTaskExecuteWhenCalledFromAsyncCallableService() throws Exception {
     // GIVEN
-    // is it OK to handle the non existence of hostgroups as a success?!
-    expect(clusterConfigurationRequest.getRequiredHostGroups()).andReturn(Collections.EMPTY_LIST);
-    expect(clusterTopology.getHostGroupInfo()).andReturn(Collections.EMPTY_MAP);
-
-    // this is only called if the "prerequisites" are satisfied
+    expect(clusterConfigurationRequest.getRequiredHostGroups()).andReturn(Collections.<String>emptyList());
+    expect(clusterTopology.getHostGroupInfo()).andReturn(Collections.<String, HostGroupInfo>emptyMap());
     clusterConfigurationRequest.process();
+    replayAll();
 
-    replay(clusterConfigurationRequest, clusterTopology);
-
-    AsyncCallableService<Boolean> asyncService = new AsyncCallableService<>(testSubject, 5000, 500, Executors
-        .newScheduledThreadPool(3));
+    AsyncCallableService<Boolean> asyncService = new AsyncCallableService<>(testSubject, 5000, 500, "test");
 
     // WHEN
     asyncService.call();
-    // THEN
-
 
+    // THEN
+    verifyAll();
   }
 
-  private Collection<String> mockRequiredHostGroups() {
-    return Arrays.asList("test-hostgroup-1");
-  }
-
-  private Map<String, HostGroupInfo> mockHostGroupInfo() {
-    Map<String, HostGroupInfo> hostGroupInfoMap = new HashMap<>();
-    HostGroupInfo hostGroupInfo = new HostGroupInfo("test-hostgroup-1");
-    hostGroupInfo.addHost("test-host-1");
-    hostGroupInfo.setRequestedCount(2);
-
-    hostGroupInfoMap.put("test-hostgroup-1", hostGroupInfo);
-    return hostGroupInfoMap;
-  }
-
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
index 025473e..07bb987 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
@@ -59,6 +59,7 @@ import org.apache.ambari.server.controller.spi.ClusterController;
 import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.ResourceProvider;
 import org.apache.ambari.server.events.RequestFinishedEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.dao.SettingDAO;
 import org.apache.ambari.server.orm.entities.SettingEntity;
 import org.apache.ambari.server.security.authorization.AuthorizationHelper;
@@ -66,6 +67,7 @@ import org.apache.ambari.server.security.encryption.CredentialStoreService;
 import org.apache.ambari.server.stack.NoSuchStackException;
 import org.apache.ambari.server.state.SecurityType;
 import org.apache.ambari.server.state.quicklinksprofile.QuickLinksProfile;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTask;
 import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
 import org.apache.ambari.server.topology.validators.TopologyValidatorService;
 import org.easymock.Capture;
@@ -156,7 +158,8 @@ public class TopologyManagerTest {
   private ClusterTopology clusterTopologyMock;
   @Mock(type = MockType.NICE)
   private ConfigureClusterTaskFactory configureClusterTaskFactory;
-
+  @Mock(type = MockType.NICE)
+  private ConfigureClusterTask configureClusterTask;
 
   @Mock(type = MockType.STRICT)
   private Future mockFuture;
@@ -342,9 +345,14 @@ public class TopologyManagerTest {
 
     expect(clusterController.ensureResourceProvider(anyObject(Resource.Type.class))).andReturn(resourceProvider);
 
-    expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture);
-
-    expectLastCall().anyTimes();
+    expect(configureClusterTaskFactory.createConfigureClusterTask(
+      anyObject(ClusterTopology.class),
+      anyObject(ClusterConfigurationRequest.class),
+      anyObject(AmbariEventPublisher.class)
+    )).andReturn(configureClusterTask);
+    expect(configureClusterTask.getTimeout()).andReturn(1000L);
+    expect(configureClusterTask.getRepeatDelay()).andReturn(50L);
+    expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture).anyTimes();
 
     expect(persistedState.persistTopologyRequest(request)).andReturn(persistedTopologyRequest).anyTimes();
     persistedState.persistLogicalRequest(logicalRequest, 1);
@@ -530,7 +538,8 @@ public class TopologyManagerTest {
     replay(blueprint, stack, request, group1, group2, ambariContext, logicalRequestFactory,
             configurationRequest, configurationRequest2, configurationRequest3, executor,
             persistedState, clusterTopologyMock, securityConfigurationFactory, credentialStoreService,
-            clusterController, resourceProvider, mockFuture, requestStatusResponse, logicalRequest, settingDAO);
+            clusterController, resourceProvider, mockFuture, requestStatusResponse, logicalRequest, settingDAO,
+            configureClusterTaskFactory, configureClusterTask);
   }
 
   @Test(expected = InvalidTopologyException.class)