You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2015/10/20 14:09:59 UTC
[2/6] falcon git commit: FALCON-1213 Base framework of the native
scheduler
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
new file mode 100644
index 0000000..ca2010b
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.workflow.engine;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.exception.DAGEngineException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A DAG Engine that uses Oozie to execute the DAG.
+ */
+public class OozieDAGEngine implements DAGEngine {
+ private static final Logger LOG = LoggerFactory.getLogger(OozieDAGEngine.class);
+ private final OozieClient client;
+ private static final int WORKFLOW_STATUS_RETRY_DELAY_MS = 100;
+
+ private static final String WORKFLOW_STATUS_RETRY_COUNT = "workflow.status.retry.count";
+ private static final List<String> PARENT_WF_ACTION_NAMES = Arrays.asList(
+ "pre-processing",
+ "recordsize",
+ "succeeded-post-processing",
+ "failed-post-processing"
+ );
+
+ public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm";
+ private final Cluster cluster;
+
+ public OozieDAGEngine(Cluster cluster) throws DAGEngineException {
+ try {
+ client = OozieClientFactory.get(cluster);
+ this.cluster = cluster;
+ } catch (Exception e) {
+ throw new DAGEngineException(e);
+ }
+ }
+
+ public OozieDAGEngine(String clusterName) throws DAGEngineException {
+ try {
+ this.cluster = ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
+ client = OozieClientFactory.get(cluster);
+ } catch (Exception e) {
+ throw new DAGEngineException(e);
+ }
+ }
+
+ @Override
+ public String run(ExecutionInstance instance) throws DAGEngineException {
+ try {
+ Properties properties = getRunProperties(instance);
+ Path buildPath = EntityUtil.getLatestStagingPath(cluster, instance.getEntity());
+ switchUser();
+ properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser());
+ properties.setProperty(OozieClient.APP_PATH, buildPath.toString());
+ return client.run(properties);
+ } catch (OozieClientException e) {
+ LOG.error("Ozie client exception:", e);
+ throw new DAGEngineException(e);
+ } catch (FalconException e1) {
+ LOG.error("Falcon Exception : ", e1);
+ throw new DAGEngineException(e1);
+ }
+ }
+
+ private void prepareEntityBuildPath(Entity entity) throws FalconException {
+ Path stagingPath = EntityUtil.getBaseStagingPath(cluster, entity);
+ Path logPath = EntityUtil.getLogPath(cluster, entity);
+
+ try {
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+ ClusterHelper.getConfiguration(cluster));
+ HadoopClientFactory.mkdirsWithDefaultPerms(fs, stagingPath);
+ HadoopClientFactory.mkdirsWithDefaultPerms(fs, logPath);
+ } catch (IOException e) {
+ throw new FalconException("Error preparing base staging dirs: " + stagingPath, e);
+ }
+ }
+
+ private void dryRunInternal(Properties props) throws OozieClientException {
+ LOG.info("Dry run with properties {}", props);
+ client.dryrun(props);
+ }
+
+ private void switchUser() {
+ String user = System.getProperty("user.name");
+ CurrentUser.authenticate(user);
+ }
+
+ @Override
+ public boolean isScheduled(ExecutionInstance instance) throws DAGEngineException {
+ try {
+ return statusEquals(client.getJobInfo(instance.getExternalID()).getStatus().name(),
+ Job.Status.PREP, Job.Status.RUNNING);
+ } catch (OozieClientException e) {
+ throw new DAGEngineException(e);
+ }
+ }
+
+ // TODO : To be implemented. Currently hardcoded for process
+ private Properties getRunProperties(ExecutionInstance instance) {
+ Properties props = new Properties();
+ DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT);
+ String nominalTime = fmt.print(instance.getInstanceTime());
+ props.put("nominalTime", nominalTime);
+ props.put("timeStamp", nominalTime);
+ props.put("feedNames", "NONE");
+ props.put("feedInstancePaths", "NONE");
+ props.put("falconInputFeeds", "NONE");
+ props.put("falconInPaths", "NONE");
+ props.put("feedNames", "NONE");
+ props.put("feedInstancePaths", "NONE");
+ props.put("userJMSNotificationEnabled", "true");
+ return props;
+ }
+
+ // TODO : To be implemented. Currently hardcoded for process
+ private Properties getDryRunProperties(Entity entity) {
+ Properties props = new Properties();
+ DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT);
+ String nominalTime = fmt.print(DateTime.now());
+ props.put("nominalTime", nominalTime);
+ props.put("timeStamp", nominalTime);
+ props.put("feedNames", "NONE");
+ props.put("feedInstancePaths", "NONE");
+ props.put("falconInputFeeds", "NONE");
+ props.put("falconInPaths", "NONE");
+ props.put("feedNames", "NONE");
+ props.put("feedInstancePaths", "NONE");
+ props.put("userJMSNotificationEnabled", "true");
+ return props;
+ }
+
+ @Override
+ public void suspend(ExecutionInstance instance) throws DAGEngineException {
+ try {
+ client.suspend(instance.getExternalID());
+ assertStatus(instance.getExternalID(), Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED, Job.Status.SUCCEEDED,
+ Job.Status.FAILED, Job.Status.KILLED);
+ LOG.info("Suspended job {} on cluster {}", instance.getExternalID(), instance.getCluster());
+ } catch (OozieClientException e) {
+ throw new DAGEngineException(e);
+ }
+ }
+
+ @Override
+ public void resume(ExecutionInstance instance) throws DAGEngineException {
+ try {
+ client.resume(instance.getExternalID());
+ assertStatus(instance.getExternalID(), Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUCCEEDED,
+ Job.Status.FAILED, Job.Status.KILLED);
+ LOG.info("Resumed job {} on cluster {}", instance.getExternalID(), instance.getCluster());
+ } catch (OozieClientException e) {
+ throw new DAGEngineException(e);
+ }
+ }
+
+ @Override
+ public void kill(ExecutionInstance instance) throws DAGEngineException {
+ try {
+ client.kill(instance.getExternalID());
+ assertStatus(instance.getExternalID(), Job.Status.KILLED, Job.Status.SUCCEEDED, Job.Status.FAILED);
+ LOG.info("Killed job {} on cluster {}", instance.getExternalID(), instance.getCluster());
+ } catch (OozieClientException e) {
+ throw new DAGEngineException(e);
+ }
+ }
+
+ @Override
+ public void reRun(ExecutionInstance instance) throws DAGEngineException {
+ // TODO : Implement this
+ }
+
+ @Override
+ public void submit(Entity entity) throws DAGEngineException {
+ try {
+ // TODO : remove hardcoded Tag value when feed support is added.
+ OozieOrchestrationWorkflowBuilder builder =
+ OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.DEFAULT);
+ prepareEntityBuildPath(entity);
+ Path buildPath = EntityUtil.getNewStagingPath(cluster, entity);
+ Properties properties = builder.build(cluster, buildPath);
+ if (properties == null) {
+ LOG.info("Entity {} is not scheduled on cluster {}", entity.getName(), cluster);
+ throw new DAGEngineException("Properties for entity " + entity.getName() + " is empty");
+ }
+
+ switchUser();
+ LOG.debug("Logged in user is " + CurrentUser.getUser());
+ properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser());
+ properties.setProperty(OozieClient.APP_PATH, buildPath.toString());
+ properties.putAll(getDryRunProperties(entity));
+ //Do submit before run as run is asynchronous
+ dryRunInternal(properties);
+ } catch (OozieClientException e) {
+ LOG.error("Oozie client exception:", e);
+ throw new DAGEngineException(e);
+ } catch (FalconException e1) {
+ LOG.error("Falcon Exception : ", e1);
+ throw new DAGEngineException(e1);
+ }
+ }
+
+ @Override
+ public InstancesResult.Instance info(String externalID) throws DAGEngineException {
+ InstancesResult.Instance instance = new InstancesResult.Instance();
+ try {
+ LOG.debug("Retrieving details for job {} ", externalID);
+ WorkflowJob jobInfo = client.getJobInfo(externalID);
+ instance.startTime = jobInfo.getStartTime();
+ if (jobInfo.getStatus().name().equals(Job.Status.RUNNING.name())) {
+ instance.endTime = new Date();
+ } else {
+ instance.endTime = jobInfo.getEndTime();
+ }
+ instance.cluster = cluster.getName();
+ instance.runId = jobInfo.getRun();
+ instance.status = InstancesResult.WorkflowStatus.valueOf(jobInfo.getStatus().name());
+ instance.logFile = jobInfo.getConsoleUrl();
+ instance.wfParams = getWFParams(jobInfo);
+ return instance;
+ } catch (Exception e) {
+ LOG.error("Error when attempting to get info for " + externalID, e);
+ throw new DAGEngineException(e);
+ }
+ }
+
+ private InstancesResult.KeyValuePair[] getWFParams(WorkflowJob jobInfo) {
+ Configuration conf = new Configuration(false);
+ conf.addResource(new ByteArrayInputStream(jobInfo.getConf().getBytes()));
+ InstancesResult.KeyValuePair[] wfParams = new InstancesResult.KeyValuePair[conf.size()];
+ int i = 0;
+ for (Map.Entry<String, String> entry : conf) {
+ wfParams[i++] = new InstancesResult.KeyValuePair(entry.getKey(), entry.getValue());
+ }
+ return wfParams;
+ }
+
+ @Override
+ public List<InstancesResult.InstanceAction> getJobDetails(String externalID) throws DAGEngineException {
+ List<InstancesResult.InstanceAction> instanceActions = new ArrayList<>();
+ try {
+ WorkflowJob wfJob = client.getJobInfo(externalID);
+ List<WorkflowAction> wfActions = wfJob.getActions();
+ // We wanna capture job urls for all user-actions & non succeeded actions of the main workflow
+ for (WorkflowAction action : wfActions) {
+ if (action.getType().equalsIgnoreCase("sub-workflow")
+ && StringUtils.isNotEmpty(action.getExternalId())) {
+ // if the action is sub-workflow, get job urls of all actions within the sub-workflow
+ List<WorkflowAction> subWorkFlowActions = client
+ .getJobInfo(action.getExternalId()).getActions();
+ for (WorkflowAction subWfAction : subWorkFlowActions) {
+ if (!subWfAction.getType().startsWith(":")) {
+ InstancesResult.InstanceAction instanceAction =
+ new InstancesResult.InstanceAction(subWfAction.getName(),
+ subWfAction.getExternalStatus(), subWfAction.getConsoleUrl());
+ instanceActions.add(instanceAction);
+ }
+ }
+ } else if (!action.getType().startsWith(":")) {
+ // if the action is a transition node it starts with :, we don't need their statuses
+ if (PARENT_WF_ACTION_NAMES.contains(action.getName())
+ && !Job.Status.SUCCEEDED.toString().equals(action.getExternalStatus())) {
+ // falcon actions in the main workflow are defined in the list
+ // get job urls for all non succeeded actions of the main workflow
+ InstancesResult.InstanceAction instanceAction =
+ new InstancesResult.InstanceAction(action.getName(), action.getExternalStatus(),
+ action.getConsoleUrl());
+ instanceActions.add(instanceAction);
+ } else if (!PARENT_WF_ACTION_NAMES.contains(action.getName())
+ && !StringUtils.equals(action.getExternalId(), "-")) {
+ // if user-action is pig/hive there is no sub-workflow, we wanna capture their urls as well
+ InstancesResult.InstanceAction instanceAction =
+ new InstancesResult.InstanceAction(action.getName(), action.getExternalStatus(),
+ action.getConsoleUrl());
+ instanceActions.add(instanceAction);
+ }
+ }
+ }
+ return instanceActions;
+ } catch (OozieClientException oce) {
+ throw new DAGEngineException(oce);
+ }
+ }
+
+ @Override
+ public boolean isAlive() throws DAGEngineException {
+ try {
+ return client.getSystemMode() == OozieClient.SYSTEM_MODE.NORMAL;
+ } catch (OozieClientException e) {
+ throw new DAGEngineException("Unable to reach Oozie server.", e);
+ }
+ }
+
+ @Override
+ public Properties getConfiguration(String externalID) throws DAGEngineException {
+ Properties props = new Properties();
+ try {
+ WorkflowJob jobInfo = client.getJobInfo(externalID);
+ Configuration conf = new Configuration(false);
+ conf.addResource(new ByteArrayInputStream(jobInfo.getConf().getBytes()));
+
+ for (Map.Entry<String, String> entry : conf) {
+ props.put(entry.getKey(), entry.getValue());
+ }
+ } catch (OozieClientException e) {
+ throw new DAGEngineException(e);
+ }
+
+ return props;
+ }
+
+ // Get status of a workflow (with retry) and ensure it is one of statuses requested.
+ private void assertStatus(String jobID, Job.Status... statuses) throws DAGEngineException {
+ String actualStatus = null;
+ int retryCount;
+ String retry = RuntimeProperties.get().getProperty(WORKFLOW_STATUS_RETRY_COUNT, "30");
+ try {
+ retryCount = Integer.valueOf(retry);
+ } catch (NumberFormatException nfe) {
+ throw new DAGEngineException("Invalid value provided for runtime property \""
+ + WORKFLOW_STATUS_RETRY_COUNT + "\". Please provide an integer value.");
+ }
+ for (int counter = 0; counter < retryCount; counter++) {
+ try {
+ actualStatus = client.getJobInfo(jobID).getStatus().name();
+ } catch (OozieClientException e) {
+ LOG.error("Unable to get status of workflow: " + jobID, e);
+ throw new DAGEngineException(e);
+ }
+ if (!statusEquals(actualStatus, statuses)) {
+ try {
+ Thread.sleep(WORKFLOW_STATUS_RETRY_DELAY_MS);
+ } catch (InterruptedException ignore) {
+ //ignore
+ }
+ } else {
+ return;
+ }
+ }
+ throw new DAGEngineException("For Job" + jobID + ", actual statuses: " + actualStatus + ", expected statuses: "
+ + Arrays.toString(statuses));
+ }
+
+ private boolean statusEquals(String left, Job.Status... right) {
+ for (Job.Status rightElement : right) {
+ if (left.equals(rightElement.name())) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
new file mode 100644
index 0000000..b2f9e59
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
@@ -0,0 +1,557 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.notification.service.event.DataEvent;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.JobCompletedEvent;
+import org.apache.falcon.notification.service.event.JobScheduledEvent;
+import org.apache.falcon.notification.service.event.TimeElapsedEvent;
+import org.apache.falcon.notification.service.impl.AlarmService;
+import org.apache.falcon.notification.service.impl.DataAvailabilityService;
+import org.apache.falcon.notification.service.impl.JobCompletionService;
+import org.apache.falcon.notification.service.impl.SchedulerService;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.engine.DAGEngine;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.OozieClientException;
+import org.apache.oozie.client.WorkflowJob;
+import org.joda.time.DateTime;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+
+/**
+ * Tests the API of FalconExecution Service and in turn the FalconExecutionService.get()s.
+ */
+public class FalconExecutionServiceTest extends AbstractTestBase {
+
+ private InMemoryStateStore stateStore = null;
+ private AlarmService mockTimeService;
+ private DataAvailabilityService mockDataService;
+ private SchedulerService mockSchedulerService;
+ private JobCompletionService mockCompletionService;
+ private DAGEngine dagEngine;
+ private int instanceCount = 0;
+
+ @BeforeClass
+ public void init() throws Exception {
+ this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+ this.conf = dfsCluster.getConf();
+ setupServices();
+ setupConfigStore();
+ }
+
+ @AfterClass
+ public void tearDown() {
+ this.dfsCluster.shutdown();
+ }
+
+ // State store is set up to sync with Config Store. That gets tested too.
+ public void setupConfigStore() throws Exception {
+ stateStore = (InMemoryStateStore) AbstractStateStore.get();
+ getStore().registerListener(stateStore);
+ storeEntity(EntityType.CLUSTER, "testCluster");
+ storeEntity(EntityType.FEED, "clicksFeed");
+ storeEntity(EntityType.FEED, "clicksSummary");
+ }
+
+ public void setupServices() throws FalconException, OozieClientException {
+ mockTimeService = Mockito.mock(AlarmService.class);
+ Mockito.when(mockTimeService.getName()).thenReturn("AlarmService");
+ Mockito.when(mockTimeService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+ Mockito.any(ID.class))).thenCallRealMethod();
+
+ mockDataService = Mockito.mock(DataAvailabilityService.class);
+ Mockito.when(mockDataService.getName()).thenReturn("DataAvailabilityService");
+ Mockito.when(mockDataService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+ Mockito.any(ID.class))).thenCallRealMethod();
+ mockSchedulerService = Mockito.mock(SchedulerService.class);
+ Mockito.when(mockSchedulerService.getName()).thenReturn("JobSchedulerService");
+ StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName());
+ StartupProperties.get().setProperty("execution.service.impl", FalconExecutionService.class.getName());
+ dagEngine = Mockito.spy(DAGEngineFactory.getDAGEngine("testCluster"));
+ Mockito.when(mockSchedulerService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+ Mockito.any(ID.class))).thenCallRealMethod();
+ mockCompletionService = Mockito.mock(JobCompletionService.class);
+ Mockito.when(mockCompletionService.getName()).thenReturn("JobCompletionService");
+ Mockito.when(mockCompletionService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+ Mockito.any(ID.class))).thenCallRealMethod();
+ Services.get().register(mockTimeService);
+ Services.get().register(mockDataService);
+ Services.get().register(mockSchedulerService);
+ Services.get().register(mockCompletionService);
+ Services.get().register(FalconExecutionService.get());
+ }
+
+ @BeforeMethod
+ private void setupStateStore() throws FalconException {
+ stateStore.clear();
+ }
+
+ @Test
+ public void testBasicFlow() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize1");
+ Process process = getStore().get(EntityType.PROCESS, "summarize1");
+ Assert.assertNotNull(process);
+ ID processKey = new ID(process);
+ String clusterName = dfsCluster.getCluster().getName();
+
+ // Schedule a process
+ Assert.assertEquals(stateStore.getEntity(processKey).getCurrentState(), EntityState.STATE.SUBMITTED);
+ FalconExecutionService.get().schedule(process);
+ Assert.assertEquals(stateStore.getEntity(processKey).getCurrentState(), EntityState.STATE.SCHEDULED);
+
+ // Simulate a time notification
+ Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+
+ // Ensure an instance is triggered and registers for data notification
+ Assert.assertEquals(stateStore.getAllExecutionInstances(process, clusterName).size(), 1);
+ InstanceState instance = stateStore.getAllExecutionInstances(process, clusterName).iterator().next();
+ Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.WAITING);
+
+ // Simulate a data notification
+ event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ // Ensure the instance is ready for execution
+ Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.READY);
+
+ // Simulate a scheduled notification
+ event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ // Ensure the instance is running
+ instance = stateStore.getAllExecutionInstances(process, clusterName).iterator().next();
+ Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.RUNNING);
+
+ // Simulate a job complete notification
+ event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instance.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ // Ensure the instance is in succeeded and is in the state store
+ instance = stateStore.getAllExecutionInstances(process, clusterName).iterator().next();
+ Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.SUCCEEDED);
+ }
+
+ @Test
+ public void testSuspendResume() throws Exception {
+ Mockito.doNothing().when(dagEngine).resume(Mockito.any(ExecutionInstance.class));
+ storeEntity(EntityType.PROCESS, "summarize2");
+ Process process = getStore().get(EntityType.PROCESS, "summarize2");
+ Assert.assertNotNull(process);
+ String clusterName = dfsCluster.getCluster().getName();
+ ID processID = new ID(process, clusterName);
+
+ // Schedule a process
+ Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
+ FalconExecutionService.get().schedule(process);
+
+ // Simulate two time notifications
+ Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+
+ // Suspend and resume All waiting - check for notification deregistration
+
+ Iterator i = stateStore.getAllExecutionInstances(process, clusterName).iterator();
+ InstanceState instance1 = (InstanceState) i.next();
+ InstanceState instance2 = (InstanceState) i.next();
+
+ // Simulate a data notification
+ event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance1.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ // One in ready and one in waiting. Both should be suspended.
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY);
+ Assert.assertEquals(instance1.getInstance().getAwaitingPredicates().size(), 0);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.WAITING);
+
+ FalconExecutionService.get().suspend(process);
+
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUSPENDED);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.SUSPENDED);
+ Mockito.verify(mockDataService).unregister(FalconExecutionService.get(),
+ instance1.getInstance().getId());
+ Mockito.verify(mockDataService).unregister(FalconExecutionService.get(),
+ instance2.getInstance().getId());
+ Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), processID);
+
+ FalconExecutionService.get().resume(process);
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.WAITING);
+
+ // Simulate a data notification and a job run notification
+ event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance2.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance1.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ // One in running and the other in ready. Both should be suspended
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
+ Mockito.when(dagEngine.isScheduled(instance1.getInstance())).thenReturn(true);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
+
+ FalconExecutionService.get().suspend(process);
+
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUSPENDED);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.SUSPENDED);
+
+ FalconExecutionService.get().resume(process);
+
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
+
+ // Running should finish after resume
+ event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instance1.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUCCEEDED);
+ }
+
+ @Test
+ // Kill waiting, ready, running - check for notification deregistration
+ public void testDelete() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize4");
+ Process process = getStore().get(EntityType.PROCESS, "summarize4");
+ Assert.assertNotNull(process);
+ String clusterName = dfsCluster.getCluster().getName();
+ ID processID = new ID(process, clusterName);
+
+ // Schedule a process
+ Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
+ FalconExecutionService.get().schedule(process);
+
+ // Simulate three time notifications
+ Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+
+ // Suspend and resume All waiting - check for notification deregistration
+ Iterator i = stateStore.getAllExecutionInstances(process, clusterName).iterator();
+ InstanceState instance1 = (InstanceState) i.next();
+ InstanceState instance2 = (InstanceState) i.next();
+ InstanceState instance3 = (InstanceState) i.next();
+
+ // Simulate two data notifications and one job run
+ event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance1.getInstance());
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance2.getInstance());
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance1.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ // One in ready, one in waiting and one running.
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
+ Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING);
+
+ FalconExecutionService.get().delete(process);
+
+ // Deregister from notification services
+ Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), processID);
+ }
+
+ @Test
+ public void testTimeOut() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize3");
+ Process process = getStore().get(EntityType.PROCESS, "summarize3");
+ Assert.assertNotNull(process);
+ String clusterName = dfsCluster.getCluster().getName();
+ ID processID = new ID(process, clusterName);
+
+ // Schedule a process
+ Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
+ FalconExecutionService.get().schedule(process);
+
+ // Simulate time notification
+ Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+
+ // Simulate data unavailable notification and timeout
+ InstanceState instanceState = stateStore.getAllExecutionInstances(process, clusterName).iterator().next();
+ ((Process) instanceState.getInstance().getEntity()).setTimeout(new Frequency("minutes(0)"));
+ DataEvent dataEvent = (DataEvent) createEvent(NotificationServicesRegistry.SERVICE.DATA,
+ instanceState.getInstance());
+ dataEvent.setStatus(DataEvent.STATUS.UNAVAILABLE);
+
+ FalconExecutionService.get().onEvent(dataEvent);
+
+ Assert.assertEquals(instanceState.getCurrentState(), InstanceState.STATE.TIMED_OUT);
+ }
+
+ // Non-triggering event should not create an instance
+ @Test
+ public void testNonTriggeringEvents() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize6");
+ Process process = getStore().get(EntityType.PROCESS, "summarize6");
+ Assert.assertNotNull(process);
+ String clusterName = dfsCluster.getCluster().getName();
+ ID processID = new ID(process, clusterName);
+
+ // Schedule a process
+ Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
+ FalconExecutionService.get().schedule(process);
+ Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SCHEDULED);
+
+ // Simulate data notification with a callback of the process.
+ Event event = createEvent(NotificationServicesRegistry.SERVICE.DATA, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+
+ // No instances should get triggered
+ Assert.assertTrue(stateStore.getAllExecutionInstances(process, clusterName).isEmpty());
+ }
+
+ // Individual instance suspend, resume, kill
+ @Test
+ public void testInstanceOperations() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize5");
+ Process process = getStore().get(EntityType.PROCESS, "summarize5");
+ Assert.assertNotNull(process);
+ String clusterName = dfsCluster.getCluster().getName();
+ ID processID = new ID(process, clusterName);
+
+ // Schedule a process
+ Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
+ FalconExecutionService.get().schedule(process);
+
+ // Simulate three time notifications
+ Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+
+ // Suspend and resume All waiting - check for notification deregistration
+ Iterator i = stateStore.getAllExecutionInstances(process, clusterName).iterator();
+ InstanceState instance1 = (InstanceState) i.next();
+ InstanceState instance2 = (InstanceState) i.next();
+ InstanceState instance3 = (InstanceState) i.next();
+
+ // Simulate two data notifications and one job run
+ event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance1.getInstance());
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance2.getInstance());
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance1.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ // One in ready, one in waiting and one running.
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
+ Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING);
+
+ EntityExecutor entityExecutor = FalconExecutionService.get().getEntityExecutor(process, clusterName);
+ Assert.assertNotNull(entityExecutor);
+ Collection<ExecutionInstance> instances = new ArrayList<>();
+ for (InstanceState instanceState : stateStore.getAllExecutionInstances(process, clusterName)) {
+ instances.add(instanceState.getInstance());
+ }
+
+ for (ExecutionInstance instance : instances) {
+ entityExecutor.suspend(instance);
+ }
+
+ // Instances must be suspended, but, entity itself must not be.
+ for (InstanceState instanceState : stateStore.getAllExecutionInstances(process, clusterName)) {
+ Assert.assertEquals(instanceState.getCurrentState(), InstanceState.STATE.SUSPENDED);
+ Mockito.verify(mockDataService).unregister(FalconExecutionService.get(),
+ instanceState.getInstance().getId());
+ }
+ Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SCHEDULED);
+
+ for (ExecutionInstance instance : instances) {
+ entityExecutor.resume(instance);
+ }
+ // Back to one in ready, one in waiting and one running.
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
+ Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING);
+
+ for (ExecutionInstance instance : instances) {
+ entityExecutor.kill(instance);
+ }
+
+ // Instances must be killed, but, entity itself must not be.
+ for (InstanceState instanceState : stateStore.getAllExecutionInstances(process, clusterName)) {
+ Assert.assertEquals(instanceState.getCurrentState(), InstanceState.STATE.KILLED);
+ }
+ Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SCHEDULED);
+ }
+
+ @Test(priority = -1)
+ // Add some entities and instance in the store and ensure the executor picks up from where it left
+ public void testSystemRestart() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize7");
+ Process process = getStore().get(EntityType.PROCESS, "summarize7");
+ Assert.assertNotNull(process);
+ String clusterName = dfsCluster.getCluster().getName();
+ ID processID = new ID(process, clusterName);
+
+ // Store couple of instances in store
+ stateStore.getEntity(processID).setCurrentState(EntityState.STATE.SCHEDULED);
+ ProcessExecutionInstance instance1 = new ProcessExecutionInstance(process,
+ new DateTime(System.currentTimeMillis() - 60 * 60 * 1000), clusterName);
+ InstanceState instanceState1 = new InstanceState(instance1);
+ instanceState1.setCurrentState(InstanceState.STATE.RUNNING);
+ stateStore.putExecutionInstance(instanceState1);
+ ProcessExecutionInstance instance2 = new ProcessExecutionInstance(process,
+ new DateTime(System.currentTimeMillis() - 30 * 60 * 1000), clusterName);
+ InstanceState instanceState2 = new InstanceState(instance2);
+ instanceState2.setCurrentState(InstanceState.STATE.READY);
+ stateStore.putExecutionInstance(instanceState2);
+
+ FalconExecutionService.get().init();
+
+ // Simulate a scheduled notification. This should cause the reload from state store
+ Event event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instanceState2.getInstance());
+ FalconExecutionService.get().onEvent(event);
+ Assert.assertEquals(instanceState2.getCurrentState(), InstanceState.STATE.RUNNING);
+
+ // Simulate a Job completion notification and ensure the instance resumes from where it left
+ event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instanceState1.getInstance());
+ FalconExecutionService.get().onEvent(event);
+ Assert.assertEquals(instanceState1.getCurrentState(), InstanceState.STATE.SUCCEEDED);
+ }
+
+ @Test(dataProvider = "actions")
+ public void testPartialFailures(String name, String action, InstanceState.STATE state) throws Exception {
+ storeEntity(EntityType.PROCESS, name);
+ Process process = getStore().get(EntityType.PROCESS, name);
+ Assert.assertNotNull(process);
+ String clusterName = dfsCluster.getCluster().getName();
+ ID processID = new ID(process, clusterName);
+
+ // Schedule the process
+ FalconExecutionService.get().schedule(process);
+
+ // Store couple of instances in store
+ stateStore.getEntity(processID).setCurrentState(EntityState.STATE.SCHEDULED);
+ ProcessExecutionInstance instance1 = new ProcessExecutionInstance(process,
+ new DateTime(System.currentTimeMillis() - 60 * 60 * 1000), clusterName);
+ instance1.setExternalID("123");
+ InstanceState instanceState1 = new InstanceState(instance1);
+ instanceState1.setCurrentState(InstanceState.STATE.RUNNING);
+ stateStore.putExecutionInstance(instanceState1);
+ ProcessExecutionInstance instance2 = new ProcessExecutionInstance(process,
+ new DateTime(System.currentTimeMillis() - 30 * 60 * 1000), clusterName);
+ InstanceState instanceState2 = new InstanceState(instance2);
+ instanceState2.setCurrentState(InstanceState.STATE.READY);
+ stateStore.putExecutionInstance(instanceState2);
+
+ // Mock failure
+ ((MockDAGEngine)dagEngine).addFailInstance(instance1);
+ Method m = FalconExecutionService.get().getClass().getMethod(action, Entity.class);
+ try {
+ m.invoke(FalconExecutionService.get(), process);
+ Assert.fail("Exception expected.");
+ } catch (Exception e) {
+ // One instance must fail and the other not
+ Assert.assertEquals(instanceState2.getCurrentState(), state);
+ Assert.assertEquals(instanceState1.getCurrentState(), InstanceState.STATE.RUNNING);
+ }
+
+ // throw no exception
+ ((MockDAGEngine)dagEngine).removeFailInstance(instance1);
+ m.invoke(FalconExecutionService.get(), process);
+
+ // Both instances must be in expected state.
+ Assert.assertEquals(instanceState2.getCurrentState(), state);
+ Assert.assertEquals(instanceState1.getCurrentState(), state);
+ }
+
+ @DataProvider(name = "actions")
+ public Object[][] testActions() {
+ return new Object[][] {
+ {"summarize8", "suspend", InstanceState.STATE.SUSPENDED},
+ {"summarize9", "delete", InstanceState.STATE.KILLED},
+ };
+ }
+
+ private Event createEvent(NotificationServicesRegistry.SERVICE type, Process process, String cluster) {
+ ID id = new ID(process, cluster);
+ switch (type) {
+ case TIME:
+ Date start = process.getClusters().getClusters().get(0).getValidity().getStart();
+ long instanceOffset =
+ SchedulerUtil.getFrequencyInMillis(DateTime.now(), process.getFrequency()) * instanceCount++;
+ return new TimeElapsedEvent(id, new DateTime(start),
+ new DateTime(process.getClusters().getClusters().get(0).getValidity().getEnd()),
+ new DateTime(start.getTime() + instanceOffset));
+ case DATA:
+ DataEvent dataEvent = new DataEvent(id, new Path("/projects/falcon/clicks"), LocationType.DATA,
+ DataEvent.STATUS.AVAILABLE);
+ return dataEvent;
+ default:
+ return null;
+ }
+ }
+
+ private Event createEvent(NotificationServicesRegistry.SERVICE type, ExecutionInstance instance) {
+ ID id = new ID(instance);
+ switch (type) {
+ case DATA:
+ DataEvent dataEvent = new DataEvent(id, new Path("/projects/falcon/clicks"), LocationType.DATA,
+ DataEvent.STATUS.AVAILABLE);
+ return dataEvent;
+ case JOB_SCHEDULE:
+ JobScheduledEvent scheduledEvent = new JobScheduledEvent(id, JobScheduledEvent.STATUS.SUCCESSFUL);
+ scheduledEvent.setExternalID("234");
+ return scheduledEvent;
+ case JOB_COMPLETION:
+ JobCompletedEvent jobEvent = new JobCompletedEvent(id, WorkflowJob.Status.SUCCEEDED, DateTime.now());
+ return jobEvent;
+ default:
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
new file mode 100644
index 0000000..087114f
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.DAGEngineException;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.workflow.engine.DAGEngine;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A Mock DAG Execution Engine used for tests.
+ */
+public class MockDAGEngine implements DAGEngine {
+ private List<ExecutionInstance> failInstances = new ArrayList<>();
+ private Map<ExecutionInstance, Integer> runInvocations = new HashMap<>();
+
+ public MockDAGEngine(String cluster) {
+
+ }
+
+ @Override
+ public String run(ExecutionInstance instance) throws DAGEngineException {
+ if (failInstances.contains(instance)) {
+ throw new DAGEngineException("Mock failure.");
+ }
+ Integer count = 1;
+ if (runInvocations.containsKey(instance)) {
+ // Increment count
+ count = runInvocations.get(instance) + 1;
+ }
+
+ runInvocations.put(instance, count);
+ return "123";
+ }
+
+ @Override
+ public boolean isScheduled(ExecutionInstance instance) throws DAGEngineException {
+ return true;
+ }
+
+ @Override
+ public void suspend(ExecutionInstance instance) throws DAGEngineException {
+ if (failInstances.contains(instance)) {
+ throw new DAGEngineException("mock failure.");
+ }
+ }
+
+ @Override
+ public void resume(ExecutionInstance instance) throws DAGEngineException {
+
+ }
+
+ @Override
+ public void kill(ExecutionInstance instance) throws DAGEngineException {
+ if (failInstances.contains(instance)) {
+ throw new DAGEngineException("mock failure.");
+ }
+ }
+
+ @Override
+ public void reRun(ExecutionInstance instance) throws DAGEngineException {
+
+ }
+
+ @Override
+ public void submit(Entity entity) throws DAGEngineException {
+
+ }
+
+ @Override
+ public InstancesResult.Instance info(String externalID) throws DAGEngineException {
+ return new InstancesResult.Instance();
+ }
+
+ @Override
+ public List<InstancesResult.InstanceAction> getJobDetails(String externalID) throws DAGEngineException {
+ return null;
+ }
+
+ @Override
+ public boolean isAlive() throws DAGEngineException {
+ return false;
+ }
+
+ @Override
+ public Properties getConfiguration(String externalID) throws DAGEngineException {
+ return null;
+ }
+
+ public void addFailInstance(ExecutionInstance failInstance) {
+ this.failInstances.add(failInstance);
+ }
+
+ public void removeFailInstance(ExecutionInstance failInstance) {
+ this.failInstances.remove(failInstance);
+ }
+
+ public Integer getTotalRuns(ExecutionInstance instance) {
+ return runInvocations.get(instance);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java b/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java
new file mode 100644
index 0000000..9457454
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.entity.v0.Frequency;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test for Utility methods.
+ */
+public class SchedulerUtilTest {
+
+ @Test(dataProvider = "frequencies")
+ public void testGetFrequencyInMillis(DateTime referenceTime, Frequency frequency, long expectedValue) {
+ Assert.assertEquals(SchedulerUtil.getFrequencyInMillis(referenceTime, frequency), expectedValue);
+ }
+
+ @DataProvider(name = "frequencies")
+ public Object[][] getTestFrequencies() {
+ DateTimeFormatter formatter = DateTimeFormat.forPattern("dd/MM/yyyy HH:mm:ss");
+ return new Object[][] {
+ {DateTime.now(), new Frequency("minutes(10)"), 10*60*1000L},
+ {DateTime.now(), new Frequency("hours(6)"), 6*60*60*1000L},
+ // Feb of leap year
+ {formatter.parseDateTime("04/02/2012 14:00:00"), new Frequency("months(1)"), 29*24*60*60*1000L},
+ // Months with 31 and 30 days
+ {formatter.parseDateTime("02/10/2015 03:30:00"), new Frequency("months(2)"), (31+30)*24*60*60*1000L},
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java
new file mode 100644
index 0000000..36f1fd1
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.impl.AlarmService;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.TimeZone;
+
+/**
+ * Class to test the time notification service.
+ */
+public class AlarmServiceTest {
+
+ private static AlarmService timeService = Mockito.spy(new AlarmService());
+ private static NotificationHandler handler = Mockito.mock(NotificationHandler.class);
+
+ @BeforeClass
+ public void setup() throws FalconException {
+ timeService.init();
+ }
+
+ @Test
+ // This test ensures notifications are generated for time events that have occurred in the past.
+ public void testbackLogCatchup() throws Exception {
+ TimeZone tz = TimeZone.getTimeZone("UTC");
+ DateTime now = DateTime.now(DateTimeZone.forTimeZone(tz));
+ // Start time 2 mins ago
+ DateTime startTime = new DateTime(now.getMillis() - 2*60*1000, DateTimeZone.forTimeZone(tz));
+ // End time 2 mins later
+ DateTime endTime = new DateTime(now.getMillis() + 2*60*1000 , DateTimeZone.forTimeZone(tz));
+
+ Process mockProcess = new Process();
+ mockProcess.setName("test");
+ ID id = new ID(mockProcess);
+ id.setCluster("testCluster");
+
+ AlarmService.AlarmRequestBuilder request =
+ new AlarmService.AlarmRequestBuilder(handler, id);
+ request.setStartTime(startTime);
+ request.setEndTime(endTime);
+ request.setFrequency(new Frequency("minutes(1)"));
+ request.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ timeService.register(request.build());
+ // Asynchronous execution, hence a small wait.
+ Thread.sleep(1000);
+ // Based on the minute boundary, there might be 3.
+ Mockito.verify(handler, Mockito.atLeast(2)).onEvent(Mockito.any(Event.class));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
new file mode 100644
index 0000000..b4a0f35
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.MockDAGEngine;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.execution.ProcessExecutionInstance;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.JobCompletedEvent;
+import org.apache.falcon.notification.service.event.JobScheduledEvent;
+import org.apache.falcon.notification.service.impl.DataAvailabilityService;
+import org.apache.falcon.notification.service.impl.JobCompletionService;
+import org.apache.falcon.notification.service.impl.SchedulerService;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.engine.DAGEngine;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.oozie.client.WorkflowJob;
+import org.joda.time.DateTime;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+
+import static org.apache.falcon.state.InstanceState.STATE;
+
+/**
+ * Class to test the job scheduler service.
+ */
+public class SchedulerServiceTest extends AbstractTestBase {
+
+ private SchedulerService scheduler = Mockito.spy(new SchedulerService());
+ private NotificationHandler handler;
+ private static String cluster = "testCluster";
+ private static InMemoryStateStore stateStore = (InMemoryStateStore) AbstractStateStore.get();
+ private static DAGEngine mockDagEngine;
+ private static Process process;
+ private volatile boolean failed = false;
+
+ @BeforeMethod
+ public void setup() throws FalconException {
+ // Initialized before every test in order to track invocations.
+ handler = Mockito.spy(new MockNotificationHandler());
+ }
+
+ @BeforeClass
+ public void init() throws Exception {
+ this.dfsCluster = EmbeddedCluster.newCluster(cluster);
+ this.conf = dfsCluster.getConf();
+ setupConfigStore();
+ DataAvailabilityService mockDataService = Mockito.mock(DataAvailabilityService.class);
+ Mockito.when(mockDataService.getName()).thenReturn("DataAvailabilityService");
+ Mockito.when(mockDataService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+ Mockito.any(ID.class))).thenCallRealMethod();
+ Services.get().register(mockDataService);
+ JobCompletionService mockCompletionService = Mockito.mock(JobCompletionService.class);
+ Mockito.when(mockCompletionService.getName()).thenReturn("JobCompletionService");
+ Mockito.when(mockCompletionService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+ Mockito.any(ID.class))).thenCallRealMethod();
+ Services.get().register(mockCompletionService);
+
+ Services.get().register(scheduler);
+ scheduler.init();
+ StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName());
+ mockDagEngine = DAGEngineFactory.getDAGEngine("testCluster");
+ }
+
+ @AfterClass
+ public void tearDown() {
+ this.dfsCluster.shutdown();
+ }
+
+ // State store is set up to sync with Config Store. That gets tested too.
+ public void setupConfigStore() throws Exception {
+ getStore().registerListener(stateStore);
+ storeEntity(EntityType.CLUSTER, "testCluster");
+ storeEntity(EntityType.FEED, "clicksFeed");
+ storeEntity(EntityType.FEED, "clicksSummary");
+ storeEntity(EntityType.PROCESS, "mockSummary");
+ process = getStore().get(EntityType.PROCESS, "mockSummary");
+ }
+
+ @Test
+ public void testSchedulingWithParallelInstances() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize");
+ Process mockProcess = getStore().get(EntityType.PROCESS, "summarize");
+ mockProcess.setParallel(1);
+ Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
+ ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
+ SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance1.getId());
+ request.setInstance(instance1);
+ // No instances running, ensure DAGEngine.run is invoked.
+ scheduler.register(request.build());
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+ Mockito.verify(handler).onEvent(Mockito.any(JobScheduledEvent.class));
+ // Max. instances running, the new instance should not be run.
+ ExecutionInstance instance2 = new ProcessExecutionInstance(mockProcess,
+ new DateTime(startTime.getTime() + 60000), cluster);
+ SchedulerService.JobScheduleRequestBuilder request2 = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance2.getId());
+ request2.setInstance(instance2);
+ scheduler.register(request2.build());
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), null);
+ // Max. instances running, the new instance should not be run.
+ ExecutionInstance instance3 = new ProcessExecutionInstance(mockProcess,
+ new DateTime(startTime.getTime() + 120000), cluster);
+ SchedulerService.JobScheduleRequestBuilder request3 = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance3.getId());
+ request3.setInstance(instance3);
+ scheduler.register(request3.build());
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+ // Simulate the completion of previous instance.
+ stateStore.getExecutionInstance(instance1.getId()).setCurrentState(STATE.SUCCEEDED);
+ scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster), WorkflowJob.Status.SUCCEEDED,
+ DateTime.now()));
+ // When an instance completes instance2 should get scheduled next iteration
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1));
+ // Simulate another completion and ensure instance3 runs.
+ stateStore.getExecutionInstance(instance2.getId()).setCurrentState(STATE.SUCCEEDED);
+ scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster), WorkflowJob.Status.SUCCEEDED,
+ DateTime.now()));
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance3), new Integer(1));
+ }
+
+ @Test
+ public void testSchedulingWithDependencies() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize1");
+ Process mockProcess = getStore().get(EntityType.PROCESS, "summarize1");
+ Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
+ // Create two instances, one dependent on the other.
+ ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
+ ExecutionInstance instance2 = new ProcessExecutionInstance(mockProcess,
+ new DateTime(startTime.getTime() + 60000), cluster);
+ SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance1.getId());
+ ArrayList<ExecutionInstance> dependencies = new ArrayList<ExecutionInstance>();
+ dependencies.add(instance2);
+ request.setDependencies(dependencies);
+ request.setInstance(instance1);
+ // instance2 is not scheduled, dependency not satisfied
+ // So, instance1 should not be scheduled either.
+ scheduler.register(request.build());
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), null);
+ Mockito.verify(handler, Mockito.times(0)).onEvent(Mockito.any(JobScheduledEvent.class));
+
+ // Schedule instance1
+ SchedulerService.JobScheduleRequestBuilder request2 = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance2.getId());
+ request2.setInstance(instance2);
+ scheduler.register(request2.build());
+
+ // Simulate completion of the instance.
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1));
+ Mockito.verify(handler, Mockito.times(1)).onEvent(Mockito.any(JobScheduledEvent.class));
+ stateStore.getExecutionInstance(instance2.getId()).setCurrentState(STATE.SUCCEEDED);
+ scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster, instance2.getInstanceTime()),
+ WorkflowJob.Status.SUCCEEDED, DateTime.now()));
+ // Dependency now satisfied. Now, the first instance should get scheduled after retry delay.
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+ }
+
+ @Test
+ public void testSchedulingWithPriorities() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize2");
+ Process mockProcess = getStore().get(EntityType.PROCESS, "summarize2");
+ storeEntity(EntityType.PROCESS, "summarize3");
+ Process mockProcess2 = getStore().get(EntityType.PROCESS, "summarize3");
+ // Set higher priority
+ Property priorityProp = new Property();
+ priorityProp.setName(EntityUtil.MR_JOB_PRIORITY);
+ priorityProp.setValue("HIGH");
+ mockProcess2.getProperties().getProperties().add(priorityProp);
+ Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
+ // Create two one dependent on the other.
+ ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
+ ExecutionInstance instance2 = new ProcessExecutionInstance(mockProcess2, new DateTime(startTime), cluster);
+ SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance1.getId());
+ request.setInstance(instance1);
+ SchedulerService.JobScheduleRequestBuilder request2 = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance2.getId());
+ request2.setInstance(instance2);
+ scheduler.register(request.build());
+ scheduler.register(request2.build());
+ Thread.sleep(100);
+ // Instance2 should be scheduled first.
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1));
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+ Mockito.verify(handler, Mockito.times(2)).onEvent(Mockito.any(JobScheduledEvent.class));
+ }
+
+ @Test
+ public void testDeRegistration() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize4");
+ Process mockProcess = getStore().get(EntityType.PROCESS, "summarize4");
+ mockProcess.setParallel(3);
+ Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
+ ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
+ // Schedule 3 instances.
+ SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance1.getId());
+ request.setInstance(instance1);
+ scheduler.register(request.build());
+ ExecutionInstance instance2 = new ProcessExecutionInstance(mockProcess,
+ new DateTime(startTime.getTime() + 60000), cluster);
+ SchedulerService.JobScheduleRequestBuilder request2 = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance2.getId());
+ request2.setInstance(instance2);
+ scheduler.register(request2.build());
+ ExecutionInstance instance3 = new ProcessExecutionInstance(mockProcess,
+ new DateTime(startTime.getTime() + 120000), cluster);
+ SchedulerService.JobScheduleRequestBuilder request3 = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance3.getId());
+ request3.setInstance(instance3);
+ scheduler.register(request3.build());
+
+ // Abort second instance
+ scheduler.unregister(handler, instance2.getId());
+
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance3), new Integer(1));
+ // Second instance should not run.
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), null);
+ }
+
+ @Test
+ public void testScheduleFailure() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize5");
+ Process mockProcess = getStore().get(EntityType.PROCESS, "summarize5");
+ Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
+ ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
+ // Scheduling an instance should fail.
+ NotificationHandler failureHandler = new NotificationHandler() {
+ @Override
+ public void onEvent(Event event) throws FalconException {
+ JobScheduledEvent scheduledEvent = ((JobScheduledEvent) event);
+ if (scheduledEvent.getStatus() != JobScheduledEvent.STATUS.FAILED) {
+ failed = true;
+ }
+ }
+ };
+ SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(failureHandler, instance1.getId());
+ request.setInstance(instance1);
+ ((MockDAGEngine)mockDagEngine).addFailInstance(instance1);
+ scheduler.register(request.build());
+ Thread.sleep(100);
+ Assert.assertFalse(failed);
+ ((MockDAGEngine)mockDagEngine).removeFailInstance(instance1);
+ }
+
+ /**
+ * A mock notification Handler that makes appropriate state changes.
+ */
+ public static class MockNotificationHandler implements NotificationHandler {
+ @Override
+ public void onEvent(Event event) throws FalconException {
+ JobScheduledEvent scheduledEvent = ((JobScheduledEvent) event);
+ Process p = (Process) process.copy();
+ p.setName(scheduledEvent.getTarget().getEntityName());
+ ProcessExecutionInstance instance = new ProcessExecutionInstance(p,
+ scheduledEvent.getTarget().getInstanceTime(), cluster);
+ InstanceState state = new InstanceState(instance).setCurrentState(STATE.RUNNING);
+ if (!stateStore.executionInstanceExists(instance.getId())) {
+ stateStore.putExecutionInstance(state);
+ } else {
+ stateStore.updateExecutionInstance(state);
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java b/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java
new file mode 100644
index 0000000..95dd5ae
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.predicate;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.notification.service.event.TimeElapsedEvent;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.apache.falcon.entity.v0.process.Process;
+
+/**
+ * Tests the predicate class.
+ */
+public class PredicateTest {
+
+ @Test
+ public void testPredicateFromEvent() throws FalconException {
+ Process process = new Process();
+ process.setName("test");
+ DateTime now = DateTime.now();
+ TimeElapsedEvent te = new TimeElapsedEvent(new ID(process), now, now, now);
+ Predicate.getPredicate(te);
+ }
+
+ @Test
+ public void testComparison() {
+ Predicate firstPredicate = Predicate.createTimePredicate(100, 200, 50);
+ Predicate secondPredicate = Predicate.createTimePredicate(1000, 2000, 50);
+ Predicate thirdPredicate = Predicate.createTimePredicate(100, 200, -1);
+
+ Assert.assertFalse(firstPredicate.evaluate(secondPredicate));
+ Assert.assertFalse(secondPredicate.evaluate(thirdPredicate));
+ //With "ANY" type
+ Assert.assertTrue(firstPredicate.evaluate(thirdPredicate));
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
new file mode 100644
index 0000000..2f32b43
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.exception.InvalidStateTransitionException;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.InMemoryStateStore;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Tests to ensure entity state changes happen correctly.
+ */
+public class EntityStateServiceTest {
+
+ private EntityStateChangeHandler listener = Mockito.mock(EntityStateChangeHandler.class);
+
+ @BeforeMethod
+ public void setUp() {
+ ((InMemoryStateStore) AbstractStateStore.get()).clear();
+ }
+
+ // Tests a schedulable entity's lifecycle : Submit -> run -> suspend -> resume
+ @Test
+ public void testLifeCycle() throws FalconException {
+ Process mockEntity = new Process();
+ mockEntity.setName("test");
+
+ StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUBMIT, listener);
+ EntityState entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next();
+ Mockito.verify(listener).onSubmit(mockEntity);
+ Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SUBMITTED));
+ StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SCHEDULE, listener);
+ Mockito.verify(listener).onSchedule(mockEntity);
+ Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SCHEDULED));
+ StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUSPEND, listener);
+ Mockito.verify(listener).onSuspend(mockEntity);
+ Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SUSPENDED));
+ StateService.get().handleStateChange(mockEntity, EntityState.EVENT.RESUME, listener);
+ Mockito.verify(listener).onResume(mockEntity);
+ Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SCHEDULED));
+ }
+
+ @Test
+ public void testInvalidTransitions() throws FalconException {
+ Feed mockEntity = new Feed();
+ mockEntity.setName("test");
+ StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUBMIT, listener);
+ // Attempt suspending a submitted entity
+ try {
+ StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUSPEND, listener);
+ Assert.fail("Exception expected");
+ } catch (InvalidStateTransitionException e) {
+ // Do nothing
+ }
+
+ StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SCHEDULE, listener);
+ // Attempt resuming a scheduled entity
+ try {
+ StateService.get().handleStateChange(mockEntity, EntityState.EVENT.RESUME, listener);
+ Assert.fail("Exception expected");
+ } catch (InvalidStateTransitionException e) {
+ // Do nothing
+ }
+
+ // Attempt scheduling a cluster
+ Cluster mockCluster = new Cluster();
+ mockCluster.setName("test");
+ StateService.get().handleStateChange(mockCluster, EntityState.EVENT.SUBMIT, listener);
+ try {
+ StateService.get().handleStateChange(mockCluster, EntityState.EVENT.SCHEDULE, listener);
+ Assert.fail("Exception expected");
+ } catch (FalconException e) {
+ // Do nothing
+ }
+ }
+
+ @Test(dataProvider = "state_and_events")
+ public void testIdempotency(EntityState.STATE state, EntityState.EVENT event)
+ throws InvalidStateTransitionException {
+ Process mockEntity = new Process();
+ mockEntity.setName("test");
+
+ EntityState entityState = new EntityState(mockEntity).setCurrentState(state);
+ entityState.nextTransition(event);
+ Assert.assertEquals(entityState.getCurrentState(), state);
+ }
+
+ @DataProvider(name = "state_and_events")
+ public Object[][] stateAndEvents() {
+ return new Object[][]{
+ {EntityState.STATE.SCHEDULED, EntityState.EVENT.SCHEDULE},
+ {EntityState.STATE.SUBMITTED, EntityState.EVENT.SUBMIT},
+ {EntityState.STATE.SUSPENDED, EntityState.EVENT.SUSPEND},
+ };
+ }
+}