You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2013/12/23 23:13:17 UTC

[2/3] AMBARI-4034. Create the RequestSchedule resource provider. Patch 1. (swagle)

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleBatchRequestEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleBatchRequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleBatchRequestEntity.java
new file mode 100644
index 0000000..48a4db9
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleBatchRequestEntity.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.orm.entities;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.IdClass;
+import javax.persistence.JoinColumn;
+import javax.persistence.JoinColumns;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+
+@IdClass(RequestScheduleBatchRequestEntityPK.class)
+@Entity
+@Table(name = "requestschedulebatchrequest")
+@NamedQueries({
+  @NamedQuery(name = "findByScheduleId", query = "SELECT batchreqs FROM " +
+    "RequestScheduleBatchRequestEntity  batchreqs WHERE batchreqs.scheduleId=:id")
+})
+public class RequestScheduleBatchRequestEntity {
+  @Id
+  @Column(name = "schedule_id", nullable = false, insertable = true, updatable = true)
+  private Long scheduleId;
+
+  @Id
+  @Column(name = "batch_id", nullable = false, insertable = true, updatable = true)
+  private Long batchId;
+
+  @Column(name = "request_id")
+  private Long requestId;
+
+  @Column(name = "request_type")
+  private String requestType;
+
+  @Column(name = "request_uri")
+  private String requestUri;
+
+  @Column(name = "request_body")
+  private String requestBody;
+
+  @Column(name = "request_status")
+  private String requestStatus;
+
+  @Column(name = "return_code")
+  private Integer returnCode;
+
+  @Column(name = "return_message")
+  private String returnMessage;
+
+  @ManyToOne
+  @JoinColumns({
+    @JoinColumn(name = "schedule_id", referencedColumnName = "schedule_id", nullable = false, insertable = false, updatable = false) })
+  private RequestScheduleEntity requestScheduleEntity;
+
+  public Long getScheduleId() {
+    return scheduleId;
+  }
+
+  public void setScheduleId(Long scheduleId) {
+    this.scheduleId = scheduleId;
+  }
+
+  public Long getBatchId() {
+    return batchId;
+  }
+
+  public void setBatchId(Long batchId) {
+    this.batchId = batchId;
+  }
+
+  public Long getRequestId() {
+    return requestId;
+  }
+
+  public void setRequestId(Long requestId) {
+    this.requestId = requestId;
+  }
+
+  public String getRequestType() {
+    return requestType;
+  }
+
+  public void setRequestType(String requestType) {
+    this.requestType = requestType;
+  }
+
+  public String getRequestUri() {
+    return requestUri;
+  }
+
+  public void setRequestUri(String requestUri) {
+    this.requestUri = requestUri;
+  }
+
+  public String getRequestBody() {
+    return requestBody;
+  }
+
+  public void setRequestBody(String requestBody) {
+    this.requestBody = requestBody;
+  }
+
+  public String getRequestStatus() {
+    return requestStatus;
+  }
+
+  public void setRequestStatus(String requestStatus) {
+    this.requestStatus = requestStatus;
+  }
+
+  public Integer getReturnCode() {
+    return returnCode;
+  }
+
+  public void setReturnCode(Integer returnCode) {
+    this.returnCode = returnCode;
+  }
+
+  public String getReturnMessage() {
+    return returnMessage;
+  }
+
+  public void setReturnMessage(String returnMessage) {
+    this.returnMessage = returnMessage;
+  }
+
+  public RequestScheduleEntity getRequestScheduleEntity() {
+    return requestScheduleEntity;
+  }
+
+  public void setRequestScheduleEntity(RequestScheduleEntity requestScheduleEntity) {
+    this.requestScheduleEntity = requestScheduleEntity;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    RequestScheduleBatchRequestEntity that = (RequestScheduleBatchRequestEntity) o;
+
+    if (!batchId.equals(that.batchId)) return false;
+    if (!scheduleId.equals(that.scheduleId)) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = scheduleId.hashCode();
+    result = 31 * result + batchId.hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleBatchRequestEntityPK.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleBatchRequestEntityPK.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleBatchRequestEntityPK.java
new file mode 100644
index 0000000..43ee542
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleBatchRequestEntityPK.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.orm.entities;
+
+import javax.persistence.Column;
+import javax.persistence.Id;
+import java.io.Serializable;
+
+public class RequestScheduleBatchRequestEntityPK implements Serializable {
+  private Long scheduleId;
+  private Long batchId;
+
+  @Id
+  @Column(name = "schedule_id", nullable = false, insertable = true, updatable = true)
+  public Long getScheduleId() {
+    return scheduleId;
+  }
+
+  public void setScheduleId(Long scheduleId) {
+    this.scheduleId = scheduleId;
+  }
+
+  @Id
+  @Column(name = "batch_id", nullable = false, insertable = true, updatable = true)
+  public Long getBatchId() {
+    return batchId;
+  }
+
+  public void setBatchId(Long batchId) {
+    this.batchId = batchId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    RequestScheduleBatchRequestEntityPK that = (RequestScheduleBatchRequestEntityPK) o;
+
+    if (!batchId.equals(that.batchId)) return false;
+    if (!scheduleId.equals(that.scheduleId)) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = scheduleId.hashCode();
+    result = 31 * result + batchId.hashCode();
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleEntity.java
index a54c22c..34bbe2b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleEntity.java
@@ -50,30 +50,12 @@ public class RequestScheduleEntity {
   @Column(name = "cluster_id", insertable = false, updatable = false, nullable = false)
   private Long clusterId;
 
-  @Column(name = "request_context")
-  private String requestContext;
+  @Column(name = "description")
+  private String description;
 
   @Column(name = "status")
   private String status;
 
-  @Column(name = "target_type")
-  private String targetType;
-
-  @Column(name = "target_name")
-  private String targetName;
-
-  @Column(name = "target_service")
-  private String targetService;
-
-  @Column(name = "target_component")
-  private String targetComponent;
-
-  @Column(name = "batch_requests_by_host")
-  private boolean batchRequestByHost;
-
-  @Column(name = "batch_host_count")
-  private Integer batchHostCount;
-
   @Column(name = "batch_separation_minutes")
   private Integer batchSeparationInMinutes;
 
@@ -111,10 +93,10 @@ public class RequestScheduleEntity {
   private String year;
 
   @Column(name = "starttime")
-  private Long startTime;
+  private String startTime;
 
   @Column(name = "endtime")
-  private Long endTime;
+  private String endTime;
 
   @Column(name = "last_execution_status")
   private String lastExecutionStatus;
@@ -124,7 +106,8 @@ public class RequestScheduleEntity {
   private ClusterEntity clusterEntity;
 
   @OneToMany(mappedBy = "requestScheduleEntity", cascade = CascadeType.ALL)
-  private Collection<RequestScheduleBatchHostEntity> requestScheduleBatchHostEntities;
+  private Collection<RequestScheduleBatchRequestEntity>
+    requestScheduleBatchRequestEntities;
 
   public long getScheduleId() {
     return scheduleId;
@@ -142,12 +125,12 @@ public class RequestScheduleEntity {
     this.clusterId = clusterId;
   }
 
-  public String getRequestContext() {
-    return requestContext;
+  public String getDescription() {
+    return description;
   }
 
-  public void setRequestContext(String request_context) {
-    this.requestContext = request_context;
+  public void setDescription(String description) {
+    this.description = description;
   }
 
   public String getStatus() {
@@ -158,54 +141,6 @@ public class RequestScheduleEntity {
     this.status = status;
   }
 
-  public String getTargetType() {
-    return targetType;
-  }
-
-  public void setTargetType(String targetType) {
-    this.targetType = targetType;
-  }
-
-  public String getTargetName() {
-    return targetName;
-  }
-
-  public void setTargetName(String targetName) {
-    this.targetName = targetName;
-  }
-
-  public String getTargetService() {
-    return targetService;
-  }
-
-  public void setTargetService(String targetService) {
-    this.targetService = targetService;
-  }
-
-  public String getTargetComponent() {
-    return targetComponent;
-  }
-
-  public void setTargetComponent(String targetComponent) {
-    this.targetComponent = targetComponent;
-  }
-
-  public boolean getIsBatchRequestByHost() {
-    return batchRequestByHost;
-  }
-
-  public void setBatchRequestByHost(boolean batchRequestByHost) {
-    this.batchRequestByHost = batchRequestByHost;
-  }
-
-  public Integer getBatchHostCount() {
-    return batchHostCount;
-  }
-
-  public void setBatchHostCount(Integer batchHostCount) {
-    this.batchHostCount = batchHostCount;
-  }
-
   public Integer getBatchSeparationInMinutes() {
     return batchSeparationInMinutes;
   }
@@ -302,19 +237,19 @@ public class RequestScheduleEntity {
     this.year = year;
   }
 
-  public Long getStartTime() {
+  public String getStartTime() {
     return startTime;
   }
 
-  public void setStartTime(Long startTime) {
+  public void setStartTime(String startTime) {
     this.startTime = startTime;
   }
 
-  public Long getEndTime() {
+  public String getEndTime() {
     return endTime;
   }
 
-  public void setEndTime(Long endTime) {
+  public void setEndTime(String endTime) {
     this.endTime = endTime;
   }
 
@@ -334,12 +269,13 @@ public class RequestScheduleEntity {
     this.clusterEntity = clusterEntity;
   }
 
-  public Collection<RequestScheduleBatchHostEntity> getRequestScheduleBatchHostEntities() {
-    return requestScheduleBatchHostEntities;
+  public Collection<RequestScheduleBatchRequestEntity> getRequestScheduleBatchRequestEntities() {
+    return requestScheduleBatchRequestEntities;
   }
 
-  public void setRequestScheduleBatchHostEntities(Collection<RequestScheduleBatchHostEntity> requestScheduleBatchHostEntities) {
-    this.requestScheduleBatchHostEntities = requestScheduleBatchHostEntities;
+  public void setRequestScheduleBatchRequestEntities(
+    Collection<RequestScheduleBatchRequestEntity> requestScheduleBatchRequestEntities) {
+    this.requestScheduleBatchRequestEntities = requestScheduleBatchRequestEntities;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
new file mode 100644
index 0000000..37d6752
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.scheduler;
+
+import org.apache.ambari.server.AmbariException;
+import org.quartz.DateBuilder;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.JobKey;
+import org.quartz.PersistJobDataAfterExecution;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.quartz.DateBuilder.futureDate;
+import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
+import static org.quartz.TriggerBuilder.newTrigger;
+
+/**
+ * Job that knows how to get the job name and group out of the JobDataMap using
+ * pre-defined keys (constants) and contains code to schedule the identified job.
+ * This abstract Job's implementation of execute() delegates to an abstract
+ * template method "doWork()" (where the extending Job class's real work goes)
+ * and then it schedules the follow-up job.
+ */
+@PersistJobDataAfterExecution
+@DisallowConcurrentExecution
+public abstract class AbstractLinearExecutionJob implements ExecutionJob {
+  private ExecutionScheduleManager executionScheduleManager;
+  private static Logger LOG = LoggerFactory.getLogger(AbstractLinearExecutionJob.class);
+
+  public AbstractLinearExecutionJob(ExecutionScheduleManager executionScheduleManager) {
+    this.executionScheduleManager = executionScheduleManager;
+  }
+
+  /**
+   * Do the actual work of the fired job.
+   * @throws AmbariException
+   */
+  protected abstract void doWork() throws AmbariException;
+
+  /**
+   * Get the next job id from context and create a trigger to fire the next
+   * job.
+   * @param context
+   * @throws JobExecutionException
+   */
+  @Override
+  public void execute(JobExecutionContext context) throws JobExecutionException {
+    JobKey jobKey = context.getJobDetail().getKey();
+    LOG.debug("Executing linear job: " + jobKey);
+
+    if (!executionScheduleManager.continueOnMisfire(context)) {
+      throw new JobExecutionException("Canceled execution based on misfire"
+        + " toleration threshold, job: " + jobKey
+        + ", scheduleTime = " + context.getScheduledFireTime());
+    }
+
+    // Perform work and exit if failure reported
+    try {
+      doWork();
+    } catch (AmbariException e) {
+      LOG.error("Exception caught on job execution. Exiting linear chain...", e);
+      throw new JobExecutionException(e);
+    }
+
+    JobDataMap jobDataMap = context.getMergedJobDataMap();
+    String nextJobName = jobDataMap.getString(NEXT_EXECUTION_JOB_NAME_KEY);
+    String nextJobGroup = jobDataMap.getString(NEXT_EXECUTION_JOB_GROUP_KEY);
+    Integer separationMinutes = jobDataMap.getIntegerFromString(
+      (NEXT_EXECUTION_SEPARATION_MINUTES));
+
+    if (separationMinutes == null) {
+      separationMinutes = 0;
+    }
+
+    // Create trigger for next job execution
+    Trigger trigger = newTrigger()
+      .forJob(nextJobName, nextJobGroup)
+      .withIdentity("TriggerForJob-" + nextJobName, LINEAR_EXECUTION_TRIGGER_GROUP)
+      .withSchedule(simpleSchedule().withMisfireHandlingInstructionFireNow())
+      .startAt(futureDate(separationMinutes, DateBuilder.IntervalUnit.MINUTE))
+      .build();
+
+    executionScheduleManager.scheduleJob(trigger);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionJob.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionJob.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionJob.java
new file mode 100644
index 0000000..264cda4
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionJob.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.scheduler;
+
+import org.quartz.Job;
+
+/**
+ * Type of Quartz Job that can be executed by the @ExecutionScheduleManager
+ */
+public interface ExecutionJob extends Job {
+  public static final String NEXT_EXECUTION_JOB_NAME_KEY = "ExecutionJob.Name";
+  public static final String NEXT_EXECUTION_JOB_GROUP_KEY = "ExecutionJob.Group";
+  public static final String NEXT_EXECUTION_SEPARATION_MINUTES =
+    "ExecutionJob.SeparationMinutes";
+  public static final String LINEAR_EXECUTION_JOB_GROUP =
+    "LinearExecutionJobs";
+  public static final String LINEAR_EXECUTION_TRIGGER_GROUP =
+    "LinearExecutionTriggers";
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
new file mode 100644
index 0000000..443a7e2
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.scheduler;
+
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Singleton;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.utils.DateUtils;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+
+/**
+ * This class handles scheduling request execution for managed clusters
+ */
+@Singleton
+public class ExecutionScheduleManager {
+  private static final Logger LOG = LoggerFactory.getLogger
+    (ExecutionScheduleManager.class);
+  @Inject
+  private ExecutionScheduler executionScheduler;
+  @Inject
+  private Configuration configuration;
+
+  private volatile boolean schedulerAvailable = false;
+
+  @Inject
+  public ExecutionScheduleManager(Injector injector) {
+    injector.injectMembers(this);
+  }
+
+  public void start() {
+    LOG.info("Starting scheduler");
+    try {
+      executionScheduler.startScheduler();
+      schedulerAvailable = true;
+    } catch (AmbariException e) {
+      LOG.warn("Unable to start scheduler. No recurring tasks will be " +
+        "scheduled.");
+    }
+  }
+
+  public void stop() {
+    LOG.info("Stopping scheduler");
+    schedulerAvailable = false;
+    try {
+      executionScheduler.stopScheduler();
+    } catch (AmbariException e) {
+      LOG.warn("Unable to stop scheduler. No new recurring tasks will be " +
+        "scheduled.");
+    }
+  }
+
+  public boolean isSchedulerAvailable() {
+    return schedulerAvailable;
+  }
+
+  public void scheduleJob(Trigger trigger) {
+    LOG.debug("Scheduling job: " + trigger.getJobKey());
+    if (isSchedulerAvailable()) {
+      try {
+        executionScheduler.scheduleJob(trigger);
+      } catch (SchedulerException e) {
+        LOG.error("Unable to add trigger for execution job: " + trigger
+          .getJobKey(), e);
+      }
+    } else {
+      LOG.error("Scheduler unavailable, cannot schedule jobs.");
+    }
+  }
+
+  public boolean continueOnMisfire(JobExecutionContext jobExecutionContext) {
+    if (jobExecutionContext != null) {
+      Date scheduledTime = jobExecutionContext.getScheduledFireTime();
+      Long diff = DateUtils.getDateDifferenceInMinutes(scheduledTime);
+      return (diff < configuration.getExecutionSchedulerMisfireToleration());
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java
new file mode 100644
index 0000000..a18c91b
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduler.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.scheduler;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.state.scheduler.RequestExecution;
+import org.apache.ambari.server.state.scheduler.Schedule;
+import org.quartz.Job;
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+
+public interface ExecutionScheduler {
+  /**
+   * Initialize and start the scheduler to accept jobs.
+   * @throws AmbariException
+   */
+  public void startScheduler() throws AmbariException;
+
+  /**
+   * Shutdown the scheduler threads and do not accept any more jobs.
+   * @throws AmbariException
+   */
+  public void stopScheduler() throws AmbariException;
+
+  /**
+   * Create a job based on the @RequestExecution and add a trigger for the
+   * created job based on the @Schedule. Schedule the job with the scheduler.
+   * @param requestExecution
+   * @param schedule
+   * @throws AmbariException
+   */
+  public void scheduleJob(RequestExecution requestExecution,
+                          Schedule schedule) throws AmbariException;
+
+  public void scheduleJob(Trigger trigger) throws SchedulerException;
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java
new file mode 100644
index 0000000..2edfce7
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionSchedulerImpl.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.scheduler;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.state.scheduler.RequestExecution;
+import org.apache.ambari.server.state.scheduler.Schedule;
+import org.quartz.Job;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+@Singleton
+public class ExecutionSchedulerImpl implements ExecutionScheduler {
+  @Inject
+  private Configuration configuration;
+  private Scheduler scheduler;
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionSchedulerImpl.class);
+  protected static final String DEFAULT_SCHEDULER_NAME = "ExecutionScheduler";
+  private static volatile boolean isInitialized = false;
+
+  @Inject
+  public ExecutionSchedulerImpl(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  protected synchronized void initializeScheduler() {
+    StdSchedulerFactory sf = new StdSchedulerFactory();
+    Properties properties = getQuartzSchedulerProperties();
+    try {
+      sf.initialize(properties);
+    } catch (SchedulerException e) {
+      LOG.warn("Failed to initialize Request Execution Scheduler properties !");
+      LOG.debug("Scheduler properties: \n" + properties);
+      e.printStackTrace();
+      return;
+    }
+    try {
+      scheduler = sf.getScheduler();
+      isInitialized = true;
+    } catch (SchedulerException e) {
+      LOG.warn("Failed to create Request Execution scheduler !");
+      e.printStackTrace();
+    }
+  }
+
+  protected Properties getQuartzSchedulerProperties() {
+    Properties properties = new Properties();
+    properties.setProperty("org.quartz.scheduler.instanceName", DEFAULT_SCHEDULER_NAME);
+    properties.setProperty("org.quartz.scheduler.instanceId", "AUTO");
+    properties.setProperty("org.quartz.threadPool.class",
+      "org.quartz.simpl.SimpleThreadPool");
+    properties.setProperty("org.quartz.threadPool.threadCount",
+      configuration.getExecutionSchedulerThreads());
+
+    // Job Store Configuration
+    properties.setProperty("org.quartz.jobStore.class",
+      "org.quartz.impl.jdbcjobstore.JobStoreTX");
+    properties.setProperty("org.quartz.jobStore.isClustered",
+      configuration.isExecutionSchedulerClusterd());
+
+    String dbType = configuration.getServerDBName();
+    String dbDelegate = "org.quartz.impl.jdbcjobstore.StdJDBCDelegate";
+    String dbValidate = "select 0";
+
+    if (dbType.equals(Configuration.SERVER_DB_NAME_DEFAULT)) {
+      dbDelegate = "org.quartz.impl.jdbcjobstore.PostgreSQLDelegate";
+    } else if (dbType.equals(Configuration.ORACLE_DB_NAME)) {
+      dbDelegate = "org.quartz.impl.jdbcjobstore.oracle.OracleDelegate";
+      dbValidate = "select 0 from dual";
+    }
+    properties.setProperty("org.quartz.jobStore.driverDelegateClass", dbDelegate);
+    // Allow only strings in the jobDataMap which is serialized
+    properties.setProperty("org.quartz.jobStore.useProperties", "false");
+
+    // Data store configuration
+    properties.setProperty("org.quartz.jobStore.dataSource", "myDS");
+    properties.setProperty("org.quartz.dataSource.myDS.driver",
+      configuration.getDatabaseDriver());
+    properties.setProperty("org.quartz.dataSource.myDS.URL",
+      configuration.getDatabaseUrl());
+    properties.setProperty("org.quartz.dataSource.myDS.user",
+      configuration.getDatabaseUser());
+    properties.setProperty("org.quartz.dataSource.myDS.password",
+      configuration.getDatabasePassword());
+    properties.setProperty("org.quartz.dataSource.myDS.maxConnections",
+      configuration.getExecutionSchedulerConnections());
+    properties.setProperty("org.quartz.dataSource.myDS.validationQuery",
+      dbValidate);
+
+    // Skip update check
+    properties.setProperty("org.quartz.scheduler.skipUpdateCheck", "true");
+
+    return properties;
+  }
+
+  protected synchronized boolean isInitialized() {
+    return isInitialized;
+  }
+
+  @Override
+  public synchronized void startScheduler() throws AmbariException {
+    try {
+      if (!isInitialized) {
+        initializeScheduler();
+        isInitialized = true;
+      }
+    } catch (Exception e) {
+      String msg = "Unable to initialize Request Execution scheduler !";
+      LOG.warn(msg);
+      e.printStackTrace();
+      throw new AmbariException(msg);
+    }
+    try {
+      scheduler.start();
+    } catch (SchedulerException e) {
+      LOG.error("Failed to start scheduler", e);
+      throw new AmbariException(e.getMessage());
+    }
+  }
+
+  @Override
+  public synchronized void stopScheduler() throws AmbariException {
+    if (scheduler == null) {
+      throw new AmbariException("Scheduler not instantiated !");
+    }
+    try {
+      scheduler.shutdown();
+    } catch (SchedulerException e) {
+      LOG.error("Failed to stop scheduler", e);
+      throw new AmbariException(e.getMessage());
+    }
+  }
+
+  @Override
+  public void scheduleJob(RequestExecution requestExecution, Schedule schedule)
+      throws AmbariException {
+
+  }
+
+  @Override
+  public void scheduleJob(Trigger trigger) throws SchedulerException {
+    scheduler.scheduleJob(trigger);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
index 9a88ae3..8955347 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
@@ -26,6 +26,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.controller.ClusterResponse;
 import org.apache.ambari.server.state.configgroup.ConfigGroup;
+import org.apache.ambari.server.state.scheduler.RequestExecution;
 
 public interface Cluster {
 
@@ -255,4 +256,24 @@ public interface Cluster {
    * @return Map of config group id to config group
    */
   public Map<Long, ConfigGroup> getConfigGroupsByHostname(String hostname) throws AmbariException;
+
+  /**
+   * Add a @RequestExecution to the cluster
+   * @param requestExecution
+   * @throws AmbariException
+   */
+  public void addRequestExecution(RequestExecution requestExecution) throws AmbariException;
+
+  /**
+   * Get all @RequestExecution objects associated with the cluster
+   * @return
+   */
+  public Map<Long, RequestExecution> getAllRequestExecutions();
+
+  /**
+   * Delete a @RequestExecution associated with the cluster
+   * @param id
+   * @throws AmbariException
+   */
+  public void deleteRequestExecution(Long id) throws AmbariException;
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index 03c3c95..0526a3d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -39,6 +39,7 @@ import org.apache.ambari.server.orm.entities.ClusterStateEntity;
 import org.apache.ambari.server.orm.entities.ConfigGroupEntity;
 import org.apache.ambari.server.orm.entities.ConfigGroupHostMappingEntity;
 import org.apache.ambari.server.orm.entities.HostConfigMappingEntity;
+import org.apache.ambari.server.orm.entities.RequestScheduleEntity;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
@@ -51,8 +52,11 @@ import org.apache.ambari.server.state.ServiceFactory;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.configgroup.ConfigGroup;
 import org.apache.ambari.server.state.configgroup.ConfigGroupFactory;
+import org.apache.ambari.server.state.scheduler.RequestExecution;
+import org.apache.ambari.server.state.scheduler.RequestExecutionFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import javax.persistence.RollbackException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -65,15 +69,15 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 public class ClusterImpl implements Cluster {
 
   private static final Logger LOG =
-      LoggerFactory.getLogger(ClusterImpl.class);
+    LoggerFactory.getLogger(ClusterImpl.class);
 
   @Inject
   private Clusters clusters;
@@ -86,24 +90,29 @@ public class ClusterImpl implements Cluster {
    * [ Config Type -> [ Config Version Tag -> Config ] ]
    */
   private Map<String, Map<String, Config>> allConfigs;
-  
+
   /**
    * [ ServiceName -> [ ServiceComponentName -> [ HostName -> [ ... ] ] ] ]
    */
   private Map<String, Map<String, Map<String, ServiceComponentHost>>>
-      serviceComponentHosts;
+    serviceComponentHosts;
 
   /**
    * [ HostName -> [ ... ] ]
    */
   private Map<String, List<ServiceComponentHost>>
-      serviceComponentHostsByHost;
+    serviceComponentHostsByHost;
 
   /**
    * Map of existing config groups
    */
   private Map<Long, ConfigGroup> clusterConfigGroups;
 
+  /**
+   * Map of Request schedules for this cluster
+   */
+  private Map<Long, RequestExecution> requestExecutions;
+
   private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
   private Lock readLock = readWriteLock.readLock();
   private Lock writeLock = readWriteLock.writeLock();
@@ -128,6 +137,8 @@ public class ClusterImpl implements Cluster {
   private ConfigGroupFactory configGroupFactory;
   @Inject
   private ConfigGroupHostMappingDAO configGroupHostMappingDAO;
+  @Inject
+  private RequestExecutionFactory requestExecutionFactory;
 
   private volatile boolean svcHostsLoaded = false;
 
@@ -138,11 +149,11 @@ public class ClusterImpl implements Cluster {
     this.clusterEntity = clusterEntity;
 
     this.serviceComponentHosts = new HashMap<String,
-        Map<String, Map<String, ServiceComponentHost>>>();
+      Map<String, Map<String, ServiceComponentHost>>>();
     this.serviceComponentHostsByHost = new HashMap<String,
-        List<ServiceComponentHost>>();
+      List<ServiceComponentHost>>();
     this.desiredStackVersion = gson.fromJson(
-        clusterEntity.getDesiredStackVersion(), StackId.class);
+      clusterEntity.getDesiredStackVersion(), StackId.class);
     allConfigs = new HashMap<String, Map<String, Config>>();
     if (!clusterEntity.getClusterConfigEntities().isEmpty()) {
       for (ClusterConfigEntity entity : clusterEntity.getClusterConfigEntities()) {
@@ -183,32 +194,32 @@ public class ClusterImpl implements Cluster {
             Service service = serviceKV.getValue();
             if (!serviceComponentHosts.containsKey(service.getName())) {
               serviceComponentHosts.put(service.getName(), new HashMap<String,
-                  Map<String, ServiceComponentHost>>());
+                Map<String, ServiceComponentHost>>());
             }
             for (Entry<String, ServiceComponent> svcComponent :
-                service.getServiceComponents().entrySet()) {
+              service.getServiceComponents().entrySet()) {
               ServiceComponent comp = svcComponent.getValue();
               String componentName = svcComponent.getKey();
               if (!serviceComponentHosts.get(service.getName()).containsKey(componentName)) {
                 serviceComponentHosts.get(service.getName()).put(componentName,
-                    new HashMap<String, ServiceComponentHost>());
+                  new HashMap<String, ServiceComponentHost>());
               }
               /** Get Service Host Components **/
               for (Entry<String, ServiceComponentHost> svchost :
-                  comp.getServiceComponentHosts().entrySet()) {
+                comp.getServiceComponentHosts().entrySet()) {
                 String hostname = svchost.getKey();
                 ServiceComponentHost svcHostComponent = svchost.getValue();
                 if (!serviceComponentHostsByHost.containsKey(hostname)) {
                   serviceComponentHostsByHost.put(hostname,
-                      new ArrayList<ServiceComponentHost>());
+                    new ArrayList<ServiceComponentHost>());
                 }
                 List<ServiceComponentHost> compList = serviceComponentHostsByHost.get(hostname);
                 compList.add(svcHostComponent);
 
                 if (!serviceComponentHosts.get(service.getName()).get(componentName)
-                    .containsKey(hostname)) {
+                  .containsKey(hostname)) {
                   serviceComponentHosts.get(service.getName()).get(componentName)
-                      .put(hostname, svcHostComponent);
+                    .put(hostname, svcHostComponent);
                 }
               }
             }
@@ -275,6 +286,31 @@ public class ClusterImpl implements Cluster {
     }
   }
 
+  private void loadRequestExecutions() {
+    if (requestExecutions == null) {
+      clusterGlobalLock.writeLock().lock();
+      try {
+        writeLock.lock();
+        try {
+          if (requestExecutions == null) {
+            requestExecutions = new HashMap<Long, RequestExecution>();
+            if (!clusterEntity.getRequestScheduleEntities().isEmpty()) {
+              for (RequestScheduleEntity scheduleEntity : clusterEntity
+                  .getRequestScheduleEntities()) {
+                requestExecutions.put(scheduleEntity.getScheduleId(),
+                  requestExecutionFactory.createExisting(this, scheduleEntity));
+              }
+            }
+          }
+        } finally {
+          writeLock.unlock();
+        }
+      } finally {
+        clusterGlobalLock.writeLock().unlock();
+      }
+    }
+  }
+
   @Override
   public void addConfigGroup(ConfigGroup configGroup) throws AmbariException {
     loadConfigGroups();
@@ -354,6 +390,77 @@ public class ClusterImpl implements Cluster {
   }
 
   @Override
+  public void addRequestExecution(RequestExecution requestExecution) throws AmbariException {
+    loadRequestExecutions();
+    clusterGlobalLock.writeLock().lock();
+    try {
+      writeLock.lock();
+      try {
+        LOG.info("Adding a new request schedule"
+            + ", clusterName = " + getClusterName()
+            + ", id = " + requestExecution.getId()
+            + ", description = " + requestExecution.getDescription());
+
+        if (requestExecutions.containsKey(requestExecution.getId())) {
+          LOG.debug("Request schedule already exists"
+            + ", clusterName = " + getClusterName()
+            + ", id = " + requestExecution.getId()
+            + ", description = " + requestExecution.getDescription());
+        } else {
+          requestExecutions.put(requestExecution.getId(), requestExecution);
+        }
+      } finally {
+        writeLock.unlock();
+      }
+    } finally {
+      clusterGlobalLock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public Map<Long, RequestExecution> getAllRequestExecutions() {
+    loadRequestExecutions();
+    clusterGlobalLock.readLock().lock();
+    try {
+      readLock.lock();
+      try {
+        return Collections.unmodifiableMap(requestExecutions);
+      } finally {
+        readLock.unlock();
+      }
+    } finally {
+      clusterGlobalLock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void deleteRequestExecution(Long id) throws AmbariException {
+    loadRequestExecutions();
+    clusterGlobalLock.writeLock().lock();
+    try {
+      readWriteLock.writeLock().lock();
+      try {
+        RequestExecution requestExecution = requestExecutions.get(id);
+        if (requestExecution == null) {
+          throw new AmbariException("Request schedule does not exists, " +
+            "id = " + id);
+        }
+        LOG.info("Deleting request schedule"
+          + ", clusterName = " + getClusterName()
+          + ", id = " + requestExecution.getId()
+          + ", description = " + requestExecution.getDescription());
+
+        requestExecution.delete();
+        requestExecutions.remove(id);
+      } finally {
+        readWriteLock.writeLock().unlock();
+      }
+    } finally {
+      clusterGlobalLock.writeLock().unlock();
+    }
+  }
+
+  @Override
   public void deleteConfigGroup(Long id) throws AmbariException {
     loadConfigGroups();
     clusterGlobalLock.writeLock().lock();
@@ -382,22 +489,22 @@ public class ClusterImpl implements Cluster {
   }
 
   public ServiceComponentHost getServiceComponentHost(String serviceName,
-      String serviceComponentName, String hostname) throws AmbariException {
+                                                      String serviceComponentName, String hostname) throws AmbariException {
     loadServiceHostComponents();
     clusterGlobalLock.readLock().lock();
     try {
       readLock.lock();
       try {
         if (!serviceComponentHosts.containsKey(serviceName)
-            || !serviceComponentHosts.get(serviceName)
-            .containsKey(serviceComponentName)
-            || !serviceComponentHosts.get(serviceName).get(serviceComponentName)
-            .containsKey(hostname)) {
+          || !serviceComponentHosts.get(serviceName)
+          .containsKey(serviceComponentName)
+          || !serviceComponentHosts.get(serviceName).get(serviceComponentName)
+          .containsKey(hostname)) {
           throw new ServiceComponentHostNotFoundException(getClusterName(), serviceName,
-              serviceComponentName, hostname);
+            serviceComponentName, hostname);
         }
         return serviceComponentHosts.get(serviceName).get(serviceComponentName)
-            .get(hostname);
+          .get(hostname);
       } finally {
         readLock.unlock();
       }
@@ -443,7 +550,7 @@ public class ClusterImpl implements Cluster {
   }
 
   public void addServiceComponentHost(
-      ServiceComponentHost svcCompHost) throws AmbariException {
+    ServiceComponentHost svcCompHost) throws AmbariException {
     loadServiceHostComponents();
     clusterGlobalLock.writeLock().lock();
     try {
@@ -451,9 +558,9 @@ public class ClusterImpl implements Cluster {
       try {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Trying to add ServiceComponentHost to ClusterHostMap cache"
-              + ", serviceName=" + svcCompHost.getServiceName()
-              + ", componentName=" + svcCompHost.getServiceComponentName()
-              + ", hostname=" + svcCompHost.getHostName());
+            + ", serviceName=" + svcCompHost.getServiceName()
+            + ", componentName=" + svcCompHost.getServiceComponentName()
+            + ", hostname=" + svcCompHost.getHostName());
         }
 
         final String hostname = svcCompHost.getHostName();
@@ -471,44 +578,44 @@ public class ClusterImpl implements Cluster {
         }
         if (!clusterFound) {
           throw new AmbariException("Host does not belong this cluster"
-              + ", hostname=" + hostname
-              + ", clusterName=" + getClusterName()
-              + ", clusterId=" + getClusterId());
+            + ", hostname=" + hostname
+            + ", clusterName=" + getClusterName()
+            + ", clusterId=" + getClusterId());
         }
 
         if (!serviceComponentHosts.containsKey(serviceName)) {
           serviceComponentHosts.put(serviceName,
-              new HashMap<String, Map<String, ServiceComponentHost>>());
+            new HashMap<String, Map<String, ServiceComponentHost>>());
         }
         if (!serviceComponentHosts.get(serviceName).containsKey(componentName)) {
           serviceComponentHosts.get(serviceName).put(componentName,
-              new HashMap<String, ServiceComponentHost>());
+            new HashMap<String, ServiceComponentHost>());
         }
 
         if (serviceComponentHosts.get(serviceName).get(componentName).
-            containsKey(hostname)) {
+          containsKey(hostname)) {
           throw new AmbariException("Duplicate entry for ServiceComponentHost"
-              + ", serviceName=" + serviceName
-              + ", serviceComponentName" + componentName
-              + ", hostname= " + hostname);
+            + ", serviceName=" + serviceName
+            + ", serviceComponentName" + componentName
+            + ", hostname= " + hostname);
         }
 
         if (!serviceComponentHostsByHost.containsKey(hostname)) {
           serviceComponentHostsByHost.put(hostname,
-              new ArrayList<ServiceComponentHost>());
+            new ArrayList<ServiceComponentHost>());
         }
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("Adding a new ServiceComponentHost"
-              + ", clusterName=" + getClusterName()
-              + ", clusterId=" + getClusterId()
-              + ", serviceName=" + serviceName
-              + ", serviceComponentName" + componentName
-              + ", hostname= " + hostname);
+            + ", clusterName=" + getClusterName()
+            + ", clusterId=" + getClusterId()
+            + ", serviceName=" + serviceName
+            + ", serviceComponentName" + componentName
+            + ", hostname= " + hostname);
         }
 
         serviceComponentHosts.get(serviceName).get(componentName).put(hostname,
-            svcCompHost);
+          svcCompHost);
         serviceComponentHostsByHost.get(hostname).add(svcCompHost);
       } finally {
         writeLock.unlock();
@@ -521,7 +628,7 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public void removeServiceComponentHost(ServiceComponentHost svcCompHost)
-      throws AmbariException {
+    throws AmbariException {
     loadServiceHostComponents();
     clusterGlobalLock.writeLock().lock();
     try {
@@ -529,9 +636,9 @@ public class ClusterImpl implements Cluster {
       try {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Trying to remove ServiceComponentHost to ClusterHostMap cache"
-              + ", serviceName=" + svcCompHost.getServiceName()
-              + ", componentName=" + svcCompHost.getServiceComponentName()
-              + ", hostname=" + svcCompHost.getHostName());
+            + ", serviceName=" + svcCompHost.getServiceName()
+            + ", componentName=" + svcCompHost.getServiceComponentName()
+            + ", hostname=" + svcCompHost.getHostName());
         }
 
         final String hostname = svcCompHost.getHostName();
@@ -549,32 +656,32 @@ public class ClusterImpl implements Cluster {
         }
         if (!clusterFound) {
           throw new AmbariException("Host does not belong this cluster"
-              + ", hostname=" + hostname
-              + ", clusterName=" + getClusterName()
-              + ", clusterId=" + getClusterId());
+            + ", hostname=" + hostname
+            + ", clusterName=" + getClusterName()
+            + ", clusterId=" + getClusterId());
         }
 
         if (!serviceComponentHosts.containsKey(serviceName)
-            || !serviceComponentHosts.get(serviceName).containsKey(componentName)
-            || !serviceComponentHosts.get(serviceName).get(componentName).
-            containsKey(hostname)) {
+          || !serviceComponentHosts.get(serviceName).containsKey(componentName)
+          || !serviceComponentHosts.get(serviceName).get(componentName).
+          containsKey(hostname)) {
           throw new AmbariException("Invalid entry for ServiceComponentHost"
-              + ", serviceName=" + serviceName
-              + ", serviceComponentName" + componentName
-              + ", hostname= " + hostname);
+            + ", serviceName=" + serviceName
+            + ", serviceComponentName" + componentName
+            + ", hostname= " + hostname);
         }
         if (!serviceComponentHostsByHost.containsKey(hostname)) {
           throw new AmbariException("Invalid host entry for ServiceComponentHost"
-              + ", serviceName=" + serviceName
-              + ", serviceComponentName" + componentName
-              + ", hostname= " + hostname);
+            + ", serviceName=" + serviceName
+            + ", serviceComponentName" + componentName
+            + ", hostname= " + hostname);
         }
 
         ServiceComponentHost schToRemove = null;
         for (ServiceComponentHost sch : serviceComponentHostsByHost.get(hostname)) {
           if (sch.getServiceName().equals(serviceName)
-              && sch.getServiceComponentName().equals(componentName)
-              && sch.getHostName().equals(hostname)) {
+            && sch.getServiceComponentName().equals(componentName)
+            && sch.getHostName().equals(hostname)) {
             schToRemove = sch;
             break;
           }
@@ -582,18 +689,18 @@ public class ClusterImpl implements Cluster {
 
         if (schToRemove == null) {
           LOG.warn("Unavailable in per host cache. ServiceComponentHost"
-              + ", serviceName=" + serviceName
-              + ", serviceComponentName" + componentName
-              + ", hostname= " + hostname);
+            + ", serviceName=" + serviceName
+            + ", serviceComponentName" + componentName
+            + ", hostname= " + hostname);
         }
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("Removing a ServiceComponentHost"
-              + ", clusterName=" + getClusterName()
-              + ", clusterId=" + getClusterId()
-              + ", serviceName=" + serviceName
-              + ", serviceComponentName" + componentName
-              + ", hostname= " + hostname);
+            + ", clusterName=" + getClusterName()
+            + ", clusterId=" + getClusterId()
+            + ", serviceName=" + serviceName
+            + ", serviceComponentName" + componentName
+            + ", hostname= " + hostname);
         }
 
         serviceComponentHosts.get(serviceName).get(componentName).remove(hostname);
@@ -627,7 +734,7 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public List<ServiceComponentHost> getServiceComponentHosts(
-      String hostname) {
+    String hostname) {
     loadServiceHostComponents();
     clusterGlobalLock.readLock().lock();
     try {
@@ -648,7 +755,7 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public void addService(Service service)
-      throws AmbariException {
+    throws AmbariException {
     loadServices();
     clusterGlobalLock.writeLock().lock();
     try {
@@ -656,15 +763,15 @@ public class ClusterImpl implements Cluster {
       try {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Adding a new Service"
-              + ", clusterName=" + getClusterName()
-              + ", clusterId=" + getClusterId()
-              + ", serviceName=" + service.getName());
+            + ", clusterName=" + getClusterName()
+            + ", clusterId=" + getClusterId()
+            + ", serviceName=" + service.getName());
         }
         if (services.containsKey(service.getName())) {
           throw new AmbariException("Service already exists"
-              + ", clusterName=" + getClusterName()
-              + ", clusterId=" + getClusterId()
-              + ", serviceName=" + service.getName());
+            + ", clusterName=" + getClusterName()
+            + ", clusterId=" + getClusterId()
+            + ", serviceName=" + service.getName());
         }
         this.services.put(service.getName(), service);
       } finally {
@@ -677,7 +784,7 @@ public class ClusterImpl implements Cluster {
   }
 
   @Override
-  public Service addService(String serviceName) throws AmbariException{
+  public Service addService(String serviceName) throws AmbariException {
     loadServices();
     clusterGlobalLock.writeLock().lock();
     try {
@@ -685,15 +792,15 @@ public class ClusterImpl implements Cluster {
       try {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Adding a new Service"
-              + ", clusterName=" + getClusterName()
-              + ", clusterId=" + getClusterId()
-              + ", serviceName=" + serviceName);
+            + ", clusterName=" + getClusterName()
+            + ", clusterId=" + getClusterId()
+            + ", serviceName=" + serviceName);
         }
         if (services.containsKey(serviceName)) {
           throw new AmbariException("Service already exists"
-              + ", clusterName=" + getClusterName()
-              + ", clusterId=" + getClusterId()
-              + ", serviceName=" + serviceName);
+            + ", clusterName=" + getClusterName()
+            + ", clusterId=" + getClusterId()
+            + ", serviceName=" + serviceName);
         }
         Service s = serviceFactory.createNew(this, serviceName);
         this.services.put(s.getName(), s);
@@ -709,7 +816,7 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public Service getService(String serviceName)
-      throws AmbariException {
+    throws AmbariException {
     loadServices();
     clusterGlobalLock.readLock().lock();
     try {
@@ -769,10 +876,10 @@ public class ClusterImpl implements Cluster {
       try {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Changing DesiredStackVersion of Cluster"
-              + ", clusterName=" + getClusterName()
-              + ", clusterId=" + getClusterId()
-              + ", currentDesiredStackVersion=" + this.desiredStackVersion
-              + ", newDesiredStackVersion=" + stackVersion);
+            + ", clusterName=" + getClusterName()
+            + ", clusterId=" + getClusterId()
+            + ", currentDesiredStackVersion=" + this.desiredStackVersion
+            + ", newDesiredStackVersion=" + stackVersion);
         }
         this.desiredStackVersion = stackVersion;
         clusterEntity.setDesiredStackVersion(gson.toJson(stackVersion));
@@ -812,7 +919,7 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public void setCurrentStackVersion(StackId stackVersion)
-  throws AmbariException {
+    throws AmbariException {
     clusterGlobalLock.readLock().lock();
     try {
       writeLock.lock();
@@ -835,8 +942,8 @@ public class ClusterImpl implements Cluster {
       } catch (RollbackException e) {
         LOG.warn("Unable to set version " + stackVersion + " for cluster " + getClusterName());
         throw new AmbariException("Unable to set"
-            + " version=" + stackVersion
-            + " for cluster " + getClusterName(), e);
+          + " version=" + stackVersion
+          + " for cluster " + getClusterName(), e);
       } finally {
         writeLock.unlock();
       }
@@ -872,7 +979,7 @@ public class ClusterImpl implements Cluster {
       readWriteLock.readLock().lock();
       try {
         if (!allConfigs.containsKey(configType)
-            || !allConfigs.get(configType).containsKey(versionTag)) {
+          || !allConfigs.get(configType).containsKey(versionTag)) {
           return null;
         }
         return allConfigs.get(configType).get(versionTag);
@@ -892,9 +999,9 @@ public class ClusterImpl implements Cluster {
       readWriteLock.writeLock().lock();
       try {
         if (config.getType() == null
-            || config.getType().isEmpty()
-            || config.getVersionTag() == null
-            || config.getVersionTag().isEmpty()) {
+          || config.getType().isEmpty()
+          || config.getVersionTag() == null
+          || config.getVersionTag().isEmpty()) {
           // TODO throw error
         }
         if (!allConfigs.containsKey(config.getType())) {
@@ -935,14 +1042,14 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public ClusterResponse convertToResponse()
-      throws AmbariException {
+    throws AmbariException {
     clusterGlobalLock.readLock().lock();
     try {
       readWriteLock.readLock().lock();
       try {
         ClusterResponse r = new ClusterResponse(getClusterId(), getClusterName(),
-            clusters.getHostsForCluster(getClusterName()).keySet(),
-            getDesiredStackVersion().getStackId());
+          clusters.getHostsForCluster(getClusterName()).keySet(),
+          getDesiredStackVersion().getStackId());
 
         return r;
       } finally {
@@ -962,9 +1069,9 @@ public class ClusterImpl implements Cluster {
       readWriteLock.readLock().lock();
       try {
         sb.append("Cluster={ clusterName=" + getClusterName()
-            + ", clusterId=" + getClusterId()
-            + ", desiredStackVersion=" + desiredStackVersion.getStackId()
-            + ", services=[ ");
+          + ", clusterId=" + getClusterId()
+          + ", desiredStackVersion=" + desiredStackVersion.getStackId()
+          + ", services=[ ");
         boolean first = true;
         for (Service s : services.values()) {
           if (!first) {
@@ -1012,13 +1119,13 @@ public class ClusterImpl implements Cluster {
       readWriteLock.writeLock().lock();
       try {
         LOG.info("Deleting all services for cluster"
-            + ", clusterName=" + getClusterName());
+          + ", clusterName=" + getClusterName());
         for (Service service : services.values()) {
           if (!service.canBeRemoved()) {
             throw new AmbariException("Found non removable service when trying to"
-                + " all services from cluster"
-                + ", clusterName=" + getClusterName()
-                + ", serviceName=" + service.getName());
+              + " all services from cluster"
+              + ", clusterName=" + getClusterName()
+              + ", serviceName=" + service.getName());
           }
         }
 
@@ -1038,7 +1145,7 @@ public class ClusterImpl implements Cluster {
 
   @Override
   public void deleteService(String serviceName)
-      throws AmbariException {
+    throws AmbariException {
     loadServices();
     clusterGlobalLock.writeLock().lock();
     try {
@@ -1046,13 +1153,13 @@ public class ClusterImpl implements Cluster {
       try {
         Service service = getService(serviceName);
         LOG.info("Deleting service for cluster"
-            + ", clusterName=" + getClusterName()
-            + ", serviceName=" + service.getName());
+          + ", clusterName=" + getClusterName()
+          + ", serviceName=" + service.getName());
         // FIXME check dependencies from meta layer
         if (!service.canBeRemoved()) {
           throw new AmbariException("Could not delete service from cluster"
-              + ", clusterName=" + getClusterName()
-              + ", serviceName=" + service.getName());
+            + ", clusterName=" + getClusterName()
+            + ", serviceName=" + service.getName());
         }
         service.delete();
         services.remove(serviceName);
@@ -1077,8 +1184,8 @@ public class ClusterImpl implements Cluster {
           if (!service.canBeRemoved()) {
             safeToRemove = false;
             LOG.warn("Found non removable service"
-                + ", clusterName=" + getClusterName()
-                + ", serviceName=" + service.getName());
+              + ", clusterName=" + getClusterName()
+              + ", serviceName=" + service.getName());
           }
         }
         return safeToRemove;
@@ -1162,7 +1269,7 @@ public class ClusterImpl implements Cluster {
 
 
   }
-  
+
   @Override
   public Map<String, DesiredConfig> getDesiredConfigs() {
     clusterGlobalLock.readLock().lock();
@@ -1186,13 +1293,13 @@ public class ClusterImpl implements Cluster {
 
         if (!map.isEmpty()) {
           Map<String, List<HostConfigMappingEntity>> hostMappingsByType =
-              hostConfigMappingDAO.findSelectedHostsByTypes(clusterEntity.getClusterId(), types);
+            hostConfigMappingDAO.findSelectedHostsByTypes(clusterEntity.getClusterId(), types);
 
           for (Entry<String, DesiredConfig> entry : map.entrySet()) {
             List<DesiredConfig.HostOverride> hostOverrides = new ArrayList<DesiredConfig.HostOverride>();
             for (HostConfigMappingEntity mappingEntity : hostMappingsByType.get(entry.getKey())) {
               hostOverrides.add(new DesiredConfig.HostOverride(mappingEntity.getHostName(),
-                  mappingEntity.getVersion()));
+                mappingEntity.getVersion()));
             }
             entry.getValue().setHostOverrides(hostOverrides);
           }
@@ -1239,7 +1346,7 @@ public class ClusterImpl implements Cluster {
     }
 
     List<HostConfigMappingEntity> mappingEntities =
-        hostConfigMappingDAO.findSelectedByHosts(clusterEntity.getClusterId(), hostnames);
+      hostConfigMappingDAO.findSelectedByHosts(clusterEntity.getClusterId(), hostnames);
 
     Map<String, Map<String, DesiredConfig>> desiredConfigsByHost = new HashMap<String, Map<String, DesiredConfig>>();
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/state/configgroup/ConfigGroupImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/configgroup/ConfigGroupImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/configgroup/ConfigGroupImpl.java
index 0162b80..20db705 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/configgroup/ConfigGroupImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/configgroup/ConfigGroupImpl.java
@@ -26,9 +26,7 @@ import com.google.inject.persist.Transactional;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.DuplicateResourceException;
 import org.apache.ambari.server.controller.ConfigGroupResponse;
-import org.apache.ambari.server.controller.internal.ConfigGroupResourceProvider;
 import org.apache.ambari.server.controller.internal.ConfigurationResourceProvider;
-import org.apache.ambari.server.controller.utilities.PropertyHelper;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.ConfigGroupConfigMappingDAO;
 import org.apache.ambari.server.orm.dao.ConfigGroupDAO;
@@ -47,14 +45,11 @@ import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
 import org.apache.ambari.server.state.ConfigFactory;
 import org.apache.ambari.server.state.Host;
-import org.eclipse.persistence.sessions.UnitOfWork;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReadWriteLock;

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/Batch.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/Batch.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/Batch.java
new file mode 100644
index 0000000..036ea40
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/Batch.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.scheduler;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Batch {
+  private final List<BatchRequest> batchRequests = new ArrayList<BatchRequest>();
+  private BatchSettings batchSettings;
+
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_EMPTY)
+  @JsonProperty("batch_requests")
+  public List<BatchRequest> getBatchRequests() {
+    return batchRequests;
+  }
+
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_EMPTY)
+  @JsonProperty("batch_settings")
+  public BatchSettings getBatchSettings() {
+    return batchSettings;
+  }
+
+  public void setBatchSettings(BatchSettings batchSettings) {
+    this.batchSettings = batchSettings;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
new file mode 100644
index 0000000..75c9f24
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.scheduler;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+public class BatchRequest implements Comparable<BatchRequest> {
+  private Long orderId;
+  private Type type;
+  private String uri;
+  private String body;
+  private String status;
+  private Integer returnCode;
+  private String responseMsg;
+
+  @JsonProperty("order_id")
+  public Long getOrderId() {
+    return orderId;
+  }
+
+  public void setOrderId(Long orderId) {
+    this.orderId = orderId;
+  }
+
+  @JsonProperty("request_type")
+  public String getType() {
+    return type.name();
+  }
+
+  public void setType(Type type) {
+    this.type = type;
+  }
+
+  @JsonProperty("request_uri")
+  public String getUri() {
+    return uri;
+  }
+
+  public void setUri(String uri) {
+    this.uri = uri;
+  }
+
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_EMPTY)
+  @JsonProperty("request_body")
+  public String getBody() {
+    return body;
+  }
+
+  public void setBody(String body) {
+    this.body = body;
+  }
+
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_EMPTY)
+  @JsonProperty("request_status")
+  public String getStatus() {
+    return status;
+  }
+
+  public void setStatus(String status) {
+    this.status = status;
+  }
+
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_EMPTY)
+  @JsonProperty("return_code")
+  public Integer getReturnCode() {
+    return returnCode;
+  }
+
+  public void setReturnCode(Integer returnCode) {
+    this.returnCode = returnCode;
+  }
+
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_EMPTY)
+  @JsonProperty("response_message")
+  public String getResponseMsg() {
+    return responseMsg;
+  }
+
+  public void setResponseMsg(String responseMsg) {
+    this.responseMsg = responseMsg;
+  }
+
+  @Override
+  public int compareTo(BatchRequest batchRequest) {
+    return this.orderId.compareTo(batchRequest.getOrderId());
+  }
+
+  public enum Type {
+    PUT,
+    POST,
+    DELETE
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
new file mode 100644
index 0000000..6e17389
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.scheduler;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.scheduler.AbstractLinearExecutionJob;
+import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
+
+public class BatchRequestJob extends AbstractLinearExecutionJob {
+
+  public BatchRequestJob(ExecutionScheduleManager executionScheduleManager) {
+    super(executionScheduleManager);
+  }
+
+  @Override
+  protected void doWork() throws AmbariException {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchSettings.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchSettings.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchSettings.java
new file mode 100644
index 0000000..452271f
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchSettings.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.scheduler;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+public class BatchSettings {
+  private Integer batchSeparationInMinutes;
+  private Integer taskFailureTolerance;
+
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_EMPTY)
+  @JsonProperty("batch_separation_in_minutes")
+  public Integer getBatchSeparationInMinutes() {
+    return batchSeparationInMinutes;
+  }
+
+  public void setBatchSeparationInMinutes(Integer batchSeparationInMinutes) {
+    this.batchSeparationInMinutes = batchSeparationInMinutes;
+  }
+
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_EMPTY)
+  @JsonProperty("task_failure_tolerance_limit")
+  public Integer getTaskFailureToleranceLimit() {
+    return taskFailureTolerance;
+  }
+
+  public void setTaskFailureToleranceLimit(Integer taskFailureTolerance) {
+    this.taskFailureTolerance = taskFailureTolerance;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java
new file mode 100644
index 0000000..9a7570d
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecution.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.scheduler;
+
+import org.apache.ambari.server.controller.RequestScheduleResponse;
+
+/**
+ * Request Execution is a type of resource that supports scheduling a request
+ * or a group of requests for execution by the ActionManager.
+ */
+public interface RequestExecution {
+  /**
+   * Primary key of Request Execution
+   * @return
+   */
+  public Long getId();
+
+  /**
+   * Cluster name to which request schedule belongs
+   * @return
+   */
+  public String getClusterName();
+
+  /**
+   * Get the batch of requests along with batch settings
+   * @return
+   */
+  public Batch getBatch();
+
+  /**
+   * Set batch of requests and batch settings
+   */
+  public void setBatch(Batch batch);
+
+  /**
+   * Get schedule for the execution
+   * @return
+   */
+  public Schedule getSchedule();
+
+  /**
+   * Set schedule for the execution
+   */
+  public void setSchedule(Schedule schedule);
+
+  /**
+   * Get @RequestScheduleResponse for this Request Execution
+   * @return
+   */
+  public RequestScheduleResponse convertToResponse();
+
+  /**
+   * Persist the Request Execution and schedule
+   */
+  public void persist();
+
+  /**
+   * Refresh entity from DB.
+   */
+  public void refresh();
+
+  /**
+   * Delete Request Schedule entity
+   */
+  public void delete();
+
+  /**
+   * Get status of schedule
+   */
+  public String getStatus();
+
+  /**
+   * Set request execution description
+   */
+  public void setDescription(String description);
+
+  /**
+   * Get description of the request execution
+   */
+  public String getDescription();
+
+  /**
+   * Set status of the schedule
+   */
+  public void setStatus(Status status);
+
+  /**
+   * Set datetime:status of last request that was executed
+   */
+  public void setLastExecutionStatus(String status);
+
+  /**
+   * Set create username
+   */
+  public void setCreateUser(String username);
+
+  /**
+   * Set create username
+   */
+  public void setUpdateUser(String username);
+
+  /**
+   * Get created time
+   */
+  public String getCreateTime();
+
+  /**
+   * Get updated time
+   */
+  public String getUpdateTime();
+
+  /**
+   * Get create user
+   */
+  public String getCreateUser();
+
+  /**
+   * Get update user
+   */
+  public String getUpdateUser();
+
+  /**
+   * Status of the Request execution
+   */
+  public enum Status {
+    SCHEDULED,
+    COMPLETED,
+    DISABLED
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dcea372/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionFactory.java
new file mode 100644
index 0000000..b715611
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionFactory.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.scheduler;
+
+import com.google.inject.assistedinject.Assisted;
+import org.apache.ambari.server.orm.entities.RequestScheduleEntity;
+import org.apache.ambari.server.state.Cluster;
+
+public interface RequestExecutionFactory {
+  RequestExecution createNew(@Assisted("cluster") Cluster cluster,
+                             @Assisted("batch") Batch batch,
+                             @Assisted("schedule") Schedule schedule);
+
+  RequestExecution createExisting(Cluster cluster,
+                                  RequestScheduleEntity requestScheduleEntity);
+}