You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ad...@apache.org on 2017/09/30 17:43:09 UTC
[2/2] ambari git commit: AMBARI-22092. Blueprint cluster creation
constantly throwing exceptions
AMBARI-22092. Blueprint cluster creation constantly throwing exceptions
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4406fd40
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4406fd40
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4406fd40
Branch: refs/heads/branch-2.6
Commit: 4406fd405133d2e379a2793003f8a2682730259f
Parents: cdbe4ba
Author: Attila Doroszlai <ad...@hortonworks.com>
Authored: Wed Sep 27 21:59:25 2017 +0200
Committer: Attila Doroszlai <ad...@hortonworks.com>
Committed: Sat Sep 30 19:42:17 2017 +0200
----------------------------------------------------------------------
.../server/topology/AsyncCallableService.java | 112 ++++++++---------
.../ambari/server/topology/TopologyManager.java | 26 +---
.../topology/tasks/ConfigureClusterTask.java | 126 ++++++++++++-------
.../topology/AsyncCallableServiceTest.java | 92 +++++---------
.../ClusterDeployWithStartOnlyTest.java | 11 ++
...InstallWithoutStartOnComponentLevelTest.java | 11 ++
.../ClusterInstallWithoutStartTest.java | 11 ++
.../topology/ConfigureClusterTaskTest.java | 71 +++--------
.../server/topology/TopologyManagerTest.java | 19 ++-
9 files changed, 223 insertions(+), 256 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
index 95ab6b0..db57378 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
@@ -18,17 +18,19 @@
package org.apache.ambari.server.topology;
-import java.util.Calendar;
-import java.util.HashSet;
-import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
* Callable service implementation for executing tasks asynchronously.
* The service repeatedly tries to execute the provided task till it successfully completes, or the provided timeout
@@ -45,89 +47,75 @@ public class AsyncCallableService<T> implements Callable<T> {
// the task to be executed
private final Callable<T> task;
+ private final String taskName;
// the total time the allowed for the task to be executed (retries will be happen within this timeframe in
// milliseconds)
private final long timeout;
// the delay between two consecutive execution trials in milliseconds
- private final long delay;
+ private final long retryDelay;
- private T serviceResult;
+ public AsyncCallableService(Callable<T> task, long timeout, long retryDelay, String taskName) {
+ this(task, timeout, retryDelay, taskName, Executors.newScheduledThreadPool(1));
+ }
- private final Set<Exception> errors = new HashSet<>();
+ public AsyncCallableService(Callable<T> task, long timeout, long retryDelay, String taskName, ScheduledExecutorService executorService) {
+ Preconditions.checkArgument(retryDelay > 0, "retryDelay should be positive");
- public AsyncCallableService(Callable<T> task, long timeout, long delay,
- ScheduledExecutorService executorService) {
this.task = task;
this.executorService = executorService;
this.timeout = timeout;
- this.delay = delay;
+ this.retryDelay = retryDelay;
+ this.taskName = taskName;
}
@Override
- public T call() {
-
- long startTimeInMillis = Calendar.getInstance().getTimeInMillis();
- LOG.info("Task execution started at: {}", startTimeInMillis);
-
- // task execution started on a new thread
- Future future = executorService.submit(task);
-
- while (!taskCompleted(future)) {
- if (!timeoutExceeded(startTimeInMillis)) {
- LOG.debug("Retrying task execution in [ {} ] milliseconds.", delay);
- future = executorService.schedule(task, delay, TimeUnit.MILLISECONDS);
- } else {
- LOG.debug("Timout exceeded, cancelling task ... ");
- // making sure the task gets cancelled!
- if (!future.isDone()) {
- boolean cancelled = future.cancel(true);
- LOG.debug("Task cancelled: {}", cancelled);
- } else {
- LOG.debug("Task already done.");
+ public T call() throws Exception {
+ long startTime = System.currentTimeMillis();
+ long timeLeft = timeout;
+ Future<T> future = executorService.submit(task);
+ LOG.info("Task {} execution started at {}", taskName, startTime);
+
+ while (true) {
+ try {
+ LOG.debug("Task {} waiting for result at most {} ms", taskName, timeLeft);
+ T taskResult = future.get(timeLeft, TimeUnit.MILLISECONDS);
+ LOG.info("Task {} successfully completed with result: {}", taskName, taskResult);
+ return taskResult;
+ } catch (TimeoutException e) {
+ LOG.debug("Task {} timeout", taskName);
+ timeLeft = 0;
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (!(cause instanceof RetryTaskSilently)) {
+ LOG.info(String.format("Task %s exception during execution", taskName), cause);
}
- LOG.info("Timeout exceeded, task execution won't be retried!");
- // exit the "retry" loop!
- break;
+ timeLeft = timeout - (System.currentTimeMillis() - startTime);
}
- }
- LOG.info("Exiting Async task execution with the result: [ {} ]", serviceResult);
- return serviceResult;
- }
+ if (timeLeft < retryDelay) {
+ attemptToCancel(future);
+ LOG.warn("Task {} timeout exceeded, no more retries", taskName);
+ return null;
+ }
- private boolean taskCompleted(Future<T> future) {
- boolean completed = false;
- try {
- LOG.debug("Retrieving task execution result ...");
- // should receive task execution result within the configured timeout interval
- // exceptions thrown from the task are propagated here
- T taskResult = future.get(timeout, TimeUnit.MILLISECONDS);
-
- // task failures are expected to be reportesd as exceptions
- LOG.debug("Task successfully executed: {}", taskResult);
- setServiceResult(taskResult);
- errors.clear();
- completed = true;
- } catch (Exception e) {
- // Future.isDone always true here!
- LOG.info("Exception during task execution: ", e);
- errors.add(e);
+ LOG.debug("Task {} retrying execution in {} milliseconds", taskName, retryDelay);
+ future = executorService.schedule(task, retryDelay, TimeUnit.MILLISECONDS);
}
- return completed;
- }
-
- private boolean timeoutExceeded(long startTimeInMillis) {
- return timeout < Calendar.getInstance().getTimeInMillis() - startTimeInMillis;
}
- private void setServiceResult(T serviceResult) {
- this.serviceResult = serviceResult;
+ private void attemptToCancel(Future<?> future) {
+ LOG.debug("Task {} timeout exceeded, cancelling", taskName);
+ if (!future.isDone() && future.cancel(true)) {
+ LOG.debug("Task {} cancelled", taskName);
+ } else {
+ LOG.debug("Task {} already done", taskName);
+ }
}
- public Set<Exception> getErrors() {
- return errors;
+ public static class RetryTaskSilently extends RuntimeException {
+ // marker, throw if the task needs to be retried
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index 3029fff..074f929 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -101,9 +101,6 @@ public class TopologyManager {
public static final String TOPOLOGY_RESOLVED_TAG = "TOPOLOGY_RESOLVED";
public static final String KDC_ADMIN_CREDENTIAL = "kdc.admin.credential";
- private static final String CLUSTER_ENV_CONFIG_TYPE_NAME = "cluster-env";
- private static final String CLUSTER_CONFIG_TASK_MAX_TIME_IN_MILLIS_PROPERTY_NAME = "cluster_configure_task_timeout";
-
private PersistedState persistedState;
/**
@@ -1122,27 +1119,8 @@ public class TopologyManager {
* @param configurationRequest configuration request to be executed
*/
private void addClusterConfigRequest(ClusterTopology topology, ClusterConfigurationRequest configurationRequest) {
-
- String timeoutStr = topology.getConfiguration().getPropertyValue(CLUSTER_ENV_CONFIG_TYPE_NAME,
- CLUSTER_CONFIG_TASK_MAX_TIME_IN_MILLIS_PROPERTY_NAME);
-
- long timeout = 1000 * 60 * 30; // 30 minutes
- long delay = 1000; //ms
-
- if (timeoutStr != null) {
- timeout = Long.parseLong(timeoutStr);
- LOG.debug("ConfigureClusterTask timeout set to: {}", timeout);
- } else {
- LOG.debug("No timeout constraints found in configuration. Wired defaults will be applied.");
- }
-
- ConfigureClusterTask configureClusterTask = configureClusterTaskFactory.createConfigureClusterTask(topology,
- configurationRequest, ambariEventPublisher);
-
- AsyncCallableService<Boolean> asyncCallableService = new AsyncCallableService<>(configureClusterTask, timeout, delay,
- Executors.newScheduledThreadPool(1));
-
- executor.submit(asyncCallableService);
+ ConfigureClusterTask task = configureClusterTaskFactory.createConfigureClusterTask(topology, configurationRequest, ambariEventPublisher);
+ executor.submit(new AsyncCallableService<>(task, task.getTimeout(), task.getRepeatDelay(),"ConfigureClusterTask"));
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
index 3aa8cb5..0f13ec2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
@@ -20,11 +20,16 @@ package org.apache.ambari.server.topology.tasks;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.events.ClusterConfigFinishedEvent;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.security.authorization.internal.RunWithInternalSecurityContext;
+import org.apache.ambari.server.state.ConfigHelper;
+import org.apache.ambari.server.topology.AsyncCallableService;
import org.apache.ambari.server.topology.ClusterConfigurationRequest;
import org.apache.ambari.server.topology.ClusterTopology;
import org.apache.ambari.server.topology.HostGroupInfo;
@@ -32,16 +37,23 @@ import org.apache.ambari.server.topology.TopologyManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
public class ConfigureClusterTask implements Callable<Boolean> {
- private static Logger LOG = LoggerFactory.getLogger(ConfigureClusterTask.class);
+ private static final long DEFAULT_TIMEOUT = TimeUnit.MINUTES.toMillis(30);
+ private static final long REPEAT_DELAY = TimeUnit.SECONDS.toMillis(1);
+ private static final String TIMEOUT_PROPERTY_NAME = "cluster_configure_task_timeout";
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigureClusterTask.class);
- private ClusterConfigurationRequest configRequest;
- private ClusterTopology topology;
- private AmbariEventPublisher ambariEventPublisher;
+ private final ClusterConfigurationRequest configRequest;
+ private final ClusterTopology topology;
+ private final AmbariEventPublisher ambariEventPublisher;
+ private final Map<String, Integer> previousHostCounts = Maps.newHashMap();
+ private final Set<String> missingHostGroups = Sets.newHashSet();
@AssistedInject
public ConfigureClusterTask(@Assisted ClusterTopology topology, @Assisted ClusterConfigurationRequest configRequest,
@@ -54,55 +66,56 @@ public class ConfigureClusterTask implements Callable<Boolean> {
@Override
@RunWithInternalSecurityContext(token = TopologyManager.INTERNAL_AUTH_TOKEN)
public Boolean call() throws Exception {
- LOG.info("TopologyManager.ConfigureClusterTask: Entering");
+ LOG.debug("Entering");
Collection<String> requiredHostGroups = getTopologyRequiredHostGroups();
- if (!areRequiredHostGroupsResolved(requiredHostGroups)) {
- LOG.debug("TopologyManager.ConfigureClusterTask - prerequisites for config request processing not yet " +
- "satisfied");
- throw new IllegalArgumentException("TopologyManager.ConfigureClusterTask - prerequisites for config " +
- "request processing not yet satisfied");
+ if (!areHostGroupsResolved(requiredHostGroups)) {
+ LOG.info("Some host groups require more hosts, cluster configuration cannot begin");
+ throw new AsyncCallableService.RetryTaskSilently();
}
- try {
- LOG.info("TopologyManager.ConfigureClusterTask: All Required host groups are completed, Cluster " +
- "Configuration can now begin");
- configRequest.process();
- } catch (Exception e) {
- LOG.error("TopologyManager.ConfigureClusterTask: " +
- "An exception occurred while attempting to process cluster configs and set on cluster: ", e);
-
- // this will signal an unsuccessful run, retry will be triggered if required
- throw new Exception(e);
- }
+ LOG.info("All required host groups are complete, cluster configuration can now begin");
+ configRequest.process();
+ LOG.info("Cluster configuration finished successfully");
- LOG.info("Cluster configuration finished successfully!");
- // Notify listeners that cluster configuration finished
- long clusterId = topology.getClusterId();
- ambariEventPublisher.publish(new ClusterConfigFinishedEvent(clusterId,
- topology.getAmbariContext().getClusterName(clusterId)));
+ notifyListeners();
- LOG.info("TopologyManager.ConfigureClusterTask: Exiting");
+ LOG.debug("Exiting");
return true;
}
+ public long getTimeout() {
+ long timeout = DEFAULT_TIMEOUT;
+
+ String timeoutStr = topology.getConfiguration().getPropertyValue(ConfigHelper.CLUSTER_ENV, TIMEOUT_PROPERTY_NAME);
+ if (timeoutStr != null) {
+ try {
+ timeout = Long.parseLong(timeoutStr);
+ LOG.info("Using custom timeout: {} ms", timeout);
+ } catch (NumberFormatException e) {
+ // use default
+ }
+ }
+
+ return timeout;
+ }
+
+ public long getRepeatDelay() {
+ return REPEAT_DELAY;
+ }
+
/**
* Return the set of host group names which are required for configuration topology resolution.
- *
- * @return set of required host group names
*/
private Collection<String> getTopologyRequiredHostGroups() {
- Collection<String> requiredHostGroups;
try {
- requiredHostGroups = configRequest.getRequiredHostGroups();
+ return configRequest.getRequiredHostGroups();
} catch (RuntimeException e) {
// just log error and allow config topology update
- LOG.error("TopologyManager.ConfigureClusterTask: An exception occurred while attempting to determine required" +
- " host groups for config update ", e);
- requiredHostGroups = Collections.emptyList();
+ LOG.error("Could not determine required host groups", e);
+ return Collections.emptyList();
}
- return requiredHostGroups;
}
/**
@@ -111,23 +124,44 @@ public class ConfigureClusterTask implements Callable<Boolean> {
* @param requiredHostGroups set of required host groups
* @return true if all required host groups are resolved
*/
- private boolean areRequiredHostGroupsResolved(Collection<String> requiredHostGroups) {
- boolean configTopologyResolved = true;
+ private boolean areHostGroupsResolved(Collection<String> requiredHostGroups) {
+ boolean allHostGroupsResolved = true;
Map<String, HostGroupInfo> hostGroupInfo = topology.getHostGroupInfo();
for (String hostGroup : requiredHostGroups) {
HostGroupInfo groupInfo = hostGroupInfo.get(hostGroup);
- if (groupInfo == null || groupInfo.getHostNames().size() < groupInfo.getRequestedHostCount()) {
- configTopologyResolved = false;
- if (groupInfo != null) {
- LOG.info("TopologyManager.ConfigureClusterTask areHostGroupsResolved: host group name = {} requires {} hosts to be mapped, but only {} are available.",
- groupInfo.getHostGroupName(), groupInfo.getRequestedHostCount(), groupInfo.getHostNames().size());
+ if (groupInfo == null) {
+ allHostGroupsResolved = false;
+ if (missingHostGroups.add(hostGroup)) {
+ LOG.warn("Host group '{}' is missing from cluster creation request", hostGroup);
}
- break;
} else {
- LOG.info("TopologyManager.ConfigureClusterTask areHostGroupsResolved: host group name = {} has been fully resolved, as all {} required hosts are mapped to {} physical hosts.",
- groupInfo.getHostGroupName(), groupInfo.getRequestedHostCount(), groupInfo.getHostNames().size());
+ int actualHostCount = groupInfo.getHostNames().size();
+ int requestedHostCount = groupInfo.getRequestedHostCount();
+ boolean hostGroupReady = actualHostCount >= requestedHostCount;
+ allHostGroupsResolved &= hostGroupReady;
+
+ Integer previousHostCount = previousHostCounts.put(hostGroup, actualHostCount);
+ if (previousHostCount == null || previousHostCount != actualHostCount) {
+ if (hostGroupReady) {
+ LOG.info("Host group '{}' resolved, requires {} hosts and {} are available",
+ groupInfo.getHostGroupName(), requestedHostCount, actualHostCount
+ );
+ } else {
+ LOG.info("Host group '{}' pending, requires {} hosts, but only {} are available",
+ groupInfo.getHostGroupName(), requestedHostCount, actualHostCount
+ );
+ }
+ }
}
}
- return configTopologyResolved;
+
+ return allHostGroupsResolved;
}
+
+ private void notifyListeners() throws AmbariException {
+ long clusterId = topology.getClusterId();
+ String clusterName = topology.getAmbariContext().getClusterName(clusterId);
+ ambariEventPublisher.publish(new ClusterConfigFinishedEvent(clusterId, clusterName));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
index d4e4975..bf8fd79 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
@@ -18,30 +18,26 @@
package org.apache.ambari.server.topology;
-import org.easymock.EasyMockRule;
-import org.easymock.EasyMockSupport;
-import org.easymock.Mock;
-import org.easymock.MockType;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.easymock.EasyMock.expect;
import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.verify;
+import org.easymock.EasyMockRule;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
public class AsyncCallableServiceTest extends EasyMockSupport {
- public static final Logger LOGGER = LoggerFactory.getLogger(AsyncCallableService.class);
+
+ private static final long TIMEOUT = 1000; // default timeout
+ private static final long RETRY_DELAY = 50; // default delay between tries
@Rule
public EasyMockRule mocks = new EasyMockRule(this);
@@ -55,49 +51,26 @@ public class AsyncCallableServiceTest extends EasyMockSupport {
@Mock
private ScheduledFuture<Boolean> futureMock;
- private long timeout;
-
- private long delay;
-
private AsyncCallableService<Boolean> asyncCallableService;
- @Before
- public void setup() {
- // default timeout, overwrite it if necessary
- timeout = 1000;
-
- // default delay between tries
- delay = 500;
- }
-
-
@Test
public void testCallableServiceShouldCancelTaskWhenTimeoutExceeded() throws Exception {
// GIVEN
-
- //the timeout period should be less zero for guaranteed timeout!
- timeout = -1l;
-
- // the task to be executed never completes successfully
+ long timeout = -1; // guaranteed timeout
expect(futureMock.get(timeout, TimeUnit.MILLISECONDS)).andThrow(new TimeoutException("Testing the timeout exceeded case"));
expect(futureMock.isDone()).andReturn(Boolean.FALSE);
-
- // this is only called when a timeout occurs
expect(futureMock.cancel(true)).andReturn(Boolean.TRUE);
-
expect(executorServiceMock.submit(taskMock)).andReturn(futureMock);
-
replayAll();
- asyncCallableService = new AsyncCallableService(taskMock, timeout, delay, executorServiceMock);
+ asyncCallableService = new AsyncCallableService<>(taskMock, timeout, RETRY_DELAY, "test", executorServiceMock);
// WHEN
Boolean serviceResult = asyncCallableService.call();
// THEN
- verify();
- Assert.assertNull("Service result must be null", serviceResult);
- Assert.assertFalse("The service should have errors!", asyncCallableService.getErrors().isEmpty());
+ verifyAll();
+ Assert.assertNull("No result expected in case of timeout", serviceResult);
}
@Test
@@ -112,72 +85,63 @@ public class AsyncCallableServiceTest extends EasyMockSupport {
}
};
- asyncCallableService = new AsyncCallableService(hangingTask, timeout, delay, Executors.newScheduledThreadPool(2));
+ asyncCallableService = new AsyncCallableService<>(hangingTask, TIMEOUT, RETRY_DELAY, "test");
// WHEN
Boolean serviceResult = asyncCallableService.call();
// THEN
- Assert.assertNull("Service result must be null", serviceResult);
- Assert.assertFalse("The service should have errors!", asyncCallableService.getErrors().isEmpty());
+ Assert.assertNull("No result expected from hanging task", serviceResult);
}
@Test
public void testCallableServiceShouldExitWhenTaskCompleted() throws Exception {
// GIVEN
- // the task to be executed never completes successfully
- expect(taskMock.call()).andReturn(Boolean.TRUE).times(1);
-
+ expect(taskMock.call()).andReturn(Boolean.TRUE);
replayAll();
- asyncCallableService = new AsyncCallableService(taskMock, timeout, delay, Executors.newScheduledThreadPool(2));
+ asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test");
// WHEN
Boolean serviceResult = asyncCallableService.call();
// THEN
- verify();
- Assert.assertNotNull("Service result must not be null", serviceResult);
- Assert.assertTrue(serviceResult);
+ verifyAll();
+ Assert.assertEquals(Boolean.TRUE, serviceResult);
}
@Test
public void testCallableServiceShouldRetryTaskExecutionTillTimeoutExceededWhenTaskThrowsException() throws Exception {
// GIVEN
-
- // the task to be throws exception
expect(taskMock.call()).andThrow(new IllegalStateException("****************** TESTING ****************")).times(2, 3);
replayAll();
- asyncCallableService = new AsyncCallableService(taskMock, timeout, delay, Executors.newScheduledThreadPool(2));
+ asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test");
// WHEN
Boolean serviceResult = asyncCallableService.call();
// THEN
- verify();
- // THEN
- Assert.assertNull("Service result must be null", serviceResult);
-
+ verifyAll();
+ Assert.assertNull("No result expected from throwing task", serviceResult);
}
@Test
public void testShouldAsyncCallableServiceRetryExecutionWhenTaskThrowsException() throws Exception {
// GIVEN
- //the task call hangs, it doesn't return within a reasonable period of time
- Callable<Boolean> hangingTask = new Callable<Boolean>() {
+ // the task throws exception
+ Callable<Boolean> throwingTask = new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
throw new IllegalStateException("****************** TESTING ****************");
}
};
- asyncCallableService = new AsyncCallableService(hangingTask, timeout, delay, Executors.newScheduledThreadPool(2));
+ asyncCallableService = new AsyncCallableService<>(throwingTask, TIMEOUT, RETRY_DELAY, "test");
// WHEN
Boolean serviceResult = asyncCallableService.call();
// THEN
- verify();
- Assert.assertNull("Service result must be null", serviceResult);
+ Assert.assertNull("No result expected from throwing task", serviceResult);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
index 95f02b1..b1233f4 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
@@ -55,12 +55,14 @@ import org.apache.ambari.server.controller.internal.ProvisionClusterRequest;
import org.apache.ambari.server.controller.internal.Stack;
import org.apache.ambari.server.controller.spi.ClusterController;
import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
import org.apache.ambari.server.security.encryption.CredentialStoreService;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.ComponentInfo;
import org.apache.ambari.server.state.SecurityType;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTask;
import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
import org.apache.ambari.server.topology.validators.TopologyValidatorService;
import org.easymock.Capture;
@@ -152,6 +154,8 @@ public class ClusterDeployWithStartOnlyTest extends EasyMockSupport {
private ComponentInfo clientComponentInfo;
@Mock(type = MockType.NICE)
private ConfigureClusterTaskFactory configureClusterTaskFactory;
+ @Mock(type = MockType.NICE)
+ private ConfigureClusterTask configureClusterTask;
@Mock(type = MockType.STRICT)
private Future mockFuture;
@@ -391,6 +395,13 @@ public class ClusterDeployWithStartOnlyTest extends EasyMockSupport {
ambariContext.persistInstallStateForUI(CLUSTER_NAME, STACK_NAME, STACK_VERSION);
expectLastCall().once();
+ expect(configureClusterTaskFactory.createConfigureClusterTask(
+ anyObject(ClusterTopology.class),
+ anyObject(ClusterConfigurationRequest.class),
+ anyObject(AmbariEventPublisher.class)
+ )).andReturn(configureClusterTask);
+ expect(configureClusterTask.getTimeout()).andReturn(1000L);
+ expect(configureClusterTask.getRepeatDelay()).andReturn(50L);
expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture).times(1);
persistedTopologyRequest = new PersistedTopologyRequest(1, request);
http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
index 224142a..63c9077 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
@@ -56,12 +56,14 @@ import org.apache.ambari.server.controller.internal.ProvisionClusterRequest;
import org.apache.ambari.server.controller.internal.Stack;
import org.apache.ambari.server.controller.spi.ClusterController;
import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
import org.apache.ambari.server.security.encryption.CredentialStoreService;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.ComponentInfo;
import org.apache.ambari.server.state.SecurityType;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTask;
import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
import org.apache.ambari.server.topology.validators.TopologyValidatorService;
import org.easymock.Capture;
@@ -143,6 +145,8 @@ public class ClusterInstallWithoutStartOnComponentLevelTest extends EasyMockSupp
private HostRoleCommand hostRoleCommand;
@Mock(type = MockType.NICE)
private ConfigureClusterTaskFactory configureClusterTaskFactory;
+ @Mock(type = MockType.NICE)
+ private ConfigureClusterTask configureClusterTask;
@Mock(type = MockType.NICE)
@@ -368,6 +372,13 @@ public class ClusterInstallWithoutStartOnComponentLevelTest extends EasyMockSupp
ambariContext.persistInstallStateForUI(CLUSTER_NAME, STACK_NAME, STACK_VERSION);
expectLastCall().once();
+ expect(configureClusterTaskFactory.createConfigureClusterTask(
+ anyObject(ClusterTopology.class),
+ anyObject(ClusterConfigurationRequest.class),
+ anyObject(AmbariEventPublisher.class)
+ )).andReturn(configureClusterTask);
+ expect(configureClusterTask.getTimeout()).andReturn(1000L);
+ expect(configureClusterTask.getRepeatDelay()).andReturn(50L);
expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture).times(1);
persistedTopologyRequest = new PersistedTopologyRequest(1, request);
http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
index 605def0..8e8e3d5 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
@@ -56,12 +56,14 @@ import org.apache.ambari.server.controller.internal.ProvisionClusterRequest;
import org.apache.ambari.server.controller.internal.Stack;
import org.apache.ambari.server.controller.spi.ClusterController;
import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
import org.apache.ambari.server.security.encryption.CredentialStoreService;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.ComponentInfo;
import org.apache.ambari.server.state.SecurityType;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTask;
import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
import org.apache.ambari.server.topology.validators.TopologyValidatorService;
import org.easymock.Capture;
@@ -144,6 +146,8 @@ public class ClusterInstallWithoutStartTest extends EasyMockSupport {
private HostRoleCommand hostRoleCommand;
@Mock(type = MockType.NICE)
private ConfigureClusterTaskFactory configureClusterTaskFactory;
+ @Mock(type = MockType.NICE)
+ private ConfigureClusterTask configureClusterTask;
@Mock(type = MockType.NICE)
@@ -363,6 +367,13 @@ public class ClusterInstallWithoutStartTest extends EasyMockSupport {
ambariContext.persistInstallStateForUI(CLUSTER_NAME, STACK_NAME, STACK_VERSION);
expectLastCall().once();
+ expect(configureClusterTaskFactory.createConfigureClusterTask(
+ anyObject(ClusterTopology.class),
+ anyObject(ClusterConfigurationRequest.class),
+ anyObject(AmbariEventPublisher.class)
+ )).andReturn(configureClusterTask);
+ expect(configureClusterTask.getTimeout()).andReturn(1000L);
+ expect(configureClusterTask.getRepeatDelay()).andReturn(50L);
expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture).times(1);
persistedTopologyRequest = new PersistedTopologyRequest(1, request);
http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
index 7d34ca2..11f571b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,41 +19,28 @@
package org.apache.ambari.server.topology;
import static org.easymock.EasyMock.anyObject;
-
import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
-import static org.easymock.EasyMock.verify;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Executors;
import org.apache.ambari.server.events.AmbariEvent;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.topology.tasks.ConfigureClusterTask;
import org.easymock.EasyMockRule;
+import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.easymock.MockType;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import junit.framework.Assert;
/**
* Unit test for the ConfigureClusterTask class.
* As business methods of this class don't return values, the assertions are made by verifying method calls on mocks.
* Thus having strict mocks is essential!
*/
-public class ConfigureClusterTaskTest {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ConfigureClusterTaskTest.class);
+public class ConfigureClusterTaskTest extends EasyMockSupport {
@Rule
public EasyMockRule mocks = new EasyMockRule(this);
@@ -74,71 +61,45 @@ public class ConfigureClusterTaskTest {
@Before
public void before() {
- reset(clusterConfigurationRequest, clusterTopology, ambariContext, ambariEventPublisher);
+ resetAll();
testSubject = new ConfigureClusterTask(clusterTopology, clusterConfigurationRequest, ambariEventPublisher);
}
@Test
- public void testShouldConfigureClusterTaskLogicBeExecutedWhenRequiredHostgroupsAreResolved() throws
- Exception {
+ public void taskShouldBeExecutedIfRequiredHostgroupsAreResolved() throws Exception {
// GIVEN
- // is it OK to handle the non existence of hostgroups as a success?!
-
- expect(clusterConfigurationRequest.getRequiredHostGroups()).andReturn(Collections.EMPTY_LIST);
- expect(clusterTopology.getHostGroupInfo()).andReturn(Collections.EMPTY_MAP);
+ expect(clusterConfigurationRequest.getRequiredHostGroups()).andReturn(Collections.<String>emptyList());
+ expect(clusterTopology.getHostGroupInfo()).andReturn(Collections.<String, HostGroupInfo>emptyMap());
expect(clusterTopology.getClusterId()).andReturn(1L).anyTimes();
expect(clusterTopology.getAmbariContext()).andReturn(ambariContext);
expect(ambariContext.getClusterName(1L)).andReturn("testCluster");
-
- // this is only called if the "prerequisites" are satisfied
clusterConfigurationRequest.process();
ambariEventPublisher.publish(anyObject(AmbariEvent.class));
-
- replay(clusterConfigurationRequest, clusterTopology, ambariContext, ambariEventPublisher);
+ replayAll();
// WHEN
Boolean result = testSubject.call();
// THEN
- verify();
+ verifyAll();
Assert.assertTrue(result);
}
@Test
public void testsShouldConfigureClusterTaskExecuteWhenCalledFromAsyncCallableService() throws Exception {
// GIVEN
- // is it OK to handle the non existence of hostgroups as a success?!
- expect(clusterConfigurationRequest.getRequiredHostGroups()).andReturn(Collections.EMPTY_LIST);
- expect(clusterTopology.getHostGroupInfo()).andReturn(Collections.EMPTY_MAP);
-
- // this is only called if the "prerequisites" are satisfied
+ expect(clusterConfigurationRequest.getRequiredHostGroups()).andReturn(Collections.<String>emptyList());
+ expect(clusterTopology.getHostGroupInfo()).andReturn(Collections.<String, HostGroupInfo>emptyMap());
clusterConfigurationRequest.process();
+ replayAll();
- replay(clusterConfigurationRequest, clusterTopology);
-
- AsyncCallableService<Boolean> asyncService = new AsyncCallableService<>(testSubject, 5000, 500, Executors
- .newScheduledThreadPool(3));
+ AsyncCallableService<Boolean> asyncService = new AsyncCallableService<>(testSubject, 5000, 500, "test");
// WHEN
asyncService.call();
- // THEN
-
+ // THEN
+ verifyAll();
}
- private Collection<String> mockRequiredHostGroups() {
- return Arrays.asList("test-hostgroup-1");
- }
-
- private Map<String, HostGroupInfo> mockHostGroupInfo() {
- Map<String, HostGroupInfo> hostGroupInfoMap = new HashMap<>();
- HostGroupInfo hostGroupInfo = new HostGroupInfo("test-hostgroup-1");
- hostGroupInfo.addHost("test-host-1");
- hostGroupInfo.setRequestedCount(2);
-
- hostGroupInfoMap.put("test-hostgroup-1", hostGroupInfo);
- return hostGroupInfoMap;
- }
-
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/4406fd40/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
index 025473e..07bb987 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
@@ -59,6 +59,7 @@ import org.apache.ambari.server.controller.spi.ClusterController;
import org.apache.ambari.server.controller.spi.Resource;
import org.apache.ambari.server.controller.spi.ResourceProvider;
import org.apache.ambari.server.events.RequestFinishedEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.dao.SettingDAO;
import org.apache.ambari.server.orm.entities.SettingEntity;
import org.apache.ambari.server.security.authorization.AuthorizationHelper;
@@ -66,6 +67,7 @@ import org.apache.ambari.server.security.encryption.CredentialStoreService;
import org.apache.ambari.server.stack.NoSuchStackException;
import org.apache.ambari.server.state.SecurityType;
import org.apache.ambari.server.state.quicklinksprofile.QuickLinksProfile;
+import org.apache.ambari.server.topology.tasks.ConfigureClusterTask;
import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
import org.apache.ambari.server.topology.validators.TopologyValidatorService;
import org.easymock.Capture;
@@ -156,7 +158,8 @@ public class TopologyManagerTest {
private ClusterTopology clusterTopologyMock;
@Mock(type = MockType.NICE)
private ConfigureClusterTaskFactory configureClusterTaskFactory;
-
+ @Mock(type = MockType.NICE)
+ private ConfigureClusterTask configureClusterTask;
@Mock(type = MockType.STRICT)
private Future mockFuture;
@@ -342,9 +345,14 @@ public class TopologyManagerTest {
expect(clusterController.ensureResourceProvider(anyObject(Resource.Type.class))).andReturn(resourceProvider);
- expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture);
-
- expectLastCall().anyTimes();
+ expect(configureClusterTaskFactory.createConfigureClusterTask(
+ anyObject(ClusterTopology.class),
+ anyObject(ClusterConfigurationRequest.class),
+ anyObject(AmbariEventPublisher.class)
+ )).andReturn(configureClusterTask);
+ expect(configureClusterTask.getTimeout()).andReturn(1000L);
+ expect(configureClusterTask.getRepeatDelay()).andReturn(50L);
+ expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture).anyTimes();
expect(persistedState.persistTopologyRequest(request)).andReturn(persistedTopologyRequest).anyTimes();
persistedState.persistLogicalRequest(logicalRequest, 1);
@@ -530,7 +538,8 @@ public class TopologyManagerTest {
replay(blueprint, stack, request, group1, group2, ambariContext, logicalRequestFactory,
configurationRequest, configurationRequest2, configurationRequest3, executor,
persistedState, clusterTopologyMock, securityConfigurationFactory, credentialStoreService,
- clusterController, resourceProvider, mockFuture, requestStatusResponse, logicalRequest, settingDAO);
+ clusterController, resourceProvider, mockFuture, requestStatusResponse, logicalRequest, settingDAO,
+ configureClusterTaskFactory, configureClusterTask);
}
@Test(expected = InvalidTopologyException.class)