You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/01/07 21:08:39 UTC

[1/2] git commit: AMBARI-4150. Provide ability to batch requests based on schedule. (swagle)

Updated Branches:
  refs/heads/trunk 9457a5715 -> 1c7e66dad


AMBARI-4150. Provide ability to batch requests based on schedule. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5da00398
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5da00398
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5da00398

Branch: refs/heads/trunk
Commit: 5da0039823cf108f7fe5361d56ce558477f9619b
Parents: 9457a57
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Jan 7 10:17:33 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Jan 7 10:17:33 2014 -0800

----------------------------------------------------------------------
 .../api/services/RequestScheduleService.java    |  33 +--
 .../controller/AmbariManagementController.java  |   6 +
 .../AmbariManagementControllerImpl.java         |   8 +
 .../RequestScheduleResourceProvider.java        |  70 ++++-
 .../scheduler/AbstractLinearExecutionJob.java   |  12 +-
 .../scheduler/ExecutionScheduleManager.java     | 294 ++++++++++++++++++-
 .../server/scheduler/ExecutionScheduler.java    |  28 +-
 .../scheduler/ExecutionSchedulerImpl.java       |  28 +-
 .../server/state/scheduler/BatchRequest.java    |   9 +
 .../server/state/scheduler/BatchRequestJob.java |  29 +-
 .../state/scheduler/BatchRequestResponse.java   |  26 ++
 .../state/scheduler/RequestExecutionImpl.java   |   5 +-
 .../ambari/server/state/scheduler/Schedule.java |  27 +-
 .../apache/ambari/server/utils/DateUtils.java   |  24 ++
 .../main/resources/Ambari-DDL-MySQL-CREATE.sql  |  15 +-
 .../resources/Ambari-DDL-Postgres-CREATE.sql    | 200 ++++++-------
 .../Ambari-DDL-Postgres-REMOTE-CREATE.sql       | 160 +++++-----
 .../parsers/JsonRequestBodyParserTest.java      |  38 +++
 .../RequestScheduleResourceProviderTest.java    |  37 ++-
 .../server/orm/InMemoryDefaultTestModule.java   |   3 +
 .../scheduler/ExecutionScheduleManagerTest.java | 268 +++++++++++++++++
 .../scheduler/ExecutionSchedulerTest.java       |   3 +-
 .../ambari/server/state/ConfigGroupTest.java    |   2 +-
 .../server/state/RequestExecutionTest.java      |  12 +-
 24 files changed, 1057 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java
index c59a8e3..9c98e09 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java
@@ -63,7 +63,7 @@ public class RequestScheduleService extends BaseService {
   @GET
   @Produces("text/plain")
   public Response getRequestSchedules(@Context HttpHeaders headers,
-                                  @Context UriInfo ui) {
+                                      @Context UriInfo ui) {
     return handleRequest(headers, null, ui, Request.Type.GET,
       createRequestSchedule(m_clusterName, null));
   }
@@ -95,35 +95,14 @@ public class RequestScheduleService extends BaseService {
    */
   @POST
   @Produces("text/plain")
-  public Response createRequestSchedule(String body, @Context HttpHeaders
-    headers,
-                                    @Context UriInfo ui) {
+  public Response createRequestSchedule(String body,
+                                        @Context HttpHeaders headers,
+                                        @Context UriInfo ui) {
     return handleRequest(headers, body, ui, Request.Type.POST,
       createRequestSchedule(m_clusterName, null));
   }
 
   /**
-   * Handles PUT /clusters/{clusterId}/request_schedules/{requestScheduleId}
-   * Update a request schedule
-   *
-   * @param body
-   * @param headers
-   * @param ui
-   * @param requestScheduleId
-   * @return
-   */
-  @PUT
-  @Path("{requestScheduleId}")
-  @Produces("text/plain")
-  public Response updateRequestSchedule(String body,
-                                        @Context HttpHeaders headers,
-                                        @Context UriInfo ui,
-                                        @PathParam("requestScheduleId") String requestScheduleId) {
-    return handleRequest(headers, body, ui, Request.Type.PUT,
-      createRequestSchedule(m_clusterName, requestScheduleId));
-  }
-
-  /**
    * Handles DELETE /clusters/{clusterId}/request_schedules/{requestScheduleId}
    * Delete a request schedule
    *
@@ -136,8 +115,8 @@ public class RequestScheduleService extends BaseService {
   @Path("{requestScheduleId}")
   @Produces("text/plain")
   public Response deleteRequestSchedule(@Context HttpHeaders headers,
-                                    @Context UriInfo ui,
-                                    @PathParam("requestScheduleId") String requestScheduleId) {
+                                        @Context UriInfo ui,
+                                        @PathParam("requestScheduleId") String requestScheduleId) {
     return handleRequest(headers, null, ui, Request.Type.DELETE,
       createRequestSchedule(m_clusterName, requestScheduleId));
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
index c031034..26a02a4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
@@ -21,6 +21,7 @@ package org.apache.ambari.server.controller;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Service;
@@ -512,5 +513,10 @@ public interface AmbariManagementController {
    * @return
    */
   public RequestExecutionFactory getRequestExecutionFactory();
+
+  /**
+   * Get Execution Schedule Manager
+   */
+  public ExecutionScheduleManager getExecutionScheduleManager();
 }
   

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index 007a023..ca14f14 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -54,6 +54,7 @@ import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.internal.URLStreamProvider;
 import org.apache.ambari.server.metadata.ActionMetadata;
 import org.apache.ambari.server.metadata.RoleCommandOrder;
+import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
 import org.apache.ambari.server.security.authorization.AuthorizationHelper;
 import org.apache.ambari.server.security.authorization.User;
 import org.apache.ambari.server.security.authorization.Users;
@@ -133,6 +134,8 @@ public class AmbariManagementControllerImpl implements
   private ConfigHelper configHelper;
   @Inject
   private RequestExecutionFactory requestExecutionFactory;
+  @Inject
+  private ExecutionScheduleManager executionScheduleManager;
 
   final private String masterHostname;
   final private Integer masterPort;
@@ -1035,6 +1038,11 @@ public class AmbariManagementControllerImpl implements
     return requestExecutionFactory;
   }
 
+  @Override
+  public ExecutionScheduleManager getExecutionScheduleManager() {
+    return executionScheduleManager;
+  }
+
   private List<Stage> doStageCreation(Cluster cluster,
       Map<State, List<Service>> changedServices,
       Map<State, List<ServiceComponent>> changedComps,

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
index 5119d25..2d3826e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
@@ -217,6 +217,20 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
     return resources;
   }
 
+  /**
+   * Currently unsupported operation. Since strong guarantees are required
+   * that no jobs are currently running.
+   * @param request    the request object which defines the set of properties
+   *                   for the resources to be updated
+   * @param predicate  the predicate object which can be used to filter which
+   *                   resources are updated
+   *
+   * @return
+   * @throws SystemException
+   * @throws UnsupportedPropertyException
+   * @throws NoSuchResourceException
+   * @throws NoSuchParentResourceException
+   */
   @Override
   public RequestStatus updateResources(Request request, Predicate predicate)
       throws SystemException, UnsupportedPropertyException,
@@ -286,11 +300,29 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
           + "exist", e);
     }
 
-    LOG.info("Deleting Request Schedule "
+    RequestExecution requestExecution =
+      cluster.getAllRequestExecutions().get(request.getId());
+
+    if (requestExecution == null) {
+      throw new AmbariException("Request Schedule not found "
+        + ", clusterName = " + request.getClusterName()
+        + ", description = " + request.getDescription()
+        + ", id = " + request.getId());
+    }
+
+    String username = getManagementController().getAuthName();
+
+    LOG.info("Disabling Request Schedule "
       + ", clusterName = " + request.getClusterName()
-      + ", id = " + request.getId());
+      + ", id = " + request.getId()
+      + ", user = " + username);
 
-    cluster.deleteRequestExecution(request.getId());
+    // Delete all jobs and triggers
+    getManagementController().getExecutionScheduleManager()
+      .deleteAllJobs(requestExecution);
+
+    requestExecution.setStatus(RequestExecution.Status.DISABLED);
+    requestExecution.persist();
   }
 
   private synchronized void updateRequestSchedule
@@ -347,6 +379,10 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
         + ", user = " + username);
 
       requestExecution.persist();
+
+      // Update schedule for the batch
+      getManagementController().getExecutionScheduleManager()
+        .updateBatchSchedule(requestExecution);
     }
   }
 
@@ -385,6 +421,7 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
 
       requestExecution.setCreateUser(username);
       requestExecution.setUpdateUser(username);
+      requestExecution.setStatus(RequestExecution.Status.SCHEDULED);
 
       LOG.info("Persisting new Request Schedule "
         + ", clusterName = " + request.getClusterName()
@@ -394,6 +431,10 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
       requestExecution.persist();
       cluster.addRequestExecution(requestExecution);
 
+      // Setup batch schedule
+      getManagementController().getExecutionScheduleManager()
+        .scheduleBatch(requestExecution);
+
       RequestScheduleResponse response = new RequestScheduleResponse
         (requestExecution.getId(), requestExecution.getClusterName(),
           requestExecution.getDescription(), requestExecution.getStatus(),
@@ -407,10 +448,31 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
     return responses;
   }
 
-  private void validateRequest(RequestScheduleRequest request) {
+  private void validateRequest(RequestScheduleRequest request) throws AmbariException {
     if (request.getClusterName() == null) {
       throw new IllegalArgumentException("Cluster name is required.");
     }
+    Schedule schedule = request.getSchedule();
+    if (schedule != null) {
+      getManagementController().getExecutionScheduleManager()
+        .validateSchedule(schedule);
+    }
+    Batch batch = request.getBatch();
+    if (batch != null && !batch.getBatchRequests().isEmpty()) {
+      // Verify requests can be ordered
+      HashSet<Long> orderIdSet = new HashSet<Long>();
+      for (BatchRequest batchRequest : batch.getBatchRequests()) {
+        if (batchRequest.getOrderId() == null) {
+          throw new AmbariException("No order id provided for batch request. " +
+            "" + batchRequest);
+        }
+        if (orderIdSet.contains(batchRequest.getOrderId())) {
+          throw new AmbariException("Duplicate order id provided for batch " +
+            "request. " + batchRequest);
+        }
+        orderIdSet.add(batchRequest.getOrderId());
+      }
+    }
   }
 
   private synchronized Set<RequestScheduleResponse> getRequestSchedules

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
index 37d6752..c779fde 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
@@ -28,6 +28,9 @@ import org.quartz.PersistJobDataAfterExecution;
 import org.quartz.Trigger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
 import static org.quartz.DateBuilder.futureDate;
 import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
 import static org.quartz.TriggerBuilder.newTrigger;
@@ -42,8 +45,8 @@ import static org.quartz.TriggerBuilder.newTrigger;
 @PersistJobDataAfterExecution
 @DisallowConcurrentExecution
 public abstract class AbstractLinearExecutionJob implements ExecutionJob {
-  private ExecutionScheduleManager executionScheduleManager;
   private static Logger LOG = LoggerFactory.getLogger(AbstractLinearExecutionJob.class);
+  protected ExecutionScheduleManager executionScheduleManager;
 
   public AbstractLinearExecutionJob(ExecutionScheduleManager executionScheduleManager) {
     this.executionScheduleManager = executionScheduleManager;
@@ -52,8 +55,10 @@ public abstract class AbstractLinearExecutionJob implements ExecutionJob {
   /**
    * Do the actual work of the fired job.
    * @throws AmbariException
+   * @param properties
    */
-  protected abstract void doWork() throws AmbariException;
+  protected abstract void doWork(Map<String, Object> properties) throws
+    AmbariException;
 
   /**
    * Get the next job id from context and create a trigger to fire the next
@@ -74,7 +79,7 @@ public abstract class AbstractLinearExecutionJob implements ExecutionJob {
 
     // Perform work and exit if failure reported
     try {
-      doWork();
+      doWork(context.getMergedJobDataMap().getWrappedMap());
     } catch (AmbariException e) {
       LOG.error("Exception caught on job execution. Exiting linear chain...", e);
       throw new JobExecutionException(e);
@@ -100,4 +105,5 @@ public abstract class AbstractLinearExecutionJob implements ExecutionJob {
 
     executionScheduleManager.scheduleJob(trigger);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
index 443a7e2..a3199e4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
@@ -23,15 +23,35 @@ import com.google.inject.Injector;
 import com.google.inject.Singleton;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.orm.dao.RequestScheduleBatchRequestDAO;
+import org.apache.ambari.server.orm.entities.RequestScheduleBatchRequestEntity;
+import org.apache.ambari.server.orm.entities.RequestScheduleBatchRequestEntityPK;
+import org.apache.ambari.server.state.scheduler.Batch;
+import org.apache.ambari.server.state.scheduler.BatchRequest;
+import org.apache.ambari.server.state.scheduler.BatchRequestJob;
+import org.apache.ambari.server.state.scheduler.BatchRequestResponse;
+import org.apache.ambari.server.state.scheduler.RequestExecution;
+import org.apache.ambari.server.state.scheduler.Schedule;
 import org.apache.ambari.server.utils.DateUtils;
-import org.quartz.Job;
+import org.quartz.CronExpression;
+import org.quartz.JobDetail;
 import org.quartz.JobExecutionContext;
+import org.quartz.JobKey;
 import org.quartz.SchedulerException;
 import org.quartz.Trigger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.text.ParseException;
+import java.util.Collections;
 import java.util.Date;
+import java.util.List;
+import java.util.ListIterator;
+
+import static org.quartz.CronScheduleBuilder.cronSchedule;
+import static org.quartz.JobBuilder.newJob;
+import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
+import static org.quartz.TriggerBuilder.newTrigger;
 
 /**
  * This class handles scheduling request execution for managed clusters
@@ -44,14 +64,22 @@ public class ExecutionScheduleManager {
   private ExecutionScheduler executionScheduler;
   @Inject
   private Configuration configuration;
+  @Inject
+  private RequestScheduleBatchRequestDAO batchRequestDAO;
 
   private volatile boolean schedulerAvailable = false;
+  protected static final String BATCH_REQUEST_JOB_PREFIX = "BatchRequestJob";
+  protected static final String REQUEST_EXECUTION_TRIGGER_PREFIX =
+    "RequestExecution";
 
   @Inject
   public ExecutionScheduleManager(Injector injector) {
     injector.injectMembers(this);
   }
 
+  /**
+   * Start Execution scheduler
+   */
   public void start() {
     LOG.info("Starting scheduler");
     try {
@@ -63,6 +91,9 @@ public class ExecutionScheduleManager {
     }
   }
 
+  /**
+   * Stop execution scheduler
+   */
   public void stop() {
     LOG.info("Stopping scheduler");
     schedulerAvailable = false;
@@ -74,10 +105,18 @@ public class ExecutionScheduleManager {
     }
   }
 
+  /**
+   * Is Execution scheduler available for accepting jobs?
+   * @return
+   */
   public boolean isSchedulerAvailable() {
     return schedulerAvailable;
   }
 
+  /**
+   * Add trigger for a job to the scheduler
+   * @param trigger
+   */
   public void scheduleJob(Trigger trigger) {
     LOG.debug("Scheduling job: " + trigger.getJobKey());
     if (isSchedulerAvailable()) {
@@ -92,6 +131,12 @@ public class ExecutionScheduleManager {
     }
   }
 
+  /**
+   * Find out by how much did a schedule misfire and decide whether to continue
+   * based on configuration
+   * @param jobExecutionContext
+   * @return
+   */
   public boolean continueOnMisfire(JobExecutionContext jobExecutionContext) {
     if (jobExecutionContext != null) {
       Date scheduledTime = jobExecutionContext.getScheduledFireTime();
@@ -100,4 +145,251 @@ public class ExecutionScheduleManager {
     }
     return true;
   }
+
+  /**
+   * Persist jobs based on the request batch and create trigger for the first
+   * job
+   * @param requestExecution
+   * @throws AmbariException
+   */
+  public void scheduleBatch(RequestExecution requestExecution)
+    throws AmbariException {
+
+    if (!isSchedulerAvailable()) {
+      throw new AmbariException("Scheduler unavailable.");
+    }
+
+    // Create and persist jobs based on batches
+    JobDetail firstJobDetail = persistBatch(requestExecution);
+
+    if (firstJobDetail == null) {
+      throw new AmbariException("Unable to schedule jobs. firstJobDetail = "
+        + firstJobDetail);
+    }
+
+    // Create a cron trigger for the first batch job
+    // If no schedule is specified create simple trigger to fire right away
+    Schedule schedule = requestExecution.getSchedule();
+
+    if (schedule != null) {
+      String triggerExpression = schedule.getScheduleExpression();
+
+      Date startDate = null;
+      Date endDate = null;
+      try {
+        String startTime = schedule.getStartTime();
+        String endTime = schedule.getEndTime();
+        startDate = startTime != null ? DateUtils.convertToDate(startTime) : new Date();
+        endDate = endTime != null ? DateUtils.convertToDate(endTime) : null;
+      } catch (ParseException e) {
+        LOG.error("Unable to parse startTime / endTime.", e);
+      }
+
+      Trigger trigger = newTrigger()
+          .withIdentity(REQUEST_EXECUTION_TRIGGER_PREFIX + "-" +
+            requestExecution.getId(), ExecutionJob.LINEAR_EXECUTION_TRIGGER_GROUP)
+          .withSchedule(cronSchedule(triggerExpression)
+            .withMisfireHandlingInstructionFireAndProceed())
+          .forJob(firstJobDetail)
+          .startAt(startDate)
+          .endAt(endDate)
+          .build();
+
+      try {
+        executionScheduler.scheduleJob(trigger);
+      } catch (SchedulerException e) {
+        LOG.error("Unable to schedule request execution.", e);
+        throw new AmbariException(e.getMessage());
+      }
+
+    } else {
+      // Create trigger for immediate job execution
+      Trigger trigger = newTrigger()
+        .forJob(firstJobDetail)
+        .withIdentity(REQUEST_EXECUTION_TRIGGER_PREFIX + "-" +
+          requestExecution.getId(), ExecutionJob.LINEAR_EXECUTION_TRIGGER_GROUP)
+        .withSchedule(simpleSchedule().withMisfireHandlingInstructionFireNow())
+        .startNow()
+        .build();
+
+      try {
+        executionScheduler.scheduleJob(trigger);
+      } catch (SchedulerException e) {
+        LOG.error("Unable to schedule request execution.", e);
+        throw new AmbariException(e.getMessage());
+      }
+    }
+  }
+
+  private JobDetail persistBatch(RequestExecution requestExecution)
+    throws  AmbariException {
+
+    Batch batch = requestExecution.getBatch();
+    JobDetail jobDetail = null;
+
+    if (batch != null) {
+      List<BatchRequest> batchRequests = batch.getBatchRequests();
+      if (batchRequests != null) {
+        Collections.sort(batchRequests);
+        ListIterator<BatchRequest> iterator = batchRequests.listIterator
+          (batchRequests.size());
+        String nextJobName = null;
+        while (iterator.hasPrevious()) {
+          BatchRequest batchRequest = iterator.previous();
+
+          String jobName = getJobName(requestExecution.getId(),
+            batchRequest.getOrderId());
+
+          // Create Job and store properties to get next batch request details
+          jobDetail = newJob(BatchRequestJob.class)
+            .withIdentity(jobName, ExecutionJob.LINEAR_EXECUTION_JOB_GROUP)
+            .usingJobData(ExecutionJob.NEXT_EXECUTION_JOB_NAME_KEY, nextJobName)
+            .usingJobData(ExecutionJob.NEXT_EXECUTION_JOB_GROUP_KEY,
+              ExecutionJob.LINEAR_EXECUTION_JOB_GROUP)
+            .usingJobData(BatchRequestJob.BATCH_REQUEST_EXECUTION_ID_KEY,
+              requestExecution.getId())
+            .usingJobData(BatchRequestJob.BATCH_REQUEST_BATCH_ID_KEY,
+              batchRequest.getOrderId())
+            .storeDurably()
+            .build();
+
+          try {
+            executionScheduler.addJob(jobDetail);
+          } catch (SchedulerException e) {
+            LOG.error("Failed to add job detail. " + batchRequest, e);
+          }
+
+          nextJobName = jobName;
+        }
+      }
+    }
+    return jobDetail;
+  }
+
+  protected String getJobName(Long executionId, Long orderId) {
+    return BATCH_REQUEST_JOB_PREFIX + "-" + executionId.toString() + "-" +
+      orderId.toString();
+  }
+
+  /**
+   * Delete and re-create all jobs and triggers
+   * Update schedule for a batch
+   * @param requestExecution
+   */
+  public void updateBatchSchedule(RequestExecution requestExecution)
+    throws AmbariException {
+
+    // TODO: Support delete and update if no jobs are running
+  }
+
+  /**
+   * Validate if schedule expression is a valid Cron schedule
+   * @param schedule
+   * @return
+   */
+  public void validateSchedule(Schedule schedule) throws AmbariException {
+    if (!schedule.isEmpty()) {
+      if (schedule.getStartTime() != null && !schedule.getStartTime().isEmpty()) {
+        try {
+          DateUtils.convertToDate(schedule.getStartTime());
+        } catch (ParseException pe) {
+          throw new AmbariException("Start time in invalid format. startTime "
+            + "= " + schedule.getStartTime() + ", Allowed format = "
+            + DateUtils.ALLOWED_DATE_FORMAT);
+        }
+      }
+      if (schedule.getEndTime() != null && !schedule.getEndTime().isEmpty()) {
+        try {
+          DateUtils.convertToDate(schedule.getEndTime());
+        } catch (ParseException pe) {
+          throw new AmbariException("End time in invalid format. endTime "
+            + "= " + schedule.getEndTime() + ", Allowed format = "
+            + DateUtils.ALLOWED_DATE_FORMAT);
+        }
+      }
+      String cronExpression = schedule.getScheduleExpression();
+      if (cronExpression != null && !cronExpression.trim().isEmpty()) {
+        if (!CronExpression.isValidExpression(cronExpression)) {
+          throw new AmbariException("Invalid non-empty cron expression " +
+            "provided. " + cronExpression);
+        }
+      }
+    }
+  }
+
+  /**
+   * Delete all jobs and triggers if possible.
+   * @throws AmbariException
+   */
+  public void deleteAllJobs(RequestExecution requestExecution) throws AmbariException {
+    if (!isSchedulerAvailable()) {
+      throw new AmbariException("Scheduler unavailable.");
+    }
+
+    // Delete all jobs for this request execution
+    Batch batch = requestExecution.getBatch();
+    if (batch != null) {
+      List<BatchRequest> batchRequests = batch.getBatchRequests();
+      if (batchRequests != null) {
+        for (BatchRequest batchRequest : batchRequests) {
+          String jobName = getJobName(requestExecution.getId(),
+            batchRequest.getOrderId());
+
+          LOG.debug("Deleting Job, jobName = " + jobName);
+
+          try {
+            executionScheduler.deleteJob(JobKey.jobKey(jobName,
+              ExecutionJob.LINEAR_EXECUTION_JOB_GROUP));
+          } catch (SchedulerException e) {
+            LOG.warn("Unable to delete job, " + jobName, e);
+            throw new AmbariException(e.getMessage());
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Execute a Batch request and return request id if the server responds with
+   * a request id for long running operations.
+   * @return request id
+   * @throws AmbariException
+   */
+  public synchronized Long executeBatchRequest(Long executionId,
+                                               Long batchId) throws AmbariException {
+
+    String type = null;
+    String uri = null;
+    String body = null;
+
+    try {
+      RequestScheduleBatchRequestEntityPK batchRequestEntityPK = new
+        RequestScheduleBatchRequestEntityPK();
+      batchRequestEntityPK.setScheduleId(executionId);
+      batchRequestEntityPK.setBatchId(batchId);
+      RequestScheduleBatchRequestEntity batchRequestEntity =
+        batchRequestDAO.findByPk(batchRequestEntityPK);
+
+      type = batchRequestEntity.getRequestType();
+      uri = batchRequestEntity.getRequestUri();
+      body = batchRequestEntity.getRequestBody();
+
+    } catch (Exception e) {
+
+    }
+
+    return -1L;
+  }
+
+  /**
+   * Get status of a long running operation
+   * @return
+   * @throws AmbariException
+   */
+  public BatchRequestResponse getBatchRequestResponse(Long requestId)
+    throws AmbariException {
+
+    BatchRequestResponse batchRequestResponse = new BatchRequestResponse();
+    return batchRequestResponse;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java
index a18c91b..c51bd6b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java
@@ -21,7 +21,9 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.state.scheduler.RequestExecution;
 import org.apache.ambari.server.state.scheduler.Schedule;
 import org.quartz.Job;
+import org.quartz.JobDetail;
 import org.quartz.JobKey;
+import org.quartz.Scheduler;
 import org.quartz.SchedulerException;
 import org.quartz.Trigger;
 
@@ -39,14 +41,24 @@ public interface ExecutionScheduler {
   public void stopScheduler() throws AmbariException;
 
   /**
-   * Create a job based on the @RequestExecution and add a trigger for the
-   * created job based on the @Schedule. Schedule the job with the scheduler.
-   * @param requestExecution
-   * @param schedule
-   * @throws AmbariException
+   * Add a trigger to the execution scheduler
+   * @param trigger
+   * @throws SchedulerException
    */
-  public void scheduleJob(RequestExecution requestExecution,
-                          Schedule schedule) throws AmbariException;
-
   public void scheduleJob(Trigger trigger) throws SchedulerException;
+
+  /**
+   * Persist job data
+   * @param job
+   * @throws SchedulerException
+   */
+  public void addJob(JobDetail job) throws SchedulerException;
+
+
+  /**
+   * Delete the identified Job from the Scheduler - and any associated Triggers.
+   * @param jobKey
+   * @throws SchedulerException
+   */
+  public void deleteJob(JobKey jobKey) throws SchedulerException;
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java
index 2edfce7..84abf0e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java
@@ -18,12 +18,14 @@
 package org.apache.ambari.server.scheduler;
 
 import com.google.inject.Inject;
+import com.google.inject.Injector;
 import com.google.inject.Singleton;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.state.scheduler.RequestExecution;
 import org.apache.ambari.server.state.scheduler.Schedule;
 import org.quartz.Job;
+import org.quartz.JobDetail;
 import org.quartz.JobKey;
 import org.quartz.Scheduler;
 import org.quartz.SchedulerException;
@@ -38,13 +40,21 @@ import java.util.Properties;
 public class ExecutionSchedulerImpl implements ExecutionScheduler {
   @Inject
   private Configuration configuration;
-  private Scheduler scheduler;
   private static final Logger LOG = LoggerFactory.getLogger(ExecutionSchedulerImpl.class);
   protected static final String DEFAULT_SCHEDULER_NAME = "ExecutionScheduler";
-  private static volatile boolean isInitialized = false;
+  protected Scheduler scheduler;
+  protected static volatile boolean isInitialized = false;
 
   @Inject
-  public ExecutionSchedulerImpl(Configuration configuration) {
+  public ExecutionSchedulerImpl(Injector injector) {
+    injector.injectMembers(this);
+  }
+
+  /**
+   * Constructor used for unit testing
+   * @param configuration
+   */
+  protected ExecutionSchedulerImpl(Configuration configuration) {
     this.configuration = configuration;
   }
 
@@ -157,14 +167,18 @@ public class ExecutionSchedulerImpl implements ExecutionScheduler {
   }
 
   @Override
-  public void scheduleJob(RequestExecution requestExecution, Schedule schedule)
-      throws AmbariException {
+  public void scheduleJob(Trigger trigger) throws SchedulerException {
+    scheduler.scheduleJob(trigger);
+  }
 
+  @Override
+  public void addJob(JobDetail jobDetail) throws SchedulerException {
+    scheduler.addJob(jobDetail, true);
   }
 
   @Override
-  public void scheduleJob(Trigger trigger) throws SchedulerException {
-    scheduler.scheduleJob(trigger);
+  public void deleteJob(JobKey jobKey) throws SchedulerException {
+    scheduler.deleteJob(jobKey);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
index 75c9f24..890fbba 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
@@ -106,4 +106,13 @@ public class BatchRequest implements Comparable<BatchRequest> {
     POST,
     DELETE
   }
+
+  @Override
+  public String toString() {
+    return "BatchRequest {" +
+      "orderId=" + orderId +
+      ", type=" + type +
+      ", uri='" + uri + '\'' +
+      '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
index 6e17389..7405706 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
@@ -21,14 +21,41 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.scheduler.AbstractLinearExecutionJob;
 import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
 
+import java.util.Map;
+
 public class BatchRequestJob extends AbstractLinearExecutionJob {
+  public static final String BATCH_REQUEST_EXECUTION_ID_KEY =
+    "BatchRequestJob.ExecutionId";
+  public static final String BATCH_REQUEST_BATCH_ID_KEY =
+    "BatchRequestJob.BatchId";
 
   public BatchRequestJob(ExecutionScheduleManager executionScheduleManager) {
     super(executionScheduleManager);
   }
 
   @Override
-  protected void doWork() throws AmbariException {
+  protected void doWork(Map<String, Object> properties) throws AmbariException {
+
+    String executionId = properties.get(BATCH_REQUEST_EXECUTION_ID_KEY) != null ?
+      (String) properties.get(BATCH_REQUEST_EXECUTION_ID_KEY) : null;
+    String batchId = properties.get(BATCH_REQUEST_BATCH_ID_KEY) != null ?
+      (String) properties.get(BATCH_REQUEST_BATCH_ID_KEY) : null;
+
+
+    if (executionId == null || batchId == null) {
+      throw new AmbariException("Unable to retrieve persisted batch request"
+        + ", execution_id = " + executionId
+        + ", batch_id = " + batchId);
+    }
+
+    Long requestId = executionScheduleManager.executeBatchRequest
+      (Long.parseLong(executionId), Long.parseLong(batchId));
+
+    if (requestId != null) {
+      // Wait on request completion
 
+      BatchRequestResponse batchRequestResponse =
+        executionScheduleManager.getBatchRequestResponse(requestId);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java
new file mode 100644
index 0000000..2710ffa
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.scheduler;
+
+/**
+ * Captures the response of a batch request. Provides functionality to
+ * capture the status of @Request status and @Task status in order to allow
+ * tolerance calculations
+ */
+public class BatchRequestResponse {
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
index 20c651f..014e6bd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
@@ -264,13 +264,11 @@ public class RequestExecutionImpl implements RequestExecution {
     if (batch != null) {
       List<BatchRequest> batchRequests = batch.getBatchRequests();
       if (batchRequests != null) {
-        // Sort by orderId and assign increasing batch id
         Collections.sort(batchRequests);
-        Long batchId = 1L;
         for (BatchRequest batchRequest : batchRequests) {
           RequestScheduleBatchRequestEntity batchRequestEntity = new
             RequestScheduleBatchRequestEntity();
-          batchRequestEntity.setBatchId(batchId);
+          batchRequestEntity.setBatchId(batchRequest.getOrderId());
           batchRequestEntity.setScheduleId(requestScheduleEntity.getScheduleId());
           batchRequestEntity.setRequestScheduleEntity(requestScheduleEntity);
           batchRequestEntity.setRequestType(batchRequest.getType());
@@ -283,7 +281,6 @@ public class RequestExecutionImpl implements RequestExecution {
           requestScheduleEntity.getRequestScheduleBatchRequestEntities().add
             (batchRequestEntity);
           requestScheduleDAO.merge(requestScheduleEntity);
-          batchId++;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/Schedule.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/Schedule.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/Schedule.java
index ff5fb76..38fbeac 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/Schedule.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/Schedule.java
@@ -148,6 +148,31 @@ public class Schedule {
       && (endTime == null || endTime.isEmpty());
   }
 
+  /**
+   * Return a cron expression from the schedule fields.
+   * Example: "0 0 12 * * ?"
+   * @return
+   */
+  public String getScheduleExpression() {
+    StringBuilder expression = new StringBuilder();
+    expression.append("0"); // seconds
+    expression.append(" ");
+    expression.append(minutes);
+    expression.append(" ");
+    expression.append(hours);
+    expression.append(" ");
+    expression.append(daysOfMonth);
+    expression.append(" ");
+    expression.append(month);
+    expression.append(" ");
+    expression.append(dayOfWeek);
+    if (year != null && !year.isEmpty()) {
+      expression.append(" ");
+      expression.append(year);
+    }
+    return expression.toString();
+  }
+
   @Override
   public int hashCode() {
     int result = minutes != null ? minutes.hashCode() : 0;
@@ -163,7 +188,7 @@ public class Schedule {
 
   @Override
   public String toString() {
-    return "Schedule{" +
+    return "Schedule {" +
       "minutes='" + minutes + '\'' +
       ", hours='" + hours + '\'' +
       ", days_of_month='" + daysOfMonth + '\'' +

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/utils/DateUtils.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/DateUtils.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/DateUtils.java
index e2cc65f..11875ed 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/utils/DateUtils.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/DateUtils.java
@@ -26,6 +26,8 @@ import java.util.Date;
  */
 public class DateUtils {
 
+  public static final String ALLOWED_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
+
   /**
    * Milliseconds to readable format in current server timezone
    * @param timestamp
@@ -52,6 +54,28 @@ public class DateUtils {
   }
 
   /**
+   * Convert from supported format to Date
+   * @param date
+   * @return
+   * @throws ParseException
+   */
+  public static Date convertToDate(String date) throws ParseException {
+    SimpleDateFormat sdf = new SimpleDateFormat(ALLOWED_DATE_FORMAT);
+    return sdf.parse(date);
+  }
+
+  /**
+   * Convert Date to allowed format
+   * @param date
+   * @return
+   * @throws ParseException
+   */
+  public static String convertDateToString(Date date) throws ParseException {
+    SimpleDateFormat sdf = new SimpleDateFormat(ALLOWED_DATE_FORMAT);
+    return sdf.format(date);
+  }
+
+  /**
    * Get difference in minutes between old date and now
    * @param oldTime
    * @return

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
index 622e0ca..11b14c8 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
@@ -183,20 +183,7 @@ CREATE TABLE clusterEvent (
   host TEXT, rack TEXT
 );
 
--- org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
-
-DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
-DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
-DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
-DROP TABLE IF EXISTS QRTZ_LOCKS;
-DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
-DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
-DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
-DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
-DROP TABLE IF EXISTS QRTZ_TRIGGERS;
-DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
-DROP TABLE IF EXISTS QRTZ_CALENDARS;
-
+-- Quartz tables
 
 CREATE TABLE QRTZ_JOB_DETAILS
   (

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
index b6912d9..047b6ab 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
@@ -174,115 +174,8 @@ BEGIN;
   SELECT 'version', '${ambariVersion}';
 COMMIT;
 
-
--- ambari log4j DDL
-
---------------------------------------------------
-----------initialisation of mapred db-------------
---------------------------------------------------
-CREATE DATABASE ambarirca;
-\connect ambarirca;
-
---CREATE ROLE "mapred" LOGIN ENCRYPTED PASSWORD 'mapred';
-CREATE USER "mapred" WITH PASSWORD 'mapred';
-GRANT ALL PRIVILEGES ON DATABASE ambarirca TO "mapred";
-
-------create tables ang grant privileges to db user---------
-CREATE TABLE workflow (
-  workflowId       TEXT, workflowName TEXT,
-  parentWorkflowId TEXT,
-  workflowContext  TEXT, userName TEXT,
-  startTime        BIGINT, lastUpdateTime BIGINT,
-  numJobsTotal     INTEGER, numJobsCompleted INTEGER,
-  inputBytes       BIGINT, outputBytes BIGINT,
-  duration         BIGINT,
-  PRIMARY KEY (workflowId),
-  FOREIGN KEY (parentWorkflowId) REFERENCES workflow (workflowId) ON DELETE CASCADE
-);
-GRANT ALL PRIVILEGES ON TABLE workflow TO "mapred";
-
-CREATE TABLE job (
-  jobId        TEXT, workflowId TEXT, jobName TEXT, workflowEntityName TEXT,
-  userName     TEXT, queue TEXT, acls TEXT, confPath TEXT,
-  submitTime   BIGINT, launchTime BIGINT, finishTime BIGINT,
-  maps         INTEGER, reduces INTEGER, status TEXT, priority TEXT,
-  finishedMaps INTEGER, finishedReduces INTEGER,
-  failedMaps   INTEGER, failedReduces INTEGER,
-  mapsRuntime  BIGINT, reducesRuntime BIGINT,
-  mapCounters  TEXT, reduceCounters TEXT, jobCounters TEXT,
-  inputBytes   BIGINT, outputBytes BIGINT,
-  PRIMARY KEY (jobId),
-  FOREIGN KEY (workflowId) REFERENCES workflow (workflowId) ON DELETE CASCADE
-);
-GRANT ALL PRIVILEGES ON TABLE job TO "mapred";
-
-CREATE TABLE task (
-  taskId        TEXT, jobId TEXT, taskType TEXT, splits TEXT,
-  startTime     BIGINT, finishTime BIGINT, status TEXT, error TEXT, counters TEXT,
-  failedAttempt TEXT,
-  PRIMARY KEY (taskId),
-  FOREIGN KEY (jobId) REFERENCES job (jobId) ON DELETE CASCADE
-);
-GRANT ALL PRIVILEGES ON TABLE task TO "mapred";
-
-CREATE TABLE taskAttempt (
-  taskAttemptId TEXT, taskId TEXT, jobId TEXT, taskType TEXT, taskTracker TEXT,
-  startTime     BIGINT, finishTime BIGINT,
-  mapFinishTime BIGINT, shuffleFinishTime BIGINT, sortFinishTime BIGINT,
-  locality      TEXT, avataar TEXT,
-  status        TEXT, error TEXT, counters TEXT,
-  inputBytes    BIGINT, outputBytes BIGINT,
-  PRIMARY KEY (taskAttemptId),
-  FOREIGN KEY (jobId) REFERENCES job (jobId) ON DELETE CASCADE,
-  FOREIGN KEY (taskId) REFERENCES task (taskId) ON DELETE CASCADE
-);
-GRANT ALL PRIVILEGES ON TABLE taskAttempt TO "mapred";
-
-CREATE TABLE hdfsEvent (
-  timestamp   BIGINT,
-  userName    TEXT,
-  clientIP    TEXT,
-  operation   TEXT,
-  srcPath     TEXT,
-  dstPath     TEXT,
-  permissions TEXT
-);
-GRANT ALL PRIVILEGES ON TABLE hdfsEvent TO "mapred";
-
-CREATE TABLE mapreduceEvent (
-  timestamp   BIGINT,
-  userName    TEXT,
-  clientIP    TEXT,
-  operation   TEXT,
-  target      TEXT,
-  result      TEXT,
-  description TEXT,
-  permissions TEXT
-);
-GRANT ALL PRIVILEGES ON TABLE mapreduceEvent TO "mapred";
-
-CREATE TABLE clusterEvent (
-  timestamp BIGINT,
-  service   TEXT, status TEXT,
-  error     TEXT, data TEXT,
-  host      TEXT, rack TEXT
-);
-GRANT ALL PRIVILEGES ON TABLE clusterEvent TO "mapred";
-
 -- Quartz tables
 
-drop table qrtz_fired_triggers;
-DROP TABLE QRTZ_PAUSED_TRIGGER_GRPS;
-DROP TABLE QRTZ_SCHEDULER_STATE;
-DROP TABLE QRTZ_LOCKS;
-drop table qrtz_simple_triggers;
-drop table qrtz_cron_triggers;
-drop table qrtz_simprop_triggers;
-DROP TABLE QRTZ_BLOB_TRIGGERS;
-drop table qrtz_triggers;
-drop table qrtz_job_details;
-drop table qrtz_calendars;
-
 CREATE TABLE qrtz_job_details
   (
     SCHED_NAME VARCHAR(120) NOT NULL,
@@ -453,3 +346,96 @@ create index idx_qrtz_ft_tg on qrtz_fired_triggers(SCHED_NAME,TRIGGER_GROUP);
 
 commit;
 
+-- ambari log4j DDL
+
+--------------------------------------------------
+----------initialisation of mapred db-------------
+--------------------------------------------------
+CREATE DATABASE ambarirca;
+\connect ambarirca;
+
+--CREATE ROLE "mapred" LOGIN ENCRYPTED PASSWORD 'mapred';
+CREATE USER "mapred" WITH PASSWORD 'mapred';
+GRANT ALL PRIVILEGES ON DATABASE ambarirca TO "mapred";
+
+------create tables ang grant privileges to db user---------
+CREATE TABLE workflow (
+  workflowId       TEXT, workflowName TEXT,
+  parentWorkflowId TEXT,
+  workflowContext  TEXT, userName TEXT,
+  startTime        BIGINT, lastUpdateTime BIGINT,
+  numJobsTotal     INTEGER, numJobsCompleted INTEGER,
+  inputBytes       BIGINT, outputBytes BIGINT,
+  duration         BIGINT,
+  PRIMARY KEY (workflowId),
+  FOREIGN KEY (parentWorkflowId) REFERENCES workflow (workflowId) ON DELETE CASCADE
+);
+GRANT ALL PRIVILEGES ON TABLE workflow TO "mapred";
+
+CREATE TABLE job (
+  jobId        TEXT, workflowId TEXT, jobName TEXT, workflowEntityName TEXT,
+  userName     TEXT, queue TEXT, acls TEXT, confPath TEXT,
+  submitTime   BIGINT, launchTime BIGINT, finishTime BIGINT,
+  maps         INTEGER, reduces INTEGER, status TEXT, priority TEXT,
+  finishedMaps INTEGER, finishedReduces INTEGER,
+  failedMaps   INTEGER, failedReduces INTEGER,
+  mapsRuntime  BIGINT, reducesRuntime BIGINT,
+  mapCounters  TEXT, reduceCounters TEXT, jobCounters TEXT,
+  inputBytes   BIGINT, outputBytes BIGINT,
+  PRIMARY KEY (jobId),
+  FOREIGN KEY (workflowId) REFERENCES workflow (workflowId) ON DELETE CASCADE
+);
+GRANT ALL PRIVILEGES ON TABLE job TO "mapred";
+
+CREATE TABLE task (
+  taskId        TEXT, jobId TEXT, taskType TEXT, splits TEXT,
+  startTime     BIGINT, finishTime BIGINT, status TEXT, error TEXT, counters TEXT,
+  failedAttempt TEXT,
+  PRIMARY KEY (taskId),
+  FOREIGN KEY (jobId) REFERENCES job (jobId) ON DELETE CASCADE
+);
+GRANT ALL PRIVILEGES ON TABLE task TO "mapred";
+
+CREATE TABLE taskAttempt (
+  taskAttemptId TEXT, taskId TEXT, jobId TEXT, taskType TEXT, taskTracker TEXT,
+  startTime     BIGINT, finishTime BIGINT,
+  mapFinishTime BIGINT, shuffleFinishTime BIGINT, sortFinishTime BIGINT,
+  locality      TEXT, avataar TEXT,
+  status        TEXT, error TEXT, counters TEXT,
+  inputBytes    BIGINT, outputBytes BIGINT,
+  PRIMARY KEY (taskAttemptId),
+  FOREIGN KEY (jobId) REFERENCES job (jobId) ON DELETE CASCADE,
+  FOREIGN KEY (taskId) REFERENCES task (taskId) ON DELETE CASCADE
+);
+GRANT ALL PRIVILEGES ON TABLE taskAttempt TO "mapred";
+
+CREATE TABLE hdfsEvent (
+  timestamp   BIGINT,
+  userName    TEXT,
+  clientIP    TEXT,
+  operation   TEXT,
+  srcPath     TEXT,
+  dstPath     TEXT,
+  permissions TEXT
+);
+GRANT ALL PRIVILEGES ON TABLE hdfsEvent TO "mapred";
+
+CREATE TABLE mapreduceEvent (
+  timestamp   BIGINT,
+  userName    TEXT,
+  clientIP    TEXT,
+  operation   TEXT,
+  target      TEXT,
+  result      TEXT,
+  description TEXT,
+  permissions TEXT
+);
+GRANT ALL PRIVILEGES ON TABLE mapreduceEvent TO "mapred";
+
+CREATE TABLE clusterEvent (
+  timestamp BIGINT,
+  service   TEXT, status TEXT,
+  error     TEXT, data TEXT,
+  host      TEXT, rack TEXT
+);
+GRANT ALL PRIVILEGES ON TABLE clusterEvent TO "mapred";
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/resources/Ambari-DDL-Postgres-REMOTE-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-REMOTE-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-REMOTE-CREATE.sql
index e89f72f..ff251c8 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-REMOTE-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-REMOTE-CREATE.sql
@@ -104,94 +104,8 @@ select 'version','${ambariVersion}';
 
 COMMIT;
 
-CREATE TABLE workflow (
-  workflowId TEXT, workflowName TEXT,
-  parentWorkflowId TEXT,
-  workflowContext TEXT, userName TEXT,
-  startTime BIGINT, lastUpdateTime BIGINT,
-  numJobsTotal INTEGER, numJobsCompleted INTEGER,
-  inputBytes BIGINT, outputBytes BIGINT,
-  duration BIGINT,
-  PRIMARY KEY (workflowId),
-  FOREIGN KEY (parentWorkflowId) REFERENCES workflow(workflowId) ON DELETE CASCADE
-);
-
-CREATE TABLE job (
-  jobId TEXT, workflowId TEXT, jobName TEXT, workflowEntityName TEXT,
-  userName TEXT, queue TEXT, acls TEXT, confPath TEXT,
-  submitTime BIGINT, launchTime BIGINT, finishTime BIGINT,
-  maps INTEGER, reduces INTEGER, status TEXT, priority TEXT,
-  finishedMaps INTEGER, finishedReduces INTEGER,
-  failedMaps INTEGER, failedReduces INTEGER,
-  mapsRuntime BIGINT, reducesRuntime BIGINT,
-  mapCounters TEXT, reduceCounters TEXT, jobCounters TEXT,
-  inputBytes BIGINT, outputBytes BIGINT,
-  PRIMARY KEY(jobId),
-  FOREIGN KEY(workflowId) REFERENCES workflow(workflowId) ON DELETE CASCADE
-);
-
-CREATE TABLE task (
-  taskId TEXT, jobId TEXT, taskType TEXT, splits TEXT,
-  startTime BIGINT, finishTime BIGINT, status TEXT, error TEXT, counters TEXT,
-  failedAttempt TEXT,
-  PRIMARY KEY(taskId),
-  FOREIGN KEY(jobId) REFERENCES job(jobId) ON DELETE CASCADE
-);
-
-CREATE TABLE taskAttempt (
-  taskAttemptId TEXT, taskId TEXT, jobId TEXT, taskType TEXT, taskTracker TEXT,
-  startTime BIGINT, finishTime BIGINT,
-  mapFinishTime BIGINT, shuffleFinishTime BIGINT, sortFinishTime BIGINT,
-  locality TEXT, avataar TEXT,
-  status TEXT, error TEXT, counters TEXT,
-  inputBytes BIGINT, outputBytes BIGINT,
-  PRIMARY KEY(taskAttemptId),
-  FOREIGN KEY(jobId) REFERENCES job(jobId) ON DELETE CASCADE,
-  FOREIGN KEY(taskId) REFERENCES task(taskId) ON DELETE CASCADE
-);
-
-CREATE TABLE hdfsEvent (
-  timestamp BIGINT,
-  userName TEXT,
-  clientIP TEXT,
-  operation TEXT,
-  srcPath TEXT,
-  dstPath TEXT,
-  permissions TEXT
-);
-
-CREATE TABLE mapreduceEvent (
-  timestamp BIGINT,
-  userName TEXT,
-  clientIP TEXT,
-  operation TEXT,
-  target TEXT,
-  result TEXT,
-  description TEXT,
-  permissions TEXT
-);
-
-CREATE TABLE clusterEvent (
-  timestamp BIGINT,
-  service TEXT, status TEXT,
-  error TEXT, data TEXT ,
-  host TEXT, rack TEXT
-);
-
 -- Quartz tables
 
-drop table qrtz_fired_triggers;
-DROP TABLE QRTZ_PAUSED_TRIGGER_GRPS;
-DROP TABLE QRTZ_SCHEDULER_STATE;
-DROP TABLE QRTZ_LOCKS;
-drop table qrtz_simple_triggers;
-drop table qrtz_cron_triggers;
-drop table qrtz_simprop_triggers;
-DROP TABLE QRTZ_BLOB_TRIGGERS;
-drop table qrtz_triggers;
-drop table qrtz_job_details;
-drop table qrtz_calendars;
-
 CREATE TABLE qrtz_job_details
   (
     SCHED_NAME VARCHAR(120) NOT NULL,
@@ -361,3 +275,77 @@ create index idx_qrtz_ft_t_g on qrtz_fired_triggers(SCHED_NAME,TRIGGER_NAME,TRIG
 create index idx_qrtz_ft_tg on qrtz_fired_triggers(SCHED_NAME,TRIGGER_GROUP);
 
 commit;
+
+CREATE TABLE workflow (
+  workflowId TEXT, workflowName TEXT,
+  parentWorkflowId TEXT,
+  workflowContext TEXT, userName TEXT,
+  startTime BIGINT, lastUpdateTime BIGINT,
+  numJobsTotal INTEGER, numJobsCompleted INTEGER,
+  inputBytes BIGINT, outputBytes BIGINT,
+  duration BIGINT,
+  PRIMARY KEY (workflowId),
+  FOREIGN KEY (parentWorkflowId) REFERENCES workflow(workflowId) ON DELETE CASCADE
+);
+
+CREATE TABLE job (
+  jobId TEXT, workflowId TEXT, jobName TEXT, workflowEntityName TEXT,
+  userName TEXT, queue TEXT, acls TEXT, confPath TEXT,
+  submitTime BIGINT, launchTime BIGINT, finishTime BIGINT,
+  maps INTEGER, reduces INTEGER, status TEXT, priority TEXT,
+  finishedMaps INTEGER, finishedReduces INTEGER,
+  failedMaps INTEGER, failedReduces INTEGER,
+  mapsRuntime BIGINT, reducesRuntime BIGINT,
+  mapCounters TEXT, reduceCounters TEXT, jobCounters TEXT,
+  inputBytes BIGINT, outputBytes BIGINT,
+  PRIMARY KEY(jobId),
+  FOREIGN KEY(workflowId) REFERENCES workflow(workflowId) ON DELETE CASCADE
+);
+
+CREATE TABLE task (
+  taskId TEXT, jobId TEXT, taskType TEXT, splits TEXT,
+  startTime BIGINT, finishTime BIGINT, status TEXT, error TEXT, counters TEXT,
+  failedAttempt TEXT,
+  PRIMARY KEY(taskId),
+  FOREIGN KEY(jobId) REFERENCES job(jobId) ON DELETE CASCADE
+);
+
+CREATE TABLE taskAttempt (
+  taskAttemptId TEXT, taskId TEXT, jobId TEXT, taskType TEXT, taskTracker TEXT,
+  startTime BIGINT, finishTime BIGINT,
+  mapFinishTime BIGINT, shuffleFinishTime BIGINT, sortFinishTime BIGINT,
+  locality TEXT, avataar TEXT,
+  status TEXT, error TEXT, counters TEXT,
+  inputBytes BIGINT, outputBytes BIGINT,
+  PRIMARY KEY(taskAttemptId),
+  FOREIGN KEY(jobId) REFERENCES job(jobId) ON DELETE CASCADE,
+  FOREIGN KEY(taskId) REFERENCES task(taskId) ON DELETE CASCADE
+);
+
+CREATE TABLE hdfsEvent (
+  timestamp BIGINT,
+  userName TEXT,
+  clientIP TEXT,
+  operation TEXT,
+  srcPath TEXT,
+  dstPath TEXT,
+  permissions TEXT
+);
+
+CREATE TABLE mapreduceEvent (
+  timestamp BIGINT,
+  userName TEXT,
+  clientIP TEXT,
+  operation TEXT,
+  target TEXT,
+  result TEXT,
+  description TEXT,
+  permissions TEXT
+);
+
+CREATE TABLE clusterEvent (
+  timestamp BIGINT,
+  service TEXT, status TEXT,
+  error TEXT, data TEXT ,
+  host TEXT, rack TEXT
+);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/test/java/org/apache/ambari/server/api/services/parsers/JsonRequestBodyParserTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/api/services/parsers/JsonRequestBodyParserTest.java b/ambari-server/src/test/java/org/apache/ambari/server/api/services/parsers/JsonRequestBodyParserTest.java
index 8f065d5..1c28452 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/api/services/parsers/JsonRequestBodyParserTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/api/services/parsers/JsonRequestBodyParserTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.ambari.server.api.services.parsers;
 
+import junit.framework.Assert;
 import org.apache.ambari.server.api.services.NamedPropertySet;
 import org.apache.ambari.server.api.services.RequestBody;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
@@ -147,6 +148,7 @@ public class JsonRequestBodyParserTest {
 
   String bodyWithRequestInfoProperties = "{ \"RequestInfo\" : { \"query\" : \"foo=bar\", \"prop1\" : \"val1\", \"prop2\" : \"val2\" }, \"Body\":" + serviceJson + "}";
 
+  String bodyWithRequestBlobProperties = "{ \"RequestBodyInfo\" : { " +"\"RequestInfo\" : { \"query\" : \"foo=bar\", \"prop1\" : \"val1\", \"prop2\" : \"val2\" }, \"Body\":" + serviceJson + "} }";
 
   @Test
   public void testParse() throws BodyParseException {
@@ -561,8 +563,44 @@ public class JsonRequestBodyParserTest {
 
     Set<NamedPropertySet> setProps2 = body.getNamedPropertySets();
     assertEquals(mapExpected, setProps2.iterator().next().getProperties());
+  }
+
+  @Test
+  public void testRequestBlobProperties() throws Exception {
+    RequestBodyParser parser = new JsonRequestBodyParser();
+    RequestBody body = parser.parse(bodyWithRequestBlobProperties).iterator().next();
+
+    Set<NamedPropertySet> setProps = body.getNamedPropertySets();
+    assertEquals(1, setProps.size());
+
+    String requestBlob = null;
+
+    for (NamedPropertySet ps : setProps) {
+      assertEquals("", ps.getName());
+      Map<String, Object> mapProps = ps.getProperties();
+
+      for (Map.Entry<String, Object> entry : mapProps.entrySet()) {
+        if (entry.getKey().equals(JsonRequestBodyParser.REQUEST_BLOB_TITLE)) {
+          requestBlob = (String) entry.getValue();
+        }
+      }
+    }
 
+    Assert.assertNotNull(requestBlob);
+    body = parser.parse(requestBlob).iterator().next();
+
+    Map<String, Object> mapExpected = new HashMap<String, Object>();
+    mapExpected.put(PropertyHelper.getPropertyId("Services", "service_name"), "HDFS");
+    mapExpected.put(PropertyHelper.getPropertyId("Services", "display_name"), "HDFS");
+    mapExpected.put(PropertyHelper.getPropertyId("ServiceInfo", "cluster_name"), "tbmetrictest");
+    mapExpected.put(PropertyHelper.getPropertyId("Services", "description"), "Apache Hadoop Distributed File System");
+    mapExpected.put(PropertyHelper.getPropertyId("ServiceInfo", "state"), "STARTED");
+    mapExpected.put(PropertyHelper.getPropertyId("OuterCategory", "propName"), "100");
+    mapExpected.put(PropertyHelper.getPropertyId("OuterCategory/nested1/nested2", "innerPropName"), "innerPropValue");
+    mapExpected.put(PropertyHelper.getPropertyId(null, "topLevelProp"), "value");
 
+    Set<NamedPropertySet> setProps2 = body.getNamedPropertySets();
+    assertEquals(mapExpected, setProps2.iterator().next().getProperties());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProviderTest.java
index 85a97f6..e391a3b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProviderTest.java
@@ -26,6 +26,7 @@ import org.apache.ambari.server.controller.spi.Request;
 import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.utilities.PredicateBuilder;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.scheduler.Batch;
@@ -77,9 +78,13 @@ public class RequestScheduleResourceProviderTest {
     RequestExecutionFactory executionFactory = createNiceMock
       (RequestExecutionFactory.class);
     RequestExecution requestExecution = createNiceMock(RequestExecution.class);
+    ExecutionScheduleManager executionScheduleManager = createNiceMock
+      (ExecutionScheduleManager.class);
 
-    expect(managementController.getClusters()).andReturn(clusters);
     expect(clusters.getCluster("Cluster100")).andReturn(cluster).anyTimes();
+    expect(managementController.getClusters()).andReturn(clusters);
+    expect(managementController.getExecutionScheduleManager()).andReturn
+      (executionScheduleManager).anyTimes();
     expect(managementController.getRequestExecutionFactory()).andReturn
       (executionFactory);
     expect(managementController.getAuthName()).andReturn("admin").anyTimes();
@@ -92,7 +97,7 @@ public class RequestScheduleResourceProviderTest {
       capture(batchCapture), capture(scheduleCapture))).andReturn(requestExecution);
 
     replay(managementController, clusters, cluster, executionFactory,
-      requestExecution, response);
+      requestExecution, response, executionScheduleManager);
 
     RequestScheduleResourceProvider resourceProvider = getResourceProvider
       (managementController);
@@ -157,7 +162,7 @@ public class RequestScheduleResourceProviderTest {
     resourceProvider.createResources(request);
 
     verify(managementController, clusters, cluster, executionFactory,
-      requestExecution, response);
+      requestExecution, response, executionScheduleManager);
 
     List<BatchRequest> testRequests = batchCapture.getValue().getBatchRequests();
     Assert.assertNotNull(testRequests);
@@ -185,10 +190,14 @@ public class RequestScheduleResourceProviderTest {
     final RequestExecution requestExecution = createNiceMock(RequestExecution.class);
     RequestScheduleResponse requestScheduleResponse = createNiceMock
       (RequestScheduleResponse.class);
+    ExecutionScheduleManager executionScheduleManager = createNiceMock
+      (ExecutionScheduleManager.class);
 
     expect(managementController.getClusters()).andReturn(clusters).anyTimes();
     expect(clusters.getCluster("Cluster100")).andReturn(cluster).anyTimes();
     expect(managementController.getAuthName()).andReturn("admin").anyTimes();
+    expect(managementController.getExecutionScheduleManager()).andReturn
+      (executionScheduleManager).anyTimes();
 
     expect(requestExecution.getId()).andReturn(25L).anyTimes();
     expect(requestExecution.convertToResponse()).andReturn
@@ -208,7 +217,7 @@ public class RequestScheduleResourceProviderTest {
     });
 
     replay(managementController, clusters, cluster, requestExecution,
-      response, requestScheduleResponse);
+      response, requestScheduleResponse, executionScheduleManager);
 
     RequestScheduleResourceProvider resourceProvider = getResourceProvider
       (managementController);
@@ -279,7 +288,7 @@ public class RequestScheduleResourceProviderTest {
     resourceProvider.updateResources(request, predicate);
 
     verify(managementController, clusters, cluster, requestExecution,
-      response, requestScheduleResponse);
+      response, requestScheduleResponse, executionScheduleManager);
   }
 
   @Test
@@ -369,14 +378,23 @@ public class RequestScheduleResourceProviderTest {
     AmbariManagementController managementController = createMock(AmbariManagementController.class);
     Clusters clusters = createNiceMock(Clusters.class);
     Cluster cluster = createNiceMock(Cluster.class);
+    RequestExecution requestExecution = createNiceMock(RequestExecution.class);
+    ExecutionScheduleManager executionScheduleManager = createNiceMock
+      (ExecutionScheduleManager.class);
+
+    Map<Long, RequestExecution> requestExecutionMap = new HashMap<Long,
+      RequestExecution>();
+    requestExecutionMap.put(1L, requestExecution);
 
     expect(managementController.getAuthName()).andReturn("admin").anyTimes();
     expect(managementController.getClusters()).andReturn(clusters).anyTimes();
+    expect(managementController.getExecutionScheduleManager()).andReturn
+      (executionScheduleManager).anyTimes();
     expect(clusters.getCluster("Cluster100")).andReturn(cluster).anyTimes();
+    expect(cluster.getAllRequestExecutions()).andReturn(requestExecutionMap);
 
-    cluster.deleteRequestExecution(1L);
-
-    replay(managementController, clusters, cluster);
+    replay(managementController, clusters, cluster, executionScheduleManager,
+      requestExecution );
 
     RequestScheduleResourceProvider resourceProvider = getResourceProvider
       (managementController);
@@ -399,6 +417,7 @@ public class RequestScheduleResourceProviderTest {
     Assert.assertEquals(predicate, lastEvent.getPredicate());
     Assert.assertNull(lastEvent.getRequest());
 
-    verify(managementController, clusters, cluster);
+    verify(managementController, clusters, cluster, executionScheduleManager,
+      requestExecution);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/test/java/org/apache/ambari/server/orm/InMemoryDefaultTestModule.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/InMemoryDefaultTestModule.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/InMemoryDefaultTestModule.java
index 06d553d..c9a392d 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/orm/InMemoryDefaultTestModule.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/InMemoryDefaultTestModule.java
@@ -21,6 +21,9 @@ package org.apache.ambari.server.orm;
 import com.google.inject.AbstractModule;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.ControllerModule;
+import org.apache.ambari.server.scheduler.ExecutionScheduleManagerTest;
+import org.apache.ambari.server.scheduler.ExecutionScheduler;
+
 import java.util.Properties;
 
 public class InMemoryDefaultTestModule extends AbstractModule {

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
new file mode 100644
index 0000000..926cb7a
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.scheduler;
+
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.persist.PersistService;
+import com.google.inject.persist.Transactional;
+import com.google.inject.util.Modules;
+import junit.framework.Assert;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.orm.GuiceJpaInitializer;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.scheduler.Batch;
+import org.apache.ambari.server.state.scheduler.BatchRequest;
+import org.apache.ambari.server.state.scheduler.BatchSettings;
+import org.apache.ambari.server.state.scheduler.RequestExecution;
+import org.apache.ambari.server.state.scheduler.RequestExecutionFactory;
+import org.apache.ambari.server.state.scheduler.Schedule;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.quartz.CronTrigger;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.impl.StdSchedulerFactory;
+import org.quartz.impl.matchers.GroupMatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+public class ExecutionScheduleManagerTest {
+  private Clusters clusters;
+  private Cluster cluster;
+  private String clusterName;
+  private Injector injector;
+  private AmbariMetaInfo metaInfo;
+  private ExecutionScheduleManager executionScheduleManager;
+  private RequestExecutionFactory requestExecutionFactory;
+  private ExecutionScheduler executionScheduler;
+  private Scheduler scheduler;
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(ExecutionScheduleManagerTest.class);
+
+  @Before
+  public void setup() throws Exception {
+    injector  = Guice.createInjector(Modules.override(
+      new InMemoryDefaultTestModule()).with(new ExecutionSchedulerTestModule()));
+    injector.getInstance(GuiceJpaInitializer.class);
+    clusters = injector.getInstance(Clusters.class);
+    metaInfo = injector.getInstance(AmbariMetaInfo.class);
+    executionScheduleManager = injector.getInstance(ExecutionScheduleManager.class);
+    executionScheduler = injector.getInstance(ExecutionScheduler.class);
+    requestExecutionFactory = injector.getInstance(RequestExecutionFactory.class);
+
+    metaInfo.init();
+    clusterName = "c1";
+    clusters.addCluster(clusterName);
+    cluster = clusters.getCluster(clusterName);
+    cluster.setDesiredStackVersion(new StackId("HDP-0.1"));
+    Assert.assertNotNull(cluster);
+    assertThat(executionScheduler, instanceOf(TestExecutionScheduler.class));
+
+    TestExecutionScheduler testExecutionScheduler = (TestExecutionScheduler)
+      executionScheduler;
+    scheduler = testExecutionScheduler.getScheduler();
+    Assert.assertNotNull(scheduler);
+
+    executionScheduleManager.start();
+  }
+
+  @After
+  public void teardown() throws Exception {
+    executionScheduleManager.stop();
+    injector.getInstance(PersistService.class).stop();
+  }
+
+  public static class TestExecutionScheduler extends ExecutionSchedulerImpl {
+    @Inject
+    public TestExecutionScheduler(Injector injector) {
+      super(injector);
+      try {
+        StdSchedulerFactory factory = new StdSchedulerFactory();
+        scheduler = factory.getScheduler();
+        isInitialized = true;
+      } catch (SchedulerException e) {
+        e.printStackTrace();
+        throw new ExceptionInInitializerError("Unable to instantiate " +
+          "scheduler");
+      }
+    }
+
+    public Scheduler getScheduler() {
+      return scheduler;
+    }
+  }
+
+  public class ExecutionSchedulerTestModule implements Module {
+    @Override
+    public void configure(Binder binder) {
+      binder.bind(ExecutionScheduler.class).to(TestExecutionScheduler.class);
+    }
+  }
+
+  @Transactional
+  private RequestExecution createRequestExecution() throws Exception {
+    Batch batches = new Batch();
+    Schedule schedule = new Schedule();
+
+    BatchSettings batchSettings = new BatchSettings();
+    batchSettings.setTaskFailureToleranceLimit(10);
+    batches.setBatchSettings(batchSettings);
+
+    List<BatchRequest> batchRequests = new ArrayList<BatchRequest>();
+    BatchRequest batchRequest1 = new BatchRequest();
+    batchRequest1.setOrderId(10L);
+    batchRequest1.setType(BatchRequest.Type.DELETE);
+    batchRequest1.setUri("testUri1");
+
+    BatchRequest batchRequest2 = new BatchRequest();
+    batchRequest2.setOrderId(12L);
+    batchRequest2.setType(BatchRequest.Type.POST);
+    batchRequest2.setUri("testUri2");
+    batchRequest2.setBody("testBody");
+
+    batchRequests.add(batchRequest1);
+    batchRequests.add(batchRequest2);
+
+    batches.getBatchRequests().addAll(batchRequests);
+
+    schedule.setMinutes("10");
+    schedule.setHours("2");
+    schedule.setMonth("*");
+    schedule.setDaysOfMonth("*");
+    schedule.setDayOfWeek("?");
+
+    RequestExecution requestExecution = requestExecutionFactory.createNew
+      (cluster, batches, schedule);
+    requestExecution.setDescription("Test Schedule");
+
+    requestExecution.persist();
+
+    return requestExecution;
+  }
+
+  @Test
+  public void testScheduleBatch() throws Exception {
+    RequestExecution requestExecution = createRequestExecution();
+    Assert.assertNotNull(requestExecution);
+
+    executionScheduleManager.scheduleBatch(requestExecution);
+
+    String jobName1 = executionScheduleManager.getJobName(requestExecution
+      .getId(), 10L);
+    String jobName2 = executionScheduleManager.getJobName(requestExecution
+      .getId(), 12L);
+    JobDetail jobDetail1 = null;
+    JobDetail jobDetail2 = null;
+    Trigger trigger1 = null;
+    Trigger trigger2 = null;
+
+    // enumerate each job group
+    for(String group: scheduler.getJobGroupNames()) {
+      // enumerate each job in group
+      for(JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals
+          (ExecutionJob.LINEAR_EXECUTION_JOB_GROUP))) {
+        LOG.info("Found job identified by: " + jobKey);
+
+        String jobName = jobKey.getName();
+        String jobGroup = jobKey.getGroup();
+
+        List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
+        Trigger trigger = triggers != null && !triggers.isEmpty() ?
+          triggers.get(0) : null;
+        Date nextFireTime = trigger != null ? trigger.getNextFireTime() : null;
+
+        LOG.info("[jobName] : " + jobName + " [groupName] : "
+          + jobGroup + " - " + nextFireTime);
+
+        if (jobName.equals(jobName1)) {
+          jobDetail1 = scheduler.getJobDetail(jobKey);
+          trigger1 = trigger;
+        } else if (jobName.equals(jobName2)) {
+          jobDetail2 = scheduler.getJobDetail(jobKey);
+          trigger2 = trigger;
+        }
+      }
+    }
+
+    Assert.assertNotNull(jobDetail1);
+    Assert.assertNotNull(trigger1);
+    Assert.assertNotNull(jobDetail2);
+    Assert.assertNull(trigger2);
+
+    CronTrigger cronTrigger = (CronTrigger) trigger1;
+    Schedule schedule = new Schedule();
+    schedule.setMinutes("10");
+    schedule.setHours("2");
+    schedule.setMonth("*");
+    schedule.setDaysOfMonth("*");
+    schedule.setDayOfWeek("?");
+
+    Assert.assertEquals(schedule.getScheduleExpression(),
+      cronTrigger.getCronExpression());
+
+    Assert.assertEquals(jobName1, jobDetail1.getKey().getName());
+    Assert.assertEquals(jobName2, jobDetail2.getKey().getName());
+  }
+
+  @Test
+  public void testDeleteAllJobs() throws Exception {
+    RequestExecution requestExecution = createRequestExecution();
+    Assert.assertNotNull(requestExecution);
+
+    executionScheduleManager.scheduleBatch(requestExecution);
+
+    String jobName1 = executionScheduleManager.getJobName(requestExecution
+      .getId(), 10L);
+    String jobName2 = executionScheduleManager.getJobName(requestExecution
+      .getId(), 12L);
+
+    JobDetail jobDetail1 = scheduler.getJobDetail(JobKey.jobKey(jobName1,
+      ExecutionJob.LINEAR_EXECUTION_JOB_GROUP));
+    JobDetail jobDetail2 = scheduler.getJobDetail(JobKey.jobKey(jobName2,
+      ExecutionJob.LINEAR_EXECUTION_JOB_GROUP));
+
+    Assert.assertNotNull(jobDetail1);
+    Assert.assertNotNull(jobDetail2);
+    Assert.assertTrue(!scheduler.getTriggersOfJob(JobKey.jobKey(jobName1,
+      ExecutionJob.LINEAR_EXECUTION_JOB_GROUP)).isEmpty());
+
+    executionScheduleManager.deleteAllJobs(requestExecution);
+
+    Assert.assertTrue(scheduler.getTriggersOfJob(JobKey.jobKey(jobName1,
+      ExecutionJob.LINEAR_EXECUTION_JOB_GROUP)).isEmpty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionSchedulerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionSchedulerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionSchedulerTest.java
index 822f583..34a5d38 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionSchedulerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionSchedulerTest.java
@@ -68,7 +68,8 @@ public class ExecutionSchedulerTest {
   @Test
   public void testSchedulerInitialize() throws Exception {
 
-    ExecutionSchedulerImpl executionScheduler = spy(new ExecutionSchedulerImpl(configuration));
+    ExecutionSchedulerImpl executionScheduler =
+      spy(new ExecutionSchedulerImpl(configuration));
 
     Properties actualProperties = executionScheduler
       .getQuartzSchedulerProperties();

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigGroupTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigGroupTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigGroupTest.java
index ac63b22..71ed464 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigGroupTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigGroupTest.java
@@ -58,7 +58,7 @@ public class ConfigGroupTest {
 
   @Before
   public void setup() throws Exception {
-    injector = Guice.createInjector(new InMemoryDefaultTestModule());
+    injector  = Guice.createInjector(new InMemoryDefaultTestModule());
     injector.getInstance(GuiceJpaInitializer.class);
     clusters = injector.getInstance(Clusters.class);
     metaInfo = injector.getInstance(AmbariMetaInfo.class);

http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
index aba7e70..c2e3243 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
@@ -146,8 +146,8 @@ public class RequestExecutionTest {
     }
     Assert.assertNotNull(reqEntity1);
     Assert.assertNotNull(reqEntity2);
-    Assert.assertEquals(Long.valueOf(1L), reqEntity1.getBatchId());
-    Assert.assertEquals(Long.valueOf(2L), reqEntity2.getBatchId());
+    Assert.assertEquals(Long.valueOf(10L), reqEntity1.getBatchId());
+    Assert.assertEquals(Long.valueOf(12L), reqEntity2.getBatchId());
     Assert.assertEquals(BatchRequest.Type.DELETE.name(), reqEntity1.getRequestType());
     Assert.assertEquals(BatchRequest.Type.POST.name(), reqEntity2.getRequestType());
     Assert.assertEquals(requestExecution.getSchedule().getMinutes(),
@@ -213,8 +213,8 @@ public class RequestExecutionTest {
     }
     Assert.assertNotNull(reqEntity1);
     Assert.assertNotNull(reqEntity2);
-    Assert.assertEquals(Long.valueOf(1L), reqEntity1.getBatchId());
-    Assert.assertEquals(Long.valueOf(2L), reqEntity2.getBatchId());
+    Assert.assertEquals(Long.valueOf(10L), reqEntity1.getBatchId());
+    Assert.assertEquals(Long.valueOf(12L), reqEntity2.getBatchId());
     Assert.assertEquals(BatchRequest.Type.PUT.name(), reqEntity1.getRequestType());
     Assert.assertEquals(BatchRequest.Type.POST.name(), reqEntity2.getRequestType());
     Assert.assertEquals("11", scheduleEntity.getHours());
@@ -250,8 +250,8 @@ public class RequestExecutionTest {
     }
     Assert.assertNotNull(reqEntity1);
     Assert.assertNotNull(reqEntity2);
-    Assert.assertEquals(Long.valueOf(1L), reqEntity1.getBatchId());
-    Assert.assertEquals(Long.valueOf(2L), reqEntity2.getBatchId());
+    Assert.assertEquals(Long.valueOf(10L), reqEntity1.getBatchId());
+    Assert.assertEquals(Long.valueOf(12L), reqEntity2.getBatchId());
     Assert.assertEquals(BatchRequest.Type.DELETE.name(), reqEntity1.getRequestType());
     Assert.assertEquals(BatchRequest.Type.POST.name(), reqEntity2.getRequestType());
     Assert.assertEquals(requestExecution.getSchedule().getMinutes(),


[2/2] git commit: AMBARI-4150. Provide ability to batch requests based on schedule. Review patch. (swagle)

Posted by sw...@apache.org.
AMBARI-4150. Provide ability to batch requests based on schedule. Review patch. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1c7e66da
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1c7e66da
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1c7e66da

Branch: refs/heads/trunk
Commit: 1c7e66dadf2630b165c98d92f7c37613ed712775
Parents: 5da0039
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Jan 7 10:56:31 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Jan 7 10:56:31 2014 -0800

----------------------------------------------------------------------
 .../main/resources/Ambari-DDL-MySQL-CREATE.sql  | 341 +++++++++----------
 1 file changed, 170 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/1c7e66da/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
index 11b14c8..97a17ea 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
@@ -107,6 +107,176 @@ insert into user_roles(role_name, user_id)
 insert into metainfo(`metainfo_key`, `metainfo_value`)
   select 'version','${ambariVersion}';
 
+-- Quartz tables
+
+CREATE TABLE QRTZ_JOB_DETAILS
+(
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  JOB_NAME  VARCHAR(200) NOT NULL,
+  JOB_GROUP VARCHAR(200) NOT NULL,
+  DESCRIPTION VARCHAR(250) NULL,
+  JOB_CLASS_NAME   VARCHAR(250) NOT NULL,
+  IS_DURABLE VARCHAR(1) NOT NULL,
+  IS_NONCONCURRENT VARCHAR(1) NOT NULL,
+  IS_UPDATE_DATA VARCHAR(1) NOT NULL,
+  REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
+  JOB_DATA BLOB NULL,
+  PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
+);
+
+CREATE TABLE QRTZ_TRIGGERS
+(
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  TRIGGER_NAME VARCHAR(200) NOT NULL,
+  TRIGGER_GROUP VARCHAR(200) NOT NULL,
+  JOB_NAME  VARCHAR(200) NOT NULL,
+  JOB_GROUP VARCHAR(200) NOT NULL,
+  DESCRIPTION VARCHAR(250) NULL,
+  NEXT_FIRE_TIME BIGINT(13) NULL,
+  PREV_FIRE_TIME BIGINT(13) NULL,
+  PRIORITY INTEGER NULL,
+  TRIGGER_STATE VARCHAR(16) NOT NULL,
+  TRIGGER_TYPE VARCHAR(8) NOT NULL,
+  START_TIME BIGINT(13) NOT NULL,
+  END_TIME BIGINT(13) NULL,
+  CALENDAR_NAME VARCHAR(200) NULL,
+  MISFIRE_INSTR SMALLINT(2) NULL,
+  JOB_DATA BLOB NULL,
+  PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+  FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
+  REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
+);
+
+CREATE TABLE QRTZ_SIMPLE_TRIGGERS
+(
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  TRIGGER_NAME VARCHAR(200) NOT NULL,
+  TRIGGER_GROUP VARCHAR(200) NOT NULL,
+  REPEAT_COUNT BIGINT(7) NOT NULL,
+  REPEAT_INTERVAL BIGINT(12) NOT NULL,
+  TIMES_TRIGGERED BIGINT(10) NOT NULL,
+  PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+  FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+  REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_CRON_TRIGGERS
+(
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  TRIGGER_NAME VARCHAR(200) NOT NULL,
+  TRIGGER_GROUP VARCHAR(200) NOT NULL,
+  CRON_EXPRESSION VARCHAR(200) NOT NULL,
+  TIME_ZONE_ID VARCHAR(80),
+  PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+  FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+  REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_SIMPROP_TRIGGERS
+(
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  TRIGGER_NAME VARCHAR(200) NOT NULL,
+  TRIGGER_GROUP VARCHAR(200) NOT NULL,
+  STR_PROP_1 VARCHAR(512) NULL,
+  STR_PROP_2 VARCHAR(512) NULL,
+  STR_PROP_3 VARCHAR(512) NULL,
+  INT_PROP_1 INT NULL,
+  INT_PROP_2 INT NULL,
+  LONG_PROP_1 BIGINT NULL,
+  LONG_PROP_2 BIGINT NULL,
+  DEC_PROP_1 NUMERIC(13,4) NULL,
+  DEC_PROP_2 NUMERIC(13,4) NULL,
+  BOOL_PROP_1 VARCHAR(1) NULL,
+  BOOL_PROP_2 VARCHAR(1) NULL,
+  PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+  FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+  REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_BLOB_TRIGGERS
+(
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  TRIGGER_NAME VARCHAR(200) NOT NULL,
+  TRIGGER_GROUP VARCHAR(200) NOT NULL,
+  BLOB_DATA BLOB NULL,
+  PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+  FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+  REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_CALENDARS
+(
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  CALENDAR_NAME  VARCHAR(200) NOT NULL,
+  CALENDAR BLOB NOT NULL,
+  PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
+);
+
+CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS
+(
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  TRIGGER_GROUP  VARCHAR(200) NOT NULL,
+  PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_FIRED_TRIGGERS
+(
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  ENTRY_ID VARCHAR(95) NOT NULL,
+  TRIGGER_NAME VARCHAR(200) NOT NULL,
+  TRIGGER_GROUP VARCHAR(200) NOT NULL,
+  INSTANCE_NAME VARCHAR(200) NOT NULL,
+  FIRED_TIME BIGINT(13) NOT NULL,
+  SCHED_TIME BIGINT(13) NOT NULL,
+  PRIORITY INTEGER NOT NULL,
+  STATE VARCHAR(16) NOT NULL,
+  JOB_NAME VARCHAR(200) NULL,
+  JOB_GROUP VARCHAR(200) NULL,
+  IS_NONCONCURRENT VARCHAR(1) NULL,
+  REQUESTS_RECOVERY VARCHAR(1) NULL,
+  PRIMARY KEY (SCHED_NAME,ENTRY_ID)
+);
+
+CREATE TABLE QRTZ_SCHEDULER_STATE
+(
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  INSTANCE_NAME VARCHAR(200) NOT NULL,
+  LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
+  CHECKIN_INTERVAL BIGINT(13) NOT NULL,
+  PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
+);
+
+CREATE TABLE QRTZ_LOCKS
+(
+  SCHED_NAME VARCHAR(120) NOT NULL,
+  LOCK_NAME  VARCHAR(40) NOT NULL,
+  PRIMARY KEY (SCHED_NAME,LOCK_NAME)
+);
+
+create index idx_qrtz_j_req_recovery on QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
+create index idx_qrtz_j_grp on QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);
+
+create index idx_qrtz_t_j on QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
+create index idx_qrtz_t_jg on QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
+create index idx_qrtz_t_c on QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
+create index idx_qrtz_t_g on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
+create index idx_qrtz_t_state on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
+create index idx_qrtz_t_n_state on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
+create index idx_qrtz_t_n_g_state on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
+create index idx_qrtz_t_next_fire_time on QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
+create index idx_qrtz_t_nft_st on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
+create index idx_qrtz_t_nft_misfire on QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
+create index idx_qrtz_t_nft_st_misfire on QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
+create index idx_qrtz_t_nft_st_misfire_grp on QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);
+
+create index idx_qrtz_ft_trig_inst_name on QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
+create index idx_qrtz_ft_inst_job_req_rcvry on QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
+create index idx_qrtz_ft_j_g on QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
+create index idx_qrtz_ft_jg on QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
+create index idx_qrtz_ft_t_g on QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
+create index idx_qrtz_ft_tg on QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
+
+commit;
 
 
 CREATE TABLE workflow (
@@ -183,177 +353,6 @@ CREATE TABLE clusterEvent (
   host TEXT, rack TEXT
 );
 
--- Quartz tables
-
-CREATE TABLE QRTZ_JOB_DETAILS
-  (
-    SCHED_NAME VARCHAR(120) NOT NULL,
-    JOB_NAME  VARCHAR(200) NOT NULL,
-    JOB_GROUP VARCHAR(200) NOT NULL,
-    DESCRIPTION VARCHAR(250) NULL,
-    JOB_CLASS_NAME   VARCHAR(250) NOT NULL,
-    IS_DURABLE VARCHAR(1) NOT NULL,
-    IS_NONCONCURRENT VARCHAR(1) NOT NULL,
-    IS_UPDATE_DATA VARCHAR(1) NOT NULL,
-    REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
-    JOB_DATA BLOB NULL,
-    PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
-);
-
-CREATE TABLE QRTZ_TRIGGERS
-  (
-    SCHED_NAME VARCHAR(120) NOT NULL,
-    TRIGGER_NAME VARCHAR(200) NOT NULL,
-    TRIGGER_GROUP VARCHAR(200) NOT NULL,
-    JOB_NAME  VARCHAR(200) NOT NULL,
-    JOB_GROUP VARCHAR(200) NOT NULL,
-    DESCRIPTION VARCHAR(250) NULL,
-    NEXT_FIRE_TIME BIGINT(13) NULL,
-    PREV_FIRE_TIME BIGINT(13) NULL,
-    PRIORITY INTEGER NULL,
-    TRIGGER_STATE VARCHAR(16) NOT NULL,
-    TRIGGER_TYPE VARCHAR(8) NOT NULL,
-    START_TIME BIGINT(13) NOT NULL,
-    END_TIME BIGINT(13) NULL,
-    CALENDAR_NAME VARCHAR(200) NULL,
-    MISFIRE_INSTR SMALLINT(2) NULL,
-    JOB_DATA BLOB NULL,
-    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
-    FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
-        REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
-);
-
-CREATE TABLE QRTZ_SIMPLE_TRIGGERS
-  (
-    SCHED_NAME VARCHAR(120) NOT NULL,
-    TRIGGER_NAME VARCHAR(200) NOT NULL,
-    TRIGGER_GROUP VARCHAR(200) NOT NULL,
-    REPEAT_COUNT BIGINT(7) NOT NULL,
-    REPEAT_INTERVAL BIGINT(12) NOT NULL,
-    TIMES_TRIGGERED BIGINT(10) NOT NULL,
-    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
-    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
-        REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
-);
-
-CREATE TABLE QRTZ_CRON_TRIGGERS
-  (
-    SCHED_NAME VARCHAR(120) NOT NULL,
-    TRIGGER_NAME VARCHAR(200) NOT NULL,
-    TRIGGER_GROUP VARCHAR(200) NOT NULL,
-    CRON_EXPRESSION VARCHAR(200) NOT NULL,
-    TIME_ZONE_ID VARCHAR(80),
-    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
-    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
-        REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
-);
-
-CREATE TABLE QRTZ_SIMPROP_TRIGGERS
-  (
-    SCHED_NAME VARCHAR(120) NOT NULL,
-    TRIGGER_NAME VARCHAR(200) NOT NULL,
-    TRIGGER_GROUP VARCHAR(200) NOT NULL,
-    STR_PROP_1 VARCHAR(512) NULL,
-    STR_PROP_2 VARCHAR(512) NULL,
-    STR_PROP_3 VARCHAR(512) NULL,
-    INT_PROP_1 INT NULL,
-    INT_PROP_2 INT NULL,
-    LONG_PROP_1 BIGINT NULL,
-    LONG_PROP_2 BIGINT NULL,
-    DEC_PROP_1 NUMERIC(13,4) NULL,
-    DEC_PROP_2 NUMERIC(13,4) NULL,
-    BOOL_PROP_1 VARCHAR(1) NULL,
-    BOOL_PROP_2 VARCHAR(1) NULL,
-    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
-    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
-    REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
-);
-
-CREATE TABLE QRTZ_BLOB_TRIGGERS
-  (
-    SCHED_NAME VARCHAR(120) NOT NULL,
-    TRIGGER_NAME VARCHAR(200) NOT NULL,
-    TRIGGER_GROUP VARCHAR(200) NOT NULL,
-    BLOB_DATA BLOB NULL,
-    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
-    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
-        REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
-);
-
-CREATE TABLE QRTZ_CALENDARS
-  (
-    SCHED_NAME VARCHAR(120) NOT NULL,
-    CALENDAR_NAME  VARCHAR(200) NOT NULL,
-    CALENDAR BLOB NOT NULL,
-    PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
-);
-
-CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS
-  (
-    SCHED_NAME VARCHAR(120) NOT NULL,
-    TRIGGER_GROUP  VARCHAR(200) NOT NULL,
-    PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
-);
-
-CREATE TABLE QRTZ_FIRED_TRIGGERS
-  (
-    SCHED_NAME VARCHAR(120) NOT NULL,
-    ENTRY_ID VARCHAR(95) NOT NULL,
-    TRIGGER_NAME VARCHAR(200) NOT NULL,
-    TRIGGER_GROUP VARCHAR(200) NOT NULL,
-    INSTANCE_NAME VARCHAR(200) NOT NULL,
-    FIRED_TIME BIGINT(13) NOT NULL,
-    SCHED_TIME BIGINT(13) NOT NULL,
-    PRIORITY INTEGER NOT NULL,
-    STATE VARCHAR(16) NOT NULL,
-    JOB_NAME VARCHAR(200) NULL,
-    JOB_GROUP VARCHAR(200) NULL,
-    IS_NONCONCURRENT VARCHAR(1) NULL,
-    REQUESTS_RECOVERY VARCHAR(1) NULL,
-    PRIMARY KEY (SCHED_NAME,ENTRY_ID)
-);
-
-CREATE TABLE QRTZ_SCHEDULER_STATE
-  (
-    SCHED_NAME VARCHAR(120) NOT NULL,
-    INSTANCE_NAME VARCHAR(200) NOT NULL,
-    LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
-    CHECKIN_INTERVAL BIGINT(13) NOT NULL,
-    PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
-);
-
-CREATE TABLE QRTZ_LOCKS
-  (
-    SCHED_NAME VARCHAR(120) NOT NULL,
-    LOCK_NAME  VARCHAR(40) NOT NULL,
-    PRIMARY KEY (SCHED_NAME,LOCK_NAME)
-);
-
-create index idx_qrtz_j_req_recovery on QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
-create index idx_qrtz_j_grp on QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);
-
-create index idx_qrtz_t_j on QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
-create index idx_qrtz_t_jg on QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
-create index idx_qrtz_t_c on QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
-create index idx_qrtz_t_g on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
-create index idx_qrtz_t_state on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
-create index idx_qrtz_t_n_state on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
-create index idx_qrtz_t_n_g_state on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
-create index idx_qrtz_t_next_fire_time on QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
-create index idx_qrtz_t_nft_st on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
-create index idx_qrtz_t_nft_misfire on QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
-create index idx_qrtz_t_nft_st_misfire on QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
-create index idx_qrtz_t_nft_st_misfire_grp on QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);
-
-create index idx_qrtz_ft_trig_inst_name on QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
-create index idx_qrtz_ft_inst_job_req_rcvry on QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
-create index idx_qrtz_ft_j_g on QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
-create index idx_qrtz_ft_jg on QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
-create index idx_qrtz_ft_t_g on QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
-create index idx_qrtz_ft_tg on QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
-
-commit;
-