You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2015/10/20 14:09:59 UTC

[2/6] falcon git commit: FALCON-1213 Base framework of the native scheduler

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
new file mode 100644
index 0000000..ca2010b
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.workflow.engine;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.exception.DAGEngineException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A DAG Engine that uses Oozie to execute the DAG.
+ */
+public class OozieDAGEngine implements DAGEngine {
+    private static final Logger LOG = LoggerFactory.getLogger(OozieDAGEngine.class);
+    private final OozieClient client;
+    private static final int WORKFLOW_STATUS_RETRY_DELAY_MS = 100;
+
+    private static final String WORKFLOW_STATUS_RETRY_COUNT = "workflow.status.retry.count";
+    private static final List<String> PARENT_WF_ACTION_NAMES = Arrays.asList(
+            "pre-processing",
+            "recordsize",
+            "succeeded-post-processing",
+            "failed-post-processing"
+    );
+
+    public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm";
+    private final Cluster cluster;
+
+    public OozieDAGEngine(Cluster cluster) throws DAGEngineException {
+        try {
+            client = OozieClientFactory.get(cluster);
+            this.cluster = cluster;
+        } catch (Exception e) {
+            throw new DAGEngineException(e);
+        }
+    }
+
+    public OozieDAGEngine(String clusterName) throws DAGEngineException {
+        try {
+            this.cluster = ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
+            client = OozieClientFactory.get(cluster);
+        } catch (Exception e) {
+            throw new DAGEngineException(e);
+        }
+    }
+
+    @Override
+    public String run(ExecutionInstance instance) throws DAGEngineException {
+        try {
+            Properties properties = getRunProperties(instance);
+            Path buildPath = EntityUtil.getLatestStagingPath(cluster, instance.getEntity());
+            switchUser();
+            properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser());
+            properties.setProperty(OozieClient.APP_PATH, buildPath.toString());
+            return client.run(properties);
+        } catch (OozieClientException e) {
+            LOG.error("Ozie client exception:", e);
+            throw new DAGEngineException(e);
+        } catch (FalconException e1) {
+            LOG.error("Falcon Exception : ", e1);
+            throw new DAGEngineException(e1);
+        }
+    }
+
+    private void prepareEntityBuildPath(Entity entity) throws FalconException {
+        Path stagingPath = EntityUtil.getBaseStagingPath(cluster, entity);
+        Path logPath = EntityUtil.getLogPath(cluster, entity);
+
+        try {
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    ClusterHelper.getConfiguration(cluster));
+            HadoopClientFactory.mkdirsWithDefaultPerms(fs, stagingPath);
+            HadoopClientFactory.mkdirsWithDefaultPerms(fs, logPath);
+        } catch (IOException e) {
+            throw new FalconException("Error preparing base staging dirs: " + stagingPath, e);
+        }
+    }
+
+    private void dryRunInternal(Properties props) throws OozieClientException {
+        LOG.info("Dry run with properties {}", props);
+        client.dryrun(props);
+    }
+
+    private void switchUser() {
+        String user = System.getProperty("user.name");
+        CurrentUser.authenticate(user);
+    }
+
+    @Override
+    public boolean isScheduled(ExecutionInstance instance) throws DAGEngineException {
+        try {
+            return statusEquals(client.getJobInfo(instance.getExternalID()).getStatus().name(),
+                    Job.Status.PREP, Job.Status.RUNNING);
+        } catch (OozieClientException e) {
+            throw new DAGEngineException(e);
+        }
+    }
+
+    // TODO : To be implemented. Currently hardcoded for process
+    private Properties getRunProperties(ExecutionInstance instance) {
+        Properties props = new Properties();
+        DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT);
+        String nominalTime = fmt.print(instance.getInstanceTime());
+        props.put("nominalTime", nominalTime);
+        props.put("timeStamp", nominalTime);
+        props.put("feedNames", "NONE");
+        props.put("feedInstancePaths", "NONE");
+        props.put("falconInputFeeds", "NONE");
+        props.put("falconInPaths", "NONE");
+        props.put("feedNames", "NONE");
+        props.put("feedInstancePaths", "NONE");
+        props.put("userJMSNotificationEnabled", "true");
+        return props;
+    }
+
+    // TODO : To be implemented. Currently hardcoded for process
+    private Properties getDryRunProperties(Entity entity) {
+        Properties props = new Properties();
+        DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT);
+        String nominalTime = fmt.print(DateTime.now());
+        props.put("nominalTime", nominalTime);
+        props.put("timeStamp", nominalTime);
+        props.put("feedNames", "NONE");
+        props.put("feedInstancePaths", "NONE");
+        props.put("falconInputFeeds", "NONE");
+        props.put("falconInPaths", "NONE");
+        props.put("feedNames", "NONE");
+        props.put("feedInstancePaths", "NONE");
+        props.put("userJMSNotificationEnabled", "true");
+        return props;
+    }
+
+    @Override
+    public void suspend(ExecutionInstance instance) throws DAGEngineException {
+        try {
+            client.suspend(instance.getExternalID());
+            assertStatus(instance.getExternalID(), Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED, Job.Status.SUCCEEDED,
+                    Job.Status.FAILED, Job.Status.KILLED);
+            LOG.info("Suspended job {} on cluster {}", instance.getExternalID(), instance.getCluster());
+        } catch (OozieClientException e) {
+            throw new DAGEngineException(e);
+        }
+    }
+
+    @Override
+    public void resume(ExecutionInstance instance) throws DAGEngineException {
+        try {
+            client.resume(instance.getExternalID());
+            assertStatus(instance.getExternalID(), Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUCCEEDED,
+                    Job.Status.FAILED, Job.Status.KILLED);
+            LOG.info("Resumed job {} on cluster {}", instance.getExternalID(), instance.getCluster());
+        } catch (OozieClientException e) {
+            throw new DAGEngineException(e);
+        }
+    }
+
+    @Override
+    public void kill(ExecutionInstance instance) throws DAGEngineException {
+        try {
+            client.kill(instance.getExternalID());
+            assertStatus(instance.getExternalID(), Job.Status.KILLED, Job.Status.SUCCEEDED, Job.Status.FAILED);
+            LOG.info("Killed job {} on cluster {}", instance.getExternalID(), instance.getCluster());
+        } catch (OozieClientException e) {
+            throw new DAGEngineException(e);
+        }
+    }
+
+    @Override
+    public void reRun(ExecutionInstance instance) throws DAGEngineException {
+        // TODO : Implement this
+    }
+
+    @Override
+    public void submit(Entity entity) throws DAGEngineException {
+        try {
+            // TODO : remove hardcoded Tag value when feed support is added.
+            OozieOrchestrationWorkflowBuilder builder =
+                    OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.DEFAULT);
+            prepareEntityBuildPath(entity);
+            Path buildPath = EntityUtil.getNewStagingPath(cluster, entity);
+            Properties properties = builder.build(cluster, buildPath);
+            if (properties == null) {
+                LOG.info("Entity {} is not scheduled on cluster {}", entity.getName(), cluster);
+                throw new DAGEngineException("Properties for entity " + entity.getName() + " is empty");
+            }
+
+            switchUser();
+            LOG.debug("Logged in user is " + CurrentUser.getUser());
+            properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser());
+            properties.setProperty(OozieClient.APP_PATH, buildPath.toString());
+            properties.putAll(getDryRunProperties(entity));
+            //Do submit before run as run is asynchronous
+            dryRunInternal(properties);
+        } catch (OozieClientException e) {
+            LOG.error("Oozie client exception:", e);
+            throw new DAGEngineException(e);
+        } catch (FalconException e1) {
+            LOG.error("Falcon Exception : ", e1);
+            throw new DAGEngineException(e1);
+        }
+    }
+
+    @Override
+    public InstancesResult.Instance info(String externalID) throws DAGEngineException {
+        InstancesResult.Instance instance = new InstancesResult.Instance();
+        try {
+            LOG.debug("Retrieving details for job {} ", externalID);
+            WorkflowJob jobInfo = client.getJobInfo(externalID);
+            instance.startTime = jobInfo.getStartTime();
+            if (jobInfo.getStatus().name().equals(Job.Status.RUNNING.name())) {
+                instance.endTime = new Date();
+            } else {
+                instance.endTime = jobInfo.getEndTime();
+            }
+            instance.cluster = cluster.getName();
+            instance.runId = jobInfo.getRun();
+            instance.status = InstancesResult.WorkflowStatus.valueOf(jobInfo.getStatus().name());
+            instance.logFile = jobInfo.getConsoleUrl();
+            instance.wfParams = getWFParams(jobInfo);
+            return instance;
+        } catch (Exception e) {
+            LOG.error("Error when attempting to get info for " + externalID, e);
+            throw new DAGEngineException(e);
+        }
+    }
+
+    private InstancesResult.KeyValuePair[] getWFParams(WorkflowJob jobInfo) {
+        Configuration conf = new Configuration(false);
+        conf.addResource(new ByteArrayInputStream(jobInfo.getConf().getBytes()));
+        InstancesResult.KeyValuePair[] wfParams = new InstancesResult.KeyValuePair[conf.size()];
+        int i = 0;
+        for (Map.Entry<String, String> entry : conf) {
+            wfParams[i++] = new InstancesResult.KeyValuePair(entry.getKey(), entry.getValue());
+        }
+        return wfParams;
+    }
+
+    @Override
+    public List<InstancesResult.InstanceAction> getJobDetails(String externalID) throws DAGEngineException {
+        List<InstancesResult.InstanceAction> instanceActions = new ArrayList<>();
+        try {
+            WorkflowJob wfJob = client.getJobInfo(externalID);
+            List<WorkflowAction> wfActions = wfJob.getActions();
+            // We wanna capture job urls for all user-actions & non succeeded actions of the main workflow
+            for (WorkflowAction action : wfActions) {
+                if (action.getType().equalsIgnoreCase("sub-workflow")
+                        && StringUtils.isNotEmpty(action.getExternalId())) {
+                    // if the action is sub-workflow, get job urls of all actions within the sub-workflow
+                    List<WorkflowAction> subWorkFlowActions = client
+                            .getJobInfo(action.getExternalId()).getActions();
+                    for (WorkflowAction subWfAction : subWorkFlowActions) {
+                        if (!subWfAction.getType().startsWith(":")) {
+                            InstancesResult.InstanceAction instanceAction =
+                                    new InstancesResult.InstanceAction(subWfAction.getName(),
+                                            subWfAction.getExternalStatus(), subWfAction.getConsoleUrl());
+                            instanceActions.add(instanceAction);
+                        }
+                    }
+                } else if (!action.getType().startsWith(":")) {
+                    // if the action is a transition node it starts with :, we don't need their statuses
+                    if (PARENT_WF_ACTION_NAMES.contains(action.getName())
+                            && !Job.Status.SUCCEEDED.toString().equals(action.getExternalStatus())) {
+                        // falcon actions in the main workflow are defined in the list
+                        // get job urls for all non succeeded actions of the main workflow
+                        InstancesResult.InstanceAction instanceAction =
+                                new InstancesResult.InstanceAction(action.getName(), action.getExternalStatus(),
+                                        action.getConsoleUrl());
+                        instanceActions.add(instanceAction);
+                    } else if (!PARENT_WF_ACTION_NAMES.contains(action.getName())
+                            && !StringUtils.equals(action.getExternalId(), "-")) {
+                        // if user-action is pig/hive there is no sub-workflow, we wanna capture their urls as well
+                        InstancesResult.InstanceAction instanceAction =
+                                new InstancesResult.InstanceAction(action.getName(), action.getExternalStatus(),
+                                        action.getConsoleUrl());
+                        instanceActions.add(instanceAction);
+                    }
+                }
+            }
+            return instanceActions;
+        } catch (OozieClientException oce) {
+            throw new DAGEngineException(oce);
+        }
+    }
+
+    @Override
+    public boolean isAlive() throws DAGEngineException {
+        try {
+            return client.getSystemMode() == OozieClient.SYSTEM_MODE.NORMAL;
+        } catch (OozieClientException e) {
+            throw new DAGEngineException("Unable to reach Oozie server.", e);
+        }
+    }
+
+    @Override
+    public Properties getConfiguration(String externalID) throws DAGEngineException {
+        Properties props = new Properties();
+        try {
+            WorkflowJob jobInfo = client.getJobInfo(externalID);
+            Configuration conf = new Configuration(false);
+            conf.addResource(new ByteArrayInputStream(jobInfo.getConf().getBytes()));
+
+            for (Map.Entry<String, String> entry : conf) {
+                props.put(entry.getKey(), entry.getValue());
+            }
+        } catch (OozieClientException e) {
+            throw new DAGEngineException(e);
+        }
+
+        return props;
+    }
+
+    // Get status of a workflow (with retry) and ensure it is one of statuses requested.
+    private void assertStatus(String jobID, Job.Status... statuses) throws DAGEngineException {
+        String actualStatus = null;
+        int retryCount;
+        String retry = RuntimeProperties.get().getProperty(WORKFLOW_STATUS_RETRY_COUNT, "30");
+        try {
+            retryCount = Integer.valueOf(retry);
+        } catch (NumberFormatException nfe) {
+            throw new DAGEngineException("Invalid value provided for runtime property \""
+                    + WORKFLOW_STATUS_RETRY_COUNT + "\". Please provide an integer value.");
+        }
+        for (int counter = 0; counter < retryCount; counter++) {
+            try {
+                actualStatus = client.getJobInfo(jobID).getStatus().name();
+            } catch (OozieClientException e) {
+                LOG.error("Unable to get status of workflow: " + jobID, e);
+                throw new DAGEngineException(e);
+            }
+            if (!statusEquals(actualStatus, statuses)) {
+                try {
+                    Thread.sleep(WORKFLOW_STATUS_RETRY_DELAY_MS);
+                } catch (InterruptedException ignore) {
+                    //ignore
+                }
+            } else {
+                return;
+            }
+        }
+        throw new DAGEngineException("For Job" + jobID + ", actual statuses: " + actualStatus + ", expected statuses: "
+                + Arrays.toString(statuses));
+    }
+
+    private boolean statusEquals(String left, Job.Status... right) {
+        for (Job.Status rightElement : right) {
+            if (left.equals(rightElement.name())) {
+                return true;
+            }
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
new file mode 100644
index 0000000..b2f9e59
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
@@ -0,0 +1,557 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.notification.service.event.DataEvent;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.JobCompletedEvent;
+import org.apache.falcon.notification.service.event.JobScheduledEvent;
+import org.apache.falcon.notification.service.event.TimeElapsedEvent;
+import org.apache.falcon.notification.service.impl.AlarmService;
+import org.apache.falcon.notification.service.impl.DataAvailabilityService;
+import org.apache.falcon.notification.service.impl.JobCompletionService;
+import org.apache.falcon.notification.service.impl.SchedulerService;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.engine.DAGEngine;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.OozieClientException;
+import org.apache.oozie.client.WorkflowJob;
+import org.joda.time.DateTime;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+
+/**
+ * Tests the API of FalconExecution Service and in turn the FalconExecutionService.get()s.
+ */
+public class FalconExecutionServiceTest extends AbstractTestBase {
+
+    private InMemoryStateStore stateStore = null;
+    private AlarmService mockTimeService;
+    private DataAvailabilityService mockDataService;
+    private SchedulerService mockSchedulerService;
+    private JobCompletionService mockCompletionService;
+    private DAGEngine dagEngine;
+    private int instanceCount = 0;
+
+    @BeforeClass
+    public void init() throws Exception {
+        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+        this.conf = dfsCluster.getConf();
+        setupServices();
+        setupConfigStore();
+    }
+
+    @AfterClass
+    public void tearDown() {
+        this.dfsCluster.shutdown();
+    }
+
+    // State store is set up to sync with Config Store. That gets tested too.
+    public void setupConfigStore() throws Exception {
+        stateStore = (InMemoryStateStore) AbstractStateStore.get();
+        getStore().registerListener(stateStore);
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        storeEntity(EntityType.FEED, "clicksFeed");
+        storeEntity(EntityType.FEED, "clicksSummary");
+    }
+
+    public void setupServices() throws FalconException, OozieClientException {
+        mockTimeService = Mockito.mock(AlarmService.class);
+        Mockito.when(mockTimeService.getName()).thenReturn("AlarmService");
+        Mockito.when(mockTimeService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+                Mockito.any(ID.class))).thenCallRealMethod();
+
+        mockDataService = Mockito.mock(DataAvailabilityService.class);
+        Mockito.when(mockDataService.getName()).thenReturn("DataAvailabilityService");
+        Mockito.when(mockDataService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+                Mockito.any(ID.class))).thenCallRealMethod();
+        mockSchedulerService = Mockito.mock(SchedulerService.class);
+        Mockito.when(mockSchedulerService.getName()).thenReturn("JobSchedulerService");
+        StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName());
+        StartupProperties.get().setProperty("execution.service.impl", FalconExecutionService.class.getName());
+        dagEngine = Mockito.spy(DAGEngineFactory.getDAGEngine("testCluster"));
+        Mockito.when(mockSchedulerService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+                Mockito.any(ID.class))).thenCallRealMethod();
+        mockCompletionService = Mockito.mock(JobCompletionService.class);
+        Mockito.when(mockCompletionService.getName()).thenReturn("JobCompletionService");
+        Mockito.when(mockCompletionService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+                Mockito.any(ID.class))).thenCallRealMethod();
+        Services.get().register(mockTimeService);
+        Services.get().register(mockDataService);
+        Services.get().register(mockSchedulerService);
+        Services.get().register(mockCompletionService);
+        Services.get().register(FalconExecutionService.get());
+    }
+
+    @BeforeMethod
+    private void setupStateStore() throws FalconException {
+        stateStore.clear();
+    }
+
+    @Test
+    public void testBasicFlow() throws Exception {
+        storeEntity(EntityType.PROCESS, "summarize1");
+        Process process = getStore().get(EntityType.PROCESS, "summarize1");
+        Assert.assertNotNull(process);
+        ID processKey = new ID(process);
+        String clusterName = dfsCluster.getCluster().getName();
+
+        // Schedule a process
+        Assert.assertEquals(stateStore.getEntity(processKey).getCurrentState(), EntityState.STATE.SUBMITTED);
+        FalconExecutionService.get().schedule(process);
+        Assert.assertEquals(stateStore.getEntity(processKey).getCurrentState(), EntityState.STATE.SCHEDULED);
+
+        // Simulate a time notification
+        Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+        FalconExecutionService.get().onEvent(event);
+
+        // Ensure an instance is triggered and registers for data notification
+        Assert.assertEquals(stateStore.getAllExecutionInstances(process, clusterName).size(), 1);
+        InstanceState instance = stateStore.getAllExecutionInstances(process, clusterName).iterator().next();
+        Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.WAITING);
+
+        // Simulate a data notification
+        event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance.getInstance());
+        FalconExecutionService.get().onEvent(event);
+
+        // Ensure the instance is ready for execution
+        Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.READY);
+
+        // Simulate a scheduled notification
+        event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance.getInstance());
+        FalconExecutionService.get().onEvent(event);
+
+        // Ensure the instance is running
+        instance = stateStore.getAllExecutionInstances(process, clusterName).iterator().next();
+        Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.RUNNING);
+
+        // Simulate a job complete notification
+        event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instance.getInstance());
+        FalconExecutionService.get().onEvent(event);
+
+        // Ensure the instance is in succeeded and is in the state store
+        instance = stateStore.getAllExecutionInstances(process, clusterName).iterator().next();
+        Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.SUCCEEDED);
+    }
+
+    @Test
+    public void testSuspendResume() throws Exception {
+        Mockito.doNothing().when(dagEngine).resume(Mockito.any(ExecutionInstance.class));
+        storeEntity(EntityType.PROCESS, "summarize2");
+        Process process = getStore().get(EntityType.PROCESS, "summarize2");
+        Assert.assertNotNull(process);
+        String clusterName = dfsCluster.getCluster().getName();
+        ID processID = new ID(process, clusterName);
+
+        // Schedule a process
+        Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
+        FalconExecutionService.get().schedule(process);
+
+        // Simulate two time notifications
+        Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+        FalconExecutionService.get().onEvent(event);
+        event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+        FalconExecutionService.get().onEvent(event);
+
+        // Suspend and resume All waiting - check for notification deregistration
+
+        Iterator i = stateStore.getAllExecutionInstances(process, clusterName).iterator();
+        InstanceState instance1 = (InstanceState) i.next();
+        InstanceState instance2 = (InstanceState) i.next();
+
+        // Simulate a data notification
+        event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance1.getInstance());
+        FalconExecutionService.get().onEvent(event);
+
+        // One in ready and one in waiting. Both should be suspended.
+        Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY);
+        Assert.assertEquals(instance1.getInstance().getAwaitingPredicates().size(), 0);
+        Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.WAITING);
+
+        FalconExecutionService.get().suspend(process);
+
+        Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUSPENDED);
+        Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.SUSPENDED);
+        Mockito.verify(mockDataService).unregister(FalconExecutionService.get(),
+                instance1.getInstance().getId());
+        Mockito.verify(mockDataService).unregister(FalconExecutionService.get(),
+                instance2.getInstance().getId());
+        Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), processID);
+
+        FalconExecutionService.get().resume(process);
+        Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY);
+        Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.WAITING);
+
+        // Simulate a data notification and a job run notification
+        event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance2.getInstance());
+        FalconExecutionService.get().onEvent(event);
+
+        event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance1.getInstance());
+        FalconExecutionService.get().onEvent(event);
+
+        // One in running and the other in ready. Both should be suspended
+        Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
+        Mockito.when(dagEngine.isScheduled(instance1.getInstance())).thenReturn(true);
+        Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
+
+        FalconExecutionService.get().suspend(process);
+
+        Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUSPENDED);
+        Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.SUSPENDED);
+
+        FalconExecutionService.get().resume(process);
+
+        Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
+        Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
+
+        // Running should finish after resume
+        event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instance1.getInstance());
+        FalconExecutionService.get().onEvent(event);
+
+        Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUCCEEDED);
+    }
+
+    @Test
+    // Kill waiting, ready, running - check for notification deregistration
+    public void testDelete() throws Exception {
+        storeEntity(EntityType.PROCESS, "summarize4");
+        Process process = getStore().get(EntityType.PROCESS, "summarize4");
+        Assert.assertNotNull(process);
+        String clusterName = dfsCluster.getCluster().getName();
+        ID processID = new ID(process, clusterName);
+
+        // Schedule a process
+        Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
+        FalconExecutionService.get().schedule(process);
+
+        // Simulate three time notifications
+        Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+        FalconExecutionService.get().onEvent(event);
+        event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+        FalconExecutionService.get().onEvent(event);
+        event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+        FalconExecutionService.get().onEvent(event);
+
+        // Suspend and resume All waiting - check for notification deregistration
+        Iterator i = stateStore.getAllExecutionInstances(process, clusterName).iterator();
+        InstanceState instance1 = (InstanceState) i.next();
+        InstanceState instance2 = (InstanceState) i.next();
+        InstanceState instance3 = (InstanceState) i.next();
+
+        // Simulate two data notifications and one job run
+        event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance1.getInstance());
+        FalconExecutionService.get().onEvent(event);
+        event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance2.getInstance());
+        FalconExecutionService.get().onEvent(event);
+        event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance1.getInstance());
+        FalconExecutionService.get().onEvent(event);
+
+        // One in ready, one in waiting and one running.
+        Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
+        Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
+        Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING);
+
+        FalconExecutionService.get().delete(process);
+
+        // Deregister from notification services
+        Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), processID);
+    }
+
+    @Test
+    public void testTimeOut() throws Exception {
+        storeEntity(EntityType.PROCESS, "summarize3");
+        Process process = getStore().get(EntityType.PROCESS, "summarize3");
+        Assert.assertNotNull(process);
+        String clusterName = dfsCluster.getCluster().getName();
+        ID processID = new ID(process, clusterName);
+
+        // Schedule a process
+        Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
+        FalconExecutionService.get().schedule(process);
+
+        // Simulate time notification
+        Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+        FalconExecutionService.get().onEvent(event);
+
+        // Simulate data unavailable notification and timeout
+        InstanceState instanceState = stateStore.getAllExecutionInstances(process, clusterName).iterator().next();
+        ((Process) instanceState.getInstance().getEntity()).setTimeout(new Frequency("minutes(0)"));
+        DataEvent dataEvent = (DataEvent) createEvent(NotificationServicesRegistry.SERVICE.DATA,
+                instanceState.getInstance());
+        dataEvent.setStatus(DataEvent.STATUS.UNAVAILABLE);
+
+        FalconExecutionService.get().onEvent(dataEvent);
+
+        Assert.assertEquals(instanceState.getCurrentState(), InstanceState.STATE.TIMED_OUT);
+    }
+
+    // Non-triggering event should not create an instance
+    @Test
+    public void testNonTriggeringEvents() throws Exception {
+        storeEntity(EntityType.PROCESS, "summarize6");
+        Process process = getStore().get(EntityType.PROCESS, "summarize6");
+        Assert.assertNotNull(process);
+        String clusterName = dfsCluster.getCluster().getName();
+        ID processID = new ID(process, clusterName);
+
+        // Schedule a process
+        Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
+        FalconExecutionService.get().schedule(process);
+        Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SCHEDULED);
+
+        // Simulate data notification with a callback of the process.
+        Event event = createEvent(NotificationServicesRegistry.SERVICE.DATA, process, clusterName);
+        FalconExecutionService.get().onEvent(event);
+
+        // No instances should get triggered
+        Assert.assertTrue(stateStore.getAllExecutionInstances(process, clusterName).isEmpty());
+    }
+
+    // Individual instance suspend, resume, kill
+    @Test
+    public void testInstanceOperations() throws Exception {
+        storeEntity(EntityType.PROCESS, "summarize5");
+        Process process = getStore().get(EntityType.PROCESS, "summarize5");
+        Assert.assertNotNull(process);
+        String clusterName = dfsCluster.getCluster().getName();
+        ID processID = new ID(process, clusterName);
+
+        // Schedule a process
+        Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
+        FalconExecutionService.get().schedule(process);
+
+        // Simulate three time notifications
+        Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+        FalconExecutionService.get().onEvent(event);
+        event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+        FalconExecutionService.get().onEvent(event);
+        event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+        FalconExecutionService.get().onEvent(event);
+
+        // Suspend and resume All waiting - check for notification deregistration
+        Iterator i = stateStore.getAllExecutionInstances(process, clusterName).iterator();
+        InstanceState instance1 = (InstanceState) i.next();
+        InstanceState instance2 = (InstanceState) i.next();
+        InstanceState instance3 = (InstanceState) i.next();
+
+        // Simulate two data notifications and one job run
+        event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance1.getInstance());
+        FalconExecutionService.get().onEvent(event);
+        event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance2.getInstance());
+        FalconExecutionService.get().onEvent(event);
+        event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance1.getInstance());
+        FalconExecutionService.get().onEvent(event);
+
+        // One in ready, one in waiting and one running.
+        Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
+        Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
+        Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING);
+
+        EntityExecutor entityExecutor = FalconExecutionService.get().getEntityExecutor(process, clusterName);
+        Assert.assertNotNull(entityExecutor);
+        Collection<ExecutionInstance> instances = new ArrayList<>();
+        for (InstanceState instanceState : stateStore.getAllExecutionInstances(process, clusterName)) {
+            instances.add(instanceState.getInstance());
+        }
+
+        for (ExecutionInstance instance : instances) {
+            entityExecutor.suspend(instance);
+        }
+
+        // Instances must be suspended, but, entity itself must not be.
+        for (InstanceState instanceState : stateStore.getAllExecutionInstances(process, clusterName)) {
+            Assert.assertEquals(instanceState.getCurrentState(), InstanceState.STATE.SUSPENDED);
+            Mockito.verify(mockDataService).unregister(FalconExecutionService.get(),
+                    instanceState.getInstance().getId());
+        }
+        Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SCHEDULED);
+
+        for (ExecutionInstance instance : instances) {
+            entityExecutor.resume(instance);
+        }
+        // Back to one in ready, one in waiting and one running.
+        Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
+        Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
+        Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING);
+
+        for (ExecutionInstance instance : instances) {
+            entityExecutor.kill(instance);
+        }
+
+        // Instances must be killed, but, entity itself must not be.
+        for (InstanceState instanceState : stateStore.getAllExecutionInstances(process, clusterName)) {
+            Assert.assertEquals(instanceState.getCurrentState(), InstanceState.STATE.KILLED);
+        }
+        Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SCHEDULED);
+    }
+
+    @Test(priority = -1)
+    // Add some entities and instance in the store and ensure the executor picks up from where it left
+    public void testSystemRestart() throws Exception {
+        storeEntity(EntityType.PROCESS, "summarize7");
+        Process process = getStore().get(EntityType.PROCESS, "summarize7");
+        Assert.assertNotNull(process);
+        String clusterName = dfsCluster.getCluster().getName();
+        ID processID = new ID(process, clusterName);
+
+        // Store couple of instances in store
+        stateStore.getEntity(processID).setCurrentState(EntityState.STATE.SCHEDULED);
+        ProcessExecutionInstance instance1 = new ProcessExecutionInstance(process,
+                new DateTime(System.currentTimeMillis() - 60 * 60 * 1000), clusterName);
+        InstanceState instanceState1 = new InstanceState(instance1);
+        instanceState1.setCurrentState(InstanceState.STATE.RUNNING);
+        stateStore.putExecutionInstance(instanceState1);
+        ProcessExecutionInstance instance2 = new ProcessExecutionInstance(process,
+                new DateTime(System.currentTimeMillis() - 30 * 60 * 1000), clusterName);
+        InstanceState instanceState2 = new InstanceState(instance2);
+        instanceState2.setCurrentState(InstanceState.STATE.READY);
+        stateStore.putExecutionInstance(instanceState2);
+
+        FalconExecutionService.get().init();
+
+        // Simulate a scheduled notification. This should cause the reload from state store
+        Event event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instanceState2.getInstance());
+        FalconExecutionService.get().onEvent(event);
+        Assert.assertEquals(instanceState2.getCurrentState(), InstanceState.STATE.RUNNING);
+
+        // Simulate a Job completion notification and ensure the instance resumes from where it left
+        event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instanceState1.getInstance());
+        FalconExecutionService.get().onEvent(event);
+        Assert.assertEquals(instanceState1.getCurrentState(), InstanceState.STATE.SUCCEEDED);
+    }
+
+    @Test(dataProvider = "actions")
+    public void testPartialFailures(String name, String action, InstanceState.STATE state) throws Exception {
+        storeEntity(EntityType.PROCESS, name);
+        Process process = getStore().get(EntityType.PROCESS, name);
+        Assert.assertNotNull(process);
+        String clusterName = dfsCluster.getCluster().getName();
+        ID processID = new ID(process, clusterName);
+
+        // Schedule the process
+        FalconExecutionService.get().schedule(process);
+
+        // Store couple of instances in store
+        stateStore.getEntity(processID).setCurrentState(EntityState.STATE.SCHEDULED);
+        ProcessExecutionInstance instance1 = new ProcessExecutionInstance(process,
+                new DateTime(System.currentTimeMillis() - 60 * 60 * 1000), clusterName);
+        instance1.setExternalID("123");
+        InstanceState instanceState1 = new InstanceState(instance1);
+        instanceState1.setCurrentState(InstanceState.STATE.RUNNING);
+        stateStore.putExecutionInstance(instanceState1);
+        ProcessExecutionInstance instance2 = new ProcessExecutionInstance(process,
+                new DateTime(System.currentTimeMillis() - 30 * 60 * 1000), clusterName);
+        InstanceState instanceState2 = new InstanceState(instance2);
+        instanceState2.setCurrentState(InstanceState.STATE.READY);
+        stateStore.putExecutionInstance(instanceState2);
+
+        // Mock failure
+        ((MockDAGEngine)dagEngine).addFailInstance(instance1);
+        Method m = FalconExecutionService.get().getClass().getMethod(action, Entity.class);
+        try {
+            m.invoke(FalconExecutionService.get(), process);
+            Assert.fail("Exception expected.");
+        } catch (Exception e) {
+            // One instance must fail and the other not
+            Assert.assertEquals(instanceState2.getCurrentState(), state);
+            Assert.assertEquals(instanceState1.getCurrentState(), InstanceState.STATE.RUNNING);
+        }
+
+        // throw no exception
+        ((MockDAGEngine)dagEngine).removeFailInstance(instance1);
+        m.invoke(FalconExecutionService.get(), process);
+
+        // Both instances must be in expected state.
+        Assert.assertEquals(instanceState2.getCurrentState(), state);
+        Assert.assertEquals(instanceState1.getCurrentState(), state);
+    }
+
+    @DataProvider(name = "actions")
+    public Object[][] testActions() {
+        return new Object[][] {
+            {"summarize8", "suspend", InstanceState.STATE.SUSPENDED},
+            {"summarize9", "delete", InstanceState.STATE.KILLED},
+        };
+    }
+
+    private Event createEvent(NotificationServicesRegistry.SERVICE type, Process process, String cluster) {
+        ID id = new ID(process, cluster);
+        switch (type) {
+        case TIME:
+            Date start = process.getClusters().getClusters().get(0).getValidity().getStart();
+            long instanceOffset =
+                    SchedulerUtil.getFrequencyInMillis(DateTime.now(), process.getFrequency()) * instanceCount++;
+            return new TimeElapsedEvent(id, new DateTime(start),
+                    new DateTime(process.getClusters().getClusters().get(0).getValidity().getEnd()),
+                    new DateTime(start.getTime() + instanceOffset));
+        case DATA:
+            DataEvent dataEvent = new DataEvent(id, new Path("/projects/falcon/clicks"), LocationType.DATA,
+                    DataEvent.STATUS.AVAILABLE);
+            return dataEvent;
+        default:
+            return null;
+        }
+    }
+
+    private Event createEvent(NotificationServicesRegistry.SERVICE type, ExecutionInstance instance) {
+        ID id = new ID(instance);
+        switch (type) {
+        case DATA:
+            DataEvent dataEvent = new DataEvent(id, new Path("/projects/falcon/clicks"), LocationType.DATA,
+                    DataEvent.STATUS.AVAILABLE);
+            return dataEvent;
+        case JOB_SCHEDULE:
+            JobScheduledEvent scheduledEvent = new JobScheduledEvent(id, JobScheduledEvent.STATUS.SUCCESSFUL);
+            scheduledEvent.setExternalID("234");
+            return scheduledEvent;
+        case JOB_COMPLETION:
+            JobCompletedEvent jobEvent = new JobCompletedEvent(id, WorkflowJob.Status.SUCCEEDED, DateTime.now());
+            return jobEvent;
+        default:
+            return null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
new file mode 100644
index 0000000..087114f
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.DAGEngineException;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.workflow.engine.DAGEngine;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A Mock DAG Execution Engine used for tests.
+ */
+public class MockDAGEngine implements DAGEngine {
+    private List<ExecutionInstance> failInstances = new ArrayList<>();
+    private Map<ExecutionInstance, Integer> runInvocations =  new HashMap<>();
+
+    public MockDAGEngine(String cluster) {
+
+    }
+
+    @Override
+    public String run(ExecutionInstance instance) throws DAGEngineException {
+        if (failInstances.contains(instance)) {
+            throw new DAGEngineException("Mock failure.");
+        }
+        Integer count = 1;
+        if (runInvocations.containsKey(instance)) {
+            // Increment count
+            count = runInvocations.get(instance) + 1;
+        }
+
+        runInvocations.put(instance, count);
+        return "123";
+    }
+
+    @Override
+    public boolean isScheduled(ExecutionInstance instance) throws DAGEngineException {
+        return true;
+    }
+
+    @Override
+    public void suspend(ExecutionInstance instance) throws DAGEngineException {
+        if (failInstances.contains(instance)) {
+            throw new DAGEngineException("mock failure.");
+        }
+    }
+
+    @Override
+    public void resume(ExecutionInstance instance) throws DAGEngineException {
+
+    }
+
+    @Override
+    public void kill(ExecutionInstance instance) throws DAGEngineException {
+        if (failInstances.contains(instance)) {
+            throw new DAGEngineException("mock failure.");
+        }
+    }
+
+    @Override
+    public void reRun(ExecutionInstance instance) throws DAGEngineException {
+
+    }
+
+    @Override
+    public void submit(Entity entity) throws DAGEngineException {
+
+    }
+
+    @Override
+    public InstancesResult.Instance info(String externalID) throws DAGEngineException {
+        return new InstancesResult.Instance();
+    }
+
+    @Override
+    public List<InstancesResult.InstanceAction> getJobDetails(String externalID) throws DAGEngineException {
+        return null;
+    }
+
+    @Override
+    public boolean isAlive() throws DAGEngineException {
+        return false;
+    }
+
+    @Override
+    public Properties getConfiguration(String externalID) throws DAGEngineException {
+        return null;
+    }
+
+    public void addFailInstance(ExecutionInstance failInstance) {
+        this.failInstances.add(failInstance);
+    }
+
+    public void removeFailInstance(ExecutionInstance failInstance) {
+        this.failInstances.remove(failInstance);
+    }
+
+    public Integer getTotalRuns(ExecutionInstance instance) {
+        return runInvocations.get(instance);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java b/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java
new file mode 100644
index 0000000..9457454
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.entity.v0.Frequency;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test for Utility methods.
+ */
+public class SchedulerUtilTest {
+
+    @Test(dataProvider = "frequencies")
+    public void testGetFrequencyInMillis(DateTime referenceTime, Frequency frequency, long expectedValue) {
+        Assert.assertEquals(SchedulerUtil.getFrequencyInMillis(referenceTime, frequency), expectedValue);
+    }
+
+    @DataProvider(name = "frequencies")
+    public Object[][] getTestFrequencies() {
+        DateTimeFormatter formatter = DateTimeFormat.forPattern("dd/MM/yyyy HH:mm:ss");
+        return new Object[][] {
+            {DateTime.now(), new Frequency("minutes(10)"), 10*60*1000L},
+            {DateTime.now(), new Frequency("hours(6)"), 6*60*60*1000L},
+            // Feb of leap year
+            {formatter.parseDateTime("04/02/2012 14:00:00"), new Frequency("months(1)"), 29*24*60*60*1000L},
+            // Months with 31 and 30 days
+            {formatter.parseDateTime("02/10/2015 03:30:00"), new Frequency("months(2)"), (31+30)*24*60*60*1000L},
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java
new file mode 100644
index 0000000..36f1fd1
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.impl.AlarmService;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.TimeZone;
+
+/**
+ * Class to test the time notification service.
+ */
+public class AlarmServiceTest {
+
+    private static AlarmService timeService = Mockito.spy(new AlarmService());
+    private static NotificationHandler handler = Mockito.mock(NotificationHandler.class);
+
+    @BeforeClass
+    public void setup() throws FalconException {
+        timeService.init();
+    }
+
+    @Test
+    // This test ensures notifications are generated for time events that have occurred in the past.
+    public void testbackLogCatchup() throws Exception {
+        TimeZone tz = TimeZone.getTimeZone("UTC");
+        DateTime now = DateTime.now(DateTimeZone.forTimeZone(tz));
+        // Start time 2 mins ago
+        DateTime startTime = new DateTime(now.getMillis() - 2*60*1000, DateTimeZone.forTimeZone(tz));
+        // End time 2 mins later
+        DateTime endTime = new DateTime(now.getMillis() + 2*60*1000 , DateTimeZone.forTimeZone(tz));
+
+        Process mockProcess = new Process();
+        mockProcess.setName("test");
+        ID id = new ID(mockProcess);
+        id.setCluster("testCluster");
+
+        AlarmService.AlarmRequestBuilder request =
+                new AlarmService.AlarmRequestBuilder(handler, id);
+        request.setStartTime(startTime);
+        request.setEndTime(endTime);
+        request.setFrequency(new Frequency("minutes(1)"));
+        request.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        timeService.register(request.build());
+        // Asynchronous execution, hence a small wait.
+        Thread.sleep(1000);
+        // Based on the minute boundary, there might be 3.
+        Mockito.verify(handler, Mockito.atLeast(2)).onEvent(Mockito.any(Event.class));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
new file mode 100644
index 0000000..b4a0f35
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.MockDAGEngine;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.execution.ProcessExecutionInstance;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.JobCompletedEvent;
+import org.apache.falcon.notification.service.event.JobScheduledEvent;
+import org.apache.falcon.notification.service.impl.DataAvailabilityService;
+import org.apache.falcon.notification.service.impl.JobCompletionService;
+import org.apache.falcon.notification.service.impl.SchedulerService;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.engine.DAGEngine;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.oozie.client.WorkflowJob;
+import org.joda.time.DateTime;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+
+import static org.apache.falcon.state.InstanceState.STATE;
+
+/**
+ * Class to test the job scheduler service.
+ */
+public class SchedulerServiceTest extends AbstractTestBase {
+
+    private SchedulerService scheduler = Mockito.spy(new SchedulerService());
+    private NotificationHandler handler;
+    private static String cluster = "testCluster";
+    private static InMemoryStateStore stateStore = (InMemoryStateStore) AbstractStateStore.get();
+    private static DAGEngine mockDagEngine;
+    private static Process process;
+    private volatile boolean failed = false;
+
+    @BeforeMethod
+    public void setup() throws FalconException {
+        // Initialized before every test in order to track invocations.
+        handler = Mockito.spy(new MockNotificationHandler());
+    }
+
+    @BeforeClass
+    public void init() throws Exception {
+        this.dfsCluster = EmbeddedCluster.newCluster(cluster);
+        this.conf = dfsCluster.getConf();
+        setupConfigStore();
+        DataAvailabilityService mockDataService = Mockito.mock(DataAvailabilityService.class);
+        Mockito.when(mockDataService.getName()).thenReturn("DataAvailabilityService");
+        Mockito.when(mockDataService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+                Mockito.any(ID.class))).thenCallRealMethod();
+        Services.get().register(mockDataService);
+        JobCompletionService mockCompletionService = Mockito.mock(JobCompletionService.class);
+        Mockito.when(mockCompletionService.getName()).thenReturn("JobCompletionService");
+        Mockito.when(mockCompletionService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+                Mockito.any(ID.class))).thenCallRealMethod();
+        Services.get().register(mockCompletionService);
+
+        Services.get().register(scheduler);
+        scheduler.init();
+        StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName());
+        mockDagEngine =  DAGEngineFactory.getDAGEngine("testCluster");
+    }
+
+    @AfterClass
+    public void tearDown() {
+        this.dfsCluster.shutdown();
+    }
+
+    // State store is set up to sync with Config Store. That gets tested too.
+    public void setupConfigStore() throws Exception {
+        getStore().registerListener(stateStore);
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        storeEntity(EntityType.FEED, "clicksFeed");
+        storeEntity(EntityType.FEED, "clicksSummary");
+        storeEntity(EntityType.PROCESS, "mockSummary");
+        process = getStore().get(EntityType.PROCESS, "mockSummary");
+    }
+
+    @Test
+    public void testSchedulingWithParallelInstances() throws Exception {
+        storeEntity(EntityType.PROCESS, "summarize");
+        Process mockProcess = getStore().get(EntityType.PROCESS, "summarize");
+        mockProcess.setParallel(1);
+        Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
+        ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
+        SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder)
+                scheduler.createRequestBuilder(handler, instance1.getId());
+        request.setInstance(instance1);
+        // No instances running, ensure DAGEngine.run is invoked.
+        scheduler.register(request.build());
+        Thread.sleep(100);
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+        Mockito.verify(handler).onEvent(Mockito.any(JobScheduledEvent.class));
+        // Max. instances running, the new instance should not be run.
+        ExecutionInstance instance2 = new ProcessExecutionInstance(mockProcess,
+                new DateTime(startTime.getTime() + 60000), cluster);
+        SchedulerService.JobScheduleRequestBuilder request2 = (SchedulerService.JobScheduleRequestBuilder)
+                scheduler.createRequestBuilder(handler, instance2.getId());
+        request2.setInstance(instance2);
+        scheduler.register(request2.build());
+        Thread.sleep(100);
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), null);
+        // Max. instances running, the new instance should not be run.
+        ExecutionInstance instance3 = new ProcessExecutionInstance(mockProcess,
+                new DateTime(startTime.getTime() + 120000), cluster);
+        SchedulerService.JobScheduleRequestBuilder request3 = (SchedulerService.JobScheduleRequestBuilder)
+                scheduler.createRequestBuilder(handler, instance3.getId());
+        request3.setInstance(instance3);
+        scheduler.register(request3.build());
+        Thread.sleep(100);
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+        // Simulate the completion of previous instance.
+        stateStore.getExecutionInstance(instance1.getId()).setCurrentState(STATE.SUCCEEDED);
+        scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster), WorkflowJob.Status.SUCCEEDED,
+                DateTime.now()));
+        // When an instance completes instance2 should get scheduled next iteration
+        Thread.sleep(100);
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1));
+        // Simulate another completion and ensure instance3 runs.
+        stateStore.getExecutionInstance(instance2.getId()).setCurrentState(STATE.SUCCEEDED);
+        scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster), WorkflowJob.Status.SUCCEEDED,
+                DateTime.now()));
+        Thread.sleep(100);
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance3), new Integer(1));
+    }
+
+    @Test
+    public void testSchedulingWithDependencies() throws Exception {
+        storeEntity(EntityType.PROCESS, "summarize1");
+        Process mockProcess = getStore().get(EntityType.PROCESS, "summarize1");
+        Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
+        // Create two instances, one dependent on the other.
+        ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
+        ExecutionInstance instance2 = new ProcessExecutionInstance(mockProcess,
+                new DateTime(startTime.getTime() + 60000), cluster);
+        SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder)
+                scheduler.createRequestBuilder(handler, instance1.getId());
+        ArrayList<ExecutionInstance> dependencies = new ArrayList<ExecutionInstance>();
+        dependencies.add(instance2);
+        request.setDependencies(dependencies);
+        request.setInstance(instance1);
+        // instance2 is not scheduled, dependency not satisfied
+        // So, instance1 should not be scheduled either.
+        scheduler.register(request.build());
+        Thread.sleep(100);
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), null);
+        Mockito.verify(handler, Mockito.times(0)).onEvent(Mockito.any(JobScheduledEvent.class));
+
+        // Schedule instance1
+        SchedulerService.JobScheduleRequestBuilder request2 = (SchedulerService.JobScheduleRequestBuilder)
+                scheduler.createRequestBuilder(handler, instance2.getId());
+        request2.setInstance(instance2);
+        scheduler.register(request2.build());
+
+        // Simulate completion of the instance.
+        Thread.sleep(100);
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1));
+        Mockito.verify(handler, Mockito.times(1)).onEvent(Mockito.any(JobScheduledEvent.class));
+        stateStore.getExecutionInstance(instance2.getId()).setCurrentState(STATE.SUCCEEDED);
+        scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster, instance2.getInstanceTime()),
+                WorkflowJob.Status.SUCCEEDED, DateTime.now()));
+        // Dependency now satisfied. Now, the first instance should get scheduled after retry delay.
+        Thread.sleep(100);
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+    }
+
+    @Test
+    public void testSchedulingWithPriorities() throws Exception {
+        storeEntity(EntityType.PROCESS, "summarize2");
+        Process mockProcess = getStore().get(EntityType.PROCESS, "summarize2");
+        storeEntity(EntityType.PROCESS, "summarize3");
+        Process mockProcess2 = getStore().get(EntityType.PROCESS, "summarize3");
+        // Set higher priority
+        Property priorityProp = new Property();
+        priorityProp.setName(EntityUtil.MR_JOB_PRIORITY);
+        priorityProp.setValue("HIGH");
+        mockProcess2.getProperties().getProperties().add(priorityProp);
+        Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
+        // Create two one dependent on the other.
+        ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
+        ExecutionInstance instance2 = new ProcessExecutionInstance(mockProcess2, new DateTime(startTime), cluster);
+        SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder)
+                scheduler.createRequestBuilder(handler, instance1.getId());
+        request.setInstance(instance1);
+        SchedulerService.JobScheduleRequestBuilder request2 = (SchedulerService.JobScheduleRequestBuilder)
+                scheduler.createRequestBuilder(handler, instance2.getId());
+        request2.setInstance(instance2);
+        scheduler.register(request.build());
+        scheduler.register(request2.build());
+        Thread.sleep(100);
+        // Instance2 should be scheduled first.
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1));
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+        Mockito.verify(handler, Mockito.times(2)).onEvent(Mockito.any(JobScheduledEvent.class));
+    }
+
+    @Test
+    public void testDeRegistration() throws Exception {
+        storeEntity(EntityType.PROCESS, "summarize4");
+        Process mockProcess = getStore().get(EntityType.PROCESS, "summarize4");
+        mockProcess.setParallel(3);
+        Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
+        ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
+        // Schedule 3 instances.
+        SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder)
+                scheduler.createRequestBuilder(handler, instance1.getId());
+        request.setInstance(instance1);
+        scheduler.register(request.build());
+        ExecutionInstance instance2 = new ProcessExecutionInstance(mockProcess,
+                new DateTime(startTime.getTime() + 60000), cluster);
+        SchedulerService.JobScheduleRequestBuilder request2 = (SchedulerService.JobScheduleRequestBuilder)
+                scheduler.createRequestBuilder(handler, instance2.getId());
+        request2.setInstance(instance2);
+        scheduler.register(request2.build());
+        ExecutionInstance instance3 = new ProcessExecutionInstance(mockProcess,
+                new DateTime(startTime.getTime() + 120000), cluster);
+        SchedulerService.JobScheduleRequestBuilder request3 = (SchedulerService.JobScheduleRequestBuilder)
+                scheduler.createRequestBuilder(handler, instance3.getId());
+        request3.setInstance(instance3);
+        scheduler.register(request3.build());
+
+        // Abort second instance
+        scheduler.unregister(handler, instance2.getId());
+
+        Thread.sleep(100);
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance3), new Integer(1));
+        // Second instance should not run.
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), null);
+    }
+
+    @Test
+    public void testScheduleFailure() throws Exception {
+        storeEntity(EntityType.PROCESS, "summarize5");
+        Process mockProcess = getStore().get(EntityType.PROCESS, "summarize5");
+        Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
+        ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
+        // Scheduling an instance should fail.
+        NotificationHandler failureHandler = new NotificationHandler() {
+            @Override
+            public void onEvent(Event event) throws FalconException {
+                JobScheduledEvent scheduledEvent = ((JobScheduledEvent) event);
+                if (scheduledEvent.getStatus() != JobScheduledEvent.STATUS.FAILED) {
+                    failed = true;
+                }
+            }
+        };
+        SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder)
+                scheduler.createRequestBuilder(failureHandler, instance1.getId());
+        request.setInstance(instance1);
+        ((MockDAGEngine)mockDagEngine).addFailInstance(instance1);
+        scheduler.register(request.build());
+        Thread.sleep(100);
+        Assert.assertFalse(failed);
+        ((MockDAGEngine)mockDagEngine).removeFailInstance(instance1);
+    }
+
+    /**
+     * A mock notification Handler that makes appropriate state changes.
+     */
+    public static class MockNotificationHandler implements NotificationHandler {
+        @Override
+        public void onEvent(Event event) throws FalconException {
+            JobScheduledEvent scheduledEvent = ((JobScheduledEvent) event);
+            Process p = (Process) process.copy();
+            p.setName(scheduledEvent.getTarget().getEntityName());
+            ProcessExecutionInstance instance = new ProcessExecutionInstance(p,
+                    scheduledEvent.getTarget().getInstanceTime(), cluster);
+            InstanceState state = new InstanceState(instance).setCurrentState(STATE.RUNNING);
+            if (!stateStore.executionInstanceExists(instance.getId())) {
+                stateStore.putExecutionInstance(state);
+            } else {
+                stateStore.updateExecutionInstance(state);
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java b/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java
new file mode 100644
index 0000000..95dd5ae
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.predicate;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.notification.service.event.TimeElapsedEvent;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.apache.falcon.entity.v0.process.Process;
+
+/**
+ * Tests the predicate class.
+ */
+public class PredicateTest {
+
+    @Test
+    public void testPredicateFromEvent() throws FalconException {
+        Process process = new Process();
+        process.setName("test");
+        DateTime now = DateTime.now();
+        TimeElapsedEvent te = new TimeElapsedEvent(new ID(process), now, now, now);
+        Predicate.getPredicate(te);
+    }
+
+    @Test
+    public void testComparison() {
+        Predicate firstPredicate = Predicate.createTimePredicate(100, 200, 50);
+        Predicate secondPredicate = Predicate.createTimePredicate(1000, 2000, 50);
+        Predicate thirdPredicate = Predicate.createTimePredicate(100, 200, -1);
+
+        Assert.assertFalse(firstPredicate.evaluate(secondPredicate));
+        Assert.assertFalse(secondPredicate.evaluate(thirdPredicate));
+        //With "ANY" type
+        Assert.assertTrue(firstPredicate.evaluate(thirdPredicate));
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
new file mode 100644
index 0000000..2f32b43
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.exception.InvalidStateTransitionException;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.InMemoryStateStore;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Tests to ensure entity state changes happen correctly.
+ */
+public class EntityStateServiceTest {
+
+    private EntityStateChangeHandler listener = Mockito.mock(EntityStateChangeHandler.class);
+
+    @BeforeMethod
+    public void setUp() {
+        ((InMemoryStateStore) AbstractStateStore.get()).clear();
+    }
+
+    // Tests a schedulable entity's lifecycle : Submit -> run -> suspend -> resume
+    @Test
+    public void testLifeCycle() throws FalconException {
+        Process mockEntity = new Process();
+        mockEntity.setName("test");
+
+        StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUBMIT, listener);
+        EntityState entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next();
+        Mockito.verify(listener).onSubmit(mockEntity);
+        Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SUBMITTED));
+        StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SCHEDULE, listener);
+        Mockito.verify(listener).onSchedule(mockEntity);
+        Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SCHEDULED));
+        StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUSPEND, listener);
+        Mockito.verify(listener).onSuspend(mockEntity);
+        Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SUSPENDED));
+        StateService.get().handleStateChange(mockEntity, EntityState.EVENT.RESUME, listener);
+        Mockito.verify(listener).onResume(mockEntity);
+        Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SCHEDULED));
+    }
+
+    @Test
+    public void testInvalidTransitions() throws FalconException {
+        Feed mockEntity = new Feed();
+        mockEntity.setName("test");
+        StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUBMIT, listener);
+        // Attempt suspending a submitted entity
+        try {
+            StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUSPEND, listener);
+            Assert.fail("Exception expected");
+        } catch (InvalidStateTransitionException e) {
+            // Do nothing
+        }
+
+        StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SCHEDULE, listener);
+        // Attempt resuming a scheduled entity
+        try {
+            StateService.get().handleStateChange(mockEntity, EntityState.EVENT.RESUME, listener);
+            Assert.fail("Exception expected");
+        } catch (InvalidStateTransitionException e) {
+            // Do nothing
+        }
+
+        // Attempt scheduling a cluster
+        Cluster mockCluster = new Cluster();
+        mockCluster.setName("test");
+        StateService.get().handleStateChange(mockCluster, EntityState.EVENT.SUBMIT, listener);
+        try {
+            StateService.get().handleStateChange(mockCluster, EntityState.EVENT.SCHEDULE, listener);
+            Assert.fail("Exception expected");
+        } catch (FalconException e) {
+            // Do nothing
+        }
+    }
+
+    @Test(dataProvider = "state_and_events")
+    public void testIdempotency(EntityState.STATE state, EntityState.EVENT event)
+        throws InvalidStateTransitionException {
+        Process mockEntity = new Process();
+        mockEntity.setName("test");
+
+        EntityState entityState = new EntityState(mockEntity).setCurrentState(state);
+        entityState.nextTransition(event);
+        Assert.assertEquals(entityState.getCurrentState(), state);
+    }
+
+    @DataProvider(name = "state_and_events")
+    public Object[][] stateAndEvents() {
+        return new Object[][]{
+            {EntityState.STATE.SCHEDULED, EntityState.EVENT.SCHEDULE},
+            {EntityState.STATE.SUBMITTED, EntityState.EVENT.SUBMIT},
+            {EntityState.STATE.SUSPENDED, EntityState.EVENT.SUSPEND},
+        };
+    }
+}