You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ad...@apache.org on 2018/01/19 09:04:55 UTC
[ambari] branch branch-2.6 updated: AMBARI-22805. Blueprints do not
handle some failures properly
This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 4b49902 AMBARI-22805. Blueprints do not handle some failures properly
4b49902 is described below
commit 4b4990272f85dded0abf4264499e2052cd282b0c
Author: Doroszlai, Attila <ad...@apache.org>
AuthorDate: Tue Jan 16 14:11:07 2018 +0100
AMBARI-22805. Blueprints do not handle some failures properly
---
.../controller/internal/CalculatedStatus.java | 7 +-
.../internal/HostComponentResourceProvider.java | 35 +---
.../internal/RequestResourceProvider.java | 34 ++-
.../orm/entities/TopologyHostRequestEntity.java | 33 ++-
.../server/topology/AsyncCallableService.java | 16 +-
.../ambari/server/topology/HostOfferResponse.java | 23 ++-
.../apache/ambari/server/topology/HostRequest.java | 52 ++++-
.../ambari/server/topology/LogicalRequest.java | 44 ++++
.../ambari/server/topology/PersistedState.java | 5 +
.../ambari/server/topology/PersistedStateImpl.java | 11 +
.../ambari/server/topology/TopologyManager.java | 58 +++---
.../server/topology/tasks/TopologyHostTask.java | 4 +
.../ambari/server/topology/tasks/TopologyTask.java | 24 ++-
.../ambari/server/upgrade/SchemaUpgradeHelper.java | 1 +
.../ambari/server/upgrade/UpgradeCatalog262.java | 70 +++++++
.../src/main/resources/Ambari-DDL-Derby-CREATE.sql | 2 +
.../src/main/resources/Ambari-DDL-MySQL-CREATE.sql | 2 +
.../main/resources/Ambari-DDL-Oracle-CREATE.sql | 2 +
.../main/resources/Ambari-DDL-Postgres-CREATE.sql | 2 +
.../resources/Ambari-DDL-SQLAnywhere-CREATE.sql | 2 +
.../main/resources/Ambari-DDL-SQLServer-CREATE.sql | 2 +
.../internal/RequestResourceProviderTest.java | 227 ++++++++++-----------
.../ambari/server/state/cluster/ClustersTest.java | 4 +-
.../server/topology/AsyncCallableServiceTest.java | 28 ++-
.../server/topology/ConfigureClusterTaskTest.java | 4 +-
.../server/topology/TopologyManagerTest.java | 7 +
26 files changed, 477 insertions(+), 222 deletions(-)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
index 3c415df..c88983b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
@@ -54,7 +54,7 @@ public class CalculatedStatus {
private final double percent;
/**
- * A status which represents a COMPLETED state at 0%
+ * A status which represents a COMPLETED state at 100%
*/
public static final CalculatedStatus COMPLETED = new CalculatedStatus(HostRoleStatus.COMPLETED,
HostRoleStatus.COMPLETED, 100.0);
@@ -65,6 +65,11 @@ public class CalculatedStatus {
public static final CalculatedStatus PENDING = new CalculatedStatus(HostRoleStatus.PENDING,
HostRoleStatus.PENDING, 0.0);
+ /**
+ * A status which represents an ABORTED state at -1%
+ */
+ public static final CalculatedStatus ABORTED = new CalculatedStatus(HostRoleStatus.ABORTED, HostRoleStatus.ABORTED, -1);
+
// ----- Constructors ------------------------------------------------------
/**
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java
index aaf4656..d240a84 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java
@@ -795,31 +795,18 @@ public class HostComponentResourceProvider extends AbstractControllerResourcePro
@Override
public RequestStageContainer invoke() throws AmbariException {
RequestStageContainer stageContainer = null;
- int retriesRemaining = 100;
- do {
- try {
- stageContainer = updateHostComponents(stages, requests, request.getRequestInfoProperties(),
- runSmokeTest);
- } catch (Exception e) {
- if (--retriesRemaining == 0) {
- LOG.info("Caught an exception while updating host components, will not try again: {}", e.getMessage(), e);
- // !!! IllegalArgumentException results in a 400 response, RuntimeException results in 500.
- if (IllegalArgumentException.class.isInstance(e)) {
- throw (IllegalArgumentException) e;
- } else {
- throw new RuntimeException("Update Host request submission failed: " + e, e);
- }
- } else {
- LOG.info("Caught an exception while updating host components, retrying : " + e);
- try {
- Thread.sleep(250);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Update Host request submission failed: " + e, e);
- }
- }
+ try {
+ stageContainer = updateHostComponents(stages, requests, request.getRequestInfoProperties(),
+ runSmokeTest);
+ } catch (Exception e) {
+ LOG.info("Caught an exception while updating host components, will not try again: {}", e.getMessage(), e);
+ // !!! IllegalArgumentException results in a 400 response, RuntimeException results in 500.
+ if (e instanceof IllegalArgumentException) {
+ throw (IllegalArgumentException) e;
+ } else {
+ throw new RuntimeException("Update Host request submission failed: " + e, e);
}
- } while (stageContainer == null);
+ }
return stageContainer;
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
index 46b1d0c..1f69015 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
@@ -23,7 +23,6 @@ import static org.apache.ambari.server.controller.internal.HostComponentResource
import static org.apache.ambari.server.controller.internal.HostComponentResourceProvider.HOST_COMPONENT_SERVICE_NAME_PROPERTY_ID;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -72,6 +71,8 @@ import org.apache.ambari.server.topology.TopologyManager;
import org.apache.ambari.server.utils.SecretReference;
import org.apache.commons.lang.StringUtils;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
@@ -127,11 +128,9 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
protected static final String HOSTS_PREDICATE = "hosts_predicate";
protected static final String ACTION_ID = "action";
protected static final String INPUTS_ID = "parameters";
- protected static final String EXLUSIVE_ID = "exclusive";
+ protected static final String EXCLUSIVE_ID = "exclusive";
public static final String HAS_RESOURCE_FILTERS = "HAS_RESOURCE_FILTERS";
- private static Set<String> pkPropertyIds =
- new HashSet<String>(Arrays.asList(new String[]{
- REQUEST_ID_PROPERTY_ID}));
+ private static final Set<String> PK_PROPERTY_IDS = ImmutableSet.of(REQUEST_ID_PROPERTY_ID);
private PredicateCompiler predicateCompiler = new PredicateCompiler();
@@ -418,7 +417,7 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
@Override
protected Set<String> getPKPropertyIds() {
- return pkPropertyIds;
+ return PK_PROPERTY_IDS;
}
@@ -476,8 +475,8 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
}
boolean exclusive = false;
- if (requestInfoProperties.containsKey(EXLUSIVE_ID)) {
- exclusive = Boolean.valueOf(requestInfoProperties.get(EXLUSIVE_ID).trim());
+ if (requestInfoProperties.containsKey(EXCLUSIVE_ID)) {
+ exclusive = Boolean.valueOf(requestInfoProperties.get(EXCLUSIVE_ID).trim());
}
return new ExecuteActionRequest(
@@ -751,7 +750,8 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
}
setResourceProperty(resource, REQUEST_ID_PROPERTY_ID, entity.getRequestId(), requestedPropertyIds);
- setResourceProperty(resource, REQUEST_CONTEXT_ID, entity.getRequestContext(), requestedPropertyIds);
+ String requestContext = entity.getRequestContext();
+ setResourceProperty(resource, REQUEST_CONTEXT_ID, requestContext, requestedPropertyIds);
setResourceProperty(resource, REQUEST_TYPE_ID, entity.getRequestType(), requestedPropertyIds);
// Mask any sensitive data fields in the inputs data structure
@@ -802,15 +802,13 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
final CalculatedStatus status;
LogicalRequest logicalRequest = topologyManager.getRequest(entity.getRequestId());
if (summary.isEmpty() && null != logicalRequest) {
- // In this case, it appears that there are no tasks but this is a logical
- // topology request, so it's a matter of hosts simply not registering yet
- // for tasks to be created ==> status = PENDING.
- // For a new LogicalRequest there should be at least one HostRequest,
- // while if they were removed already ==> status = COMPLETED.
- if (logicalRequest.getHostRequests().isEmpty()) {
- status = CalculatedStatus.COMPLETED;
- } else {
- status = CalculatedStatus.PENDING;
+ status = logicalRequest.calculateStatus();
+ if (status == CalculatedStatus.ABORTED) {
+ Optional<String> failureReason = logicalRequest.getFailureReason();
+ if (failureReason.isPresent()) {
+ requestContext += "\nFAILED: " + failureReason.get();
+ setResourceProperty(resource, REQUEST_CONTEXT_ID, requestContext, requestedPropertyIds);
+ }
}
} else {
// there are either tasks or this is not a logical request, so do normal
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java
index 7abbd51..1491302 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java
@@ -17,11 +17,13 @@
*/
package org.apache.ambari.server.orm.entities;
+import java.util.Collection;
+
import javax.persistence.CascadeType;
import javax.persistence.Column;
import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.ManyToOne;
@@ -29,8 +31,8 @@ import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.OneToMany;
import javax.persistence.Table;
-import javax.persistence.TableGenerator;
-import java.util.Collection;
+
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
@Entity
@Table(name = "topology_host_request")
@@ -49,6 +51,13 @@ public class TopologyHostRequestEntity {
@Column(name = "host_name", length = 255)
private String hostName;
+ @Column(name = "status")
+ @Enumerated(EnumType.STRING)
+ private HostRoleStatus status;
+
+ @Column(name = "status_message")
+ private String statusMessage;
+
@ManyToOne
@JoinColumn(name = "logical_request_id", referencedColumnName = "id", nullable = false)
private TopologyLogicalRequestEntity topologyLogicalRequestEntity;
@@ -92,6 +101,22 @@ public class TopologyHostRequestEntity {
this.hostName = hostName;
}
+ public HostRoleStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(HostRoleStatus status) {
+ this.status = status;
+ }
+
+ public String getStatusMessage() {
+ return statusMessage;
+ }
+
+ public void setStatusMessage(String statusMessage) {
+ this.statusMessage = statusMessage;
+ }
+
public TopologyLogicalRequestEntity getTopologyLogicalRequestEntity() {
return topologyLogicalRequestEntity;
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
index db57378..3abb52c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java
@@ -29,7 +29,9 @@ import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
/**
* Callable service implementation for executing tasks asynchronously.
@@ -55,12 +57,13 @@ public class AsyncCallableService<T> implements Callable<T> {
// the delay between two consecutive execution trials in milliseconds
private final long retryDelay;
+ private final Function<Throwable, ?> onError;
- public AsyncCallableService(Callable<T> task, long timeout, long retryDelay, String taskName) {
- this(task, timeout, retryDelay, taskName, Executors.newScheduledThreadPool(1));
+ public AsyncCallableService(Callable<T> task, long timeout, long retryDelay, String taskName, Function<Throwable, ?> onError) {
+ this(task, timeout, retryDelay, taskName, Executors.newScheduledThreadPool(1), onError);
}
- public AsyncCallableService(Callable<T> task, long timeout, long retryDelay, String taskName, ScheduledExecutorService executorService) {
+ public AsyncCallableService(Callable<T> task, long timeout, long retryDelay, String taskName, ScheduledExecutorService executorService, Function<Throwable, ?> onError) {
Preconditions.checkArgument(retryDelay > 0, "retryDelay should be positive");
this.task = task;
@@ -68,6 +71,7 @@ public class AsyncCallableService<T> implements Callable<T> {
this.timeout = timeout;
this.retryDelay = retryDelay;
this.taskName = taskName;
+ this.onError = onError;
}
@Override
@@ -78,6 +82,7 @@ public class AsyncCallableService<T> implements Callable<T> {
LOG.info("Task {} execution started at {}", taskName, startTime);
while (true) {
+ Throwable lastError;
try {
LOG.debug("Task {} waiting for result at most {} ms", taskName, timeLeft);
T taskResult = future.get(timeLeft, TimeUnit.MILLISECONDS);
@@ -85,18 +90,21 @@ public class AsyncCallableService<T> implements Callable<T> {
return taskResult;
} catch (TimeoutException e) {
LOG.debug("Task {} timeout", taskName);
+ lastError = e;
timeLeft = 0;
} catch (ExecutionException e) {
- Throwable cause = e.getCause();
+ Throwable cause = Throwables.getRootCause(e);
if (!(cause instanceof RetryTaskSilently)) {
LOG.info(String.format("Task %s exception during execution", taskName), cause);
}
+ lastError = cause;
timeLeft = timeout - (System.currentTimeMillis() - startTime);
}
if (timeLeft < retryDelay) {
attemptToCancel(future);
LOG.warn("Task {} timeout exceeded, no more retries", taskName);
+ onError.apply(lastError);
return null;
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java
index e220c50..2c29e5d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java
@@ -22,7 +22,8 @@ package org.apache.ambari.server.topology;
import java.util.List;
import java.util.concurrent.Executor;
-import org.apache.ambari.server.topology.tasks.TopologyTask;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.topology.tasks.TopologyHostTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,9 +41,9 @@ final class HostOfferResponse {
private final Answer answer;
private final String hostGroupName;
private final long hostRequestId;
- private final List<TopologyTask> tasks;
+ private final List<TopologyHostTask> tasks;
- static HostOfferResponse createAcceptedResponse(long hostRequestId, String hostGroupName, List<TopologyTask> tasks) {
+ static HostOfferResponse createAcceptedResponse(long hostRequestId, String hostGroupName, List<TopologyHostTask> tasks) {
return new HostOfferResponse(Answer.ACCEPTED, hostRequestId, hostGroupName, tasks);
}
@@ -50,7 +51,7 @@ final class HostOfferResponse {
this(answer, -1, null, null);
}
- private HostOfferResponse(Answer answer, long hostRequestId, String hostGroupName, List<TopologyTask> tasks) {
+ private HostOfferResponse(Answer answer, long hostRequestId, String hostGroupName, List<TopologyHostTask> tasks) {
this.answer = answer;
this.hostRequestId = hostRequestId;
this.hostGroupName = hostGroupName;
@@ -78,12 +79,20 @@ final class HostOfferResponse {
executor.execute(new Runnable() {
@Override
public void run() {
- for (TopologyTask task : tasks) {
- LOG.info("Running task for accepted host offer for hostname = {}, task = {}", hostName, task.getType());
- task.run();
+ for (TopologyHostTask task : tasks) {
+ try {
+ LOG.info("Running task for accepted host offer for hostname = {}, task = {}", hostName, task.getType());
+ task.run();
+ } catch (Exception e) {
+ HostRequest hostRequest = task.getHostRequest();
+ LOG.error("{} task for host {} failed due to", task.getType(), hostRequest.getHostName(), e);
+ hostRequest.markHostRequestFailed(HostRoleStatus.ABORTED, e, ambariContext.getPersistedTopologyState());
+ break;
+ }
}
}
});
}
}
+
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
index 8aed8aa..6659e9e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.api.predicate.InvalidQueryException;
import org.apache.ambari.server.api.predicate.PredicateCompiler;
import org.apache.ambari.server.controller.internal.HostResourceProvider;
@@ -45,11 +46,14 @@ import org.apache.ambari.server.topology.tasks.InstallHostTask;
import org.apache.ambari.server.topology.tasks.PersistHostResourcesTask;
import org.apache.ambari.server.topology.tasks.RegisterWithConfigGroupTask;
import org.apache.ambari.server.topology.tasks.StartHostTask;
+import org.apache.ambari.server.topology.tasks.TopologyHostTask;
import org.apache.ambari.server.topology.tasks.TopologyTask;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
/**
* Represents a set of requests to a single host such as install, start, etc.
@@ -69,6 +73,8 @@ public class HostRequest implements Comparable<HostRequest> {
private final long id;
private boolean isOutstanding = true;
private final boolean skipFailure;
+ private HostRoleStatus status = HostRoleStatus.PENDING;
+ private String statusMessage;
private Map<TopologyTask, Map<String, Long>> logicalTaskMap = new HashMap<TopologyTask, Map<String, Long>>();
@@ -77,7 +83,7 @@ public class HostRequest implements Comparable<HostRequest> {
// logical task id -> physical tasks
private Map<Long, Long> physicalTasks = new HashMap<Long, Long>();
- private List<TopologyTask> topologyTasks = new ArrayList<TopologyTask>();
+ private List<TopologyHostTask> topologyTasks = new ArrayList<>();
private ClusterTopology topology;
@@ -119,6 +125,8 @@ public class HostRequest implements Comparable<HostRequest> {
hostgroupName = entity.getTopologyHostGroupEntity().getName();
hostGroup = topology.getBlueprint().getHostGroup(hostgroupName);
hostname = entity.getHostName();
+ setStatus(entity.getStatus());
+ statusMessage = entity.getStatusMessage();
this.predicate = toPredicate(predicate);
containsMaster = hostGroup.containsMasterComponent();
this.topology = topology;
@@ -134,6 +142,15 @@ public class HostRequest implements Comparable<HostRequest> {
(hostname == null ? "Host Assignment Pending" : hostname));
}
+ void markHostRequestFailed(HostRoleStatus status, Throwable cause, PersistedState persistedState) {
+ String errorMessage = StringUtils.substringBefore(Throwables.getRootCause(cause).getMessage(), "\n");
+ LOG.info("HostRequest: marking host request {} for {} as {} due to {}", id, hostname, status, errorMessage);
+ abortPendingTasks();
+ setStatus(status);
+ setStatusMessage(errorMessage);
+ persistedState.setHostRequestStatus(id, status, errorMessage);
+ }
+
//todo: synchronization
public synchronized HostOfferResponse offer(Host host) {
if (!isOutstanding) {
@@ -149,6 +166,24 @@ public class HostRequest implements Comparable<HostRequest> {
}
}
+ public HostRoleStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(HostRoleStatus status) {
+ if (status != null) {
+ this.status = status;
+ }
+ }
+
+ public void setStatusMessage(String errorMessage) {
+ this.statusMessage = errorMessage;
+ }
+
+ public Optional<String> getStatusMessage() {
+ return Optional.fromNullable(statusMessage);
+ }
+
public void setHostName(String hostName) {
hostname = hostName;
}
@@ -307,7 +342,7 @@ public class HostRequest implements Comparable<HostRequest> {
}
}
- public List<TopologyTask> getTopologyTasks() {
+ public List<TopologyHostTask> getTopologyTasks() {
return topologyTasks;
}
@@ -341,6 +376,9 @@ public class HostRequest implements Comparable<HostRequest> {
logicalTask.setStructuredOut(physicalTask.getStructuredOut());
}
}
+ if (logicalTask.getStatus() == HostRoleStatus.PENDING && status != HostRoleStatus.PENDING) {
+ logicalTask.setStatus(status);
+ }
}
return logicalTasks.values();
}
@@ -440,6 +478,14 @@ public class HostRequest implements Comparable<HostRequest> {
getLogicalTask(logicalTaskId).incrementAttemptCount();
}
+ public void abortPendingTasks() {
+ for (HostRoleCommand command : getLogicalTasks()) {
+ if (command.getStatus() == HostRoleStatus.PENDING) {
+ command.setStatus(HostRoleStatus.ABORTED);
+ }
+ }
+ }
+
private Predicate toPredicate(String predicate) {
Predicate compiledPredicate = null;
try {
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
index 6df9bc7..aa7c21e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
@@ -38,6 +38,7 @@ import org.apache.ambari.server.controller.AmbariManagementController;
import org.apache.ambari.server.controller.AmbariServer;
import org.apache.ambari.server.controller.RequestStatusResponse;
import org.apache.ambari.server.controller.ShortTaskStatus;
+import org.apache.ambari.server.controller.internal.CalculatedStatus;
import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.orm.entities.TopologyHostGroupEntity;
@@ -49,6 +50,7 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
@@ -405,6 +407,40 @@ public class LogicalRequest extends Request {
return removed;
}
+ /**
+ * @return true if all the tasks in the logical request are in completed state, false otherwise
+ */
+ public boolean isFinished() {
+ for (ShortTaskStatus ts : getRequestStatus().getTasks()) {
+ if (!HostRoleStatus.valueOf(ts.getStatus()).isCompletedState()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Returns if all the tasks in the logical request have completed state.
+ */
+ public boolean isSuccessful() {
+ for (ShortTaskStatus ts : getRequestStatus().getTasks()) {
+ if (HostRoleStatus.valueOf(ts.getStatus()) != HostRoleStatus.COMPLETED) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public Optional<String> getFailureReason() {
+ for (HostRequest request : getHostRequests()) {
+ Optional<String> failureReason = request.getStatusMessage();
+ if (failureReason.isPresent()) {
+ return failureReason;
+ }
+ }
+ return Optional.absent();
+ }
+
private void createHostRequests(TopologyRequest request, ClusterTopology topology) {
Map<String, HostGroupInfo> hostGroupInfoMap = request.getHostGroupInfo();
Blueprint blueprint = topology.getBlueprint();
@@ -523,4 +559,12 @@ public class LogicalRequest extends Request {
}
return controller;
}
+
+ public CalculatedStatus calculateStatus() {
+ return !isFinished()
+ ? CalculatedStatus.PENDING
+ : isSuccessful()
+ ? CalculatedStatus.COMPLETED
+ : CalculatedStatus.ABORTED;
+ }
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
index 8d55c16..b61c02a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.controller.internal.BaseClusterRequest;
import org.apache.ambari.server.state.Host;
@@ -86,4 +87,8 @@ public interface PersistedState {
*/
void removeHostRequests(long logicalRequestId, Collection<HostRequest> hostRequests);
+ /**
+ * Update the status of the given host request.
+ */
+ void setHostRequestStatus(long hostRequestId, HostRoleStatus status, String message);
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
index 1def4df..52fcb6a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
@@ -28,6 +28,7 @@ import javax.inject.Singleton;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.api.predicate.InvalidQueryException;
import org.apache.ambari.server.controller.internal.BaseClusterRequest;
import org.apache.ambari.server.orm.dao.HostDAO;
@@ -142,6 +143,16 @@ public class PersistedStateImpl implements PersistedState {
}
@Override
+ public void setHostRequestStatus(long hostRequestId, HostRoleStatus status, String message) {
+ TopologyHostRequestEntity hostRequestEntity = hostRequestDAO.findById(hostRequestId);
+ if (hostRequestEntity != null) {
+ hostRequestEntity.setStatus(status);
+ hostRequestEntity.setStatusMessage(message);
+ hostRequestDAO.merge(hostRequestEntity);
+ }
+ }
+
+ @Override
public void registerPhysicalTask(long logicalTaskId, long physicalTaskId) {
TopologyLogicalTaskEntity entity = topologyLogicalTaskDAO.findById(logicalTaskId);
HostRoleCommandEntity physicalEntity = hostRoleCommandDAO.findByPK(physicalTaskId);
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index 7eb88cf..511d538 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -33,7 +33,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
import javax.inject.Inject;
import org.apache.ambari.server.AmbariException;
@@ -43,7 +45,6 @@ import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorBlueprintP
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.AmbariServer;
import org.apache.ambari.server.controller.RequestStatusResponse;
-import org.apache.ambari.server.controller.ShortTaskStatus;
import org.apache.ambari.server.controller.internal.ArtifactResourceProvider;
import org.apache.ambari.server.controller.internal.BaseClusterRequest;
import org.apache.ambari.server.controller.internal.CalculatedStatus;
@@ -81,6 +82,7 @@ import org.apache.ambari.server.utils.RetryHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Singleton;
import com.google.inject.persist.Transactional;
@@ -337,7 +339,7 @@ public class TopologyManager {
clusterTopologyMap.put(clusterId, topology);
- addClusterConfigRequest(topology, new ClusterConfigurationRequest(ambariContext, topology, true,
+ addClusterConfigRequest(logicalRequest, topology, new ClusterConfigurationRequest(ambariContext, topology, true,
stackAdvisorBlueprintProcessor, securityType == SecurityType.KERBEROS));
// Process the logical request
@@ -1051,9 +1053,17 @@ public class TopologyManager {
if (!configChecked) {
configChecked = true;
if (!ambariContext.isTopologyResolved(topology.getClusterId())) {
- LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, adding cluster config request");
- addClusterConfigRequest(topology, new ClusterConfigurationRequest(
- ambariContext, topology, false, stackAdvisorBlueprintProcessor));
+ if (provisionRequest == null) {
+ LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, but provision request missing, skipping cluster config request");
+ } else if (provisionRequest.isFinished()) {
+ LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, but provision request is finished, skipping cluster config request");
+ } else {
+ LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, adding cluster config request");
+ ClusterConfigurationRequest configRequest = new ClusterConfigurationRequest(ambariContext, topology, false, stackAdvisorBlueprintProcessor);
+ addClusterConfigRequest(provisionRequest, topology, configRequest);
+ }
+ } else {
+ getOrCreateTopologyTaskExecutor(topology.getClusterId()).start();
}
}
}
@@ -1061,36 +1071,17 @@ public class TopologyManager {
}
/**
- * @param logicalRequest
* @return true if all the tasks in the logical request are in completed state, false otherwise
*/
private boolean isLogicalRequestFinished(LogicalRequest logicalRequest) {
- if(logicalRequest != null) {
- boolean completed = true;
- for(ShortTaskStatus ts : logicalRequest.getRequestStatus().getTasks()) {
- if(!HostRoleStatus.valueOf(ts.getStatus()).isCompletedState()) {
- completed = false;
- }
- }
- return completed;
- }
- return false;
+ return logicalRequest != null && logicalRequest.isFinished();
}
/**
* Returns if all the tasks in the logical request have completed state.
- * @param logicalRequest
- * @return
*/
private boolean isLogicalRequestSuccessful(LogicalRequest logicalRequest) {
- if(logicalRequest != null) {
- for(ShortTaskStatus ts : logicalRequest.getRequestStatus().getTasks()) {
- if(HostRoleStatus.valueOf(ts.getStatus()) != HostRoleStatus.COMPLETED) {
- return false;
- }
- }
- }
- return true;
+ return logicalRequest != null && logicalRequest.isSuccessful();
}
//todo: this should invoke a callback on each 'service' in the topology
@@ -1121,9 +1112,20 @@ public class TopologyManager {
* @param topology cluster topology
* @param configurationRequest configuration request to be executed
*/
- private void addClusterConfigRequest(ClusterTopology topology, ClusterConfigurationRequest configurationRequest) {
+ private void addClusterConfigRequest(final LogicalRequest logicalRequest, ClusterTopology topology, ClusterConfigurationRequest configurationRequest) {
ConfigureClusterTask task = configureClusterTaskFactory.createConfigureClusterTask(topology, configurationRequest, ambariEventPublisher);
- executor.submit(new AsyncCallableService<>(task, task.getTimeout(), task.getRepeatDelay(),"ConfigureClusterTask"));
+ Function<Throwable, Void> onConfigureClusterError = new Function<Throwable, Void>() {
+ @Nullable @Override
+ public Void apply(Throwable input) {
+ HostRoleStatus status = input instanceof TimeoutException ? HostRoleStatus.TIMEDOUT : HostRoleStatus.FAILED;
+ LOG.info("ConfigureClusterTask failed, marking host requests {}", status);
+ for (HostRequest hostRequest : logicalRequest.getHostRequests()) {
+ hostRequest.markHostRequestFailed(status, input, persistedState);
+ }
+ return null;
+ }
+ };
+ executor.submit(new AsyncCallableService<>(task, task.getTimeout(), task.getRepeatDelay(),"ConfigureClusterTask", onConfigureClusterError));
}
/**
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java
index 82a2f6e..3db02e2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java
@@ -39,6 +39,10 @@ public abstract class TopologyHostTask implements TopologyTask {
this.hostRequest = hostRequest;
}
+ public HostRequest getHostRequest() {
+ return hostRequest;
+ }
+
/**
* Run with an InternalAuthenticationToken as when running these tasks we might not have any active security context.
*/
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java
index 0753c3d..6f2d7f5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java
@@ -19,6 +19,12 @@
package org.apache.ambari.server.topology.tasks;
+import java.util.Set;
+
+import org.apache.ambari.server.RoleCommand;
+
+import com.google.common.collect.ImmutableSet;
+
/**
* Task which is executed by the TopologyManager.
*/
@@ -26,11 +32,23 @@ public interface TopologyTask extends Runnable {
/**
* Task type.
*/
- public enum Type {
+ enum Type {
RESOURCE_CREATION,
CONFIGURE,
INSTALL,
- START
+ START {
+ @Override
+ public Set<RoleCommand> tasksToAbortOnFailure() {
+ return ImmutableSet.of(RoleCommand.START);
+ }
+ },
+ ;
+
+ private static Set<RoleCommand> ALL_TASKS = ImmutableSet.of(RoleCommand.INSTALL, RoleCommand.START);
+
+ public Set<RoleCommand> tasksToAbortOnFailure() {
+ return ALL_TASKS;
+ }
}
/**
@@ -38,5 +56,5 @@ public interface TopologyTask extends Runnable {
*
* @return the type of task
*/
- public Type getType();
+ Type getType();
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
index 2ab8dc8..fb7ce22 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
@@ -190,6 +190,7 @@ public class SchemaUpgradeHelper {
catalogBinder.addBinding().to(UpgradeCatalog252.class);
catalogBinder.addBinding().to(UpgradeCatalog260.class);
catalogBinder.addBinding().to(UpgradeCatalog261.class);
+ catalogBinder.addBinding().to(UpgradeCatalog262.class);
catalogBinder.addBinding().to(UpdateAlertScriptPaths.class);
catalogBinder.addBinding().to(FinalUpgradeCatalog.class);
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog262.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog262.java
new file mode 100644
index 0000000..f83204d
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog262.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.upgrade;
+
+import java.sql.SQLException;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.orm.DBAccessor;
+
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+
+/**
+ * The {@link UpgradeCatalog262} upgrades Ambari from 2.6.1 to 2.6.2.
+ */
+public class UpgradeCatalog262 extends AbstractUpgradeCatalog {
+
+ private static final String HOST_REQUEST_TABLE = "topology_host_request";
+ private static final String STATUS_COLUMN = "status";
+ private static final String STATUS_MESSAGE_COLUMN = "status_message";
+
+ @Inject
+ public UpgradeCatalog262(Injector injector) {
+ super(injector);
+ }
+
+ @Override
+ public String getSourceVersion() {
+ return "2.6.1";
+ }
+
+ @Override
+ public String getTargetVersion() {
+ return "2.6.2";
+ }
+
+ @Override
+ protected void executeDDLUpdates() throws AmbariException, SQLException {
+ addHostRequestStatusColumn();
+ }
+
+ private void addHostRequestStatusColumn() throws SQLException {
+ dbAccessor.addColumn(HOST_REQUEST_TABLE, new DBAccessor.DBColumnInfo(STATUS_COLUMN, String.class, 255, null, true));
+ dbAccessor.addColumn(HOST_REQUEST_TABLE, new DBAccessor.DBColumnInfo(STATUS_MESSAGE_COLUMN, String.class, 1024, null, true));
+ }
+
+ @Override
+ protected void executePreDMLUpdates() throws AmbariException, SQLException {
+ }
+
+ @Override
+ protected void executeDMLUpdates() throws AmbariException, SQLException {
+ }
+
+}
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
index 058fa2a..cf6fe74 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql
@@ -756,6 +756,8 @@ CREATE TABLE topology_host_request (
group_id BIGINT NOT NULL,
stage_id BIGINT NOT NULL,
host_name VARCHAR(255),
+ status VARCHAR(255),
+ status_message VARCHAR(1024),
CONSTRAINT PK_topology_host_request PRIMARY KEY (id),
CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id),
CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id));
diff --git a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
index ec692b8..fe1e7ff 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
@@ -773,6 +773,8 @@ CREATE TABLE topology_host_request (
group_id BIGINT NOT NULL,
stage_id BIGINT NOT NULL,
host_name VARCHAR(255),
+ status VARCHAR(255),
+ status_message VARCHAR(1024),
CONSTRAINT PK_topology_host_request PRIMARY KEY (id),
CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id),
CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id));
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
index 65be459..5d16258 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql
@@ -752,6 +752,8 @@ CREATE TABLE topology_host_request (
group_id NUMBER(19) NOT NULL,
stage_id NUMBER(19) NOT NULL,
host_name VARCHAR(255),
+ status VARCHAR2(255),
+ status_message VARCHAR2(1024),
CONSTRAINT PK_topology_host_request PRIMARY KEY (id),
CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id),
CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id));
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
index 122fd8b..ab40315 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
@@ -754,6 +754,8 @@ CREATE TABLE topology_host_request (
group_id BIGINT NOT NULL,
stage_id BIGINT NOT NULL,
host_name VARCHAR(255),
+ status VARCHAR(255),
+ status_message VARCHAR(1024),
CONSTRAINT PK_topology_host_request PRIMARY KEY (id),
CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id),
CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id));
diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
index 1a98f3b..28bbd44 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql
@@ -750,6 +750,8 @@ CREATE TABLE topology_host_request (
group_id NUMERIC(19) NOT NULL,
stage_id NUMERIC(19) NOT NULL,
host_name VARCHAR(255),
+ status VARCHAR(255),
+ status_message VARCHAR(1024),
CONSTRAINT PK_topology_host_request PRIMARY KEY (id),
CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id),
CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id));
diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
index 3d48a86..49fc473 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql
@@ -771,6 +771,8 @@ CREATE TABLE topology_host_request (
group_id BIGINT NOT NULL,
stage_id BIGINT NOT NULL,
host_name VARCHAR(255),
+ status VARCHAR(255),
+ status_message VARCHAR(1024),
CONSTRAINT PK_topology_host_request PRIMARY KEY CLUSTERED (id),
CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id),
CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id));
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java
index fb9c4fd..4e7dfeb 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java
@@ -28,11 +28,13 @@ import static org.easymock.EasyMock.newCapture;
import static org.powermock.api.easymock.PowerMock.createMock;
import static org.powermock.api.easymock.PowerMock.createNiceMock;
import static org.powermock.api.easymock.PowerMock.replay;
+import static org.powermock.api.easymock.PowerMock.replayAll;
import static org.powermock.api.easymock.PowerMock.reset;
+import static org.powermock.api.easymock.PowerMock.resetAll;
import static org.powermock.api.easymock.PowerMock.verify;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
import java.lang.reflect.Field;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -99,7 +101,11 @@ import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
@@ -1187,9 +1193,7 @@ public class RequestResourceProviderTest {
capture(requestCapture), capture(predicateCapture))).andReturn(Collections.singleton(resource));
// replay
- replay(managementController, response, controller,
- hostComponentProcessResourceProvider, resource, clusters);
- PowerMock.replayAll();
+ replayAll();
SecurityContextHolder.getContext().setAuthentication(
TestAuthenticationFactory.createAdministrator());
@@ -1664,127 +1668,108 @@ public class RequestResourceProviderTest {
}
/**
- * Tests that topology requests return different status (PENDING) if there are
- * no tasks. Normal requests should return COMPLETED.
- *
- * @throws Exception
+ * Tests that if there are no tasks, topology requests return status they get from the logical request.
*/
@Test
@PrepareForTest(AmbariServer.class)
public void testGetLogicalRequestStatusWithNoTasks() throws Exception {
- // Given
- Resource.Type type = Resource.Type.Request;
-
- AmbariManagementController managementController = createMock(AmbariManagementController.class);
- ActionManager actionManager = createNiceMock(ActionManager.class);
-
- Clusters clusters = createNiceMock(Clusters.class);
-
- RequestEntity requestMock = createNiceMock(RequestEntity.class);
-
- expect(requestMock.getRequestContext()).andReturn("this is a context").anyTimes();
- expect(requestMock.getRequestId()).andReturn(100L).anyTimes();
- Capture<Collection<Long>> requestIdsCapture = Capture.newInstance();
-
-
- ClusterTopology topology = createNiceMock(ClusterTopology.class);
-
- HostGroup hostGroup = createNiceMock(HostGroup.class);
- expect(hostGroup.getName()).andReturn("host_group_1").anyTimes();
-
- Blueprint blueprint = createNiceMock(Blueprint.class);
- expect(blueprint.getHostGroup("host_group_1")).andReturn(hostGroup).anyTimes();
- expect(topology.getClusterId()).andReturn(2L).anyTimes();
-
- Long clusterId = 2L;
- String clusterName = "cluster1";
- Cluster cluster = createNiceMock(Cluster.class);
- expect(cluster.getClusterId()).andReturn(clusterId).anyTimes();
- expect(cluster.getClusterName()).andReturn(clusterName).anyTimes();
-
- expect(managementController.getActionManager()).andReturn(actionManager).anyTimes();
- expect(managementController.getClusters()).andReturn(clusters).anyTimes();
- expect(clusters.getCluster(eq(clusterName))).andReturn(cluster).anyTimes();
- expect(clusters.getClusterById(clusterId)).andReturn(cluster).anyTimes();
- expect(requestDAO.findByPks(capture(requestIdsCapture), eq(true))).andReturn(Lists.newArrayList(requestMock));
- expect(hrcDAO.findAggregateCounts((Long) anyObject())).andReturn(
- Collections.<Long, HostRoleCommandStatusSummaryDTO>emptyMap()).anyTimes();
-
- Map<String, HostGroupInfo> hostGroupInfoMap = new HashMap<>();
- HostGroupInfo hostGroupInfo = new HostGroupInfo("host_group_1");
- hostGroupInfo.setRequestedCount(1);
- hostGroupInfoMap.put("host_group_1", hostGroupInfo);
-
- TopologyRequest topologyRequest = createNiceMock(TopologyRequest.class);
- expect(topologyRequest.getHostGroupInfo()).andReturn(hostGroupInfoMap).anyTimes();
- expect(topology.getBlueprint()).andReturn(blueprint).anyTimes();
- expect(blueprint.shouldSkipFailure()).andReturn(true).anyTimes();
-
-
-
- PowerMock.mockStatic(AmbariServer.class);
- expect(AmbariServer.getController()).andReturn(managementController).anyTimes();
-
- PowerMock.replayAll(
- topologyRequest,
- topology,
- blueprint,
- managementController,
- clusters);
-
-
- LogicalRequest logicalRequest = createNiceMock(LogicalRequest.class);
- Collection<HostRequest> hostRequests = new ArrayList<>();
- HostRequest hostRequest = createNiceMock(HostRequest.class);
- hostRequests.add(hostRequest);
- expect(logicalRequest.getHostRequests()).andReturn(hostRequests).anyTimes();
- expect(logicalRequest.constructNewPersistenceEntity()).andReturn(requestMock).anyTimes();
-
- reset(topologyManager);
-
- expect(topologyManager.getRequest(100L)).andReturn(logicalRequest).anyTimes();
-
-
- expect(topologyManager.getRequests(eq(Collections.singletonList(100L)))).andReturn(
- Collections.singletonList(logicalRequest)).anyTimes();
- expect(topologyManager.getStageSummaries(EasyMock.<Long>anyObject())).andReturn(
- Collections.<Long, HostRoleCommandStatusSummaryDTO>emptyMap()).anyTimes();
-
- replay(actionManager, requestMock, requestDAO, hrcDAO, topologyManager, logicalRequest, hostRequest);
-
- ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
- type,
- PropertyHelper.getPropertyIds(type),
- PropertyHelper.getKeyPropertyIds(type),
- managementController);
-
- Set<String> propertyIds = ImmutableSet.of(
- RequestResourceProvider.REQUEST_ID_PROPERTY_ID,
- RequestResourceProvider.REQUEST_STATUS_PROPERTY_ID,
- RequestResourceProvider.REQUEST_PROGRESS_PERCENT_ID
- );
-
- Predicate predicate = new PredicateBuilder().
- property(RequestResourceProvider.REQUEST_ID_PROPERTY_ID).equals("100").
- toPredicate();
-
- Request request = PropertyHelper.getReadRequest(propertyIds);
-
- // When
- Set<Resource> resources = provider.getResources(request, predicate);
-
- // Then
-
-
- // verify
- PowerMock.verifyAll();
- verify(actionManager, requestMock, requestDAO, hrcDAO, topologyManager, logicalRequest, hostRequest);
-
- Assert.assertEquals(1, resources.size());
- for (Resource resource : resources) {
- Assert.assertEquals(100L, (long)(Long) resource.getPropertyValue(RequestResourceProvider.REQUEST_ID_PROPERTY_ID));
- Assert.assertEquals("PENDING", resource.getPropertyValue(RequestResourceProvider.REQUEST_STATUS_PROPERTY_ID));
- Assert.assertEquals(0.0, resource.getPropertyValue(RequestResourceProvider.REQUEST_PROGRESS_PERCENT_ID));
+ Iterable<CalculatedStatus> statusList = ImmutableList.of(CalculatedStatus.COMPLETED, CalculatedStatus.PENDING, CalculatedStatus.ABORTED);
+ for (CalculatedStatus calculatedStatus : statusList) {
+ // Given
+ resetAll();
+
+ PowerMock.mockStatic(AmbariServer.class);
+ AmbariManagementController managementController = createMock(AmbariManagementController.class);
+ ActionManager actionManager = createNiceMock(ActionManager.class);
+ Clusters clusters = createNiceMock(Clusters.class);
+ Cluster cluster = createNiceMock(Cluster.class);
+ RequestEntity requestMock = createNiceMock(RequestEntity.class);
+ Blueprint blueprint = createNiceMock(Blueprint.class);
+ ClusterTopology topology = createNiceMock(ClusterTopology.class);
+ HostGroup hostGroup = createNiceMock(HostGroup.class);
+ TopologyRequest topologyRequest = createNiceMock(TopologyRequest.class);
+ LogicalRequest logicalRequest = createNiceMock(LogicalRequest.class);
+ HostRequest hostRequest = createNiceMock(HostRequest.class);
+
+ Long requestId = 100L;
+ Long clusterId = 2L;
+ String clusterName = "cluster1";
+ String hostGroupName = "host_group_1";
+ HostGroupInfo hostGroupInfo = new HostGroupInfo(hostGroupName);
+ hostGroupInfo.setRequestedCount(1);
+ Map<String, HostGroupInfo> hostGroupInfoMap = ImmutableMap.of(hostGroupName, hostGroupInfo);
+ Collection<HostRequest> hostRequests = Collections.singletonList(hostRequest);
+ Map<Long, HostRoleCommandStatusSummaryDTO> dtoMap = Collections.emptyMap();
+
+ expect(AmbariServer.getController()).andReturn(managementController).anyTimes();
+ expect(requestMock.getRequestContext()).andReturn("this is a context").anyTimes();
+ expect(requestMock.getRequestId()).andReturn(requestId).anyTimes();
+ expect(hostGroup.getName()).andReturn(hostGroupName).anyTimes();
+ expect(blueprint.getHostGroup(hostGroupName)).andReturn(hostGroup).anyTimes();
+ expect(topology.getClusterId()).andReturn(2L).anyTimes();
+ expect(cluster.getClusterId()).andReturn(clusterId).anyTimes();
+ expect(cluster.getClusterName()).andReturn(clusterName).anyTimes();
+ expect(managementController.getActionManager()).andReturn(actionManager).anyTimes();
+ expect(managementController.getClusters()).andReturn(clusters).anyTimes();
+ expect(clusters.getCluster(eq(clusterName))).andReturn(cluster).anyTimes();
+ expect(clusters.getClusterById(clusterId)).andReturn(cluster).anyTimes();
+ Collection<Long> requestIds = anyObject();
+ expect(requestDAO.findByPks(requestIds, eq(true))).andReturn(Lists.newArrayList(requestMock));
+ expect(hrcDAO.findAggregateCounts((Long) anyObject())).andReturn(dtoMap).anyTimes();
+ expect(topologyManager.getRequest(requestId)).andReturn(logicalRequest).anyTimes();
+ expect(topologyManager.getRequests(eq(Collections.singletonList(requestId)))).andReturn(Collections.singletonList(logicalRequest)).anyTimes();
+ expect(topologyManager.getStageSummaries(EasyMock.<Long>anyObject())).andReturn(dtoMap).anyTimes();
+
+ expect(topologyRequest.getHostGroupInfo()).andReturn(hostGroupInfoMap).anyTimes();
+ expect(topology.getBlueprint()).andReturn(blueprint).anyTimes();
+ expect(blueprint.shouldSkipFailure()).andReturn(true).anyTimes();
+
+ expect(logicalRequest.getHostRequests()).andReturn(hostRequests).anyTimes();
+ expect(logicalRequest.constructNewPersistenceEntity()).andReturn(requestMock).anyTimes();
+ expect(logicalRequest.calculateStatus()).andReturn(calculatedStatus).anyTimes();
+ Optional<String> failureReason = calculatedStatus == CalculatedStatus.ABORTED
+ ? Optional.of("some reason")
+ : Optional.<String>absent();
+ expect(logicalRequest.getFailureReason()).andReturn(failureReason).anyTimes();
+
+ replayAll();
+
+ Resource.Type type = Resource.Type.Request;
+ ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
+ type,
+ PropertyHelper.getPropertyIds(type),
+ PropertyHelper.getKeyPropertyIds(type),
+ managementController
+ );
+
+ Set<String> propertyIds = ImmutableSet.of(
+ RequestResourceProvider.REQUEST_ID_PROPERTY_ID,
+ RequestResourceProvider.REQUEST_STATUS_PROPERTY_ID,
+ RequestResourceProvider.REQUEST_PROGRESS_PERCENT_ID,
+ RequestResourceProvider.REQUEST_CONTEXT_ID
+ );
+
+ Predicate predicate = new PredicateBuilder().
+ property(RequestResourceProvider.REQUEST_ID_PROPERTY_ID).equals("100").
+ toPredicate();
+
+ Request request = PropertyHelper.getReadRequest(propertyIds);
+
+ // When
+ Set<Resource> resources = provider.getResources(request, predicate);
+
+ // Then
+ verifyAll();
+
+ Assert.assertEquals(1, resources.size());
+ Resource resource = Iterables.getOnlyElement(resources);
+ Assert.assertEquals(requestId, resource.getPropertyValue(RequestResourceProvider.REQUEST_ID_PROPERTY_ID));
+ Assert.assertEquals(calculatedStatus.getStatus().toString(), resource.getPropertyValue(RequestResourceProvider.REQUEST_STATUS_PROPERTY_ID));
+ Assert.assertEquals(calculatedStatus.getPercent(), resource.getPropertyValue(RequestResourceProvider.REQUEST_PROGRESS_PERCENT_ID));
+
+ Object requestContext = resource.getPropertyValue(RequestResourceProvider.REQUEST_CONTEXT_ID);
+ Assert.assertNotNull(requestContext);
+ Assert.assertTrue(!failureReason.isPresent() || requestContext.toString().contains(failureReason.get()));
}
}
}
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java
index febe591..bd5dde6 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java
@@ -79,7 +79,7 @@ import org.apache.ambari.server.topology.LogicalRequest;
import org.apache.ambari.server.topology.PersistedState;
import org.apache.ambari.server.topology.TopologyManager;
import org.apache.ambari.server.topology.TopologyRequest;
-import org.apache.ambari.server.topology.tasks.TopologyTask;
+import org.apache.ambari.server.topology.tasks.TopologyHostTask;
import org.apache.ambari.server.utils.EventBusSynchronizer;
import org.junit.After;
import org.junit.Before;
@@ -650,7 +650,7 @@ public class ClustersTest {
expect(hr.getHostgroupName()).andReturn("MyHostGroup").anyTimes();
expect(hr.getHostName()).andReturn(hostName).anyTimes();
expect(hr.getStageId()).andReturn(1L);
- expect(hr.getTopologyTasks()).andReturn(Collections.<TopologyTask>emptyList());
+ expect(hr.getTopologyTasks()).andReturn(Collections.<TopologyHostTask>emptyList());
replay(hr);
return hr;
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
index bf8fd79..3930e2e 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.ambari.server.topology;
+import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.expect;
import java.util.concurrent.Callable;
@@ -34,6 +35,8 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
+import com.google.common.base.Function;
+
public class AsyncCallableServiceTest extends EasyMockSupport {
private static final long TIMEOUT = 1000; // default timeout
@@ -51,19 +54,24 @@ public class AsyncCallableServiceTest extends EasyMockSupport {
@Mock
private ScheduledFuture<Boolean> futureMock;
+ @Mock
+ private Function<Throwable, ?> onErrorMock;
+
private AsyncCallableService<Boolean> asyncCallableService;
@Test
public void testCallableServiceShouldCancelTaskWhenTimeoutExceeded() throws Exception {
// GIVEN
long timeout = -1; // guaranteed timeout
- expect(futureMock.get(timeout, TimeUnit.MILLISECONDS)).andThrow(new TimeoutException("Testing the timeout exceeded case"));
+ TimeoutException timeoutException = new TimeoutException("Testing the timeout exceeded case");
+ expect(futureMock.get(timeout, TimeUnit.MILLISECONDS)).andThrow(timeoutException);
expect(futureMock.isDone()).andReturn(Boolean.FALSE);
expect(futureMock.cancel(true)).andReturn(Boolean.TRUE);
expect(executorServiceMock.submit(taskMock)).andReturn(futureMock);
+ expect(onErrorMock.apply(timeoutException)).andReturn(null);
replayAll();
- asyncCallableService = new AsyncCallableService<>(taskMock, timeout, RETRY_DELAY, "test", executorServiceMock);
+ asyncCallableService = new AsyncCallableService<>(taskMock, timeout, RETRY_DELAY, "test", executorServiceMock, onErrorMock);
// WHEN
Boolean serviceResult = asyncCallableService.call();
@@ -84,13 +92,16 @@ public class AsyncCallableServiceTest extends EasyMockSupport {
return false;
}
};
+ expect(onErrorMock.apply(anyObject(TimeoutException.class))).andReturn(null);
+ replayAll();
- asyncCallableService = new AsyncCallableService<>(hangingTask, TIMEOUT, RETRY_DELAY, "test");
+ asyncCallableService = new AsyncCallableService<>(hangingTask, TIMEOUT, RETRY_DELAY, "test", onErrorMock);
// WHEN
Boolean serviceResult = asyncCallableService.call();
// THEN
+ verifyAll();
Assert.assertNull("No result expected from hanging task", serviceResult);
}
@@ -98,8 +109,9 @@ public class AsyncCallableServiceTest extends EasyMockSupport {
public void testCallableServiceShouldExitWhenTaskCompleted() throws Exception {
// GIVEN
expect(taskMock.call()).andReturn(Boolean.TRUE);
+ expect(onErrorMock.apply(anyObject(TimeoutException.class))).andThrow(new AssertionError("No error expected")).anyTimes();
replayAll();
- asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test");
+ asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test", onErrorMock);
// WHEN
Boolean serviceResult = asyncCallableService.call();
@@ -113,8 +125,9 @@ public class AsyncCallableServiceTest extends EasyMockSupport {
public void testCallableServiceShouldRetryTaskExecutionTillTimeoutExceededWhenTaskThrowsException() throws Exception {
// GIVEN
expect(taskMock.call()).andThrow(new IllegalStateException("****************** TESTING ****************")).times(2, 3);
+ expect(onErrorMock.apply(anyObject(IllegalStateException.class))).andReturn(null).once();
replayAll();
- asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test");
+ asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test", onErrorMock);
// WHEN
Boolean serviceResult = asyncCallableService.call();
@@ -135,13 +148,16 @@ public class AsyncCallableServiceTest extends EasyMockSupport {
throw new IllegalStateException("****************** TESTING ****************");
}
};
+ expect(onErrorMock.apply(anyObject(IllegalStateException.class))).andReturn(null).once();
+ replayAll();
- asyncCallableService = new AsyncCallableService<>(throwingTask, TIMEOUT, RETRY_DELAY, "test");
+ asyncCallableService = new AsyncCallableService<>(throwingTask, TIMEOUT, RETRY_DELAY, "test", onErrorMock);
// WHEN
Boolean serviceResult = asyncCallableService.call();
// THEN
+ verifyAll();
Assert.assertNull("No result expected from throwing task", serviceResult);
}
}
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
index 11f571b..f5ac795 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
@@ -35,6 +35,8 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import com.google.common.base.Functions;
+
/**
* Unit test for the ConfigureClusterTask class.
* As business methods of this class don't return values, the assertions are made by verifying method calls on mocks.
@@ -93,7 +95,7 @@ public class ConfigureClusterTaskTest extends EasyMockSupport {
clusterConfigurationRequest.process();
replayAll();
- AsyncCallableService<Boolean> asyncService = new AsyncCallableService<>(testSubject, 5000, 500, "test");
+ AsyncCallableService<Boolean> asyncService = new AsyncCallableService<>(testSubject, 5000, 500, "test", Functions.<Throwable>identity());
// WHEN
asyncService.call();
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
index 07bb987..0c556f5 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
@@ -403,6 +403,7 @@ public class TopologyManagerTest {
List<LogicalRequest> requestList = new ArrayList<>();
requestList.add(logicalRequest);
expect(logicalRequest.hasPendingHostRequests()).andReturn(false).anyTimes();
+ expect(logicalRequest.isFinished()).andReturn(false).anyTimes();
allRequests.put(clusterTopologyMock, requestList);
expect(requestStatusResponse.getTasks()).andReturn(Collections.<ShortTaskStatus>emptyList()).anyTimes();
expect(clusterTopologyMock.isClusterKerberosEnabled()).andReturn(true);
@@ -437,6 +438,8 @@ public class TopologyManagerTest {
expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology,
List<LogicalRequest>>emptyMap()).anyTimes();
expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes();
+ expect(logicalRequest.isFinished()).andReturn(true).anyTimes();
+ expect(logicalRequest.isSuccessful()).andReturn(true).anyTimes();
replayAll();
topologyManager.provisionCluster(request);
requestFinished();
@@ -460,6 +463,8 @@ public class TopologyManagerTest {
expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology,
List<LogicalRequest>>emptyMap()).anyTimes();
expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes();
+ expect(logicalRequest.isFinished()).andReturn(true).anyTimes();
+ expect(logicalRequest.isSuccessful()).andReturn(false).anyTimes();
replayAll();
topologyManager.provisionCluster(request);
requestFinished();
@@ -483,6 +488,7 @@ public class TopologyManagerTest {
expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology,
List<LogicalRequest>>emptyMap()).anyTimes();
expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes();
+ expect(logicalRequest.isFinished()).andReturn(false).anyTimes();
replayAll();
topologyManager.provisionCluster(request);
requestFinished();
@@ -523,6 +529,7 @@ public class TopologyManagerTest {
expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes();
expect(logicalRequest.hasPendingHostRequests()).andReturn(true).anyTimes();
expect(logicalRequest.getCompletedHostRequests()).andReturn(Collections.EMPTY_LIST).anyTimes();
+ expect(logicalRequest.isFinished()).andReturn(true).anyTimes();
expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes();
replayAll();
EasyMock.replay(clusterTopologyMock);
--
To stop receiving notification emails like this one, please contact
['"commits@ambari.apache.org" <co...@ambari.apache.org>'].