You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/01/16 19:16:57 UTC

[2/2] git commit: AMBARI-4306. Request Schedule status not updated for Point in time execution request. (swagle)

AMBARI-4306. Request Schedule status not updated for Point in time execution request. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a158073c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a158073c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a158073c

Branch: refs/heads/trunk
Commit: a158073c7db49cee635bdcda2a6330ead3fba62b
Parents: 40e94f0
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Wed Jan 15 15:21:39 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Thu Jan 16 10:16:51 2014 -0800

----------------------------------------------------------------------
 .../scheduler/AbstractLinearExecutionJob.java   | 18 +++-
 .../scheduler/ExecutionScheduleManager.java     | 94 ++++++++++++++++++--
 .../server/scheduler/ExecutionScheduler.java    | 18 ++++
 .../scheduler/ExecutionSchedulerImpl.java       | 11 +++
 .../server/state/scheduler/BatchRequestJob.java | 21 +++++
 .../state/scheduler/RequestExecution.java       |  5 ++
 .../state/scheduler/RequestExecutionImpl.java   | 13 +++
 .../apache/ambari/server/utils/DateUtils.java   | 10 +++
 .../scheduler/ExecutionScheduleManagerTest.java | 61 +++++++++++++
 .../server/state/RequestExecutionTest.java      | 22 +++++
 10 files changed, 262 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/a158073c/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
index 9203aac..73d81bc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
@@ -57,8 +57,11 @@ public abstract class AbstractLinearExecutionJob implements ExecutionJob {
    * @throws AmbariException
    * @param properties
    */
-  protected abstract void doWork(Map<String, Object> properties) throws
-    AmbariException;
+  protected abstract void doWork(Map<String, Object> properties)
+    throws AmbariException;
+
+  protected abstract void finalizeExecution(Map<String, Object> properties)
+    throws AmbariException;
 
   /**
    * Get the next job id from context and create a trigger to fire the next
@@ -77,9 +80,12 @@ public abstract class AbstractLinearExecutionJob implements ExecutionJob {
         + ", scheduleTime = " + context.getScheduledFireTime());
     }
 
+    Map<String, Object> properties = context.getMergedJobDataMap()
+      .getWrappedMap();
+
     // Perform work and exit if failure reported
     try {
-      doWork(context.getMergedJobDataMap().getWrappedMap());
+      doWork(properties);
     } catch (AmbariException e) {
       LOG.error("Exception caught on job execution. Exiting linear chain...", e);
       throw new JobExecutionException(e);
@@ -92,8 +98,14 @@ public abstract class AbstractLinearExecutionJob implements ExecutionJob {
     String nextJobName = jobDataMap.getString(NEXT_EXECUTION_JOB_NAME_KEY);
     String nextJobGroup = jobDataMap.getString(NEXT_EXECUTION_JOB_GROUP_KEY);
 
+    // If no more jobs left, update status and return
     if (nextJobName == null || nextJobName.isEmpty()) {
       LOG.debug("End of linear job chain. Returning with success.");
+      try {
+        finalizeExecution(properties);
+      } catch (AmbariException e) {
+        LOG.warn("Unable to finalize execution for job: " + jobKey);
+      }
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/a158073c/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
index 3898441..268b4b4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
@@ -523,20 +523,20 @@ public class ExecutionScheduleManager {
         }
 
         if (requestMap.get(REQUESTS_ABORTED_TASKS_KEY) != null) {
-          batchRequestResponse.setAbortedTaskCount(Integer.parseInt
-            (requestMap.get(REQUESTS_ABORTED_TASKS_KEY).toString()));
+          batchRequestResponse.setAbortedTaskCount(
+            ((Double) requestMap.get(REQUESTS_ABORTED_TASKS_KEY)).intValue());
         }
         if (requestMap.get(REQUESTS_FAILED_TASKS_KEY) != null) {
-          batchRequestResponse.setFailedTaskCount(Integer.parseInt
-            (requestMap.get(REQUESTS_FAILED_TASKS_KEY).toString()));
+          batchRequestResponse.setFailedTaskCount(
+            ((Double) requestMap.get(REQUESTS_FAILED_TASKS_KEY)).intValue());
         }
         if (requestMap.get(REQUESTS_TIMEDOUT_TASKS_KEY) != null) {
-          batchRequestResponse.setTimedOutTaskCount(Integer.parseInt
-            (requestMap.get(REQUESTS_TIMEDOUT_TASKS_KEY).toString()));
+          batchRequestResponse.setTimedOutTaskCount(
+            ((Double) requestMap.get(REQUESTS_TIMEDOUT_TASKS_KEY)).intValue());
         }
         if (requestMap.get(REQUESTS_TOTAL_TASKS_KEY) != null) {
-          batchRequestResponse.setTotalTaskCount(Integer.parseInt
-            (requestMap.get(REQUESTS_TOTAL_TASKS_KEY).toString()));
+          batchRequestResponse.setTotalTaskCount(
+            ((Double) requestMap.get(REQUESTS_TOTAL_TASKS_KEY)).intValue());
         }
         batchRequestResponse.setStatus(status);
       }
@@ -626,4 +626,82 @@ public class ExecutionScheduleManager {
     return false;
   }
 
+  /**
+   * Marks Request Schedule as COMPLETED, if:
+   * No triggers exist for the first job in the chain OR
+   * If the trigger will never fire again.
+   *
+   * @param executionId
+   * @param clusterName
+   * @throws AmbariException
+   */
+  public void finalizeBatch(long executionId, String clusterName)
+    throws AmbariException {
+
+    Cluster cluster = clusters.getCluster(clusterName);
+    RequestExecution requestExecution = cluster.getAllRequestExecutions().get(executionId);
+
+    if (requestExecution == null) {
+      throw new AmbariException("Unable to find request schedule with id = "
+        + executionId);
+    }
+
+    Batch batch = requestExecution.getBatch();
+    BatchRequest firstBatchRequest = null;
+
+    if (batch != null) {
+      List<BatchRequest> batchRequests = batch.getBatchRequests();
+      if (batchRequests != null && batchRequests.size() > 0) {
+        Collections.sort(batchRequests);
+        firstBatchRequest = batchRequests.get(0);
+      }
+    }
+
+    boolean markCompleted = false;
+
+    if (firstBatchRequest != null) {
+      String jobName = getJobName(executionId, firstBatchRequest.getOrderId());
+      JobKey jobKey = JobKey.jobKey(jobName, ExecutionJob.LINEAR_EXECUTION_JOB_GROUP);
+      JobDetail jobDetail;
+      try {
+        jobDetail = executionScheduler.getJobDetail(jobKey);
+      } catch (SchedulerException e) {
+        LOG.warn("Unable to retrieve job details from scheduler. job: " + jobKey);
+        e.printStackTrace();
+        return;
+      }
+
+      if (jobDetail != null) {
+        try {
+          List<? extends Trigger> triggers = executionScheduler.getTriggersForJob(jobKey);
+          if (triggers != null && triggers.size() > 0) {
+            if (triggers.size() > 1) {
+              throw new AmbariException("Too many triggers defined for job. " +
+                "job: " + jobKey);
+            }
+
+            Trigger trigger = triggers.get(0);
+            // Note: If next fire time is in the past, it could be a misfire
+            // If final fire time is null, means it is a forever running job
+            if (!trigger.mayFireAgain() ||
+                (trigger.getFinalFireTime() != null &&
+                  !DateUtils.isFutureTime(trigger.getFinalFireTime()))) {
+              markCompleted = true;
+            }
+          } else {
+            // No triggers for job
+            markCompleted = true;
+          }
+        } catch (SchedulerException e) {
+          LOG.warn("Unable to retrieve triggers for job: " + jobKey);
+          e.printStackTrace();
+          return;
+        }
+      }
+    }
+
+    if (markCompleted) {
+      requestExecution.updateStatus(RequestExecution.Status.COMPLETED);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a158073c/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java
index 6644e1b..f75eb94 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java
@@ -27,6 +27,8 @@ import org.quartz.Scheduler;
 import org.quartz.SchedulerException;
 import org.quartz.Trigger;
 
+import java.util.List;
+
 public interface ExecutionScheduler {
   /**
    * Initialize and start the scheduler to accept jobs.
@@ -61,4 +63,20 @@ public interface ExecutionScheduler {
    * @throws SchedulerException
    */
   public void deleteJob(JobKey jobKey) throws SchedulerException;
+
+  /**
+   * Get details for a job from scheduler.
+   * @param jobKey
+   * @return
+   */
+  public JobDetail getJobDetail(JobKey jobKey) throws SchedulerException;
+
+  /**
+   * Get all triggers created for a job.
+   * @param jobKey
+   * @return
+   * @throws SchedulerException
+   */
+  public List<? extends Trigger> getTriggersForJob(JobKey jobKey)
+    throws SchedulerException;
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a158073c/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java
index 7e1381d..488564f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java
@@ -35,6 +35,7 @@ import org.quartz.impl.StdSchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.Properties;
 
 @Singleton
@@ -196,4 +197,14 @@ public class ExecutionSchedulerImpl implements ExecutionScheduler {
     scheduler.deleteJob(jobKey);
   }
 
+  @Override
+  public JobDetail getJobDetail(JobKey jobKey) throws SchedulerException {
+    return scheduler.getJobDetail(jobKey);
+  }
+
+  @Override
+  public List<? extends Trigger> getTriggersForJob(JobKey jobKey) throws SchedulerException {
+    return scheduler.getTriggersOfJob(jobKey);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a158073c/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
index 88142ac..0718dcc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
@@ -111,6 +111,27 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
     }
   }
 
+  @Override
+  protected void finalizeExecution(Map<String, Object> properties)
+      throws AmbariException {
+
+    Long executionId = properties.get(BATCH_REQUEST_EXECUTION_ID_KEY) != null ?
+      (Long) properties.get(BATCH_REQUEST_EXECUTION_ID_KEY) : null;
+    Long batchId = properties.get(BATCH_REQUEST_BATCH_ID_KEY) != null ?
+      (Long) properties.get(BATCH_REQUEST_BATCH_ID_KEY) : null;
+    String clusterName = (String) properties.get(BATCH_REQUEST_CLUSTER_NAME_KEY);
+
+    if (executionId == null || batchId == null) {
+      throw new AmbariException("Unable to retrieve persisted batch request"
+        + ", execution_id = " + executionId
+        + ", batch_id = " + batchId);
+    }
+
+    // Check if this job has a future and update status if it doesn't
+    executionScheduleManager.finalizeBatch(executionId, clusterName);
+
+  }
+
   private Map<String, Integer> addTaskCountToProperties(Map<String, Object> properties,
                                         Map<String, Integer> oldCounts,
                                         BatchRequestResponse batchRequestResponse) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/a158073c/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java
index 44e8ece..06a46c7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java
@@ -164,6 +164,11 @@ public interface RequestExecution {
   void updateBatchRequest(long batchId, BatchRequestResponse batchRequestResponse, boolean statusOnly);
 
   /**
+   * Update status and save RequestExecution
+   */
+  public void updateStatus(Status status);
+
+  /**
    * Status of the Request execution
    */
   public enum Status {

http://git-wip-us.apache.org/repos/asf/ambari/blob/a158073c/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
index 5f43b52..2e7fa11 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
@@ -458,4 +458,17 @@ public class RequestExecutionImpl implements RequestExecution {
     setLastExecutionStatus(batchRequestResponse.getStatus());
   }
 
+  @Override
+  @Transactional
+  public void updateStatus(Status status) {
+    setStatus(status);
+    if (isPersisted) {
+      requestScheduleEntity.setUpdateTimestamp(System.currentTimeMillis());
+      requestScheduleDAO.merge(requestScheduleEntity);
+    } else {
+      LOG.warn("Updated status in memory, since Request Schedule is not " +
+        "persisted.");
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a158073c/ambari-server/src/main/java/org/apache/ambari/server/utils/DateUtils.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/DateUtils.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/DateUtils.java
index 3444988..785f4fd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/utils/DateUtils.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/DateUtils.java
@@ -84,4 +84,14 @@ public class DateUtils {
     long diff = Math.abs(oldTime.getTime() - new Date().getTime());
     return diff / (60 * 1000) % 60;
   }
+
+  /**
+   * Check if given time is in the future
+   * @param time
+   * @return
+   */
+  public static boolean isFutureTime(Date time) {
+    Date now = new Date();
+    return time.after(now);
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a158073c/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
index 11a0d51..8f3b800 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
@@ -39,6 +39,7 @@ import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.scheduler.*;
 import org.easymock.Capture;
+import org.easymock.IAnswer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -508,4 +509,64 @@ public class ExecutionScheduleManagerTest {
     verify(clustersMock, clusterMock, configurationMock, requestExecutionMock,
       executionSchedulerMock, scheduleManager, batchMock);
   }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testFinalizeBatch() throws Exception {
+    Clusters clustersMock = createMock(Clusters.class);
+    Cluster clusterMock = createMock(Cluster.class);
+    Configuration configurationMock = createNiceMock(Configuration.class);
+    ExecutionScheduler executionSchedulerMock = createMock(ExecutionScheduler.class);
+    InternalTokenStorage tokenStorageMock = createMock(InternalTokenStorage.class);
+    Gson gson = new Gson();
+    RequestExecution requestExecutionMock = createMock(RequestExecution.class);
+    Batch batchMock = createMock(Batch.class);
+    JobDetail jobDetailMock = createMock(JobDetail.class);
+    final BatchRequest batchRequestMock = createMock(BatchRequest.class);
+    final Trigger triggerMock = createNiceMock(Trigger.class);
+    final List<Trigger> triggers = new ArrayList<Trigger>()  {{ add(triggerMock); }};
+
+    long executionId = 11L;
+    String clusterName = "c1";
+    Date pastDate = new Date(new Date().getTime() - 2);
+
+    Map<Long, RequestExecution> executionMap = new HashMap<Long, RequestExecution>();
+    executionMap.put(executionId, requestExecutionMock);
+
+    ExecutionScheduleManager scheduleManager = createMockBuilder(ExecutionScheduleManager.class).
+      withConstructor(configurationMock, executionSchedulerMock,
+        tokenStorageMock, clustersMock, gson).createMock();
+
+    expectLastCall().anyTimes();
+
+    expect(clustersMock.getCluster(clusterName)).andReturn(clusterMock).anyTimes();
+    expect(clusterMock.getAllRequestExecutions()).andReturn(executionMap).anyTimes();
+    expect(requestExecutionMock.getBatch()).andReturn(batchMock).anyTimes();
+    expect(batchMock.getBatchRequests()).andReturn
+      (new ArrayList<BatchRequest>() {{
+        add(batchRequestMock);
+      }});
+    expect(batchRequestMock.getOrderId()).andReturn(1L).anyTimes();
+    expect(executionSchedulerMock.getJobDetail((JobKey) anyObject()))
+      .andReturn(jobDetailMock).anyTimes();
+    expect((List<Trigger>) executionSchedulerMock
+      .getTriggersForJob((JobKey) anyObject())).andReturn(triggers).anyTimes();
+    expect(triggerMock.mayFireAgain()).andReturn(true).anyTimes();
+    expect(triggerMock.getFinalFireTime()).andReturn(pastDate).anyTimes();
+
+    requestExecutionMock.updateStatus(RequestExecution.Status.COMPLETED);
+    expectLastCall();
+
+    replay(clustersMock, clusterMock, configurationMock, requestExecutionMock,
+      executionSchedulerMock, scheduleManager, batchMock, batchRequestMock,
+      triggerMock, jobDetailMock);
+
+
+    scheduleManager.finalizeBatch(executionId, clusterName);
+
+
+    verify(clustersMock, clusterMock, configurationMock, requestExecutionMock,
+      executionSchedulerMock, scheduleManager, batchMock, batchRequestMock,
+      triggerMock, jobDetailMock);
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a158073c/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
index 9d29ebf..22f8301 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
@@ -114,6 +114,8 @@ public class RequestExecutionTest {
 
     RequestExecution requestExecution = requestExecutionFactory.createNew
       (cluster, batches, schedule);
+
+    requestExecution.setStatus(RequestExecution.Status.SCHEDULED);
     requestExecution.setDescription("Test Schedule");
 
     requestExecution.persist();
@@ -319,4 +321,24 @@ public class RequestExecutionTest {
     Assert.assertNotNull(postBatchRequest.getBody());
   }
 
+  @Test
+  public void testUpdateStatus() throws Exception {
+    RequestExecution requestExecution = createRequestSchedule();
+    Assert.assertNotNull(requestExecution);
+    Assert.assertNotNull(cluster.getAllRequestExecutions().get
+      (requestExecution.getId()));
+
+    RequestScheduleEntity scheduleEntity = requestScheduleDAO.findById
+      (requestExecution.getId());
+    Assert.assertNotNull(scheduleEntity);
+    Assert.assertEquals(RequestExecution.Status.SCHEDULED.name(),
+      scheduleEntity.getStatus());
+
+    requestExecution.updateStatus(RequestExecution.Status.COMPLETED);
+
+    scheduleEntity = requestScheduleDAO.findById(requestExecution.getId());
+    Assert.assertNotNull(scheduleEntity);
+    Assert.assertEquals(RequestExecution.Status.COMPLETED.name(),
+      scheduleEntity.getStatus());
+  }
 }