You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/01/07 21:08:39 UTC
[1/2] git commit: AMBARI-4150. Provide ability to batch requests
based on schedule. (swagle)
Updated Branches:
refs/heads/trunk 9457a5715 -> 1c7e66dad
AMBARI-4150. Provide ability to batch requests based on schedule. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5da00398
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5da00398
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5da00398
Branch: refs/heads/trunk
Commit: 5da0039823cf108f7fe5361d56ce558477f9619b
Parents: 9457a57
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Jan 7 10:17:33 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Jan 7 10:17:33 2014 -0800
----------------------------------------------------------------------
.../api/services/RequestScheduleService.java | 33 +--
.../controller/AmbariManagementController.java | 6 +
.../AmbariManagementControllerImpl.java | 8 +
.../RequestScheduleResourceProvider.java | 70 ++++-
.../scheduler/AbstractLinearExecutionJob.java | 12 +-
.../scheduler/ExecutionScheduleManager.java | 294 ++++++++++++++++++-
.../server/scheduler/ExecutionScheduler.java | 28 +-
.../scheduler/ExecutionSchedulerImpl.java | 28 +-
.../server/state/scheduler/BatchRequest.java | 9 +
.../server/state/scheduler/BatchRequestJob.java | 29 +-
.../state/scheduler/BatchRequestResponse.java | 26 ++
.../state/scheduler/RequestExecutionImpl.java | 5 +-
.../ambari/server/state/scheduler/Schedule.java | 27 +-
.../apache/ambari/server/utils/DateUtils.java | 24 ++
.../main/resources/Ambari-DDL-MySQL-CREATE.sql | 15 +-
.../resources/Ambari-DDL-Postgres-CREATE.sql | 200 ++++++-------
.../Ambari-DDL-Postgres-REMOTE-CREATE.sql | 160 +++++-----
.../parsers/JsonRequestBodyParserTest.java | 38 +++
.../RequestScheduleResourceProviderTest.java | 37 ++-
.../server/orm/InMemoryDefaultTestModule.java | 3 +
.../scheduler/ExecutionScheduleManagerTest.java | 268 +++++++++++++++++
.../scheduler/ExecutionSchedulerTest.java | 3 +-
.../ambari/server/state/ConfigGroupTest.java | 2 +-
.../server/state/RequestExecutionTest.java | 12 +-
24 files changed, 1057 insertions(+), 280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java
index c59a8e3..9c98e09 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/RequestScheduleService.java
@@ -63,7 +63,7 @@ public class RequestScheduleService extends BaseService {
@GET
@Produces("text/plain")
public Response getRequestSchedules(@Context HttpHeaders headers,
- @Context UriInfo ui) {
+ @Context UriInfo ui) {
return handleRequest(headers, null, ui, Request.Type.GET,
createRequestSchedule(m_clusterName, null));
}
@@ -95,35 +95,14 @@ public class RequestScheduleService extends BaseService {
*/
@POST
@Produces("text/plain")
- public Response createRequestSchedule(String body, @Context HttpHeaders
- headers,
- @Context UriInfo ui) {
+ public Response createRequestSchedule(String body,
+ @Context HttpHeaders headers,
+ @Context UriInfo ui) {
return handleRequest(headers, body, ui, Request.Type.POST,
createRequestSchedule(m_clusterName, null));
}
/**
- * Handles PUT /clusters/{clusterId}/request_schedules/{requestScheduleId}
- * Update a request schedule
- *
- * @param body
- * @param headers
- * @param ui
- * @param requestScheduleId
- * @return
- */
- @PUT
- @Path("{requestScheduleId}")
- @Produces("text/plain")
- public Response updateRequestSchedule(String body,
- @Context HttpHeaders headers,
- @Context UriInfo ui,
- @PathParam("requestScheduleId") String requestScheduleId) {
- return handleRequest(headers, body, ui, Request.Type.PUT,
- createRequestSchedule(m_clusterName, requestScheduleId));
- }
-
- /**
* Handles DELETE /clusters/{clusterId}/request_schedules/{requestScheduleId}
* Delete a request schedule
*
@@ -136,8 +115,8 @@ public class RequestScheduleService extends BaseService {
@Path("{requestScheduleId}")
@Produces("text/plain")
public Response deleteRequestSchedule(@Context HttpHeaders headers,
- @Context UriInfo ui,
- @PathParam("requestScheduleId") String requestScheduleId) {
+ @Context UriInfo ui,
+ @PathParam("requestScheduleId") String requestScheduleId) {
return handleRequest(headers, null, ui, Request.Type.DELETE,
createRequestSchedule(m_clusterName, requestScheduleId));
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
index c031034..26a02a4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
@@ -21,6 +21,7 @@ package org.apache.ambari.server.controller;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.actionmanager.ActionManager;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Service;
@@ -512,5 +513,10 @@ public interface AmbariManagementController {
* @return
*/
public RequestExecutionFactory getRequestExecutionFactory();
+
+ /**
+ * Get Execution Schedule Manager
+ */
+ public ExecutionScheduleManager getExecutionScheduleManager();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index 007a023..ca14f14 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -54,6 +54,7 @@ import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.internal.URLStreamProvider;
import org.apache.ambari.server.metadata.ActionMetadata;
import org.apache.ambari.server.metadata.RoleCommandOrder;
+import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
import org.apache.ambari.server.security.authorization.AuthorizationHelper;
import org.apache.ambari.server.security.authorization.User;
import org.apache.ambari.server.security.authorization.Users;
@@ -133,6 +134,8 @@ public class AmbariManagementControllerImpl implements
private ConfigHelper configHelper;
@Inject
private RequestExecutionFactory requestExecutionFactory;
+ @Inject
+ private ExecutionScheduleManager executionScheduleManager;
final private String masterHostname;
final private Integer masterPort;
@@ -1035,6 +1038,11 @@ public class AmbariManagementControllerImpl implements
return requestExecutionFactory;
}
+ @Override
+ public ExecutionScheduleManager getExecutionScheduleManager() {
+ return executionScheduleManager;
+ }
+
private List<Stage> doStageCreation(Cluster cluster,
Map<State, List<Service>> changedServices,
Map<State, List<ServiceComponent>> changedComps,
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
index 5119d25..2d3826e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java
@@ -217,6 +217,20 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
return resources;
}
+ /**
+ * Currently unsupported operation. Since strong guarantees are required
+ * that no jobs are currently running.
+ * @param request the request object which defines the set of properties
+ * for the resources to be updated
+ * @param predicate the predicate object which can be used to filter which
+ * resources are updated
+ *
+ * @return
+ * @throws SystemException
+ * @throws UnsupportedPropertyException
+ * @throws NoSuchResourceException
+ * @throws NoSuchParentResourceException
+ */
@Override
public RequestStatus updateResources(Request request, Predicate predicate)
throws SystemException, UnsupportedPropertyException,
@@ -286,11 +300,29 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
+ "exist", e);
}
- LOG.info("Deleting Request Schedule "
+ RequestExecution requestExecution =
+ cluster.getAllRequestExecutions().get(request.getId());
+
+ if (requestExecution == null) {
+ throw new AmbariException("Request Schedule not found "
+ + ", clusterName = " + request.getClusterName()
+ + ", description = " + request.getDescription()
+ + ", id = " + request.getId());
+ }
+
+ String username = getManagementController().getAuthName();
+
+ LOG.info("Disabling Request Schedule "
+ ", clusterName = " + request.getClusterName()
- + ", id = " + request.getId());
+ + ", id = " + request.getId()
+ + ", user = " + username);
- cluster.deleteRequestExecution(request.getId());
+ // Delete all jobs and triggers
+ getManagementController().getExecutionScheduleManager()
+ .deleteAllJobs(requestExecution);
+
+ requestExecution.setStatus(RequestExecution.Status.DISABLED);
+ requestExecution.persist();
}
private synchronized void updateRequestSchedule
@@ -347,6 +379,10 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
+ ", user = " + username);
requestExecution.persist();
+
+ // Update schedule for the batch
+ getManagementController().getExecutionScheduleManager()
+ .updateBatchSchedule(requestExecution);
}
}
@@ -385,6 +421,7 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
requestExecution.setCreateUser(username);
requestExecution.setUpdateUser(username);
+ requestExecution.setStatus(RequestExecution.Status.SCHEDULED);
LOG.info("Persisting new Request Schedule "
+ ", clusterName = " + request.getClusterName()
@@ -394,6 +431,10 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
requestExecution.persist();
cluster.addRequestExecution(requestExecution);
+ // Setup batch schedule
+ getManagementController().getExecutionScheduleManager()
+ .scheduleBatch(requestExecution);
+
RequestScheduleResponse response = new RequestScheduleResponse
(requestExecution.getId(), requestExecution.getClusterName(),
requestExecution.getDescription(), requestExecution.getStatus(),
@@ -407,10 +448,31 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
return responses;
}
- private void validateRequest(RequestScheduleRequest request) {
+ private void validateRequest(RequestScheduleRequest request) throws AmbariException {
if (request.getClusterName() == null) {
throw new IllegalArgumentException("Cluster name is required.");
}
+ Schedule schedule = request.getSchedule();
+ if (schedule != null) {
+ getManagementController().getExecutionScheduleManager()
+ .validateSchedule(schedule);
+ }
+ Batch batch = request.getBatch();
+ if (batch != null && !batch.getBatchRequests().isEmpty()) {
+ // Verify requests can be ordered
+ HashSet<Long> orderIdSet = new HashSet<Long>();
+ for (BatchRequest batchRequest : batch.getBatchRequests()) {
+ if (batchRequest.getOrderId() == null) {
+ throw new AmbariException("No order id provided for batch request. " +
+ "" + batchRequest);
+ }
+ if (orderIdSet.contains(batchRequest.getOrderId())) {
+ throw new AmbariException("Duplicate order id provided for batch " +
+ "request. " + batchRequest);
+ }
+ orderIdSet.add(batchRequest.getOrderId());
+ }
+ }
}
private synchronized Set<RequestScheduleResponse> getRequestSchedules
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
index 37d6752..c779fde 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
@@ -28,6 +28,9 @@ import org.quartz.PersistJobDataAfterExecution;
import org.quartz.Trigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
import static org.quartz.DateBuilder.futureDate;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;
@@ -42,8 +45,8 @@ import static org.quartz.TriggerBuilder.newTrigger;
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public abstract class AbstractLinearExecutionJob implements ExecutionJob {
- private ExecutionScheduleManager executionScheduleManager;
private static Logger LOG = LoggerFactory.getLogger(AbstractLinearExecutionJob.class);
+ protected ExecutionScheduleManager executionScheduleManager;
public AbstractLinearExecutionJob(ExecutionScheduleManager executionScheduleManager) {
this.executionScheduleManager = executionScheduleManager;
@@ -52,8 +55,10 @@ public abstract class AbstractLinearExecutionJob implements ExecutionJob {
/**
* Do the actual work of the fired job.
* @throws AmbariException
+ * @param properties
*/
- protected abstract void doWork() throws AmbariException;
+ protected abstract void doWork(Map<String, Object> properties) throws
+ AmbariException;
/**
* Get the next job id from context and create a trigger to fire the next
@@ -74,7 +79,7 @@ public abstract class AbstractLinearExecutionJob implements ExecutionJob {
// Perform work and exit if failure reported
try {
- doWork();
+ doWork(context.getMergedJobDataMap().getWrappedMap());
} catch (AmbariException e) {
LOG.error("Exception caught on job execution. Exiting linear chain...", e);
throw new JobExecutionException(e);
@@ -100,4 +105,5 @@ public abstract class AbstractLinearExecutionJob implements ExecutionJob {
executionScheduleManager.scheduleJob(trigger);
}
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
index 443a7e2..a3199e4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
@@ -23,15 +23,35 @@ import com.google.inject.Injector;
import com.google.inject.Singleton;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.orm.dao.RequestScheduleBatchRequestDAO;
+import org.apache.ambari.server.orm.entities.RequestScheduleBatchRequestEntity;
+import org.apache.ambari.server.orm.entities.RequestScheduleBatchRequestEntityPK;
+import org.apache.ambari.server.state.scheduler.Batch;
+import org.apache.ambari.server.state.scheduler.BatchRequest;
+import org.apache.ambari.server.state.scheduler.BatchRequestJob;
+import org.apache.ambari.server.state.scheduler.BatchRequestResponse;
+import org.apache.ambari.server.state.scheduler.RequestExecution;
+import org.apache.ambari.server.state.scheduler.Schedule;
import org.apache.ambari.server.utils.DateUtils;
-import org.quartz.Job;
+import org.quartz.CronExpression;
+import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
+import org.quartz.JobKey;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.text.ParseException;
+import java.util.Collections;
import java.util.Date;
+import java.util.List;
+import java.util.ListIterator;
+
+import static org.quartz.CronScheduleBuilder.cronSchedule;
+import static org.quartz.JobBuilder.newJob;
+import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
+import static org.quartz.TriggerBuilder.newTrigger;
/**
* This class handles scheduling request execution for managed clusters
@@ -44,14 +64,22 @@ public class ExecutionScheduleManager {
private ExecutionScheduler executionScheduler;
@Inject
private Configuration configuration;
+ @Inject
+ private RequestScheduleBatchRequestDAO batchRequestDAO;
private volatile boolean schedulerAvailable = false;
+ protected static final String BATCH_REQUEST_JOB_PREFIX = "BatchRequestJob";
+ protected static final String REQUEST_EXECUTION_TRIGGER_PREFIX =
+ "RequestExecution";
@Inject
public ExecutionScheduleManager(Injector injector) {
injector.injectMembers(this);
}
+ /**
+ * Start Execution scheduler
+ */
public void start() {
LOG.info("Starting scheduler");
try {
@@ -63,6 +91,9 @@ public class ExecutionScheduleManager {
}
}
+ /**
+ * Stop execution scheduler
+ */
public void stop() {
LOG.info("Stopping scheduler");
schedulerAvailable = false;
@@ -74,10 +105,18 @@ public class ExecutionScheduleManager {
}
}
+ /**
+ * Is Execution scheduler available for accepting jobs?
+ * @return
+ */
public boolean isSchedulerAvailable() {
return schedulerAvailable;
}
+ /**
+ * Add trigger for a job to the scheduler
+ * @param trigger
+ */
public void scheduleJob(Trigger trigger) {
LOG.debug("Scheduling job: " + trigger.getJobKey());
if (isSchedulerAvailable()) {
@@ -92,6 +131,12 @@ public class ExecutionScheduleManager {
}
}
+ /**
+ * Find out by how much did a schedule misfire and decide whether to continue
+ * based on configuration
+ * @param jobExecutionContext
+ * @return
+ */
public boolean continueOnMisfire(JobExecutionContext jobExecutionContext) {
if (jobExecutionContext != null) {
Date scheduledTime = jobExecutionContext.getScheduledFireTime();
@@ -100,4 +145,251 @@ public class ExecutionScheduleManager {
}
return true;
}
+
+ /**
+ * Persist jobs based on the request batch and create trigger for the first
+ * job
+ * @param requestExecution
+ * @throws AmbariException
+ */
+ public void scheduleBatch(RequestExecution requestExecution)
+ throws AmbariException {
+
+ if (!isSchedulerAvailable()) {
+ throw new AmbariException("Scheduler unavailable.");
+ }
+
+ // Create and persist jobs based on batches
+ JobDetail firstJobDetail = persistBatch(requestExecution);
+
+ if (firstJobDetail == null) {
+ throw new AmbariException("Unable to schedule jobs. firstJobDetail = "
+ + firstJobDetail);
+ }
+
+ // Create a cron trigger for the first batch job
+ // If no schedule is specified create simple trigger to fire right away
+ Schedule schedule = requestExecution.getSchedule();
+
+ if (schedule != null) {
+ String triggerExpression = schedule.getScheduleExpression();
+
+ Date startDate = null;
+ Date endDate = null;
+ try {
+ String startTime = schedule.getStartTime();
+ String endTime = schedule.getEndTime();
+ startDate = startTime != null ? DateUtils.convertToDate(startTime) : new Date();
+ endDate = endTime != null ? DateUtils.convertToDate(endTime) : null;
+ } catch (ParseException e) {
+ LOG.error("Unable to parse startTime / endTime.", e);
+ }
+
+ Trigger trigger = newTrigger()
+ .withIdentity(REQUEST_EXECUTION_TRIGGER_PREFIX + "-" +
+ requestExecution.getId(), ExecutionJob.LINEAR_EXECUTION_TRIGGER_GROUP)
+ .withSchedule(cronSchedule(triggerExpression)
+ .withMisfireHandlingInstructionFireAndProceed())
+ .forJob(firstJobDetail)
+ .startAt(startDate)
+ .endAt(endDate)
+ .build();
+
+ try {
+ executionScheduler.scheduleJob(trigger);
+ } catch (SchedulerException e) {
+ LOG.error("Unable to schedule request execution.", e);
+ throw new AmbariException(e.getMessage());
+ }
+
+ } else {
+ // Create trigger for immediate job execution
+ Trigger trigger = newTrigger()
+ .forJob(firstJobDetail)
+ .withIdentity(REQUEST_EXECUTION_TRIGGER_PREFIX + "-" +
+ requestExecution.getId(), ExecutionJob.LINEAR_EXECUTION_TRIGGER_GROUP)
+ .withSchedule(simpleSchedule().withMisfireHandlingInstructionFireNow())
+ .startNow()
+ .build();
+
+ try {
+ executionScheduler.scheduleJob(trigger);
+ } catch (SchedulerException e) {
+ LOG.error("Unable to schedule request execution.", e);
+ throw new AmbariException(e.getMessage());
+ }
+ }
+ }
+
+ private JobDetail persistBatch(RequestExecution requestExecution)
+ throws AmbariException {
+
+ Batch batch = requestExecution.getBatch();
+ JobDetail jobDetail = null;
+
+ if (batch != null) {
+ List<BatchRequest> batchRequests = batch.getBatchRequests();
+ if (batchRequests != null) {
+ Collections.sort(batchRequests);
+ ListIterator<BatchRequest> iterator = batchRequests.listIterator
+ (batchRequests.size());
+ String nextJobName = null;
+ while (iterator.hasPrevious()) {
+ BatchRequest batchRequest = iterator.previous();
+
+ String jobName = getJobName(requestExecution.getId(),
+ batchRequest.getOrderId());
+
+ // Create Job and store properties to get next batch request details
+ jobDetail = newJob(BatchRequestJob.class)
+ .withIdentity(jobName, ExecutionJob.LINEAR_EXECUTION_JOB_GROUP)
+ .usingJobData(ExecutionJob.NEXT_EXECUTION_JOB_NAME_KEY, nextJobName)
+ .usingJobData(ExecutionJob.NEXT_EXECUTION_JOB_GROUP_KEY,
+ ExecutionJob.LINEAR_EXECUTION_JOB_GROUP)
+ .usingJobData(BatchRequestJob.BATCH_REQUEST_EXECUTION_ID_KEY,
+ requestExecution.getId())
+ .usingJobData(BatchRequestJob.BATCH_REQUEST_BATCH_ID_KEY,
+ batchRequest.getOrderId())
+ .storeDurably()
+ .build();
+
+ try {
+ executionScheduler.addJob(jobDetail);
+ } catch (SchedulerException e) {
+ LOG.error("Failed to add job detail. " + batchRequest, e);
+ }
+
+ nextJobName = jobName;
+ }
+ }
+ }
+ return jobDetail;
+ }
+
+ protected String getJobName(Long executionId, Long orderId) {
+ return BATCH_REQUEST_JOB_PREFIX + "-" + executionId.toString() + "-" +
+ orderId.toString();
+ }
+
+ /**
+ * Delete and re-create all jobs and triggers
+ * Update schedule for a batch
+ * @param requestExecution
+ */
+ public void updateBatchSchedule(RequestExecution requestExecution)
+ throws AmbariException {
+
+ // TODO: Support delete and update if no jobs are running
+ }
+
+ /**
+ * Validate if schedule expression is a valid Cron schedule
+ * @param schedule
+ * @return
+ */
+ public void validateSchedule(Schedule schedule) throws AmbariException {
+ if (!schedule.isEmpty()) {
+ if (schedule.getStartTime() != null && !schedule.getStartTime().isEmpty()) {
+ try {
+ DateUtils.convertToDate(schedule.getStartTime());
+ } catch (ParseException pe) {
+ throw new AmbariException("Start time in invalid format. startTime "
+ + "= " + schedule.getStartTime() + ", Allowed format = "
+ + DateUtils.ALLOWED_DATE_FORMAT);
+ }
+ }
+ if (schedule.getEndTime() != null && !schedule.getEndTime().isEmpty()) {
+ try {
+ DateUtils.convertToDate(schedule.getEndTime());
+ } catch (ParseException pe) {
+ throw new AmbariException("End time in invalid format. endTime "
+ + "= " + schedule.getEndTime() + ", Allowed format = "
+ + DateUtils.ALLOWED_DATE_FORMAT);
+ }
+ }
+ String cronExpression = schedule.getScheduleExpression();
+ if (cronExpression != null && !cronExpression.trim().isEmpty()) {
+ if (!CronExpression.isValidExpression(cronExpression)) {
+ throw new AmbariException("Invalid non-empty cron expression " +
+ "provided. " + cronExpression);
+ }
+ }
+ }
+ }
+
+ /**
+ * Delete all jobs and triggers if possible.
+ * @throws AmbariException
+ */
+ public void deleteAllJobs(RequestExecution requestExecution) throws AmbariException {
+ if (!isSchedulerAvailable()) {
+ throw new AmbariException("Scheduler unavailable.");
+ }
+
+ // Delete all jobs for this request execution
+ Batch batch = requestExecution.getBatch();
+ if (batch != null) {
+ List<BatchRequest> batchRequests = batch.getBatchRequests();
+ if (batchRequests != null) {
+ for (BatchRequest batchRequest : batchRequests) {
+ String jobName = getJobName(requestExecution.getId(),
+ batchRequest.getOrderId());
+
+ LOG.debug("Deleting Job, jobName = " + jobName);
+
+ try {
+ executionScheduler.deleteJob(JobKey.jobKey(jobName,
+ ExecutionJob.LINEAR_EXECUTION_JOB_GROUP));
+ } catch (SchedulerException e) {
+ LOG.warn("Unable to delete job, " + jobName, e);
+ throw new AmbariException(e.getMessage());
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Execute a Batch request and return request id if the server responds with
+ * a request id for long running operations.
+ * @return request id
+ * @throws AmbariException
+ */
+ public synchronized Long executeBatchRequest(Long executionId,
+ Long batchId) throws AmbariException {
+
+ String type = null;
+ String uri = null;
+ String body = null;
+
+ try {
+ RequestScheduleBatchRequestEntityPK batchRequestEntityPK = new
+ RequestScheduleBatchRequestEntityPK();
+ batchRequestEntityPK.setScheduleId(executionId);
+ batchRequestEntityPK.setBatchId(batchId);
+ RequestScheduleBatchRequestEntity batchRequestEntity =
+ batchRequestDAO.findByPk(batchRequestEntityPK);
+
+ type = batchRequestEntity.getRequestType();
+ uri = batchRequestEntity.getRequestUri();
+ body = batchRequestEntity.getRequestBody();
+
+ } catch (Exception e) {
+
+ }
+
+ return -1L;
+ }
+
+ /**
+ * Get status of a long running operation
+ * @return
+ * @throws AmbariException
+ */
+ public BatchRequestResponse getBatchRequestResponse(Long requestId)
+ throws AmbariException {
+
+ BatchRequestResponse batchRequestResponse = new BatchRequestResponse();
+ return batchRequestResponse;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java
index a18c91b..c51bd6b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java
@@ -21,7 +21,9 @@ import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.state.scheduler.RequestExecution;
import org.apache.ambari.server.state.scheduler.Schedule;
import org.quartz.Job;
+import org.quartz.JobDetail;
import org.quartz.JobKey;
+import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
@@ -39,14 +41,24 @@ public interface ExecutionScheduler {
public void stopScheduler() throws AmbariException;
/**
- * Create a job based on the @RequestExecution and add a trigger for the
- * created job based on the @Schedule. Schedule the job with the scheduler.
- * @param requestExecution
- * @param schedule
- * @throws AmbariException
+ * Add a trigger to the execution scheduler
+ * @param trigger
+ * @throws SchedulerException
*/
- public void scheduleJob(RequestExecution requestExecution,
- Schedule schedule) throws AmbariException;
-
public void scheduleJob(Trigger trigger) throws SchedulerException;
+
+ /**
+ * Persist job data
+ * @param job
+ * @throws SchedulerException
+ */
+ public void addJob(JobDetail job) throws SchedulerException;
+
+
+ /**
+ * Delete the identified Job from the Scheduler - and any associated Triggers.
+ * @param jobKey
+ * @throws SchedulerException
+ */
+ public void deleteJob(JobKey jobKey) throws SchedulerException;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java
index 2edfce7..84abf0e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java
@@ -18,12 +18,14 @@
package org.apache.ambari.server.scheduler;
import com.google.inject.Inject;
+import com.google.inject.Injector;
import com.google.inject.Singleton;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.state.scheduler.RequestExecution;
import org.apache.ambari.server.state.scheduler.Schedule;
import org.quartz.Job;
+import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
@@ -38,13 +40,21 @@ import java.util.Properties;
public class ExecutionSchedulerImpl implements ExecutionScheduler {
@Inject
private Configuration configuration;
- private Scheduler scheduler;
private static final Logger LOG = LoggerFactory.getLogger(ExecutionSchedulerImpl.class);
protected static final String DEFAULT_SCHEDULER_NAME = "ExecutionScheduler";
- private static volatile boolean isInitialized = false;
+ protected Scheduler scheduler;
+ protected static volatile boolean isInitialized = false;
@Inject
- public ExecutionSchedulerImpl(Configuration configuration) {
+ public ExecutionSchedulerImpl(Injector injector) {
+ injector.injectMembers(this);
+ }
+
+ /**
+ * Constructor used for unit testing
+ * @param configuration
+ */
+ protected ExecutionSchedulerImpl(Configuration configuration) {
this.configuration = configuration;
}
@@ -157,14 +167,18 @@ public class ExecutionSchedulerImpl implements ExecutionScheduler {
}
@Override
- public void scheduleJob(RequestExecution requestExecution, Schedule schedule)
- throws AmbariException {
+ public void scheduleJob(Trigger trigger) throws SchedulerException {
+ scheduler.scheduleJob(trigger);
+ }
+ @Override
+ public void addJob(JobDetail jobDetail) throws SchedulerException {
+ scheduler.addJob(jobDetail, true);
}
@Override
- public void scheduleJob(Trigger trigger) throws SchedulerException {
- scheduler.scheduleJob(trigger);
+ public void deleteJob(JobKey jobKey) throws SchedulerException {
+ scheduler.deleteJob(jobKey);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
index 75c9f24..890fbba 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
@@ -106,4 +106,13 @@ public class BatchRequest implements Comparable<BatchRequest> {
POST,
DELETE
}
+
+ @Override
+ public String toString() {
+ return "BatchRequest {" +
+ "orderId=" + orderId +
+ ", type=" + type +
+ ", uri='" + uri + '\'' +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
index 6e17389..7405706 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
@@ -21,14 +21,41 @@ import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.scheduler.AbstractLinearExecutionJob;
import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
+import java.util.Map;
+
public class BatchRequestJob extends AbstractLinearExecutionJob {
+ public static final String BATCH_REQUEST_EXECUTION_ID_KEY =
+ "BatchRequestJob.ExecutionId";
+ public static final String BATCH_REQUEST_BATCH_ID_KEY =
+ "BatchRequestJob.BatchId";
public BatchRequestJob(ExecutionScheduleManager executionScheduleManager) {
super(executionScheduleManager);
}
@Override
- protected void doWork() throws AmbariException {
+ protected void doWork(Map<String, Object> properties) throws AmbariException {
+
+ String executionId = properties.get(BATCH_REQUEST_EXECUTION_ID_KEY) != null ?
+ (String) properties.get(BATCH_REQUEST_EXECUTION_ID_KEY) : null;
+ String batchId = properties.get(BATCH_REQUEST_BATCH_ID_KEY) != null ?
+ (String) properties.get(BATCH_REQUEST_BATCH_ID_KEY) : null;
+
+
+ if (executionId == null || batchId == null) {
+ throw new AmbariException("Unable to retrieve persisted batch request"
+ + ", execution_id = " + executionId
+ + ", batch_id = " + batchId);
+ }
+
+ Long requestId = executionScheduleManager.executeBatchRequest
+ (Long.parseLong(executionId), Long.parseLong(batchId));
+
+ if (requestId != null) {
+ // Wait on request completion
+ BatchRequestResponse batchRequestResponse =
+ executionScheduleManager.getBatchRequestResponse(requestId);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java
new file mode 100644
index 0000000..2710ffa
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestResponse.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.scheduler;
+
+/**
+ * Captures the response of a batch request. Provides functionality to
+ * capture the status of @Request status and @Task status in order to allow
+ * tolerance calculations
+ */
+public class BatchRequestResponse {
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
index 20c651f..014e6bd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java
@@ -264,13 +264,11 @@ public class RequestExecutionImpl implements RequestExecution {
if (batch != null) {
List<BatchRequest> batchRequests = batch.getBatchRequests();
if (batchRequests != null) {
- // Sort by orderId and assign increasing batch id
Collections.sort(batchRequests);
- Long batchId = 1L;
for (BatchRequest batchRequest : batchRequests) {
RequestScheduleBatchRequestEntity batchRequestEntity = new
RequestScheduleBatchRequestEntity();
- batchRequestEntity.setBatchId(batchId);
+ batchRequestEntity.setBatchId(batchRequest.getOrderId());
batchRequestEntity.setScheduleId(requestScheduleEntity.getScheduleId());
batchRequestEntity.setRequestScheduleEntity(requestScheduleEntity);
batchRequestEntity.setRequestType(batchRequest.getType());
@@ -283,7 +281,6 @@ public class RequestExecutionImpl implements RequestExecution {
requestScheduleEntity.getRequestScheduleBatchRequestEntities().add
(batchRequestEntity);
requestScheduleDAO.merge(requestScheduleEntity);
- batchId++;
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/Schedule.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/Schedule.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/Schedule.java
index ff5fb76..38fbeac 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/Schedule.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/Schedule.java
@@ -148,6 +148,31 @@ public class Schedule {
&& (endTime == null || endTime.isEmpty());
}
+ /**
+ * Return a cron expression from the schedule fields.
+ * Example: "0 0 12 * * ?"
+ * @return
+ */
+ public String getScheduleExpression() {
+ StringBuilder expression = new StringBuilder();
+ expression.append("0"); // seconds
+ expression.append(" ");
+ expression.append(minutes);
+ expression.append(" ");
+ expression.append(hours);
+ expression.append(" ");
+ expression.append(daysOfMonth);
+ expression.append(" ");
+ expression.append(month);
+ expression.append(" ");
+ expression.append(dayOfWeek);
+ if (year != null && !year.isEmpty()) {
+ expression.append(" ");
+ expression.append(year);
+ }
+ return expression.toString();
+ }
+
@Override
public int hashCode() {
int result = minutes != null ? minutes.hashCode() : 0;
@@ -163,7 +188,7 @@ public class Schedule {
@Override
public String toString() {
- return "Schedule{" +
+ return "Schedule {" +
"minutes='" + minutes + '\'' +
", hours='" + hours + '\'' +
", days_of_month='" + daysOfMonth + '\'' +
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/java/org/apache/ambari/server/utils/DateUtils.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/DateUtils.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/DateUtils.java
index e2cc65f..11875ed 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/utils/DateUtils.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/DateUtils.java
@@ -26,6 +26,8 @@ import java.util.Date;
*/
public class DateUtils {
+ public static final String ALLOWED_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
+
/**
* Milliseconds to readable format in current server timezone
* @param timestamp
@@ -52,6 +54,28 @@ public class DateUtils {
}
/**
+ * Convert from supported format to Date
+ * @param date
+ * @return
+ * @throws ParseException
+ */
+ public static Date convertToDate(String date) throws ParseException {
+ SimpleDateFormat sdf = new SimpleDateFormat(ALLOWED_DATE_FORMAT);
+ return sdf.parse(date);
+ }
+
+ /**
+ * Convert Date to allowed format
+ * @param date
+ * @return
+ * @throws ParseException
+ */
+ public static String convertDateToString(Date date) throws ParseException {
+ SimpleDateFormat sdf = new SimpleDateFormat(ALLOWED_DATE_FORMAT);
+ return sdf.format(date);
+ }
+
+ /**
* Get difference in minutes between old date and now
* @param oldTime
* @return
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
index 622e0ca..11b14c8 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
@@ -183,20 +183,7 @@ CREATE TABLE clusterEvent (
host TEXT, rack TEXT
);
--- org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
-
-DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
-DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
-DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
-DROP TABLE IF EXISTS QRTZ_LOCKS;
-DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
-DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
-DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
-DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
-DROP TABLE IF EXISTS QRTZ_TRIGGERS;
-DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
-DROP TABLE IF EXISTS QRTZ_CALENDARS;
-
+-- Quartz tables
CREATE TABLE QRTZ_JOB_DETAILS
(
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
index b6912d9..047b6ab 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql
@@ -174,115 +174,8 @@ BEGIN;
SELECT 'version', '${ambariVersion}';
COMMIT;
-
--- ambari log4j DDL
-
---------------------------------------------------
-----------initialisation of mapred db-------------
---------------------------------------------------
-CREATE DATABASE ambarirca;
-\connect ambarirca;
-
---CREATE ROLE "mapred" LOGIN ENCRYPTED PASSWORD 'mapred';
-CREATE USER "mapred" WITH PASSWORD 'mapred';
-GRANT ALL PRIVILEGES ON DATABASE ambarirca TO "mapred";
-
-------create tables ang grant privileges to db user---------
-CREATE TABLE workflow (
- workflowId TEXT, workflowName TEXT,
- parentWorkflowId TEXT,
- workflowContext TEXT, userName TEXT,
- startTime BIGINT, lastUpdateTime BIGINT,
- numJobsTotal INTEGER, numJobsCompleted INTEGER,
- inputBytes BIGINT, outputBytes BIGINT,
- duration BIGINT,
- PRIMARY KEY (workflowId),
- FOREIGN KEY (parentWorkflowId) REFERENCES workflow (workflowId) ON DELETE CASCADE
-);
-GRANT ALL PRIVILEGES ON TABLE workflow TO "mapred";
-
-CREATE TABLE job (
- jobId TEXT, workflowId TEXT, jobName TEXT, workflowEntityName TEXT,
- userName TEXT, queue TEXT, acls TEXT, confPath TEXT,
- submitTime BIGINT, launchTime BIGINT, finishTime BIGINT,
- maps INTEGER, reduces INTEGER, status TEXT, priority TEXT,
- finishedMaps INTEGER, finishedReduces INTEGER,
- failedMaps INTEGER, failedReduces INTEGER,
- mapsRuntime BIGINT, reducesRuntime BIGINT,
- mapCounters TEXT, reduceCounters TEXT, jobCounters TEXT,
- inputBytes BIGINT, outputBytes BIGINT,
- PRIMARY KEY (jobId),
- FOREIGN KEY (workflowId) REFERENCES workflow (workflowId) ON DELETE CASCADE
-);
-GRANT ALL PRIVILEGES ON TABLE job TO "mapred";
-
-CREATE TABLE task (
- taskId TEXT, jobId TEXT, taskType TEXT, splits TEXT,
- startTime BIGINT, finishTime BIGINT, status TEXT, error TEXT, counters TEXT,
- failedAttempt TEXT,
- PRIMARY KEY (taskId),
- FOREIGN KEY (jobId) REFERENCES job (jobId) ON DELETE CASCADE
-);
-GRANT ALL PRIVILEGES ON TABLE task TO "mapred";
-
-CREATE TABLE taskAttempt (
- taskAttemptId TEXT, taskId TEXT, jobId TEXT, taskType TEXT, taskTracker TEXT,
- startTime BIGINT, finishTime BIGINT,
- mapFinishTime BIGINT, shuffleFinishTime BIGINT, sortFinishTime BIGINT,
- locality TEXT, avataar TEXT,
- status TEXT, error TEXT, counters TEXT,
- inputBytes BIGINT, outputBytes BIGINT,
- PRIMARY KEY (taskAttemptId),
- FOREIGN KEY (jobId) REFERENCES job (jobId) ON DELETE CASCADE,
- FOREIGN KEY (taskId) REFERENCES task (taskId) ON DELETE CASCADE
-);
-GRANT ALL PRIVILEGES ON TABLE taskAttempt TO "mapred";
-
-CREATE TABLE hdfsEvent (
- timestamp BIGINT,
- userName TEXT,
- clientIP TEXT,
- operation TEXT,
- srcPath TEXT,
- dstPath TEXT,
- permissions TEXT
-);
-GRANT ALL PRIVILEGES ON TABLE hdfsEvent TO "mapred";
-
-CREATE TABLE mapreduceEvent (
- timestamp BIGINT,
- userName TEXT,
- clientIP TEXT,
- operation TEXT,
- target TEXT,
- result TEXT,
- description TEXT,
- permissions TEXT
-);
-GRANT ALL PRIVILEGES ON TABLE mapreduceEvent TO "mapred";
-
-CREATE TABLE clusterEvent (
- timestamp BIGINT,
- service TEXT, status TEXT,
- error TEXT, data TEXT,
- host TEXT, rack TEXT
-);
-GRANT ALL PRIVILEGES ON TABLE clusterEvent TO "mapred";
-
-- Quartz tables
-drop table qrtz_fired_triggers;
-DROP TABLE QRTZ_PAUSED_TRIGGER_GRPS;
-DROP TABLE QRTZ_SCHEDULER_STATE;
-DROP TABLE QRTZ_LOCKS;
-drop table qrtz_simple_triggers;
-drop table qrtz_cron_triggers;
-drop table qrtz_simprop_triggers;
-DROP TABLE QRTZ_BLOB_TRIGGERS;
-drop table qrtz_triggers;
-drop table qrtz_job_details;
-drop table qrtz_calendars;
-
CREATE TABLE qrtz_job_details
(
SCHED_NAME VARCHAR(120) NOT NULL,
@@ -453,3 +346,96 @@ create index idx_qrtz_ft_tg on qrtz_fired_triggers(SCHED_NAME,TRIGGER_GROUP);
commit;
+-- ambari log4j DDL
+
+--------------------------------------------------
+----------initialisation of mapred db-------------
+--------------------------------------------------
+CREATE DATABASE ambarirca;
+\connect ambarirca;
+
+--CREATE ROLE "mapred" LOGIN ENCRYPTED PASSWORD 'mapred';
+CREATE USER "mapred" WITH PASSWORD 'mapred';
+GRANT ALL PRIVILEGES ON DATABASE ambarirca TO "mapred";
+
+------create tables ang grant privileges to db user---------
+CREATE TABLE workflow (
+ workflowId TEXT, workflowName TEXT,
+ parentWorkflowId TEXT,
+ workflowContext TEXT, userName TEXT,
+ startTime BIGINT, lastUpdateTime BIGINT,
+ numJobsTotal INTEGER, numJobsCompleted INTEGER,
+ inputBytes BIGINT, outputBytes BIGINT,
+ duration BIGINT,
+ PRIMARY KEY (workflowId),
+ FOREIGN KEY (parentWorkflowId) REFERENCES workflow (workflowId) ON DELETE CASCADE
+);
+GRANT ALL PRIVILEGES ON TABLE workflow TO "mapred";
+
+CREATE TABLE job (
+ jobId TEXT, workflowId TEXT, jobName TEXT, workflowEntityName TEXT,
+ userName TEXT, queue TEXT, acls TEXT, confPath TEXT,
+ submitTime BIGINT, launchTime BIGINT, finishTime BIGINT,
+ maps INTEGER, reduces INTEGER, status TEXT, priority TEXT,
+ finishedMaps INTEGER, finishedReduces INTEGER,
+ failedMaps INTEGER, failedReduces INTEGER,
+ mapsRuntime BIGINT, reducesRuntime BIGINT,
+ mapCounters TEXT, reduceCounters TEXT, jobCounters TEXT,
+ inputBytes BIGINT, outputBytes BIGINT,
+ PRIMARY KEY (jobId),
+ FOREIGN KEY (workflowId) REFERENCES workflow (workflowId) ON DELETE CASCADE
+);
+GRANT ALL PRIVILEGES ON TABLE job TO "mapred";
+
+CREATE TABLE task (
+ taskId TEXT, jobId TEXT, taskType TEXT, splits TEXT,
+ startTime BIGINT, finishTime BIGINT, status TEXT, error TEXT, counters TEXT,
+ failedAttempt TEXT,
+ PRIMARY KEY (taskId),
+ FOREIGN KEY (jobId) REFERENCES job (jobId) ON DELETE CASCADE
+);
+GRANT ALL PRIVILEGES ON TABLE task TO "mapred";
+
+CREATE TABLE taskAttempt (
+ taskAttemptId TEXT, taskId TEXT, jobId TEXT, taskType TEXT, taskTracker TEXT,
+ startTime BIGINT, finishTime BIGINT,
+ mapFinishTime BIGINT, shuffleFinishTime BIGINT, sortFinishTime BIGINT,
+ locality TEXT, avataar TEXT,
+ status TEXT, error TEXT, counters TEXT,
+ inputBytes BIGINT, outputBytes BIGINT,
+ PRIMARY KEY (taskAttemptId),
+ FOREIGN KEY (jobId) REFERENCES job (jobId) ON DELETE CASCADE,
+ FOREIGN KEY (taskId) REFERENCES task (taskId) ON DELETE CASCADE
+);
+GRANT ALL PRIVILEGES ON TABLE taskAttempt TO "mapred";
+
+CREATE TABLE hdfsEvent (
+ timestamp BIGINT,
+ userName TEXT,
+ clientIP TEXT,
+ operation TEXT,
+ srcPath TEXT,
+ dstPath TEXT,
+ permissions TEXT
+);
+GRANT ALL PRIVILEGES ON TABLE hdfsEvent TO "mapred";
+
+CREATE TABLE mapreduceEvent (
+ timestamp BIGINT,
+ userName TEXT,
+ clientIP TEXT,
+ operation TEXT,
+ target TEXT,
+ result TEXT,
+ description TEXT,
+ permissions TEXT
+);
+GRANT ALL PRIVILEGES ON TABLE mapreduceEvent TO "mapred";
+
+CREATE TABLE clusterEvent (
+ timestamp BIGINT,
+ service TEXT, status TEXT,
+ error TEXT, data TEXT,
+ host TEXT, rack TEXT
+);
+GRANT ALL PRIVILEGES ON TABLE clusterEvent TO "mapred";
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/main/resources/Ambari-DDL-Postgres-REMOTE-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-REMOTE-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-REMOTE-CREATE.sql
index e89f72f..ff251c8 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-REMOTE-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-REMOTE-CREATE.sql
@@ -104,94 +104,8 @@ select 'version','${ambariVersion}';
COMMIT;
-CREATE TABLE workflow (
- workflowId TEXT, workflowName TEXT,
- parentWorkflowId TEXT,
- workflowContext TEXT, userName TEXT,
- startTime BIGINT, lastUpdateTime BIGINT,
- numJobsTotal INTEGER, numJobsCompleted INTEGER,
- inputBytes BIGINT, outputBytes BIGINT,
- duration BIGINT,
- PRIMARY KEY (workflowId),
- FOREIGN KEY (parentWorkflowId) REFERENCES workflow(workflowId) ON DELETE CASCADE
-);
-
-CREATE TABLE job (
- jobId TEXT, workflowId TEXT, jobName TEXT, workflowEntityName TEXT,
- userName TEXT, queue TEXT, acls TEXT, confPath TEXT,
- submitTime BIGINT, launchTime BIGINT, finishTime BIGINT,
- maps INTEGER, reduces INTEGER, status TEXT, priority TEXT,
- finishedMaps INTEGER, finishedReduces INTEGER,
- failedMaps INTEGER, failedReduces INTEGER,
- mapsRuntime BIGINT, reducesRuntime BIGINT,
- mapCounters TEXT, reduceCounters TEXT, jobCounters TEXT,
- inputBytes BIGINT, outputBytes BIGINT,
- PRIMARY KEY(jobId),
- FOREIGN KEY(workflowId) REFERENCES workflow(workflowId) ON DELETE CASCADE
-);
-
-CREATE TABLE task (
- taskId TEXT, jobId TEXT, taskType TEXT, splits TEXT,
- startTime BIGINT, finishTime BIGINT, status TEXT, error TEXT, counters TEXT,
- failedAttempt TEXT,
- PRIMARY KEY(taskId),
- FOREIGN KEY(jobId) REFERENCES job(jobId) ON DELETE CASCADE
-);
-
-CREATE TABLE taskAttempt (
- taskAttemptId TEXT, taskId TEXT, jobId TEXT, taskType TEXT, taskTracker TEXT,
- startTime BIGINT, finishTime BIGINT,
- mapFinishTime BIGINT, shuffleFinishTime BIGINT, sortFinishTime BIGINT,
- locality TEXT, avataar TEXT,
- status TEXT, error TEXT, counters TEXT,
- inputBytes BIGINT, outputBytes BIGINT,
- PRIMARY KEY(taskAttemptId),
- FOREIGN KEY(jobId) REFERENCES job(jobId) ON DELETE CASCADE,
- FOREIGN KEY(taskId) REFERENCES task(taskId) ON DELETE CASCADE
-);
-
-CREATE TABLE hdfsEvent (
- timestamp BIGINT,
- userName TEXT,
- clientIP TEXT,
- operation TEXT,
- srcPath TEXT,
- dstPath TEXT,
- permissions TEXT
-);
-
-CREATE TABLE mapreduceEvent (
- timestamp BIGINT,
- userName TEXT,
- clientIP TEXT,
- operation TEXT,
- target TEXT,
- result TEXT,
- description TEXT,
- permissions TEXT
-);
-
-CREATE TABLE clusterEvent (
- timestamp BIGINT,
- service TEXT, status TEXT,
- error TEXT, data TEXT ,
- host TEXT, rack TEXT
-);
-
-- Quartz tables
-drop table qrtz_fired_triggers;
-DROP TABLE QRTZ_PAUSED_TRIGGER_GRPS;
-DROP TABLE QRTZ_SCHEDULER_STATE;
-DROP TABLE QRTZ_LOCKS;
-drop table qrtz_simple_triggers;
-drop table qrtz_cron_triggers;
-drop table qrtz_simprop_triggers;
-DROP TABLE QRTZ_BLOB_TRIGGERS;
-drop table qrtz_triggers;
-drop table qrtz_job_details;
-drop table qrtz_calendars;
-
CREATE TABLE qrtz_job_details
(
SCHED_NAME VARCHAR(120) NOT NULL,
@@ -361,3 +275,77 @@ create index idx_qrtz_ft_t_g on qrtz_fired_triggers(SCHED_NAME,TRIGGER_NAME,TRIG
create index idx_qrtz_ft_tg on qrtz_fired_triggers(SCHED_NAME,TRIGGER_GROUP);
commit;
+
+CREATE TABLE workflow (
+ workflowId TEXT, workflowName TEXT,
+ parentWorkflowId TEXT,
+ workflowContext TEXT, userName TEXT,
+ startTime BIGINT, lastUpdateTime BIGINT,
+ numJobsTotal INTEGER, numJobsCompleted INTEGER,
+ inputBytes BIGINT, outputBytes BIGINT,
+ duration BIGINT,
+ PRIMARY KEY (workflowId),
+ FOREIGN KEY (parentWorkflowId) REFERENCES workflow(workflowId) ON DELETE CASCADE
+);
+
+CREATE TABLE job (
+ jobId TEXT, workflowId TEXT, jobName TEXT, workflowEntityName TEXT,
+ userName TEXT, queue TEXT, acls TEXT, confPath TEXT,
+ submitTime BIGINT, launchTime BIGINT, finishTime BIGINT,
+ maps INTEGER, reduces INTEGER, status TEXT, priority TEXT,
+ finishedMaps INTEGER, finishedReduces INTEGER,
+ failedMaps INTEGER, failedReduces INTEGER,
+ mapsRuntime BIGINT, reducesRuntime BIGINT,
+ mapCounters TEXT, reduceCounters TEXT, jobCounters TEXT,
+ inputBytes BIGINT, outputBytes BIGINT,
+ PRIMARY KEY(jobId),
+ FOREIGN KEY(workflowId) REFERENCES workflow(workflowId) ON DELETE CASCADE
+);
+
+CREATE TABLE task (
+ taskId TEXT, jobId TEXT, taskType TEXT, splits TEXT,
+ startTime BIGINT, finishTime BIGINT, status TEXT, error TEXT, counters TEXT,
+ failedAttempt TEXT,
+ PRIMARY KEY(taskId),
+ FOREIGN KEY(jobId) REFERENCES job(jobId) ON DELETE CASCADE
+);
+
+CREATE TABLE taskAttempt (
+ taskAttemptId TEXT, taskId TEXT, jobId TEXT, taskType TEXT, taskTracker TEXT,
+ startTime BIGINT, finishTime BIGINT,
+ mapFinishTime BIGINT, shuffleFinishTime BIGINT, sortFinishTime BIGINT,
+ locality TEXT, avataar TEXT,
+ status TEXT, error TEXT, counters TEXT,
+ inputBytes BIGINT, outputBytes BIGINT,
+ PRIMARY KEY(taskAttemptId),
+ FOREIGN KEY(jobId) REFERENCES job(jobId) ON DELETE CASCADE,
+ FOREIGN KEY(taskId) REFERENCES task(taskId) ON DELETE CASCADE
+);
+
+CREATE TABLE hdfsEvent (
+ timestamp BIGINT,
+ userName TEXT,
+ clientIP TEXT,
+ operation TEXT,
+ srcPath TEXT,
+ dstPath TEXT,
+ permissions TEXT
+);
+
+CREATE TABLE mapreduceEvent (
+ timestamp BIGINT,
+ userName TEXT,
+ clientIP TEXT,
+ operation TEXT,
+ target TEXT,
+ result TEXT,
+ description TEXT,
+ permissions TEXT
+);
+
+CREATE TABLE clusterEvent (
+ timestamp BIGINT,
+ service TEXT, status TEXT,
+ error TEXT, data TEXT ,
+ host TEXT, rack TEXT
+);
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/test/java/org/apache/ambari/server/api/services/parsers/JsonRequestBodyParserTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/api/services/parsers/JsonRequestBodyParserTest.java b/ambari-server/src/test/java/org/apache/ambari/server/api/services/parsers/JsonRequestBodyParserTest.java
index 8f065d5..1c28452 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/api/services/parsers/JsonRequestBodyParserTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/api/services/parsers/JsonRequestBodyParserTest.java
@@ -18,6 +18,7 @@
package org.apache.ambari.server.api.services.parsers;
+import junit.framework.Assert;
import org.apache.ambari.server.api.services.NamedPropertySet;
import org.apache.ambari.server.api.services.RequestBody;
import org.apache.ambari.server.controller.utilities.PropertyHelper;
@@ -147,6 +148,7 @@ public class JsonRequestBodyParserTest {
String bodyWithRequestInfoProperties = "{ \"RequestInfo\" : { \"query\" : \"foo=bar\", \"prop1\" : \"val1\", \"prop2\" : \"val2\" }, \"Body\":" + serviceJson + "}";
+ String bodyWithRequestBlobProperties = "{ \"RequestBodyInfo\" : { " +"\"RequestInfo\" : { \"query\" : \"foo=bar\", \"prop1\" : \"val1\", \"prop2\" : \"val2\" }, \"Body\":" + serviceJson + "} }";
@Test
public void testParse() throws BodyParseException {
@@ -561,8 +563,44 @@ public class JsonRequestBodyParserTest {
Set<NamedPropertySet> setProps2 = body.getNamedPropertySets();
assertEquals(mapExpected, setProps2.iterator().next().getProperties());
+ }
+
+ @Test
+ public void testRequestBlobProperties() throws Exception {
+ RequestBodyParser parser = new JsonRequestBodyParser();
+ RequestBody body = parser.parse(bodyWithRequestBlobProperties).iterator().next();
+
+ Set<NamedPropertySet> setProps = body.getNamedPropertySets();
+ assertEquals(1, setProps.size());
+
+ String requestBlob = null;
+
+ for (NamedPropertySet ps : setProps) {
+ assertEquals("", ps.getName());
+ Map<String, Object> mapProps = ps.getProperties();
+
+ for (Map.Entry<String, Object> entry : mapProps.entrySet()) {
+ if (entry.getKey().equals(JsonRequestBodyParser.REQUEST_BLOB_TITLE)) {
+ requestBlob = (String) entry.getValue();
+ }
+ }
+ }
+ Assert.assertNotNull(requestBlob);
+ body = parser.parse(requestBlob).iterator().next();
+
+ Map<String, Object> mapExpected = new HashMap<String, Object>();
+ mapExpected.put(PropertyHelper.getPropertyId("Services", "service_name"), "HDFS");
+ mapExpected.put(PropertyHelper.getPropertyId("Services", "display_name"), "HDFS");
+ mapExpected.put(PropertyHelper.getPropertyId("ServiceInfo", "cluster_name"), "tbmetrictest");
+ mapExpected.put(PropertyHelper.getPropertyId("Services", "description"), "Apache Hadoop Distributed File System");
+ mapExpected.put(PropertyHelper.getPropertyId("ServiceInfo", "state"), "STARTED");
+ mapExpected.put(PropertyHelper.getPropertyId("OuterCategory", "propName"), "100");
+ mapExpected.put(PropertyHelper.getPropertyId("OuterCategory/nested1/nested2", "innerPropName"), "innerPropValue");
+ mapExpected.put(PropertyHelper.getPropertyId(null, "topLevelProp"), "value");
+ Set<NamedPropertySet> setProps2 = body.getNamedPropertySets();
+ assertEquals(mapExpected, setProps2.iterator().next().getProperties());
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProviderTest.java
index 85a97f6..e391a3b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProviderTest.java
@@ -26,6 +26,7 @@ import org.apache.ambari.server.controller.spi.Request;
import org.apache.ambari.server.controller.spi.Resource;
import org.apache.ambari.server.controller.utilities.PredicateBuilder;
import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.scheduler.Batch;
@@ -77,9 +78,13 @@ public class RequestScheduleResourceProviderTest {
RequestExecutionFactory executionFactory = createNiceMock
(RequestExecutionFactory.class);
RequestExecution requestExecution = createNiceMock(RequestExecution.class);
+ ExecutionScheduleManager executionScheduleManager = createNiceMock
+ (ExecutionScheduleManager.class);
- expect(managementController.getClusters()).andReturn(clusters);
expect(clusters.getCluster("Cluster100")).andReturn(cluster).anyTimes();
+ expect(managementController.getClusters()).andReturn(clusters);
+ expect(managementController.getExecutionScheduleManager()).andReturn
+ (executionScheduleManager).anyTimes();
expect(managementController.getRequestExecutionFactory()).andReturn
(executionFactory);
expect(managementController.getAuthName()).andReturn("admin").anyTimes();
@@ -92,7 +97,7 @@ public class RequestScheduleResourceProviderTest {
capture(batchCapture), capture(scheduleCapture))).andReturn(requestExecution);
replay(managementController, clusters, cluster, executionFactory,
- requestExecution, response);
+ requestExecution, response, executionScheduleManager);
RequestScheduleResourceProvider resourceProvider = getResourceProvider
(managementController);
@@ -157,7 +162,7 @@ public class RequestScheduleResourceProviderTest {
resourceProvider.createResources(request);
verify(managementController, clusters, cluster, executionFactory,
- requestExecution, response);
+ requestExecution, response, executionScheduleManager);
List<BatchRequest> testRequests = batchCapture.getValue().getBatchRequests();
Assert.assertNotNull(testRequests);
@@ -185,10 +190,14 @@ public class RequestScheduleResourceProviderTest {
final RequestExecution requestExecution = createNiceMock(RequestExecution.class);
RequestScheduleResponse requestScheduleResponse = createNiceMock
(RequestScheduleResponse.class);
+ ExecutionScheduleManager executionScheduleManager = createNiceMock
+ (ExecutionScheduleManager.class);
expect(managementController.getClusters()).andReturn(clusters).anyTimes();
expect(clusters.getCluster("Cluster100")).andReturn(cluster).anyTimes();
expect(managementController.getAuthName()).andReturn("admin").anyTimes();
+ expect(managementController.getExecutionScheduleManager()).andReturn
+ (executionScheduleManager).anyTimes();
expect(requestExecution.getId()).andReturn(25L).anyTimes();
expect(requestExecution.convertToResponse()).andReturn
@@ -208,7 +217,7 @@ public class RequestScheduleResourceProviderTest {
});
replay(managementController, clusters, cluster, requestExecution,
- response, requestScheduleResponse);
+ response, requestScheduleResponse, executionScheduleManager);
RequestScheduleResourceProvider resourceProvider = getResourceProvider
(managementController);
@@ -279,7 +288,7 @@ public class RequestScheduleResourceProviderTest {
resourceProvider.updateResources(request, predicate);
verify(managementController, clusters, cluster, requestExecution,
- response, requestScheduleResponse);
+ response, requestScheduleResponse, executionScheduleManager);
}
@Test
@@ -369,14 +378,23 @@ public class RequestScheduleResourceProviderTest {
AmbariManagementController managementController = createMock(AmbariManagementController.class);
Clusters clusters = createNiceMock(Clusters.class);
Cluster cluster = createNiceMock(Cluster.class);
+ RequestExecution requestExecution = createNiceMock(RequestExecution.class);
+ ExecutionScheduleManager executionScheduleManager = createNiceMock
+ (ExecutionScheduleManager.class);
+
+ Map<Long, RequestExecution> requestExecutionMap = new HashMap<Long,
+ RequestExecution>();
+ requestExecutionMap.put(1L, requestExecution);
expect(managementController.getAuthName()).andReturn("admin").anyTimes();
expect(managementController.getClusters()).andReturn(clusters).anyTimes();
+ expect(managementController.getExecutionScheduleManager()).andReturn
+ (executionScheduleManager).anyTimes();
expect(clusters.getCluster("Cluster100")).andReturn(cluster).anyTimes();
+ expect(cluster.getAllRequestExecutions()).andReturn(requestExecutionMap);
- cluster.deleteRequestExecution(1L);
-
- replay(managementController, clusters, cluster);
+ replay(managementController, clusters, cluster, executionScheduleManager,
+ requestExecution );
RequestScheduleResourceProvider resourceProvider = getResourceProvider
(managementController);
@@ -399,6 +417,7 @@ public class RequestScheduleResourceProviderTest {
Assert.assertEquals(predicate, lastEvent.getPredicate());
Assert.assertNull(lastEvent.getRequest());
- verify(managementController, clusters, cluster);
+ verify(managementController, clusters, cluster, executionScheduleManager,
+ requestExecution);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/test/java/org/apache/ambari/server/orm/InMemoryDefaultTestModule.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/InMemoryDefaultTestModule.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/InMemoryDefaultTestModule.java
index 06d553d..c9a392d 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/orm/InMemoryDefaultTestModule.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/InMemoryDefaultTestModule.java
@@ -21,6 +21,9 @@ package org.apache.ambari.server.orm;
import com.google.inject.AbstractModule;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.ControllerModule;
+import org.apache.ambari.server.scheduler.ExecutionScheduleManagerTest;
+import org.apache.ambari.server.scheduler.ExecutionScheduler;
+
import java.util.Properties;
public class InMemoryDefaultTestModule extends AbstractModule {
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
new file mode 100644
index 0000000..926cb7a
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.scheduler;
+
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.persist.PersistService;
+import com.google.inject.persist.Transactional;
+import com.google.inject.util.Modules;
+import junit.framework.Assert;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.orm.GuiceJpaInitializer;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.scheduler.Batch;
+import org.apache.ambari.server.state.scheduler.BatchRequest;
+import org.apache.ambari.server.state.scheduler.BatchSettings;
+import org.apache.ambari.server.state.scheduler.RequestExecution;
+import org.apache.ambari.server.state.scheduler.RequestExecutionFactory;
+import org.apache.ambari.server.state.scheduler.Schedule;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.quartz.CronTrigger;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.impl.StdSchedulerFactory;
+import org.quartz.impl.matchers.GroupMatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+public class ExecutionScheduleManagerTest {
+ private Clusters clusters;
+ private Cluster cluster;
+ private String clusterName;
+ private Injector injector;
+ private AmbariMetaInfo metaInfo;
+ private ExecutionScheduleManager executionScheduleManager;
+ private RequestExecutionFactory requestExecutionFactory;
+ private ExecutionScheduler executionScheduler;
+ private Scheduler scheduler;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ExecutionScheduleManagerTest.class);
+
+ @Before
+ public void setup() throws Exception {
+ injector = Guice.createInjector(Modules.override(
+ new InMemoryDefaultTestModule()).with(new ExecutionSchedulerTestModule()));
+ injector.getInstance(GuiceJpaInitializer.class);
+ clusters = injector.getInstance(Clusters.class);
+ metaInfo = injector.getInstance(AmbariMetaInfo.class);
+ executionScheduleManager = injector.getInstance(ExecutionScheduleManager.class);
+ executionScheduler = injector.getInstance(ExecutionScheduler.class);
+ requestExecutionFactory = injector.getInstance(RequestExecutionFactory.class);
+
+ metaInfo.init();
+ clusterName = "c1";
+ clusters.addCluster(clusterName);
+ cluster = clusters.getCluster(clusterName);
+ cluster.setDesiredStackVersion(new StackId("HDP-0.1"));
+ Assert.assertNotNull(cluster);
+ assertThat(executionScheduler, instanceOf(TestExecutionScheduler.class));
+
+ TestExecutionScheduler testExecutionScheduler = (TestExecutionScheduler)
+ executionScheduler;
+ scheduler = testExecutionScheduler.getScheduler();
+ Assert.assertNotNull(scheduler);
+
+ executionScheduleManager.start();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ executionScheduleManager.stop();
+ injector.getInstance(PersistService.class).stop();
+ }
+
+ public static class TestExecutionScheduler extends ExecutionSchedulerImpl {
+ @Inject
+ public TestExecutionScheduler(Injector injector) {
+ super(injector);
+ try {
+ StdSchedulerFactory factory = new StdSchedulerFactory();
+ scheduler = factory.getScheduler();
+ isInitialized = true;
+ } catch (SchedulerException e) {
+ e.printStackTrace();
+ throw new ExceptionInInitializerError("Unable to instantiate " +
+ "scheduler");
+ }
+ }
+
+ public Scheduler getScheduler() {
+ return scheduler;
+ }
+ }
+
+ public class ExecutionSchedulerTestModule implements Module {
+ @Override
+ public void configure(Binder binder) {
+ binder.bind(ExecutionScheduler.class).to(TestExecutionScheduler.class);
+ }
+ }
+
+ @Transactional
+ private RequestExecution createRequestExecution() throws Exception {
+ Batch batches = new Batch();
+ Schedule schedule = new Schedule();
+
+ BatchSettings batchSettings = new BatchSettings();
+ batchSettings.setTaskFailureToleranceLimit(10);
+ batches.setBatchSettings(batchSettings);
+
+ List<BatchRequest> batchRequests = new ArrayList<BatchRequest>();
+ BatchRequest batchRequest1 = new BatchRequest();
+ batchRequest1.setOrderId(10L);
+ batchRequest1.setType(BatchRequest.Type.DELETE);
+ batchRequest1.setUri("testUri1");
+
+ BatchRequest batchRequest2 = new BatchRequest();
+ batchRequest2.setOrderId(12L);
+ batchRequest2.setType(BatchRequest.Type.POST);
+ batchRequest2.setUri("testUri2");
+ batchRequest2.setBody("testBody");
+
+ batchRequests.add(batchRequest1);
+ batchRequests.add(batchRequest2);
+
+ batches.getBatchRequests().addAll(batchRequests);
+
+ schedule.setMinutes("10");
+ schedule.setHours("2");
+ schedule.setMonth("*");
+ schedule.setDaysOfMonth("*");
+ schedule.setDayOfWeek("?");
+
+ RequestExecution requestExecution = requestExecutionFactory.createNew
+ (cluster, batches, schedule);
+ requestExecution.setDescription("Test Schedule");
+
+ requestExecution.persist();
+
+ return requestExecution;
+ }
+
+ @Test
+ public void testScheduleBatch() throws Exception {
+ RequestExecution requestExecution = createRequestExecution();
+ Assert.assertNotNull(requestExecution);
+
+ executionScheduleManager.scheduleBatch(requestExecution);
+
+ String jobName1 = executionScheduleManager.getJobName(requestExecution
+ .getId(), 10L);
+ String jobName2 = executionScheduleManager.getJobName(requestExecution
+ .getId(), 12L);
+ JobDetail jobDetail1 = null;
+ JobDetail jobDetail2 = null;
+ Trigger trigger1 = null;
+ Trigger trigger2 = null;
+
+ // enumerate each job group
+ for(String group: scheduler.getJobGroupNames()) {
+ // enumerate each job in group
+ for(JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals
+ (ExecutionJob.LINEAR_EXECUTION_JOB_GROUP))) {
+ LOG.info("Found job identified by: " + jobKey);
+
+ String jobName = jobKey.getName();
+ String jobGroup = jobKey.getGroup();
+
+ List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
+ Trigger trigger = triggers != null && !triggers.isEmpty() ?
+ triggers.get(0) : null;
+ Date nextFireTime = trigger != null ? trigger.getNextFireTime() : null;
+
+ LOG.info("[jobName] : " + jobName + " [groupName] : "
+ + jobGroup + " - " + nextFireTime);
+
+ if (jobName.equals(jobName1)) {
+ jobDetail1 = scheduler.getJobDetail(jobKey);
+ trigger1 = trigger;
+ } else if (jobName.equals(jobName2)) {
+ jobDetail2 = scheduler.getJobDetail(jobKey);
+ trigger2 = trigger;
+ }
+ }
+ }
+
+ Assert.assertNotNull(jobDetail1);
+ Assert.assertNotNull(trigger1);
+ Assert.assertNotNull(jobDetail2);
+ Assert.assertNull(trigger2);
+
+ CronTrigger cronTrigger = (CronTrigger) trigger1;
+ Schedule schedule = new Schedule();
+ schedule.setMinutes("10");
+ schedule.setHours("2");
+ schedule.setMonth("*");
+ schedule.setDaysOfMonth("*");
+ schedule.setDayOfWeek("?");
+
+ Assert.assertEquals(schedule.getScheduleExpression(),
+ cronTrigger.getCronExpression());
+
+ Assert.assertEquals(jobName1, jobDetail1.getKey().getName());
+ Assert.assertEquals(jobName2, jobDetail2.getKey().getName());
+ }
+
+ @Test
+ public void testDeleteAllJobs() throws Exception {
+ RequestExecution requestExecution = createRequestExecution();
+ Assert.assertNotNull(requestExecution);
+
+ executionScheduleManager.scheduleBatch(requestExecution);
+
+ String jobName1 = executionScheduleManager.getJobName(requestExecution
+ .getId(), 10L);
+ String jobName2 = executionScheduleManager.getJobName(requestExecution
+ .getId(), 12L);
+
+ JobDetail jobDetail1 = scheduler.getJobDetail(JobKey.jobKey(jobName1,
+ ExecutionJob.LINEAR_EXECUTION_JOB_GROUP));
+ JobDetail jobDetail2 = scheduler.getJobDetail(JobKey.jobKey(jobName2,
+ ExecutionJob.LINEAR_EXECUTION_JOB_GROUP));
+
+ Assert.assertNotNull(jobDetail1);
+ Assert.assertNotNull(jobDetail2);
+ Assert.assertTrue(!scheduler.getTriggersOfJob(JobKey.jobKey(jobName1,
+ ExecutionJob.LINEAR_EXECUTION_JOB_GROUP)).isEmpty());
+
+ executionScheduleManager.deleteAllJobs(requestExecution);
+
+ Assert.assertTrue(scheduler.getTriggersOfJob(JobKey.jobKey(jobName1,
+ ExecutionJob.LINEAR_EXECUTION_JOB_GROUP)).isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionSchedulerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionSchedulerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionSchedulerTest.java
index 822f583..34a5d38 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionSchedulerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionSchedulerTest.java
@@ -68,7 +68,8 @@ public class ExecutionSchedulerTest {
@Test
public void testSchedulerInitialize() throws Exception {
- ExecutionSchedulerImpl executionScheduler = spy(new ExecutionSchedulerImpl(configuration));
+ ExecutionSchedulerImpl executionScheduler =
+ spy(new ExecutionSchedulerImpl(configuration));
Properties actualProperties = executionScheduler
.getQuartzSchedulerProperties();
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigGroupTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigGroupTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigGroupTest.java
index ac63b22..71ed464 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigGroupTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/ConfigGroupTest.java
@@ -58,7 +58,7 @@ public class ConfigGroupTest {
@Before
public void setup() throws Exception {
- injector = Guice.createInjector(new InMemoryDefaultTestModule());
+ injector = Guice.createInjector(new InMemoryDefaultTestModule());
injector.getInstance(GuiceJpaInitializer.class);
clusters = injector.getInstance(Clusters.class);
metaInfo = injector.getInstance(AmbariMetaInfo.class);
http://git-wip-us.apache.org/repos/asf/ambari/blob/5da00398/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
index aba7e70..c2e3243 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/RequestExecutionTest.java
@@ -146,8 +146,8 @@ public class RequestExecutionTest {
}
Assert.assertNotNull(reqEntity1);
Assert.assertNotNull(reqEntity2);
- Assert.assertEquals(Long.valueOf(1L), reqEntity1.getBatchId());
- Assert.assertEquals(Long.valueOf(2L), reqEntity2.getBatchId());
+ Assert.assertEquals(Long.valueOf(10L), reqEntity1.getBatchId());
+ Assert.assertEquals(Long.valueOf(12L), reqEntity2.getBatchId());
Assert.assertEquals(BatchRequest.Type.DELETE.name(), reqEntity1.getRequestType());
Assert.assertEquals(BatchRequest.Type.POST.name(), reqEntity2.getRequestType());
Assert.assertEquals(requestExecution.getSchedule().getMinutes(),
@@ -213,8 +213,8 @@ public class RequestExecutionTest {
}
Assert.assertNotNull(reqEntity1);
Assert.assertNotNull(reqEntity2);
- Assert.assertEquals(Long.valueOf(1L), reqEntity1.getBatchId());
- Assert.assertEquals(Long.valueOf(2L), reqEntity2.getBatchId());
+ Assert.assertEquals(Long.valueOf(10L), reqEntity1.getBatchId());
+ Assert.assertEquals(Long.valueOf(12L), reqEntity2.getBatchId());
Assert.assertEquals(BatchRequest.Type.PUT.name(), reqEntity1.getRequestType());
Assert.assertEquals(BatchRequest.Type.POST.name(), reqEntity2.getRequestType());
Assert.assertEquals("11", scheduleEntity.getHours());
@@ -250,8 +250,8 @@ public class RequestExecutionTest {
}
Assert.assertNotNull(reqEntity1);
Assert.assertNotNull(reqEntity2);
- Assert.assertEquals(Long.valueOf(1L), reqEntity1.getBatchId());
- Assert.assertEquals(Long.valueOf(2L), reqEntity2.getBatchId());
+ Assert.assertEquals(Long.valueOf(10L), reqEntity1.getBatchId());
+ Assert.assertEquals(Long.valueOf(12L), reqEntity2.getBatchId());
Assert.assertEquals(BatchRequest.Type.DELETE.name(), reqEntity1.getRequestType());
Assert.assertEquals(BatchRequest.Type.POST.name(), reqEntity2.getRequestType());
Assert.assertEquals(requestExecution.getSchedule().getMinutes(),
[2/2] git commit: AMBARI-4150. Provide ability to batch requests
based on schedule. Review patch. (swagle)
Posted by sw...@apache.org.
AMBARI-4150. Provide ability to batch requests based on schedule. Review patch. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1c7e66da
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1c7e66da
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1c7e66da
Branch: refs/heads/trunk
Commit: 1c7e66dadf2630b165c98d92f7c37613ed712775
Parents: 5da0039
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Jan 7 10:56:31 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Jan 7 10:56:31 2014 -0800
----------------------------------------------------------------------
.../main/resources/Ambari-DDL-MySQL-CREATE.sql | 341 +++++++++----------
1 file changed, 170 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/1c7e66da/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
index 11b14c8..97a17ea 100644
--- a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
+++ b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql
@@ -107,6 +107,176 @@ insert into user_roles(role_name, user_id)
insert into metainfo(`metainfo_key`, `metainfo_value`)
select 'version','${ambariVersion}';
+-- Quartz tables
+
+CREATE TABLE QRTZ_JOB_DETAILS
+(
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ JOB_NAME VARCHAR(200) NOT NULL,
+ JOB_GROUP VARCHAR(200) NOT NULL,
+ DESCRIPTION VARCHAR(250) NULL,
+ JOB_CLASS_NAME VARCHAR(250) NOT NULL,
+ IS_DURABLE VARCHAR(1) NOT NULL,
+ IS_NONCONCURRENT VARCHAR(1) NOT NULL,
+ IS_UPDATE_DATA VARCHAR(1) NOT NULL,
+ REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
+ JOB_DATA BLOB NULL,
+ PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
+);
+
+CREATE TABLE QRTZ_TRIGGERS
+(
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ JOB_NAME VARCHAR(200) NOT NULL,
+ JOB_GROUP VARCHAR(200) NOT NULL,
+ DESCRIPTION VARCHAR(250) NULL,
+ NEXT_FIRE_TIME BIGINT(13) NULL,
+ PREV_FIRE_TIME BIGINT(13) NULL,
+ PRIORITY INTEGER NULL,
+ TRIGGER_STATE VARCHAR(16) NOT NULL,
+ TRIGGER_TYPE VARCHAR(8) NOT NULL,
+ START_TIME BIGINT(13) NOT NULL,
+ END_TIME BIGINT(13) NULL,
+ CALENDAR_NAME VARCHAR(200) NULL,
+ MISFIRE_INSTR SMALLINT(2) NULL,
+ JOB_DATA BLOB NULL,
+ PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+ FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
+ REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
+);
+
+CREATE TABLE QRTZ_SIMPLE_TRIGGERS
+(
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ REPEAT_COUNT BIGINT(7) NOT NULL,
+ REPEAT_INTERVAL BIGINT(12) NOT NULL,
+ TIMES_TRIGGERED BIGINT(10) NOT NULL,
+ PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+ FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+ REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_CRON_TRIGGERS
+(
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ CRON_EXPRESSION VARCHAR(200) NOT NULL,
+ TIME_ZONE_ID VARCHAR(80),
+ PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+ FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+ REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_SIMPROP_TRIGGERS
+(
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ STR_PROP_1 VARCHAR(512) NULL,
+ STR_PROP_2 VARCHAR(512) NULL,
+ STR_PROP_3 VARCHAR(512) NULL,
+ INT_PROP_1 INT NULL,
+ INT_PROP_2 INT NULL,
+ LONG_PROP_1 BIGINT NULL,
+ LONG_PROP_2 BIGINT NULL,
+ DEC_PROP_1 NUMERIC(13,4) NULL,
+ DEC_PROP_2 NUMERIC(13,4) NULL,
+ BOOL_PROP_1 VARCHAR(1) NULL,
+ BOOL_PROP_2 VARCHAR(1) NULL,
+ PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+ FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+ REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_BLOB_TRIGGERS
+(
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ BLOB_DATA BLOB NULL,
+ PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
+ FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+ REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_CALENDARS
+(
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ CALENDAR_NAME VARCHAR(200) NOT NULL,
+ CALENDAR BLOB NOT NULL,
+ PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
+);
+
+CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS
+(
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
+);
+
+CREATE TABLE QRTZ_FIRED_TRIGGERS
+(
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ ENTRY_ID VARCHAR(95) NOT NULL,
+ TRIGGER_NAME VARCHAR(200) NOT NULL,
+ TRIGGER_GROUP VARCHAR(200) NOT NULL,
+ INSTANCE_NAME VARCHAR(200) NOT NULL,
+ FIRED_TIME BIGINT(13) NOT NULL,
+ SCHED_TIME BIGINT(13) NOT NULL,
+ PRIORITY INTEGER NOT NULL,
+ STATE VARCHAR(16) NOT NULL,
+ JOB_NAME VARCHAR(200) NULL,
+ JOB_GROUP VARCHAR(200) NULL,
+ IS_NONCONCURRENT VARCHAR(1) NULL,
+ REQUESTS_RECOVERY VARCHAR(1) NULL,
+ PRIMARY KEY (SCHED_NAME,ENTRY_ID)
+);
+
+CREATE TABLE QRTZ_SCHEDULER_STATE
+(
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ INSTANCE_NAME VARCHAR(200) NOT NULL,
+ LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
+ CHECKIN_INTERVAL BIGINT(13) NOT NULL,
+ PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
+);
+
+CREATE TABLE QRTZ_LOCKS
+(
+ SCHED_NAME VARCHAR(120) NOT NULL,
+ LOCK_NAME VARCHAR(40) NOT NULL,
+ PRIMARY KEY (SCHED_NAME,LOCK_NAME)
+);
+
+create index idx_qrtz_j_req_recovery on QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
+create index idx_qrtz_j_grp on QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);
+
+create index idx_qrtz_t_j on QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
+create index idx_qrtz_t_jg on QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
+create index idx_qrtz_t_c on QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
+create index idx_qrtz_t_g on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
+create index idx_qrtz_t_state on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
+create index idx_qrtz_t_n_state on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
+create index idx_qrtz_t_n_g_state on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
+create index idx_qrtz_t_next_fire_time on QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
+create index idx_qrtz_t_nft_st on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
+create index idx_qrtz_t_nft_misfire on QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
+create index idx_qrtz_t_nft_st_misfire on QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
+create index idx_qrtz_t_nft_st_misfire_grp on QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);
+
+create index idx_qrtz_ft_trig_inst_name on QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
+create index idx_qrtz_ft_inst_job_req_rcvry on QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
+create index idx_qrtz_ft_j_g on QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
+create index idx_qrtz_ft_jg on QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
+create index idx_qrtz_ft_t_g on QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
+create index idx_qrtz_ft_tg on QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
+
+commit;
CREATE TABLE workflow (
@@ -183,177 +353,6 @@ CREATE TABLE clusterEvent (
host TEXT, rack TEXT
);
--- Quartz tables
-
-CREATE TABLE QRTZ_JOB_DETAILS
- (
- SCHED_NAME VARCHAR(120) NOT NULL,
- JOB_NAME VARCHAR(200) NOT NULL,
- JOB_GROUP VARCHAR(200) NOT NULL,
- DESCRIPTION VARCHAR(250) NULL,
- JOB_CLASS_NAME VARCHAR(250) NOT NULL,
- IS_DURABLE VARCHAR(1) NOT NULL,
- IS_NONCONCURRENT VARCHAR(1) NOT NULL,
- IS_UPDATE_DATA VARCHAR(1) NOT NULL,
- REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
- JOB_DATA BLOB NULL,
- PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
-);
-
-CREATE TABLE QRTZ_TRIGGERS
- (
- SCHED_NAME VARCHAR(120) NOT NULL,
- TRIGGER_NAME VARCHAR(200) NOT NULL,
- TRIGGER_GROUP VARCHAR(200) NOT NULL,
- JOB_NAME VARCHAR(200) NOT NULL,
- JOB_GROUP VARCHAR(200) NOT NULL,
- DESCRIPTION VARCHAR(250) NULL,
- NEXT_FIRE_TIME BIGINT(13) NULL,
- PREV_FIRE_TIME BIGINT(13) NULL,
- PRIORITY INTEGER NULL,
- TRIGGER_STATE VARCHAR(16) NOT NULL,
- TRIGGER_TYPE VARCHAR(8) NOT NULL,
- START_TIME BIGINT(13) NOT NULL,
- END_TIME BIGINT(13) NULL,
- CALENDAR_NAME VARCHAR(200) NULL,
- MISFIRE_INSTR SMALLINT(2) NULL,
- JOB_DATA BLOB NULL,
- PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
- FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
- REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
-);
-
-CREATE TABLE QRTZ_SIMPLE_TRIGGERS
- (
- SCHED_NAME VARCHAR(120) NOT NULL,
- TRIGGER_NAME VARCHAR(200) NOT NULL,
- TRIGGER_GROUP VARCHAR(200) NOT NULL,
- REPEAT_COUNT BIGINT(7) NOT NULL,
- REPEAT_INTERVAL BIGINT(12) NOT NULL,
- TIMES_TRIGGERED BIGINT(10) NOT NULL,
- PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
- FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
- REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
-);
-
-CREATE TABLE QRTZ_CRON_TRIGGERS
- (
- SCHED_NAME VARCHAR(120) NOT NULL,
- TRIGGER_NAME VARCHAR(200) NOT NULL,
- TRIGGER_GROUP VARCHAR(200) NOT NULL,
- CRON_EXPRESSION VARCHAR(200) NOT NULL,
- TIME_ZONE_ID VARCHAR(80),
- PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
- FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
- REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
-);
-
-CREATE TABLE QRTZ_SIMPROP_TRIGGERS
- (
- SCHED_NAME VARCHAR(120) NOT NULL,
- TRIGGER_NAME VARCHAR(200) NOT NULL,
- TRIGGER_GROUP VARCHAR(200) NOT NULL,
- STR_PROP_1 VARCHAR(512) NULL,
- STR_PROP_2 VARCHAR(512) NULL,
- STR_PROP_3 VARCHAR(512) NULL,
- INT_PROP_1 INT NULL,
- INT_PROP_2 INT NULL,
- LONG_PROP_1 BIGINT NULL,
- LONG_PROP_2 BIGINT NULL,
- DEC_PROP_1 NUMERIC(13,4) NULL,
- DEC_PROP_2 NUMERIC(13,4) NULL,
- BOOL_PROP_1 VARCHAR(1) NULL,
- BOOL_PROP_2 VARCHAR(1) NULL,
- PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
- FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
- REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
-);
-
-CREATE TABLE QRTZ_BLOB_TRIGGERS
- (
- SCHED_NAME VARCHAR(120) NOT NULL,
- TRIGGER_NAME VARCHAR(200) NOT NULL,
- TRIGGER_GROUP VARCHAR(200) NOT NULL,
- BLOB_DATA BLOB NULL,
- PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
- FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
- REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
-);
-
-CREATE TABLE QRTZ_CALENDARS
- (
- SCHED_NAME VARCHAR(120) NOT NULL,
- CALENDAR_NAME VARCHAR(200) NOT NULL,
- CALENDAR BLOB NOT NULL,
- PRIMARY KEY (SCHED_NAME,CALENDAR_NAME)
-);
-
-CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS
- (
- SCHED_NAME VARCHAR(120) NOT NULL,
- TRIGGER_GROUP VARCHAR(200) NOT NULL,
- PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
-);
-
-CREATE TABLE QRTZ_FIRED_TRIGGERS
- (
- SCHED_NAME VARCHAR(120) NOT NULL,
- ENTRY_ID VARCHAR(95) NOT NULL,
- TRIGGER_NAME VARCHAR(200) NOT NULL,
- TRIGGER_GROUP VARCHAR(200) NOT NULL,
- INSTANCE_NAME VARCHAR(200) NOT NULL,
- FIRED_TIME BIGINT(13) NOT NULL,
- SCHED_TIME BIGINT(13) NOT NULL,
- PRIORITY INTEGER NOT NULL,
- STATE VARCHAR(16) NOT NULL,
- JOB_NAME VARCHAR(200) NULL,
- JOB_GROUP VARCHAR(200) NULL,
- IS_NONCONCURRENT VARCHAR(1) NULL,
- REQUESTS_RECOVERY VARCHAR(1) NULL,
- PRIMARY KEY (SCHED_NAME,ENTRY_ID)
-);
-
-CREATE TABLE QRTZ_SCHEDULER_STATE
- (
- SCHED_NAME VARCHAR(120) NOT NULL,
- INSTANCE_NAME VARCHAR(200) NOT NULL,
- LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
- CHECKIN_INTERVAL BIGINT(13) NOT NULL,
- PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
-);
-
-CREATE TABLE QRTZ_LOCKS
- (
- SCHED_NAME VARCHAR(120) NOT NULL,
- LOCK_NAME VARCHAR(40) NOT NULL,
- PRIMARY KEY (SCHED_NAME,LOCK_NAME)
-);
-
-create index idx_qrtz_j_req_recovery on QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
-create index idx_qrtz_j_grp on QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);
-
-create index idx_qrtz_t_j on QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
-create index idx_qrtz_t_jg on QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
-create index idx_qrtz_t_c on QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
-create index idx_qrtz_t_g on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
-create index idx_qrtz_t_state on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
-create index idx_qrtz_t_n_state on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
-create index idx_qrtz_t_n_g_state on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
-create index idx_qrtz_t_next_fire_time on QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
-create index idx_qrtz_t_nft_st on QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
-create index idx_qrtz_t_nft_misfire on QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
-create index idx_qrtz_t_nft_st_misfire on QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
-create index idx_qrtz_t_nft_st_misfire_grp on QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);
-
-create index idx_qrtz_ft_trig_inst_name on QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
-create index idx_qrtz_ft_inst_job_req_rcvry on QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
-create index idx_qrtz_ft_j_g on QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
-create index idx_qrtz_ft_jg on QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
-create index idx_qrtz_ft_t_g on QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
-create index idx_qrtz_ft_tg on QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
-
-commit;
-