You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/01/15 19:16:09 UTC

git commit: AMBARI-4293. Support tolerance level for batched requests based on task status. (swagle)

Updated Branches:
  refs/heads/trunk d06dd4d90 -> 15c39f4bc


AMBARI-4293. Support tolerance level for batched requests based on task status. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/15c39f4b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/15c39f4b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/15c39f4b

Branch: refs/heads/trunk
Commit: 15c39f4bca69b9982381acde5798c9288c62d6c9
Parents: d06dd4d
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Jan 14 16:05:37 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Wed Jan 15 10:15:53 2014 -0800

----------------------------------------------------------------------
 .../scheduler/ExecutionScheduleManager.java     | 70 ++++++++++++++++++--
 .../server/state/scheduler/BatchRequestJob.java | 69 +++++++++++++++++--
 .../state/scheduler/BatchRequestResponse.java   | 36 ++++++++++
 .../state/scheduler/RequestExecutionImpl.java   |  4 +-
 .../scheduler/ExecutionScheduleManagerTest.java | 48 ++++++++++++++
 .../state/scheduler/BatchRequestJobTest.java    | 12 ++--
 6 files changed, 223 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/15c39f4b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
index 98e7b4e..3898441 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
@@ -36,6 +36,7 @@ import org.apache.ambari.server.state.scheduler.Batch;
 import org.apache.ambari.server.state.scheduler.BatchRequest;
 import org.apache.ambari.server.state.scheduler.BatchRequestJob;
 import org.apache.ambari.server.state.scheduler.BatchRequestResponse;
+import org.apache.ambari.server.state.scheduler.BatchSettings;
 import org.apache.ambari.server.state.scheduler.RequestExecution;
 import org.apache.ambari.server.state.scheduler.Schedule;
 import org.apache.ambari.server.utils.DateUtils;
@@ -80,6 +81,13 @@ public class ExecutionScheduleManager {
   protected Client ambariClient;
   protected WebResource ambariWebResource;
 
+  protected static final String REQUESTS_STATUS_KEY = "request_status";
+  protected static final String REQUESTS_ID_KEY = "id";
+  protected static final String REQUESTS_FAILED_TASKS_KEY = "failed_task_count";
+  protected static final String REQUESTS_ABORTED_TASKS_KEY = "aborted_task_count";
+  protected static final String REQUESTS_TIMEDOUT_TASKS_KEY = "timed_out_task_count";
+  protected static final String REQUESTS_TOTAL_TASKS_KEY = "task_count";
+
   @Inject
   public ExecutionScheduleManager(Configuration configuration,
                                   ExecutionScheduler executionScheduler,
@@ -460,7 +468,11 @@ public class ExecutionScheduleManager {
     throws AmbariException {
 
     StrBuilder sb = new StrBuilder();
-    sb.append(DEFAULT_API_PATH).append("/clusters/").append(clusterName).append("/requests/").append(requestId);
+    sb.append(DEFAULT_API_PATH)
+      .append("/clusters/")
+      .append(clusterName)
+      .append("/requests/")
+      .append(requestId);
 
     return performApiGetRequest(sb.toString(), true);
 
@@ -499,15 +511,33 @@ public class ExecutionScheduleManager {
       }
 
       if (requestMap != null) {
-        batchRequestResponse.setRequestId(((Double) requestMap.get("id")).longValue());
+        batchRequestResponse.setRequestId((
+          (Double) requestMap.get(REQUESTS_ID_KEY)).longValue());
         //TODO fix different names for field
         String status = null;
-        if (requestMap.get("request_status") != null) {
-          status = requestMap.get("request_status").toString();
+        if (requestMap.get(REQUESTS_STATUS_KEY) != null) {
+          status = requestMap.get(REQUESTS_STATUS_KEY).toString();
         }
         if (requestMap.get("status") != null) {
           status = requestMap.get("status").toString();
         }
+
+        if (requestMap.get(REQUESTS_ABORTED_TASKS_KEY) != null) {
+          batchRequestResponse.setAbortedTaskCount(Integer.parseInt
+            (requestMap.get(REQUESTS_ABORTED_TASKS_KEY).toString()));
+        }
+        if (requestMap.get(REQUESTS_FAILED_TASKS_KEY) != null) {
+          batchRequestResponse.setFailedTaskCount(Integer.parseInt
+            (requestMap.get(REQUESTS_FAILED_TASKS_KEY).toString()));
+        }
+        if (requestMap.get(REQUESTS_TIMEDOUT_TASKS_KEY) != null) {
+          batchRequestResponse.setTimedOutTaskCount(Integer.parseInt
+            (requestMap.get(REQUESTS_TIMEDOUT_TASKS_KEY).toString()));
+        }
+        if (requestMap.get(REQUESTS_TOTAL_TASKS_KEY) != null) {
+          batchRequestResponse.setTotalTaskCount(Integer.parseInt
+            (requestMap.get(REQUESTS_TOTAL_TASKS_KEY).toString()));
+        }
         batchRequestResponse.setStatus(status);
       }
 
@@ -522,14 +552,17 @@ public class ExecutionScheduleManager {
 
   public void updateBatchRequest(long executionId, long batchId, String clusterName,
                                  BatchRequestResponse batchRequestResponse,
-                                 boolean statusOnly)
-      throws AmbariException{
+                                 boolean statusOnly) throws AmbariException {
 
     Cluster cluster = clusters.getCluster(clusterName);
     RequestExecution requestExecution = cluster.getAllRequestExecutions().get(executionId);
 
-    requestExecution.updateBatchRequest(batchId, batchRequestResponse, statusOnly);
+    if (requestExecution == null) {
+      throw new AmbariException("Unable to find request schedule with id = "
+        + executionId);
+    }
 
+    requestExecution.updateBatchRequest(batchId, batchRequestResponse, statusOnly);
   }
 
   protected BatchRequestResponse performUriRequest(String url, String body, String method) {
@@ -569,5 +602,28 @@ public class ExecutionScheduleManager {
     return convertToBatchRequestResponse(response);
   }
 
+  public boolean hasToleranceThresholdExceeded(Long executionId,
+      String clusterName, Map<String, Integer> taskCounts) throws AmbariException {
+
+    Cluster cluster = clusters.getCluster(clusterName);
+    RequestExecution requestExecution = cluster.getAllRequestExecutions().get(executionId);
+
+    if (requestExecution == null) {
+      throw new AmbariException("Unable to find request schedule with id = "
+        + executionId);
+    }
+
+    int percentageFailed =
+      (int) ((taskCounts.get(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY) * 100.0f) /
+      taskCounts.get(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY));
+
+    BatchSettings batchSettings = requestExecution.getBatch().getBatchSettings();
+    if (batchSettings != null
+        && batchSettings.getTaskFailureToleranceLimit() != null) {
+      return percentageFailed > batchSettings.getTaskFailureToleranceLimit();
+    }
+
+    return false;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/15c39f4b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
index 9fbb571..88142ac 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
@@ -26,6 +26,7 @@ import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.Map;
 
 public class BatchRequestJob extends AbstractLinearExecutionJob {
@@ -36,7 +37,11 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
   public static final String BATCH_REQUEST_BATCH_ID_KEY =
     "BatchRequestJob.BatchId";
   public static final String BATCH_REQUEST_CLUSTER_NAME_KEY =
-      "BatchRequestJob.ClusterName";
+    "BatchRequestJob.ClusterName";
+  public static final String BATCH_REQUEST_FAILED_TASKS_KEY =
+    "BatchRequestJob.FailedTaskCount";
+  public static final String BATCH_REQUEST_TOTAL_TASKS_KEY =
+    "BatchRequestJob.TotalTaskCount";
 
   private final long statusCheckInterval;
 
@@ -63,18 +68,23 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
         + ", batch_id = " + batchId);
     }
 
+    // Aggregate tasks counts stored in the DataMap
+    Map<String, Integer> taskCounts = getTaskCountProperties(properties);
+
     Long requestId = executionScheduleManager.executeBatchRequest
       (executionId, batchId, clusterName);
 
     if (requestId != null) {
       HostRoleStatus status;
+      BatchRequestResponse batchRequestResponse;
       do {
-        BatchRequestResponse batchRequestResponse =
-            executionScheduleManager.getBatchRequestResponse(requestId, clusterName);
+        batchRequestResponse = executionScheduleManager
+          .getBatchRequestResponse(requestId, clusterName);
 
         status = HostRoleStatus.valueOf(batchRequestResponse.getStatus());
 
-        executionScheduleManager.updateBatchRequest(executionId, batchId, clusterName, batchRequestResponse, true);
+        executionScheduleManager.updateBatchRequest(executionId, batchId,
+          clusterName, batchRequestResponse, true);
 
         try {
           Thread.sleep(statusCheckInterval);
@@ -84,6 +94,57 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
           throw new AmbariException(message, e);
         }
       } while (!status.isCompletedState());
+
+      // Store aggregated task status counts in the DataMap
+      Map<String, Integer> aggregateCounts = addTaskCountToProperties
+        (properties, taskCounts, batchRequestResponse);
+
+      if (executionScheduleManager.hasToleranceThresholdExceeded
+          (executionId, clusterName, aggregateCounts)) {
+
+        throw new AmbariException("Task failure tolerance limit exceeded"
+            + ", execution_id = " + executionId
+            + ", processed batch_id = " + batchId
+            + ", failed tasks = " + aggregateCounts.get(BATCH_REQUEST_FAILED_TASKS_KEY)
+            + ", total tasks = " + aggregateCounts.get(BATCH_REQUEST_TOTAL_TASKS_KEY));
+      }
+    }
+  }
+
+  private Map<String, Integer> addTaskCountToProperties(Map<String, Object> properties,
+                                        Map<String, Integer> oldCounts,
+                                        BatchRequestResponse batchRequestResponse) {
+
+    Map<String, Integer> taskCounts = new HashMap<String, Integer>();
+
+    if (batchRequestResponse != null) {
+      Integer failedTasks = batchRequestResponse.getFailedTaskCount() +
+        batchRequestResponse.getAbortedTaskCount() +
+        batchRequestResponse.getTimedOutTaskCount();
+
+      Integer failedCount = oldCounts.get(BATCH_REQUEST_FAILED_TASKS_KEY) + failedTasks;
+      Integer totalCount = oldCounts.get(BATCH_REQUEST_TOTAL_TASKS_KEY) +
+        batchRequestResponse.getTotalTaskCount();
+
+      properties.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
+      taskCounts.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
+      properties.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
+      taskCounts.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
+    }
+
+    return taskCounts;
+  }
+
+  private Map<String, Integer> getTaskCountProperties(Map<String, Object> properties) {
+    Map<String, Integer> taskCounts = new HashMap<String, Integer>();
+    if (properties != null) {
+      Object countObj = properties.get(BATCH_REQUEST_FAILED_TASKS_KEY);
+      taskCounts.put(BATCH_REQUEST_FAILED_TASKS_KEY,
+        countObj != null ? Integer.parseInt(countObj.toString()) : 0);
+      countObj = properties.get(BATCH_REQUEST_TOTAL_TASKS_KEY);
+      taskCounts.put(BATCH_REQUEST_TOTAL_TASKS_KEY, countObj != null ?
+        Integer.parseInt(countObj.toString()) : 0);
     }
+    return taskCounts;
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/15c39f4b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java
index 59a45fd..518e586 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java
@@ -29,6 +29,10 @@ public class BatchRequestResponse {
   private int returnCode;
   private String returnMessage;
 
+  private int failedTaskCount;
+  private int abortedTaskCount;
+  private int timedOutTaskCount;
+  private int totalTaskCount;
 
   public Long getRequestId() {
     return requestId;
@@ -61,4 +65,36 @@ public class BatchRequestResponse {
   public void setReturnMessage(String returnMessage) {
     this.returnMessage = returnMessage;
   }
+
+  public int getFailedTaskCount() {
+    return failedTaskCount;
+  }
+
+  public void setFailedTaskCount(int failedTaskCount) {
+    this.failedTaskCount = failedTaskCount;
+  }
+
+  public int getAbortedTaskCount() {
+    return abortedTaskCount;
+  }
+
+  public void setAbortedTaskCount(int abortedTaskCount) {
+    this.abortedTaskCount = abortedTaskCount;
+  }
+
+  public int getTimedOutTaskCount() {
+    return timedOutTaskCount;
+  }
+
+  public void setTimedOutTaskCount(int timedOutTaskCount) {
+    this.timedOutTaskCount = timedOutTaskCount;
+  }
+
+  public int getTotalTaskCount() {
+    return totalTaskCount;
+  }
+
+  public void setTotalTaskCount(int totalTaskCount) {
+    this.totalTaskCount = totalTaskCount;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/15c39f4b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
index a1e7d53..5f43b52 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
@@ -424,7 +424,9 @@ public class RequestExecutionImpl implements RequestExecution {
   }
 
   @Override
-  public void updateBatchRequest(long batchId, BatchRequestResponse batchRequestResponse, boolean statusOnly) {
+  public void updateBatchRequest(long batchId,
+                                 BatchRequestResponse batchRequestResponse,
+                                 boolean statusOnly) {
     long executionId = requestScheduleEntity.getScheduleId();
 
     RequestScheduleBatchRequestEntityPK batchRequestEntityPK = new

http://git-wip-us.apache.org/repos/asf/ambari/blob/15c39f4b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
index 19105e0..11a0d51 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
@@ -460,4 +460,52 @@ public class ExecutionScheduleManagerTest {
 
     assertEquals(apiUri, uriCapture.getValue());
   }
+
+  @Test
+  public void testHasToleranceThresholdExceeded() throws Exception {
+    Clusters clustersMock = createMock(Clusters.class);
+    Cluster clusterMock = createMock(Cluster.class);
+    Configuration configurationMock = createNiceMock(Configuration.class);
+    ExecutionScheduler executionSchedulerMock = createMock(ExecutionScheduler.class);
+    InternalTokenStorage tokenStorageMock = createMock(InternalTokenStorage.class);
+    Gson gson = new Gson();
+    RequestExecution requestExecutionMock = createMock(RequestExecution.class);
+    Batch batchMock = createMock(Batch.class);
+
+    long executionId = 11L;
+    String clusterName = "c1";
+
+    BatchSettings batchSettings = new BatchSettings();
+    batchSettings.setTaskFailureToleranceLimit(10);
+
+    Map<Long, RequestExecution> executionMap = new HashMap<Long, RequestExecution>();
+    executionMap.put(executionId, requestExecutionMock);
+
+    ExecutionScheduleManager scheduleManager = createMockBuilder(ExecutionScheduleManager.class).
+      withConstructor(configurationMock, executionSchedulerMock,
+        tokenStorageMock, clustersMock, gson).createMock();
+
+    expectLastCall().anyTimes();
+
+    expect(clustersMock.getCluster(clusterName)).andReturn(clusterMock).anyTimes();
+    expect(clusterMock.getAllRequestExecutions()).andReturn(executionMap).anyTimes();
+    expect(requestExecutionMock.getBatch()).andReturn(batchMock).anyTimes();
+    expect(batchMock.getBatchSettings()).andReturn(batchSettings).anyTimes();
+
+    replay(clustersMock, clusterMock, configurationMock, requestExecutionMock,
+      executionSchedulerMock, scheduleManager, batchMock);
+
+    HashMap<String, Integer> taskCounts = new HashMap<String, Integer>() {{
+      put(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY, 2);
+      put(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY, 10);
+    }};
+
+    boolean exceeded = scheduleManager.hasToleranceThresholdExceeded
+      (executionId, clusterName, taskCounts);
+
+    Assert.assertTrue(exceeded);
+
+    verify(clustersMock, clusterMock, configurationMock, requestExecutionMock,
+      executionSchedulerMock, scheduleManager, batchMock);
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/15c39f4b/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
index adb2d27..9a45227 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java
@@ -31,7 +31,6 @@ import static org.easymock.EasyMock.*;
 
 public class BatchRequestJobTest {
 
-
   @Test
   public void testDoWork() throws Exception {
     ExecutionScheduleManager scheduleManagerMock = createMock(ExecutionScheduleManager.class);
@@ -46,6 +45,10 @@ public class BatchRequestJobTest {
     properties.put(BatchRequestJob.BATCH_REQUEST_BATCH_ID_KEY, batchId);
     properties.put(BatchRequestJob.BATCH_REQUEST_CLUSTER_NAME_KEY, clusterName);
 
+    HashMap<String, Integer> taskCounts = new HashMap<String, Integer>()
+    {{ put(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY, 0);
+      put(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY, 0); }};
+
 
     BatchRequestResponse pendingResponse = new BatchRequestResponse();
     pendingResponse.setStatus(HostRoleStatus.PENDING.toString());
@@ -59,7 +62,8 @@ public class BatchRequestJobTest {
     Capture<String> clusterNameCapture = new Capture<String>();
 
 
-    expect(scheduleManagerMock.executeBatchRequest(captureLong(executionIdCapture), captureLong(batchIdCapture),
+    expect(scheduleManagerMock.executeBatchRequest(captureLong(executionIdCapture),
+      captureLong(batchIdCapture),
       capture(clusterNameCapture))).andReturn(requestId);
 
     expect(scheduleManagerMock.getBatchRequestResponse(requestId, clusterName)).
@@ -68,6 +72,8 @@ public class BatchRequestJobTest {
       andReturn(inProgressResponse).times(4);
     expect(scheduleManagerMock.getBatchRequestResponse(requestId, clusterName)).
       andReturn(completedResponse).once();
+    expect(scheduleManagerMock.hasToleranceThresholdExceeded(executionId,
+      clusterName, taskCounts)).andReturn(false);
 
     scheduleManagerMock.updateBatchRequest(eq(executionId), eq(batchId), eq(clusterName),
         anyObject(BatchRequestResponse.class), eq(true));
@@ -82,7 +88,5 @@ public class BatchRequestJobTest {
     Assert.assertEquals(executionId, executionIdCapture.getValue().longValue());
     Assert.assertEquals(batchId, batchIdCapture.getValue().longValue());
     Assert.assertEquals(clusterName, clusterNameCapture.getValue());
-
-
   }
 }