You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/01/15 19:16:09 UTC
git commit: AMBARI-4293. Support tolerance level for batched requests
based on task status. (swagle)
Updated Branches:
refs/heads/trunk d06dd4d90 -> 15c39f4bc
AMBARI-4293. Support tolerance level for batched requests based on task status. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/15c39f4b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/15c39f4b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/15c39f4b
Branch: refs/heads/trunk
Commit: 15c39f4bca69b9982381acde5798c9288c62d6c9
Parents: d06dd4d
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Jan 14 16:05:37 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Wed Jan 15 10:15:53 2014 -0800
----------------------------------------------------------------------
.../scheduler/ExecutionScheduleManager.java | 70 ++++++++++++++++++--
.../server/state/scheduler/BatchRequestJob.java | 69 +++++++++++++++++--
.../state/scheduler/BatchRequestResponse.java | 36 ++++++++++
.../state/scheduler/RequestExecutionImpl.java | 4 +-
.../scheduler/ExecutionScheduleManagerTest.java | 48 ++++++++++++++
.../state/scheduler/BatchRequestJobTest.java | 12 ++--
6 files changed, 223 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/15c39f4b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
index 98e7b4e..3898441 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
@@ -36,6 +36,7 @@ import org.apache.ambari.server.state.scheduler.Batch;
import org.apache.ambari.server.state.scheduler.BatchRequest;
import org.apache.ambari.server.state.scheduler.BatchRequestJob;
import org.apache.ambari.server.state.scheduler.BatchRequestResponse;
+import org.apache.ambari.server.state.scheduler.BatchSettings;
import org.apache.ambari.server.state.scheduler.RequestExecution;
import org.apache.ambari.server.state.scheduler.Schedule;
import org.apache.ambari.server.utils.DateUtils;
@@ -80,6 +81,13 @@ public class ExecutionScheduleManager {
protected Client ambariClient;
protected WebResource ambariWebResource;
+ protected static final String REQUESTS_STATUS_KEY = "request_status";
+ protected static final String REQUESTS_ID_KEY = "id";
+ protected static final String REQUESTS_FAILED_TASKS_KEY = "failed_task_count";
+ protected static final String REQUESTS_ABORTED_TASKS_KEY = "aborted_task_count";
+ protected static final String REQUESTS_TIMEDOUT_TASKS_KEY = "timed_out_task_count";
+ protected static final String REQUESTS_TOTAL_TASKS_KEY = "task_count";
+
@Inject
public ExecutionScheduleManager(Configuration configuration,
ExecutionScheduler executionScheduler,
@@ -460,7 +468,11 @@ public class ExecutionScheduleManager {
throws AmbariException {
StrBuilder sb = new StrBuilder();
- sb.append(DEFAULT_API_PATH).append("/clusters/").append(clusterName).append("/requests/").append(requestId);
+ sb.append(DEFAULT_API_PATH)
+ .append("/clusters/")
+ .append(clusterName)
+ .append("/requests/")
+ .append(requestId);
return performApiGetRequest(sb.toString(), true);
@@ -499,15 +511,33 @@ public class ExecutionScheduleManager {
}
if (requestMap != null) {
- batchRequestResponse.setRequestId(((Double) requestMap.get("id")).longValue());
+ batchRequestResponse.setRequestId((
+ (Double) requestMap.get(REQUESTS_ID_KEY)).longValue());
//TODO fix different names for field
String status = null;
- if (requestMap.get("request_status") != null) {
- status = requestMap.get("request_status").toString();
+ if (requestMap.get(REQUESTS_STATUS_KEY) != null) {
+ status = requestMap.get(REQUESTS_STATUS_KEY).toString();
}
if (requestMap.get("status") != null) {
status = requestMap.get("status").toString();
}
+
+ if (requestMap.get(REQUESTS_ABORTED_TASKS_KEY) != null) {
+ batchRequestResponse.setAbortedTaskCount(Integer.parseInt
+ (requestMap.get(REQUESTS_ABORTED_TASKS_KEY).toString()));
+ }
+ if (requestMap.get(REQUESTS_FAILED_TASKS_KEY) != null) {
+ batchRequestResponse.setFailedTaskCount(Integer.parseInt
+ (requestMap.get(REQUESTS_FAILED_TASKS_KEY).toString()));
+ }
+ if (requestMap.get(REQUESTS_TIMEDOUT_TASKS_KEY) != null) {
+ batchRequestResponse.setTimedOutTaskCount(Integer.parseInt
+ (requestMap.get(REQUESTS_TIMEDOUT_TASKS_KEY).toString()));
+ }
+ if (requestMap.get(REQUESTS_TOTAL_TASKS_KEY) != null) {
+ batchRequestResponse.setTotalTaskCount(Integer.parseInt
+ (requestMap.get(REQUESTS_TOTAL_TASKS_KEY).toString()));
+ }
batchRequestResponse.setStatus(status);
}
@@ -522,14 +552,17 @@ public class ExecutionScheduleManager {
public void updateBatchRequest(long executionId, long batchId, String clusterName,
BatchRequestResponse batchRequestResponse,
- boolean statusOnly)
- throws AmbariException{
+ boolean statusOnly) throws AmbariException {
Cluster cluster = clusters.getCluster(clusterName);
RequestExecution requestExecution = cluster.getAllRequestExecutions().get(executionId);
- requestExecution.updateBatchRequest(batchId, batchRequestResponse, statusOnly);
+ if (requestExecution == null) {
+ throw new AmbariException("Unable to find request schedule with id = "
+ + executionId);
+ }
+ requestExecution.updateBatchRequest(batchId, batchRequestResponse, statusOnly);
}
protected BatchRequestResponse performUriRequest(String url, String body, String method) {
@@ -569,5 +602,28 @@ public class ExecutionScheduleManager {
return convertToBatchRequestResponse(response);
}
+ public boolean hasToleranceThresholdExceeded(Long executionId,
+ String clusterName, Map<String, Integer> taskCounts) throws AmbariException {
+
+ Cluster cluster = clusters.getCluster(clusterName);
+ RequestExecution requestExecution = cluster.getAllRequestExecutions().get(executionId);
+
+ if (requestExecution == null) {
+ throw new AmbariException("Unable to find request schedule with id = "
+ + executionId);
+ }
+
+ int percentageFailed =
+ (int) ((taskCounts.get(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY) * 100.0f) /
+ taskCounts.get(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY));
+
+ BatchSettings batchSettings = requestExecution.getBatch().getBatchSettings();
+ if (batchSettings != null
+ && batchSettings.getTaskFailureToleranceLimit() != null) {
+ return percentageFailed > batchSettings.getTaskFailureToleranceLimit();
+ }
+
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/15c39f4b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
index 9fbb571..88142ac 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
@@ -26,6 +26,7 @@ import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.Map;
public class BatchRequestJob extends AbstractLinearExecutionJob {
@@ -36,7 +37,11 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
public static final String BATCH_REQUEST_BATCH_ID_KEY =
"BatchRequestJob.BatchId";
public static final String BATCH_REQUEST_CLUSTER_NAME_KEY =
- "BatchRequestJob.ClusterName";
+ "BatchRequestJob.ClusterName";
+ public static final String BATCH_REQUEST_FAILED_TASKS_KEY =
+ "BatchRequestJob.FailedTaskCount";
+ public static final String BATCH_REQUEST_TOTAL_TASKS_KEY =
+ "BatchRequestJob.TotalTaskCount";
private final long statusCheckInterval;
@@ -63,18 +68,23 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
+ ", batch_id = " + batchId);
}
+ // Aggregate tasks counts stored in the DataMap
+ Map<String, Integer> taskCounts = getTaskCountProperties(properties);
+
Long requestId = executionScheduleManager.executeBatchRequest
(executionId, batchId, clusterName);
if (requestId != null) {
HostRoleStatus status;
+ BatchRequestResponse batchRequestResponse;
do {
- BatchRequestResponse batchRequestResponse =
- executionScheduleManager.getBatchRequestResponse(requestId, clusterName);
+ batchRequestResponse = executionScheduleManager
+ .getBatchRequestResponse(requestId, clusterName);
status = HostRoleStatus.valueOf(batchRequestResponse.getStatus());
- executionScheduleManager.updateBatchRequest(executionId, batchId, clusterName, batchRequestResponse, true);
+ executionScheduleManager.updateBatchRequest(executionId, batchId,
+ clusterName, batchRequestResponse, true);
try {
Thread.sleep(statusCheckInterval);
@@ -84,6 +94,57 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
throw new AmbariException(message, e);
}
} while (!status.isCompletedState());
+
+ // Store aggregated task status counts in the DataMap
+ Map<String, Integer> aggregateCounts = addTaskCountToProperties
+ (properties, taskCounts, batchRequestResponse);
+
+ if (executionScheduleManager.hasToleranceThresholdExceeded
+ (executionId, clusterName, aggregateCounts)) {
+
+ throw new AmbariException("Task failure tolerance limit exceeded"
+ + ", execution_id = " + executionId
+ + ", processed batch_id = " + batchId
+ + ", failed tasks = " + aggregateCounts.get(BATCH_REQUEST_FAILED_TASKS_KEY)
+ + ", total tasks = " + aggregateCounts.get(BATCH_REQUEST_TOTAL_TASKS_KEY));
+ }
+ }
+ }
+
+ private Map<String, Integer> addTaskCountToProperties(Map<String, Object> properties,
+ Map<String, Integer> oldCounts,
+ BatchRequestResponse batchRequestResponse) {
+
+ Map<String, Integer> taskCounts = new HashMap<String, Integer>();
+
+ if (batchRequestResponse != null) {
+ Integer failedTasks = batchRequestResponse.getFailedTaskCount() +
+ batchRequestResponse.getAbortedTaskCount() +
+ batchRequestResponse.getTimedOutTaskCount();
+
+ Integer failedCount = oldCounts.get(BATCH_REQUEST_FAILED_TASKS_KEY) + failedTasks;
+ Integer totalCount = oldCounts.get(BATCH_REQUEST_TOTAL_TASKS_KEY) +
+ batchRequestResponse.getTotalTaskCount();
+
+ properties.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
+ taskCounts.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
+ properties.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
+ taskCounts.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
+ }
+
+ return taskCounts;
+ }
+
+ private Map<String, Integer> getTaskCountProperties(Map<String, Object> properties) {
+ Map<String, Integer> taskCounts = new HashMap<String, Integer>();
+ if (properties != null) {
+ Object countObj = properties.get(BATCH_REQUEST_FAILED_TASKS_KEY);
+ taskCounts.put(BATCH_REQUEST_FAILED_TASKS_KEY,
+ countObj != null ? Integer.parseInt(countObj.toString()) : 0);
+ countObj = properties.get(BATCH_REQUEST_TOTAL_TASKS_KEY);
+ taskCounts.put(BATCH_REQUEST_TOTAL_TASKS_KEY, countObj != null ?
+ Integer.parseInt(countObj.toString()) : 0);
}
+ return taskCounts;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/15c39f4b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java
index 59a45fd..518e586 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java
@@ -29,6 +29,10 @@ public class BatchRequestResponse {
private int returnCode;
private String returnMessage;
+ private int failedTaskCount;
+ private int abortedTaskCount;
+ private int timedOutTaskCount;
+ private int totalTaskCount;
public Long getRequestId() {
return requestId;
@@ -61,4 +65,36 @@ public class BatchRequestResponse {
public void setReturnMessage(String returnMessage) {
this.returnMessage = returnMessage;
}
+
+ public int getFailedTaskCount() {
+ return failedTaskCount;
+ }
+
+ public void setFailedTaskCount(int failedTaskCount) {
+ this.failedTaskCount = failedTaskCount;
+ }
+
+ public int getAbortedTaskCount() {
+ return abortedTaskCount;
+ }
+
+ public void setAbortedTaskCount(int abortedTaskCount) {
+ this.abortedTaskCount = abortedTaskCount;
+ }
+
+ public int getTimedOutTaskCount() {
+ return timedOutTaskCount;
+ }
+
+ public void setTimedOutTaskCount(int timedOutTaskCount) {
+ this.timedOutTaskCount = timedOutTaskCount;
+ }
+
+ public int getTotalTaskCount() {
+ return totalTaskCount;
+ }
+
+ public void setTotalTaskCount(int totalTaskCount) {
+ this.totalTaskCount = totalTaskCount;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/15c39f4b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
index a1e7d53..5f43b52 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
@@ -424,7 +424,9 @@ public class RequestExecutionImpl implements RequestExecution {
}
@Override
- public void updateBatchRequest(long batchId, BatchRequestResponse batchRequestResponse, boolean statusOnly) {
+ public void updateBatchRequest(long batchId,
+ BatchRequestResponse batchRequestResponse,
+ boolean statusOnly) {
long executionId = requestScheduleEntity.getScheduleId();
RequestScheduleBatchRequestEntityPK batchRequestEntityPK = new
http://git-wip-us.apache.org/repos/asf/ambari/blob/15c39f4b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
index 19105e0..11a0d51 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
@@ -460,4 +460,52 @@ public class ExecutionScheduleManagerTest {
assertEquals(apiUri, uriCapture.getValue());
}
+
+ @Test
+ public void testHasToleranceThresholdExceeded() throws Exception {
+ Clusters clustersMock = createMock(Clusters.class);
+ Cluster clusterMock = createMock(Cluster.class);
+ Configuration configurationMock = createNiceMock(Configuration.class);
+ ExecutionScheduler executionSchedulerMock = createMock(ExecutionScheduler.class);
+ InternalTokenStorage tokenStorageMock = createMock(InternalTokenStorage.class);
+ Gson gson = new Gson();
+ RequestExecution requestExecutionMock = createMock(RequestExecution.class);
+ Batch batchMock = createMock(Batch.class);
+
+ long executionId = 11L;
+ String clusterName = "c1";
+
+ BatchSettings batchSettings = new BatchSettings();
+ batchSettings.setTaskFailureToleranceLimit(10);
+
+ Map<Long, RequestExecution> executionMap = new HashMap<Long, RequestExecution>();
+ executionMap.put(executionId, requestExecutionMock);
+
+ ExecutionScheduleManager scheduleManager = createMockBuilder(ExecutionScheduleManager.class).
+ withConstructor(configurationMock, executionSchedulerMock,
+ tokenStorageMock, clustersMock, gson).createMock();
+
+ expectLastCall().anyTimes();
+
+ expect(clustersMock.getCluster(clusterName)).andReturn(clusterMock).anyTimes();
+ expect(clusterMock.getAllRequestExecutions()).andReturn(executionMap).anyTimes();
+ expect(requestExecutionMock.getBatch()).andReturn(batchMock).anyTimes();
+ expect(batchMock.getBatchSettings()).andReturn(batchSettings).anyTimes();
+
+ replay(clustersMock, clusterMock, configurationMock, requestExecutionMock,
+ executionSchedulerMock, scheduleManager, batchMock);
+
+ HashMap<String, Integer> taskCounts = new HashMap<String, Integer>() {{
+ put(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY, 2);
+ put(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY, 10);
+ }};
+
+ boolean exceeded = scheduleManager.hasToleranceThresholdExceeded
+ (executionId, clusterName, taskCounts);
+
+ Assert.assertTrue(exceeded);
+
+ verify(clustersMock, clusterMock, configurationMock, requestExecutionMock,
+ executionSchedulerMock, scheduleManager, batchMock);
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/15c39f4b/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
index adb2d27..9a45227 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
@@ -31,7 +31,6 @@ import static org.easymock.EasyMock.*;
public class BatchRequestJobTest {
-
@Test
public void testDoWork() throws Exception {
ExecutionScheduleManager scheduleManagerMock = createMock(ExecutionScheduleManager.class);
@@ -46,6 +45,10 @@ public class BatchRequestJobTest {
properties.put(BatchRequestJob.BATCH_REQUEST_BATCH_ID_KEY, batchId);
properties.put(BatchRequestJob.BATCH_REQUEST_CLUSTER_NAME_KEY, clusterName);
+ HashMap<String, Integer> taskCounts = new HashMap<String, Integer>()
+ {{ put(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY, 0);
+ put(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY, 0); }};
+
BatchRequestResponse pendingResponse = new BatchRequestResponse();
pendingResponse.setStatus(HostRoleStatus.PENDING.toString());
@@ -59,7 +62,8 @@ public class BatchRequestJobTest {
Capture<String> clusterNameCapture = new Capture<String>();
- expect(scheduleManagerMock.executeBatchRequest(captureLong(executionIdCapture), captureLong(batchIdCapture),
+ expect(scheduleManagerMock.executeBatchRequest(captureLong(executionIdCapture),
+ captureLong(batchIdCapture),
capture(clusterNameCapture))).andReturn(requestId);
expect(scheduleManagerMock.getBatchRequestResponse(requestId, clusterName)).
@@ -68,6 +72,8 @@ public class BatchRequestJobTest {
andReturn(inProgressResponse).times(4);
expect(scheduleManagerMock.getBatchRequestResponse(requestId, clusterName)).
andReturn(completedResponse).once();
+ expect(scheduleManagerMock.hasToleranceThresholdExceeded(executionId,
+ clusterName, taskCounts)).andReturn(false);
scheduleManagerMock.updateBatchRequest(eq(executionId), eq(batchId), eq(clusterName),
anyObject(BatchRequestResponse.class), eq(true));
@@ -82,7 +88,5 @@ public class BatchRequestJobTest {
Assert.assertEquals(executionId, executionIdCapture.getValue().longValue());
Assert.assertEquals(batchId, batchIdCapture.getValue().longValue());
Assert.assertEquals(clusterName, clusterNameCapture.getValue());
-
-
}
}