You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2015/10/20 14:09:58 UTC
[1/6] falcon git commit: FALCON-1213 Base framework of the native
scheduler
Repository: falcon
Updated Branches:
refs/heads/master 9e6d5a6c5 -> a0911bd82
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
new file mode 100644
index 0000000..d27ac7e
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.exception.InvalidStateTransitionException;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.execution.ProcessExecutionInstance;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.InMemoryStateStore;
+import org.joda.time.DateTime;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Tests the state changes of an instance.
+ */
+public class InstanceStateServiceTest {
+
+ private InstanceStateChangeHandler listener = Mockito.mock(InstanceStateChangeHandler.class);
+ private ProcessExecutionInstance mockInstance;
+
+ @BeforeMethod
+ public void setup() {
+ Process testProcess = new Process();
+ testProcess.setName("test");
+ // Setup new mocks so we can verify the no. of invocations
+ mockInstance = Mockito.mock(ProcessExecutionInstance.class);
+ Mockito.when(mockInstance.getEntity()).thenReturn(testProcess);
+ Mockito.when(mockInstance.getInstanceTime()).thenReturn(DateTime.now());
+ Mockito.when(mockInstance.getCluster()).thenReturn("testCluster");
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ ((InMemoryStateStore) AbstractStateStore.get()).clear();
+ }
+
+ // Tests an entity instance's lifecycle : Trigger -> waiting -> ready -> running
+ // -> suspendAll -> resumeAll -> success
+ @Test
+ public void testLifeCycle() throws FalconException {
+ StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.TRIGGER, listener);
+ InstanceState instanceFromStore = AbstractStateStore.get()
+ .getExecutionInstance(new ID(mockInstance));
+ Mockito.verify(listener).onTrigger(mockInstance);
+ Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.WAITING));
+ StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.CONDITIONS_MET, listener);
+ Mockito.verify(listener).onConditionsMet(mockInstance);
+ Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.READY));
+ StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SCHEDULE, listener);
+ Mockito.verify(listener).onSchedule(mockInstance);
+ Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.RUNNING));
+ StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SUSPEND, listener);
+ Mockito.verify(listener).onSuspend(mockInstance);
+ Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.SUSPENDED));
+ StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.RESUME_RUNNING, listener);
+ Mockito.verify(listener).onResume(mockInstance);
+ Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.RUNNING));
+ StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SUCCEED, listener);
+ Mockito.verify(listener).onSuccess(mockInstance);
+ Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.SUCCEEDED));
+ Assert.assertEquals(AbstractStateStore.get().getAllEntities().size(), 0);
+ }
+
+ @Test
+ public void testInvalidTransitions() throws FalconException {
+ StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.TRIGGER, listener);
+ try {
+ StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SCHEDULE, listener);
+ Assert.fail("Exception expected");
+ } catch (InvalidStateTransitionException e) {
+ // Do nothing
+ }
+
+ // Resume an instance that is not suspended
+ StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.CONDITIONS_MET, listener);
+ try {
+ StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.RESUME_READY, listener);
+ Assert.fail("Exception expected");
+ } catch (InvalidStateTransitionException e) {
+ // Do nothing
+ }
+
+ StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SCHEDULE, listener);
+ StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.FAIL, listener);
+
+ // Attempt killing a completed instance
+ try {
+ StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.KILL, listener);
+ Assert.fail("Exception expected");
+ } catch (InvalidStateTransitionException e) {
+ // Do nothing
+ }
+ }
+
+ @Test(dataProvider = "state_and_events")
+ public void testIdempotency(InstanceState.STATE state, InstanceState.EVENT event)
+ throws InvalidStateTransitionException, StateStoreException {
+ InstanceState instanceState = new InstanceState(mockInstance).setCurrentState(state);
+ instanceState.nextTransition(event);
+ Assert.assertEquals(instanceState.getCurrentState(), state);
+ }
+
+ @DataProvider(name = "state_and_events")
+ public Object[][] stateAndEvents() {
+ return new Object[][] {
+ {InstanceState.STATE.WAITING, InstanceState.EVENT.TRIGGER},
+ {InstanceState.STATE.READY, InstanceState.EVENT.CONDITIONS_MET},
+ {InstanceState.STATE.TIMED_OUT, InstanceState.EVENT.TIME_OUT},
+ {InstanceState.STATE.RUNNING, InstanceState.EVENT.SCHEDULE},
+ {InstanceState.STATE.SUSPENDED, InstanceState.EVENT.SUSPEND},
+ {InstanceState.STATE.KILLED, InstanceState.EVENT.KILL},
+ {InstanceState.STATE.SUCCEEDED, InstanceState.EVENT.SUCCEED},
+ {InstanceState.STATE.FAILED, InstanceState.EVENT.FAIL},
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/resources/config/cluster/cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/config/cluster/cluster-0.1.xml b/scheduler/src/test/resources/config/cluster/cluster-0.1.xml
new file mode 100644
index 0000000..223cbc6
--- /dev/null
+++ b/scheduler/src/test/resources/config/cluster/cluster-0.1.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<cluster colo="default" description="" name="testCluster" xmlns="uri:falcon:cluster:0.1">
+ <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
+ <interfaces>
+ <interface type="readonly" endpoint="hftp://localhost:50010"
+ version="0.20.2"/>
+ <interface type="write" endpoint="jail://testCluster:00"
+ version="0.20.2"/>
+ <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+ <interface type="workflow" endpoint="http://localhost:11000/oozie" version="4.0"/>
+ <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+ version="5.1.6"/>
+ <interface type="registry" endpoint="http://localhost:48080/templeton/v1"
+ version="0.11.0"/>
+ </interfaces>
+ <locations>
+ <location name="staging" path="/projects/falcon/staging"/>
+ <location name="temp" path="/tmp"/>
+ <location name="working" path="/projects/falcon/working"/>
+ </locations>
+ <properties>
+ <property name="field1" value="value1"/>
+ <property name="field2" value="value2"/>
+ </properties>
+</cluster>
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/resources/config/feed/feed-0.1.xml
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/config/feed/feed-0.1.xml b/scheduler/src/test/resources/config/feed/feed-0.1.xml
new file mode 100644
index 0000000..25daf7d
--- /dev/null
+++ b/scheduler/src/test/resources/config/feed/feed-0.1.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1"
+ >
+ <partitions>
+ <partition name="fraud"/>
+ <partition name="good"/>
+ </partitions>
+
+ <groups>online,bi</groups>
+ <availabilityFlag>_SUCCESS</availabilityFlag>
+
+ <frequency>hours(1)</frequency>
+ <sla slaLow="hours(2)" slaHigh="hours(3)"/>
+ <timezone>UTC</timezone>
+
+ <late-arrival cut-off="hours(6)"/>
+
+ <clusters>
+ <cluster name="testCluster" type="source">
+ <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+ <retention limit="hours(48)" action="delete"/>
+ <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+ <sla slaLow="hours(3)" slaHigh="hours(4)"/>
+ <locations>
+ <location type="data" path="/projects/falcon/clicks"/>
+ <location type="stats" path="/projects/falcon/clicksStats"/>
+ <location type="meta" path="/projects/falcon/clicksMetaData"/>
+ </locations>
+ </cluster>
+ </clusters>
+
+ <locations>
+ <location type="data" path="/projects/falcon/clicks"/>
+ <location type="stats" path="/projects/falcon/clicksStats"/>
+ <location type="meta" path="/projects/falcon/clicksMetaData"/>
+ </locations>
+
+ <ACL owner="testuser" group="group" permission="0x755"/>
+ <schema location="/schema/clicks" provider="protobuf"/>
+</feed>
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/config/process/process-0.1.xml b/scheduler/src/test/resources/config/process/process-0.1.xml
new file mode 100644
index 0000000..deeb554
--- /dev/null
+++ b/scheduler/src/test/resources/config/process/process-0.1.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<process name="sample" xmlns="uri:falcon:process:0.1">
+ <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
+ <pipelines>testPipeline,dataReplication_Pipeline</pipelines>
+ <clusters>
+ <cluster name="testCluster">
+ <validity start="2011-11-02T00:00Z" end="2091-12-30T00:00Z"/>
+ </cluster>
+ </clusters>
+ <parallel>1</parallel>
+ <order>LIFO</order>
+ <frequency>minutes(1)</frequency>
+ <sla shouldStartIn="minutes(2)" shouldEndIn="minutes(4)"/>
+
+ <!-- what -->
+ <inputs>
+ <input name="clicks" feed="clicksFeed" start="yesterday(0,0)" end="yesterday(20,0)"/>
+ </inputs>
+
+ <outputs>
+ <output name="clicksOutput" feed="clicksSummary" instance="today(0,0)"/>
+ </outputs>
+
+ <!-- how -->
+ <properties>
+ <property name="name1" value="value1"/>
+ <property name="name2" value="value2"/>
+ </properties>
+
+ <workflow engine="oozie" path="/falcon/test/workflow"/>
+
+ <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+
+ <late-process policy="exp-backoff" delay="minutes(1)">
+ <late-input input="clicks" workflow-path="hdfs://clicks/late/workflow"/>
+ </late-process>
+</process>
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index da12d3a..8891e5f 100644
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -119,6 +119,12 @@
<dependency>
<groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-scheduler</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
<artifactId>falcon-retention</artifactId>
</dependency>
[3/6] falcon git commit: FALCON-1213 Base framework of the native
scheduler
Posted by pa...@apache.org.
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/ID.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/ID.java b/scheduler/src/main/java/org/apache/falcon/state/ID.java
new file mode 100644
index 0000000..420c856
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/ID.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.datanucleus.util.StringUtils;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.Serializable;
+
+/**
+ * A serializable, comparable ID used to uniquely represent an entity or an instance.
+ */
+public final class ID implements Serializable, Comparable<ID> {
+ public static final String KEY_SEPARATOR = "/";
+ public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm";
+
+ private String entityName;
+ private EntityType entityType;
+ private String entityKey;
+ private String cluster;
+ private DateTime instanceTime;
+
+ /**
+ * Default Constructor.
+ */
+ public ID(){}
+
+ /**
+ * Constructor.
+ *
+ * @param type
+ * @param name
+ */
+ public ID(EntityType type, String name) {
+ assert type != null : "Entity type must be present.";
+ assert !StringUtils.isEmpty(name) : "Entity name must be present.";
+ this.entityName = name;
+ this.entityType = type;
+ this.entityKey = entityType + KEY_SEPARATOR + entityName;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param entity
+ */
+ public ID(Entity entity) {
+ this(entity.getEntityType(), entity.getName());
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param entity
+ * @param cluster
+ */
+ public ID(Entity entity, String cluster) {
+ this(entity.getEntityType(), entity.getName());
+ assert !StringUtils.isEmpty(cluster) : "Cluster cannot be empty.";
+ this.cluster = cluster;
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param instance
+ */
+ public ID(ExecutionInstance instance) {
+ this(instance.getEntity(), instance.getCluster());
+ assert instance.getInstanceTime() != null : "Nominal time cannot be null.";
+ this.instanceTime = instance.getInstanceTime();
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param entity
+ * @param cluster
+ * @param instanceTime
+ */
+ public ID(Entity entity, String cluster, DateTime instanceTime) {
+ this(entity, cluster);
+ assert instanceTime != null : "Nominal time cannot be null.";
+ this.instanceTime = instanceTime;
+ }
+
+ /**
+ * @return cluster name
+ */
+ public String getCluster() {
+ return cluster;
+ }
+
+ /**
+ * @param cluster name
+ */
+ public void setCluster(String cluster) {
+ this.cluster = cluster;
+ }
+
+ /**
+ * @return nominal time
+ */
+ public DateTime getInstanceTime() {
+ return instanceTime;
+ }
+
+ /**
+ * @param instanceTime
+ */
+ public void setInstanceTime(DateTime instanceTime) {
+ this.instanceTime = instanceTime;
+ }
+
+ /**
+ * @return entity name
+ */
+ public String getEntityName() {
+ return entityName;
+ }
+
+ /**
+ * @return entity type
+ */
+ public EntityType getEntityType() {
+ return entityType;
+ }
+
+ @Override
+ public String toString() {
+ String val = entityKey;
+ if (!StringUtils.isEmpty(cluster)) {
+ val = val + KEY_SEPARATOR + cluster;
+ }
+
+ if (instanceTime != null) {
+ DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT);
+ val = val + KEY_SEPARATOR + fmt.print(instanceTime);
+ }
+ return val;
+ }
+
+ /**
+ * @return An ID without the cluster name
+ */
+ public String getEntityKey() {
+ return entityKey;
+ }
+
+ /**
+ * @return ID without the instance information
+ */
+ public ID getEntityID() {
+ ID newID = new ID(this.entityType, this.entityName);
+ newID.setCluster(this.cluster);
+ newID.setInstanceTime(null);
+ return newID;
+ }
+
+ @Override
+ public boolean equals(Object id) {
+ if (id == null || id.getClass() != getClass()) {
+ return false;
+ }
+ return compareTo((ID)id) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.toString().hashCode();
+ }
+
+ @Override
+ public int compareTo(ID id) {
+ if (id == null) {
+ return -1;
+ }
+ return this.toString().compareTo(id.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
new file mode 100644
index 0000000..8cf24ee
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.exception.InvalidStateTransitionException;
+import org.apache.falcon.execution.ExecutionInstance;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents the state of an execution instance.
+ * Implements {@link org.apache.falcon.state.StateMachine} for an instance.
+ */
+public class InstanceState implements StateMachine<InstanceState.STATE, InstanceState.EVENT> {
+ private ExecutionInstance instance;
+ private STATE currentState;
+ private static final STATE INITIAL_STATE = STATE.WAITING;
+
+ /**
+ * Enumerates all the valid states of an instance and the valid transitions from that state.
+ */
+ public enum STATE implements StateMachine<InstanceState.STATE, InstanceState.EVENT> {
+ WAITING {
+ @Override
+ public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+ switch (event) {
+ case SUSPEND:
+ return SUSPENDED;
+ case KILL:
+ return KILLED;
+ case CONDITIONS_MET:
+ return READY;
+ case TIME_OUT:
+ return TIMED_OUT;
+ case TRIGGER:
+ return this;
+ default:
+ throw new InvalidStateTransitionException("Event " + event.name() + " not valid for state, "
+ + STATE.WAITING.name());
+ }
+ }
+ },
+ READY {
+ @Override
+ public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+ switch (event) {
+ case SUSPEND:
+ return SUSPENDED;
+ case KILL:
+ return KILLED;
+ case SCHEDULE:
+ return RUNNING;
+ case CONDITIONS_MET:
+ return this;
+ default:
+ throw new InvalidStateTransitionException("Event " + event.name()
+ + " not valid for state, " + this.name());
+ }
+ }
+ },
+ RUNNING {
+ @Override
+ public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+ switch (event) {
+ case SUSPEND:
+ return SUSPENDED;
+ case KILL:
+ return KILLED;
+ case SUCCEED:
+ return SUCCEEDED;
+ case FAIL:
+ return FAILED;
+ case SCHEDULE:
+ return this;
+ default:
+ throw new InvalidStateTransitionException("Event " + event.name()
+ + " not valid for state, " + this.name());
+ }
+ }
+ }, SUCCEEDED {
+ @Override
+ public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+ if (event == EVENT.SUCCEED) {
+ return this;
+ }
+ throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name()
+ + ". Cannot apply transitions.");
+ }
+ },
+ FAILED {
+ @Override
+ public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+ if (event == EVENT.FAIL) {
+ return this;
+ }
+ throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name()
+ + ". Cannot apply transitions.");
+ }
+ },
+ KILLED {
+ @Override
+ public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+ if (event == EVENT.KILL) {
+ return this;
+ }
+ throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name()
+ + ". Cannot apply transitions.");
+ }
+ },
+ TIMED_OUT {
+ @Override
+ public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+ if (event == EVENT.TIME_OUT) {
+ return this;
+ }
+ throw new InvalidStateTransitionException("Instance is in terminal state, " + this.name()
+ + ". Cannot apply transitions.");
+ }
+ },
+ SUSPENDED {
+ @Override
+ public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+ switch (event) {
+ case RESUME_WAITING:
+ return WAITING;
+ case RESUME_READY:
+ return READY;
+ case RESUME_RUNNING:
+ return RUNNING;
+ case SUSPEND:
+ return this;
+ // The instance can complete execution on DAG engine, just after a suspend was issued.
+ // Especially with Oozie, it finishes execution of current action before suspending.
+ // Hence need to allow terminal states too.
+ case SUCCEED:
+ return SUCCEEDED;
+ case FAIL:
+ return FAILED;
+ case KILL:
+ return KILLED;
+ default:
+ throw new InvalidStateTransitionException("Event " + event.name()
+ + " not valid for state, " + this.name());
+ }
+ }
+ }
+ }
+
+ /**
+ * Enumerates all the valid events that can cause a state transition.
+ */
+ public enum EVENT {
+ TRIGGER,
+ CONDITIONS_MET,
+ TIME_OUT,
+ SCHEDULE,
+ SUSPEND,
+ RESUME_WAITING,
+ RESUME_READY,
+ RESUME_RUNNING,
+ KILL,
+ SUCCEED,
+ FAIL
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param instance
+ */
+ public InstanceState(ExecutionInstance instance) {
+ this.instance = instance;
+ currentState = INITIAL_STATE;
+ }
+
+ /**
+ * @return execution instance
+ */
+ public ExecutionInstance getInstance() {
+ return instance;
+ }
+
+ /**
+ * @return current state
+ */
+ public STATE getCurrentState() {
+ return currentState;
+ }
+
+ /**
+ * @param state
+ * @return This instance
+ */
+ public InstanceState setCurrentState(STATE state) {
+ this.currentState = state;
+ return this;
+ }
+
+ @Override
+ public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+ return currentState.nextTransition(event);
+ }
+
+ /**
+ * @return "active" states of an instance.
+ */
+ public static List<STATE> getActiveStates() {
+ List<InstanceState.STATE> states = new ArrayList<STATE>();
+ states.add(STATE.RUNNING);
+ states.add(STATE.READY);
+ states.add(STATE.WAITING);
+ return states;
+ }
+
+ /**
+ * @return "running" states of an instance.
+ */
+ public static List<STATE> getRunningStates() {
+ List<InstanceState.STATE> states = new ArrayList<STATE>();
+ states.add(STATE.RUNNING);
+ return states;
+ }
+
+ /**
+ * @return "terminal" states of an instance.
+ */
+ public static List<STATE> getTerminalStates() {
+ List<InstanceState.STATE> states = new ArrayList<STATE>();
+ states.add(STATE.FAILED);
+ states.add(STATE.KILLED);
+ states.add(STATE.SUCCEEDED);
+ return states;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java
new file mode 100644
index 0000000..1f69fab
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceStateChangeHandler.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.execution.ExecutionInstance;
+
+/**
+ * Any handler interested in handling state changes of instances must implement this interface.
+ */
+public interface InstanceStateChangeHandler {
+
+ /**
+ * Invoked when an instance is created.
+ *
+ * @param instance
+ * @throws FalconException
+ */
+ void onTrigger(ExecutionInstance instance) throws FalconException;
+
+ /**
+ * Invoked when all gating conditions are satisfied.
+ *
+ * @param instance
+ * @throws FalconException
+ */
+ void onConditionsMet(ExecutionInstance instance) throws FalconException;
+
+ /**
+ * Invoked when an instance scheduled on a DAG Engine.
+ *
+ * @param instance
+ * @throws FalconException
+ */
+ void onSchedule(ExecutionInstance instance) throws FalconException;
+
+ /**
+ * Invoked on suspension of an instance.
+ *
+ * @param instance
+ * @throws FalconException
+ */
+ void onSuspend(ExecutionInstance instance) throws FalconException;
+
+ /**
+ * Invoked when an instance is resumed.
+ *
+ * @param instance
+ * @throws FalconException
+ */
+ void onResume(ExecutionInstance instance) throws FalconException;
+
+ /**
+ * Invoked when an instance is killed.
+ *
+ * @param instance
+ * @throws FalconException
+ */
+ void onKill(ExecutionInstance instance) throws FalconException;
+
+ /**
+ * Invoked when an instance completes successfully.
+ *
+ * @param instance
+ * @throws FalconException
+ */
+ void onSuccess(ExecutionInstance instance) throws FalconException;
+
+ /**
+ * Invoked when execution of an instance fails.
+ *
+ * @param instance
+ * @throws FalconException
+ */
+ void onFailure(ExecutionInstance instance) throws FalconException;
+
+ /**
+ * Invoked when an instance times out waiting for gating conditions to be satisfied.
+ *
+ * @param instance
+ * @throws FalconException
+ */
+ void onTimeOut(ExecutionInstance instance) throws FalconException;
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/StateMachine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateMachine.java b/scheduler/src/main/java/org/apache/falcon/state/StateMachine.java
new file mode 100644
index 0000000..6ca0500
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/StateMachine.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.exception.InvalidStateTransitionException;
+
+/**
+ * Interface to be implemented by a class that handles state transitions.
+ */
+public interface StateMachine<STATE extends Enum<STATE>, EVENT extends Enum<EVENT>> {
+
+ /**
+ * @param event
+ * @return The state that the machine enters into as a result of the event.
+ * @throws InvalidStateTransitionException
+ */
+ STATE nextTransition(EVENT event) throws InvalidStateTransitionException;
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/StateService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
new file mode 100644
index 0000000..81357a4
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A service that fetches state from state store, handles state transitions of entities and instances,
+ * invokes state change handler and finally persists the new state in the state store.
+ */
+public final class StateService {
+ private static final Logger LOG = LoggerFactory.getLogger(StateService.class);
+ private static final StateService LIFE_CYCLE_SERVICE = new StateService();
+ private final StateStore stateStore;
+
+ private StateService() {
+ stateStore = AbstractStateStore.get();
+ }
+
+ /**
+ * @return - Singleton instance of StateService
+ */
+ public static StateService get() {
+ return LIFE_CYCLE_SERVICE;
+ }
+
+ /**
+ * @return - Name of the service
+ */
+ public String getName() {
+ return "EntityLifeCycleService";
+ }
+
+ /**
+ * Fetches the entity from state store, applies state transitions, calls appropriate method on the handler and
+ * persists the final state in the store.
+ *
+ * @param entity
+ * @param event
+ * @param handler
+ * @throws FalconException
+ */
+ public void handleStateChange(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler)
+ throws FalconException {
+ ID id = new ID(entity);
+ if (!stateStore.entityExists(id)) {
+ // New entity
+ if (event == EntityState.EVENT.SUBMIT) {
+ callbackHandler(entity, EntityState.EVENT.SUBMIT, handler);
+ stateStore.putEntity(new EntityState(entity));
+ LOG.debug("Entity {} submitted due to event {}.", id, event.name());
+ } else {
+ throw new FalconException("Entity " + id + " does not exist in state store.");
+ }
+ } else {
+ if (entity.getEntityType() == EntityType.CLUSTER) {
+ throw new FalconException("Cluster entity " + entity.getName() + " can only be submitted.");
+ }
+ EntityState entityState = stateStore.getEntity(id);
+ EntityState.STATE newState = entityState.nextTransition(event);
+ callbackHandler(entity, event, handler);
+ entityState.setCurrentState(newState);
+ stateStore.updateEntity(entityState);
+ LOG.debug("State of entity: {} changed to: {} as a result of event: {}.", id,
+ entityState.getCurrentState(), event.name());
+ }
+ }
+
+ // Invokes the right method on the state change handler
+ private void callbackHandler(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler)
+ throws FalconException {
+ if (handler == null) {
+ return;
+ }
+ switch (event) {
+ case SUBMIT:
+ handler.onSubmit(entity);
+ break;
+ case SCHEDULE:
+ handler.onSchedule(entity);
+ break;
+ case SUSPEND:
+ handler.onSuspend(entity);
+ break;
+ case RESUME:
+ handler.onResume(entity);
+ break;
+ default: // Do nothing, only propagate events that originate from user
+ }
+ }
+
+ /**
+ * Fetches the instance from state store, applies state transitions, calls appropriate method on the handler and
+ * persists the final state in the store.
+ *
+ * @param instance
+ * @param event
+ * @param handler
+ * @throws FalconException
+ */
+ public void handleStateChange(ExecutionInstance instance, InstanceState.EVENT event,
+ InstanceStateChangeHandler handler) throws FalconException {
+ ID id = new ID(instance);
+ if (!stateStore.executionInstanceExists(id)) {
+ // New instance
+ if (event == InstanceState.EVENT.TRIGGER) {
+ callbackHandler(instance, InstanceState.EVENT.TRIGGER, handler);
+ stateStore.putExecutionInstance(new InstanceState(instance));
+ LOG.debug("Instance {} triggered due to event {}.", id, event.name());
+ } else {
+ throw new FalconException("Instance " + id + "does not exist.");
+ }
+ } else {
+ InstanceState instanceState = stateStore.getExecutionInstance(id);
+ InstanceState.STATE newState = instanceState.nextTransition(event);
+ callbackHandler(instance, event, handler);
+ instanceState.setCurrentState(newState);
+ stateStore.updateExecutionInstance(instanceState);
+ LOG.debug("State of instance: {} changed to: {} as a result of event: {}.", id,
+ instanceState.getCurrentState(), event.name());
+ }
+ }
+
+ // Invokes the right method on the state change handler
+ private void callbackHandler(ExecutionInstance instance, InstanceState.EVENT event,
+ InstanceStateChangeHandler handler) throws FalconException {
+ if (handler == null) {
+ return;
+ }
+ switch (event) {
+ case TRIGGER:
+ handler.onTrigger(instance);
+ break;
+ case CONDITIONS_MET:
+ handler.onConditionsMet(instance);
+ break;
+ case TIME_OUT:
+ handler.onTimeOut(instance);
+ break;
+ case SCHEDULE:
+ handler.onSchedule(instance);
+ break;
+ case SUSPEND:
+ handler.onSuspend(instance);
+ break;
+ case RESUME_WAITING:
+ case RESUME_READY:
+ case RESUME_RUNNING:
+ handler.onResume(instance);
+ break;
+ case KILL:
+ handler.onKill(instance);
+ break;
+ case SUCCEED:
+ handler.onSuccess(instance);
+ break;
+ case FAIL:
+ handler.onFailure(instance);
+ break;
+ default: // Do nothing
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
new file mode 100644
index 0000000..67e047f
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.service.ConfigurationChangeListener;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.util.ReflectionUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This abstract class listens to config store changes and keeps the state store in sync with the config store.
+ */
+public abstract class AbstractStateStore implements StateStore, ConfigurationChangeListener {
+ private static StateStore stateStore;
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractStateStore.class);
+
+ @Override
+ public void onAdd(Entity entity) throws FalconException {
+ if (entity.getEntityType() != EntityType.CLUSTER) {
+ putEntity(new EntityState(entity));
+ }
+ }
+
+ @Override
+ public void onRemove(Entity entity) throws FalconException {
+ // Delete entity should remove its instances too.
+ if (entity.getEntityType() != EntityType.CLUSTER) {
+ deleteEntity(new ID(entity));
+ }
+ }
+
+ @Override
+ public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
+ if (newEntity.getEntityType() != EntityType.CLUSTER) {
+ EntityState entityState = getEntity(new ID(oldEntity));
+ if (entityState == null) {
+ onAdd(newEntity);
+ } else {
+ entityState.setEntity(newEntity);
+ updateEntity(entityState);
+ }
+ }
+ }
+
+ @Override
+ public void onReload(Entity entity) throws FalconException {
+ if (entity.getEntityType() != EntityType.CLUSTER) {
+ // To ensure the config store and state store are in sync
+ if (!entityExists(new ID(entity))) {
+ LOG.info("State store missing entity {}. Adding it.", entity.getName());
+ onAdd(entity);
+ }
+ }
+ }
+
+ /**
+ * @return Singleton instance of an implementation of State Store based on the startup properties.
+ */
+ public static synchronized StateStore get() {
+ if (stateStore == null) {
+ String storeImpl = StartupProperties.get().getProperty("state.store",
+ "org.apache.falcon.state.store.InMemoryStateStore");
+ try {
+ stateStore = ReflectionUtils.getInstanceByClassName(storeImpl);
+ } catch (FalconException e) {
+ throw new RuntimeException("Unable to load state store impl. : " + storeImpl, e);
+ }
+ }
+ return stateStore;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
new file mode 100644
index 0000000..4aa6fdb
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+
+import java.util.Collection;
+
+/**
+ * Interface to abstract out Entity store API.
+ */
+public interface EntityStateStore {
+ /**
+ * @param entityState
+ * @throws StateStoreException
+ */
+ void putEntity(EntityState entityState) throws StateStoreException;
+
+ /**
+ * @param entityId
+ * @return Entity corresponding to the key
+ * @throws StateStoreException - If entity does not exist.
+ */
+ EntityState getEntity(ID entityId) throws StateStoreException;
+
+ /**
+ * @param entityId
+ * @return true, if entity exists in store.
+ */
+ boolean entityExists(ID entityId);
+
+ /**
+ * @param state
+ * @return Entities in a given state.
+ */
+ Collection<Entity> getEntities(EntityState.STATE state);
+
+ /**
+ * @return All Entities in the store.
+ */
+ Collection<EntityState> getAllEntities();
+
+ /**
+ * Update an existing entity with the new values.
+ *
+ * @param entityState
+ * @throws StateStoreException when entity does not exist.
+ */
+ void updateEntity(EntityState entityState) throws StateStoreException;
+
+ /**
+ * Removes the entity and its instances from the store.
+ *
+ * @param entityId
+ * @throws StateStoreException
+ */
+ void deleteEntity(ID entityId) throws StateStoreException;
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
new file mode 100644
index 0000000..3822860
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store;
+
+import com.google.common.collect.Lists;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.joda.time.DateTime;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * An in memory state store mostly intended for unit tests.
+ * Singleton.
+ */
+public final class InMemoryStateStore extends AbstractStateStore {
+
+ private Map<String, EntityState> entityStates = new HashMap<>();
+ // Keep it sorted
+ private SortedMap<String, InstanceState> instanceStates = Collections
+ .synchronizedSortedMap(new TreeMap<String, InstanceState>());
+
+ private static final StateStore STORE = new InMemoryStateStore();
+
+ private InMemoryStateStore() {
+ }
+
+ public static StateStore get() {
+ return STORE;
+ }
+
+ @Override
+ public void putEntity(EntityState entityState) throws StateStoreException {
+ String key = new ID(entityState.getEntity()).getEntityKey();
+ if (entityStates.containsKey(key)) {
+ throw new StateStoreException("Entity with key, " + key + " already exists.");
+ }
+ entityStates.put(key, entityState);
+ }
+
+ @Override
+ public EntityState getEntity(ID entityId) throws StateStoreException {
+ if (!entityStates.containsKey(entityId.getEntityKey())) {
+ throw new StateStoreException("Entity with key, " + entityId + " does not exist.");
+ }
+ return entityStates.get(entityId.getEntityKey());
+ }
+
+ @Override
+ public boolean entityExists(ID entityId) {
+ return entityStates.containsKey(entityId.getEntityKey());
+ }
+
+ @Override
+ public Collection<Entity> getEntities(EntityState.STATE state) {
+ Collection<Entity> entities = new ArrayList<>();
+ for (EntityState entityState : entityStates.values()) {
+ if (entityState.getCurrentState().equals(state)) {
+ entities.add(entityState.getEntity());
+ }
+ }
+ return entities;
+ }
+
+ @Override
+ public Collection<EntityState> getAllEntities() {
+ return entityStates.values();
+ }
+
+ @Override
+ public void updateEntity(EntityState entityState) throws StateStoreException {
+ String key = new ID(entityState.getEntity()).getEntityKey();
+ if (!entityStates.containsKey(key)) {
+ throw new StateStoreException("Entity with key, " + key + " does not exist.");
+ }
+ entityStates.put(key, entityState);
+ }
+
+ @Override
+ public void deleteEntity(ID entityId) throws StateStoreException {
+ if (!entityStates.containsKey(entityId.getEntityKey())) {
+ throw new StateStoreException("Entity with key, " + entityId + " does not exist.");
+ }
+ deleteExecutionInstances(entityId);
+ entityStates.remove(entityId.getEntityKey());
+ }
+
+ @Override
+ public void putExecutionInstance(InstanceState instanceState) throws StateStoreException {
+ String key = new ID(instanceState.getInstance()).toString();
+ if (instanceStates.containsKey(key)) {
+ throw new StateStoreException("Instance with key, " + key + " already exists.");
+ }
+ instanceStates.put(key, instanceState);
+ }
+
+ @Override
+ public InstanceState getExecutionInstance(ID instanceId) throws StateStoreException {
+ if (!instanceStates.containsKey(instanceId.toString())) {
+ throw new StateStoreException("Instance with key, " + instanceId + " does not exist.");
+ }
+ return instanceStates.get(instanceId.toString());
+ }
+
+ @Override
+ public void updateExecutionInstance(InstanceState instanceState) throws StateStoreException {
+ String key = new ID(instanceState.getInstance()).toString();
+ if (!instanceStates.containsKey(key)) {
+ throw new StateStoreException("Instance with key, " + key + " does not exist.");
+ }
+ instanceStates.put(key, instanceState);
+ }
+
+ @Override
+ public Collection<InstanceState> getAllExecutionInstances(Entity entity, String cluster)
+ throws StateStoreException {
+ ID id = new ID(entity, cluster);
+ if (!entityStates.containsKey(id.getEntityKey())) {
+ throw new StateStoreException("Entity with key, " + id.getEntityKey() + " does not exist.");
+ }
+ Collection<InstanceState> instances = new ArrayList<InstanceState>();
+ for (Map.Entry<String, InstanceState> instanceState : instanceStates.entrySet()) {
+ if (instanceState.getKey().startsWith(id.toString())) {
+ instances.add(instanceState.getValue());
+ }
+ }
+ return instances;
+ }
+
+ @Override
+ public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
+ Collection<InstanceState.STATE> states) throws StateStoreException {
+ ID id = new ID(entity, cluster);
+ return getExecutionInstances(id, states);
+ }
+
+ @Override
+ public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
+ Collection<InstanceState.STATE> states, DateTime start, DateTime end) throws StateStoreException {
+ List<InstanceState> instancesToReturn = new ArrayList<>();
+ ID id = new ID(entity, cluster);
+ for (InstanceState state : getExecutionInstances(id, states)) {
+ ExecutionInstance instance = state.getInstance();
+ // if end time is before start time of instance
+ // or start time is after end time of instance ignore.
+ if ((instance.getActualStart() != null && !(end.isBefore(instance.getActualStart()))
+ || (instance.getActualEnd() != null && start.isAfter(instance.getActualEnd())))) {
+ instancesToReturn.add(state);
+ }
+ }
+ return instancesToReturn;
+ }
+
+ @Override
+ public Collection<InstanceState> getExecutionInstances(ID entityId, Collection<InstanceState.STATE> states)
+ throws StateStoreException {
+ Collection<InstanceState> instances = new ArrayList<InstanceState>();
+ for (Map.Entry<String, InstanceState> instanceState : instanceStates.entrySet()) {
+ if (instanceState.getKey().startsWith(entityId.toString())
+ && states.contains(instanceState.getValue().getCurrentState())) {
+ instances.add(instanceState.getValue());
+ }
+ }
+ return instances;
+ }
+
+ @Override
+ public InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException {
+ ID id = new ID(entity, cluster);
+ if (!entityStates.containsKey(id.getEntityKey())) {
+ throw new StateStoreException("Entity with key, " + id.getEntityKey() + " does not exist.");
+ }
+ InstanceState latestState = null;
+ // TODO : Very crude. Iterating over all entries and getting the last one.
+ for (Map.Entry<String, InstanceState> instanceState : instanceStates.entrySet()) {
+ if (instanceState.getKey().startsWith(id.toString())) {
+ latestState = instanceState.getValue();
+ }
+ }
+ return latestState;
+ }
+
+ @Override
+ public boolean executionInstanceExists(ID instanceId) {
+ return instanceStates.containsKey(instanceId.toString());
+ }
+
+ @Override
+ public void deleteExecutionInstances(ID entityId) {
+ for (String instanceKey : Lists.newArrayList(instanceStates.keySet())) {
+ if (instanceKey.startsWith(entityId.getEntityKey())) {
+ instanceStates.remove(instanceKey);
+ }
+ }
+ }
+
+ public void clear() {
+ entityStates.clear();
+ instanceStates.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
new file mode 100644
index 0000000..d6a4b49
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.joda.time.DateTime;
+
+import java.util.Collection;
+
+/**
+ * Interface to abstract out instance store API.
+ */
+// TODO : Add order and limit capabilities to the API
+public interface InstanceStateStore {
+ /**
+ * Adds an execution instance to the store.
+ *
+ * @param instanceState
+ * @throws StateStoreException
+ */
+ void putExecutionInstance(InstanceState instanceState) throws StateStoreException;
+
+ /**
+ * @param instanceId
+ * @return Execution instance corresponding to the name.
+ * @throws StateStoreException - When instance does not exist
+ */
+ InstanceState getExecutionInstance(ID instanceId) throws StateStoreException;
+
+ /**
+ * Updates an execution instance in the store.
+ *
+ * @param instanceState
+ * @throws StateStoreException - if the instance does not exist.
+ */
+ void updateExecutionInstance(InstanceState instanceState) throws StateStoreException;
+
+ /**
+ * @param entity
+ * @param cluster
+ * @return - All execution instances for the given entity and cluster.
+ * @throws StateStoreException
+ */
+ Collection<InstanceState> getAllExecutionInstances(Entity entity, String cluster) throws StateStoreException;
+
+ /**
+ * @param entity
+ * @param cluster
+ * @param states
+ * @return - All execution instances for the given entity and cluster and states
+ * @throws StateStoreException
+ */
+ Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
+ Collection<InstanceState.STATE> states) throws StateStoreException;
+
+ /**
+ * @param entity
+ * @param cluster
+ * @param states
+ * @return - All execution instances for the given entity and cluster and states
+ * @throws StateStoreException
+ */
+ Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
+ Collection<InstanceState.STATE> states,
+ DateTime start, DateTime end) throws StateStoreException;
+
+ /**
+ * @param entityId
+ * @param states
+ * @return - All execution instance for an given entityKey (that includes the cluster name)
+ * @throws StateStoreException
+ */
+ Collection<InstanceState> getExecutionInstances(ID entityId, Collection<InstanceState.STATE> states)
+ throws StateStoreException;
+ /**
+ * @param entity
+ * @param cluster
+ * @return - The latest execution instance
+ * @throws StateStoreException
+ */
+ InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException;
+
+ /**
+ * @param instanceId
+ * @return true, if instance exists.
+ */
+ boolean executionInstanceExists(ID instanceId);
+
+ /**
+ * Delete instances of a given entity.
+ *
+ * @param entityId
+ */
+ void deleteExecutionInstances(ID entityId);
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
new file mode 100644
index 0000000..f595c26
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store;
+
+import org.apache.falcon.service.ConfigurationChangeListener;
+
+/**
+ * Interface that combines entity, instance store APIs and also config change listener's.
+ */
+public interface StateStore extends ConfigurationChangeListener, EntityStateStore, InstanceStateStore {
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
new file mode 100644
index 0000000..ebc05ec
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.workflow.engine;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.DAGEngineException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.resource.InstancesResult;
+
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * Interface for an implementation that executes a DAG.
+ */
+public interface DAGEngine {
+
+ /**
+ * Run an instance for execution.
+ *
+ * @param instance
+ * @return
+ * @throws DAGEngineException
+ */
+ String run(ExecutionInstance instance) throws DAGEngineException;
+
+ /**
+ * @param instance
+ * @return true if an instance is scheduled for execution.
+ * @throws DAGEngineException
+ */
+ boolean isScheduled(ExecutionInstance instance) throws DAGEngineException;
+
+ /**
+ * Suspend a running instance.
+ *
+ * @param instance
+ * @throws DAGEngineException
+ */
+ void suspend(ExecutionInstance instance) throws DAGEngineException;
+
+ /**
+ * Resume a suspended instance.
+ *
+ * @param instance
+ * @throws DAGEngineException
+ */
+ void resume(ExecutionInstance instance) throws DAGEngineException;
+
+ /**
+ * Kill a running instance.
+ *
+ * @param instance
+ * @throws DAGEngineException
+ */
+ void kill(ExecutionInstance instance) throws DAGEngineException;
+
+ /**
+ * Re-run a terminated instance.
+ *
+ * @param instance
+ * @throws DAGEngineException
+ */
+ void reRun(ExecutionInstance instance) throws DAGEngineException;
+
+ /**
+ * Perform dryrun of an instance.
+ *
+ * @param entity
+ * @throws DAGEngineException
+ */
+ void submit(Entity entity) throws DAGEngineException;
+
+ /**
+ * Returns info about the Job.
+ * @param externalID
+ * @return
+ */
+ InstancesResult.Instance info(String externalID) throws DAGEngineException;
+
+ /**
+ * @param externalID
+ * @return status of each individual node in the DAG.
+ * @throws DAGEngineException
+ */
+ List<InstancesResult.InstanceAction> getJobDetails(String externalID) throws DAGEngineException;
+
+ /**
+ * @return true if DAG Engine is up and running.
+ */
+ boolean isAlive() throws DAGEngineException;
+
+ /**
+ * @param externalID
+ * @return Configuration used to run the job.
+ * @throws DAGEngineException
+ */
+ Properties getConfiguration(String externalID) throws DAGEngineException;
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngineFactory.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngineFactory.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngineFactory.java
new file mode 100644
index 0000000..e400326
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngineFactory.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.workflow.engine;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.util.ReflectionUtils;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Factory for providing appropriate dag engine to the Falcon services.
+ */
+public final class DAGEngineFactory {
+ private static final String DAG_ENGINE = "dag.engine.impl";
+
+ // Cache the DAGEngines, to avoid overhead of creation.
+ private static final Map<String, DAGEngine> DAG_ENGINES = new HashMap<>();
+
+ private DAGEngineFactory() {
+ }
+
+ public static DAGEngine getDAGEngine(Cluster cluster) throws FalconException {
+ return getDAGEngine(cluster.getName());
+ }
+
+ public static DAGEngine getDAGEngine(String clusterName) throws FalconException {
+ // Cache the DAGEngines for every cluster.
+ if (!DAG_ENGINES.containsKey(clusterName)) {
+ DAG_ENGINES.put(clusterName,
+ (DAGEngine) ReflectionUtils.getInstance(DAG_ENGINE, String.class, clusterName));
+ }
+
+ return DAG_ENGINES.get(clusterName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
new file mode 100644
index 0000000..8dcf3a5
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.workflow.engine;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.execution.EntityExecutor;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.FalconExecutionService;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.InstancesSummaryResult;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Workflow engine which uses Falcon's native scheduler.
+ */
+public class FalconWorkflowEngine extends AbstractWorkflowEngine {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FalconWorkflowEngine.class);
+ private static final FalconExecutionService EXECUTION_SERVICE = FalconExecutionService.get();
+ private static final StateStore STATE_STORE = AbstractStateStore.get();
+ private static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();
+ private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
+
+ private enum JobAction {
+ KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS
+ }
+
+ public FalconWorkflowEngine() {
+ // Registering As it cleans up staging paths and not entirely Oozie Specific.
+ registerListener(new OozieHouseKeepingService());
+ }
+
+ @Override
+ public boolean isAlive(Cluster cluster) throws FalconException {
+ return DAGEngineFactory.getDAGEngine(cluster).isAlive();
+ }
+
+ @Override
+ public void schedule(Entity entity, Boolean skipDryRun, Map<String, String> properties) throws FalconException {
+ EXECUTION_SERVICE.schedule(entity);
+ }
+
+ @Override
+ public void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException {
+ DAGEngineFactory.getDAGEngine(clusterName).submit(entity);
+ }
+
+ @Override
+ public boolean isActive(Entity entity) throws FalconException {
+ return STATE_STORE.getEntity(new ID(entity)).getCurrentState() != EntityState.STATE.SUBMITTED;
+ }
+
+ @Override
+ public boolean isSuspended(Entity entity) throws FalconException {
+ return STATE_STORE.getEntity(new ID(entity))
+ .getCurrentState().equals(EntityState.STATE.SUSPENDED);
+ }
+
+ @Override
+ public boolean isCompleted(Entity entity) throws FalconException {
+ throw new FalconException("Not yet implemented.");
+ }
+
+ @Override
+ public String suspend(Entity entity) throws FalconException {
+ EXECUTION_SERVICE.suspend(entity);
+ return "SUCCESS";
+ }
+
+ @Override
+ public String resume(Entity entity) throws FalconException {
+ EXECUTION_SERVICE.resume(entity);
+ return "SUCCESS";
+ }
+
+ @Override
+ public String delete(Entity entity) throws FalconException {
+ if (isActive(entity)) {
+ EXECUTION_SERVICE.delete(entity);
+ }
+ // This should remove it from state store too as state store listens to config store changes.
+ CONFIG_STORE.remove(entity.getEntityType(), entity.getName());
+ return "SUCCESS";
+ }
+
+ @Override
+ public String delete(Entity entity, String cluster) throws FalconException {
+ EXECUTION_SERVICE.getEntityExecutor(entity, cluster).killAll();
+ return "SUCCESS";
+ }
+
+ @Override
+ public InstancesResult getRunningInstances(Entity entity, List<LifeCycle> lifeCycles) throws FalconException {
+ Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
+ List<InstancesResult.Instance> runInstances = new ArrayList<>();
+
+ for (String cluster : clusters) {
+ Collection<InstanceState> instances =
+ STATE_STORE.getExecutionInstances(entity, cluster, InstanceState.getRunningStates());
+ for (InstanceState state : instances) {
+ String instanceTimeStr = state.getInstance().getInstanceTime().toString();
+ InstancesResult.Instance instance = new InstancesResult.Instance(cluster, instanceTimeStr,
+ InstancesResult.WorkflowStatus.RUNNING);
+ instance.startTime = state.getInstance().getActualStart().toDate();
+ runInstances.add(instance);
+ }
+ }
+ InstancesResult result = new InstancesResult(APIResult.Status.SUCCEEDED, "Running Instances");
+ result.setInstances(runInstances.toArray(new InstancesResult.Instance[runInstances.size()]));
+ return result;
+ }
+
+ private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end,
+ Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+ Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
+ List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS);
+ APIResult.Status overallStatus = APIResult.Status.SUCCEEDED;
+ int instanceCount = 0;
+
+ Collection<InstanceState.STATE> states;
+ switch(action) {
+ case KILL:
+ case SUSPEND:
+ states = InstanceState.getActiveStates();
+ break;
+ case RESUME:
+ states = new ArrayList<>();
+ states.add(InstanceState.STATE.SUSPENDED);
+ break;
+ case STATUS:
+ case PARAMS:
+ // Applicable only for running and finished jobs.
+ states = InstanceState.getRunningStates();
+ states.addAll(InstanceState.getTerminalStates());
+ states.add(InstanceState.STATE.SUSPENDED);
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled action " + action);
+ }
+
+ List<ExecutionInstance> instancesToActOn = new ArrayList<>();
+ for (String cluster : clusters) {
+ if (clusterList.size() != 0 && !clusterList.contains(cluster)) {
+ continue;
+ }
+ LOG.debug("Retrieving instances for cluster : {} for action {}" , cluster, action);
+ Collection<InstanceState> instances =
+ STATE_STORE.getExecutionInstances(entity, cluster, states, new DateTime(start), new DateTime(end));
+ for (InstanceState state : instances) {
+ instancesToActOn.add(state.getInstance());
+ }
+ }
+
+ List<InstancesResult.Instance> instances = new ArrayList<>();
+ for (ExecutionInstance ins : instancesToActOn) {
+ instanceCount++;
+ String instanceTimeStr = SchemaHelper.formatDateUTC(ins.getInstanceTime().toDate());
+
+ InstancesResult.Instance instance = null;
+ try {
+ instance = performAction(ins.getCluster(), entity, action, ins);
+ instance.instance = instanceTimeStr;
+ } catch (FalconException e) {
+ LOG.warn("Unable to perform action {} on cluster", action, e);
+ instance = new InstancesResult.Instance(ins.getCluster(), instanceTimeStr, null);
+ instance.status = InstancesResult.WorkflowStatus.ERROR;
+ instance.details = e.getMessage();
+ overallStatus = APIResult.Status.PARTIAL;
+ }
+ instances.add(instance);
+ }
+ if (instanceCount < 2 && overallStatus == APIResult.Status.PARTIAL) {
+ overallStatus = APIResult.Status.FAILED;
+ }
+ InstancesResult instancesResult = new InstancesResult(overallStatus, action.name());
+ instancesResult.setInstances(instances.toArray(new InstancesResult.Instance[instances.size()]));
+ return instancesResult;
+ }
+
+ private List<String> getIncludedClusters(Properties props, String clustersType) {
+ String clusters = props == null ? "" : props.getProperty(clustersType, "");
+ List<String> clusterList = new ArrayList<>();
+ for (String cluster : clusters.split(",")) {
+ if (StringUtils.isNotEmpty(cluster)) {
+ clusterList.add(cluster.trim());
+ }
+ }
+ return clusterList;
+ }
+
+ private InstancesResult.Instance performAction(String cluster, Entity entity, JobAction action,
+ ExecutionInstance instance) throws FalconException {
+ EntityExecutor executor = EXECUTION_SERVICE.getEntityExecutor(entity, cluster);
+ InstancesResult.Instance instanceInfo = null;
+ LOG.debug("Retrieving information for {} for action {}", instance.getId(), action);
+ if (StringUtils.isNotEmpty(instance.getExternalID())) {
+ instanceInfo = DAGEngineFactory.getDAGEngine(cluster).info(instance.getExternalID());
+ } else {
+ instanceInfo = new InstancesResult.Instance();
+ }
+ switch(action) {
+ case KILL:
+ executor.kill(instance);
+ instanceInfo.status = InstancesResult.WorkflowStatus.KILLED;
+ break;
+ case SUSPEND:
+ executor.suspend(instance);
+ instanceInfo.status = InstancesResult.WorkflowStatus.SUSPENDED;
+ break;
+ case RESUME:
+ executor.resume(instance);
+ instanceInfo.status =
+ InstancesResult.WorkflowStatus.valueOf(STATE_STORE
+ .getExecutionInstance(instance.getId()).getCurrentState().name());
+ break;
+ case RERUN:
+ break;
+ case STATUS:
+ if (StringUtils.isNotEmpty(instance.getExternalID())) {
+ List<InstancesResult.InstanceAction> instanceActions =
+ DAGEngineFactory.getDAGEngine(cluster).getJobDetails(instance.getExternalID());
+ instanceInfo.actions = instanceActions
+ .toArray(new InstancesResult.InstanceAction[instanceActions.size()]);
+ }
+ break;
+
+ case PARAMS:
+ // Mask details, log
+ instanceInfo.details = null;
+ instanceInfo.logFile = null;
+ Properties props = DAGEngineFactory.getDAGEngine(cluster).getConfiguration(instance.getExternalID());
+ InstancesResult.KeyValuePair[] keyValuePairs = new InstancesResult.KeyValuePair[props.size()];
+ int i=0;
+ for (String name : props.stringPropertyNames()) {
+ keyValuePairs[i++] = new InstancesResult.KeyValuePair(name, props.getProperty(name));
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled action " + action);
+ }
+ return instanceInfo;
+ }
+
+ @Override
+ public InstancesResult killInstances(Entity entity, Date start, Date end,
+ Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+ return doJobAction(JobAction.KILL, entity, start, end, props, lifeCycles);
+ }
+
+ @Override
+ public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props,
+ List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException {
+ throw new FalconException("Not yet Implemented");
+ }
+
+ @Override
+ public InstancesResult suspendInstances(Entity entity, Date start, Date end,
+ Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+ return doJobAction(JobAction.SUSPEND, entity, start, end, props, lifeCycles);
+ }
+
+ @Override
+ public InstancesResult resumeInstances(Entity entity, Date start, Date end,
+ Properties props, List<LifeCycle> lifeCycles) throws FalconException {
+ return doJobAction(JobAction.RESUME, entity, start, end, props, lifeCycles);
+ }
+
+ @Override
+ public InstancesResult getStatus(Entity entity, Date start, Date end,
+ List<LifeCycle> lifeCycles) throws FalconException {
+ return doJobAction(JobAction.STATUS, entity, start, end, null, lifeCycles);
+ }
+
+ @Override
+ public InstancesSummaryResult getSummary(Entity entity, Date start, Date end,
+ List<LifeCycle> lifeCycles) throws FalconException {
+ throw new FalconException("Not yet Implemented");
+ }
+
+ @Override
+ public InstancesResult getInstanceParams(Entity entity, Date start, Date end,
+ List<LifeCycle> lifeCycles) throws FalconException {
+ return doJobAction(JobAction.PARAMS, entity, start, end, null, lifeCycles);
+ }
+
+ @Override
+ public boolean isNotificationEnabled(String cluster, String jobID) throws FalconException {
+ return true;
+ }
+
+ @Override
+ public String update(Entity oldEntity, Entity newEntity, String cluster, Boolean skipDryRun)
+ throws FalconException {
+ throw new FalconException("Not yet Implemented");
+ }
+
+ @Override
+ public String touch(Entity entity, String cluster, Boolean skipDryRun) throws FalconException {
+ throw new FalconException("Not yet Implemented");
+ }
+
+ @Override
+ public void reRun(String cluster, String jobId, Properties props, boolean isForced) throws FalconException {
+ throw new FalconException("Not yet Implemented");
+ }
+
+ @Override
+ public String getWorkflowStatus(String cluster, String jobId) throws FalconException {
+ return DAGEngineFactory.getDAGEngine(cluster).info(jobId).getStatus().name();
+ }
+
+ @Override
+ public Properties getWorkflowProperties(String cluster, String jobId) throws FalconException {
+ return DAGEngineFactory.getDAGEngine(cluster).getConfiguration(jobId);
+ }
+
+ @Override
+ public InstancesResult getJobDetails(String cluster, String jobId) throws FalconException {
+ InstancesResult.Instance[] instances = new InstancesResult.Instance[1];
+ InstancesResult result = new InstancesResult(APIResult.Status.SUCCEEDED,
+ "Instance for workflow id:" + jobId);
+ instances[0] = DAGEngineFactory.getDAGEngine(cluster).info(jobId);
+ result.setInstances(instances);
+ return result;
+ }
+}
+
[5/6] falcon git commit: FALCON-1213 Base framework of the native
scheduler
Posted by pa...@apache.org.
FALCON-1213 Base framework of the native scheduler
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4175c54a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4175c54a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4175c54a
Branch: refs/heads/master
Commit: 4175c54a158eeb9883dc192260890eb2b73ad6f1
Parents: 5a55bae
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Tue Oct 20 17:38:26 2015 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Tue Oct 20 17:38:26 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/falcon/entity/EntityUtil.java | 34 ++
pom.xml | 6 +-
scheduler/pom.xml | 120 ++++
.../falcon/exception/DAGEngineException.java | 48 ++
.../InvalidStateTransitionException.java | 47 ++
.../exception/NotificationServiceException.java | 48 ++
.../falcon/exception/StateStoreException.java | 47 ++
.../apache/falcon/execution/EntityExecutor.java | 111 ++++
.../falcon/execution/ExecutionInstance.java | 180 ++++++
.../execution/FalconExecutionService.java | 214 +++++++
.../falcon/execution/NotificationHandler.java | 34 ++
.../execution/ProcessExecutionInstance.java | 277 +++++++++
.../falcon/execution/ProcessExecutor.java | 460 +++++++++++++++
.../apache/falcon/execution/SchedulerUtil.java | 54 ++
.../service/FalconNotificationService.java | 76 +++
.../service/NotificationServicesRegistry.java | 125 +++++
.../notification/service/event/DataEvent.java | 76 +++
.../notification/service/event/Event.java | 37 ++
.../service/event/JobCompletedEvent.java | 58 ++
.../service/event/JobScheduledEvent.java | 80 +++
.../service/event/TimeElapsedEvent.java | 62 +++
.../notification/service/impl/AlarmService.java | 326 +++++++++++
.../service/impl/DataAvailabilityService.java | 94 ++++
.../service/impl/JobCompletionService.java | 208 +++++++
.../service/impl/SchedulerService.java | 399 +++++++++++++
.../service/request/AlarmRequest.java | 84 +++
.../request/DataNotificationRequest.java | 79 +++
.../JobCompletionNotificationRequest.java | 62 +++
.../request/JobScheduleNotificationRequest.java | 60 ++
.../service/request/NotificationRequest.java | 53 ++
.../org/apache/falcon/predicate/Predicate.java | 220 ++++++++
.../org/apache/falcon/state/EntityState.java | 133 +++++
.../falcon/state/EntityStateChangeHandler.java | 59 ++
.../main/java/org/apache/falcon/state/ID.java | 200 +++++++
.../org/apache/falcon/state/InstanceState.java | 250 +++++++++
.../state/InstanceStateChangeHandler.java | 99 ++++
.../org/apache/falcon/state/StateMachine.java | 34 ++
.../org/apache/falcon/state/StateService.java | 185 ++++++
.../falcon/state/store/AbstractStateStore.java | 92 +++
.../falcon/state/store/EntityStateStore.java | 76 +++
.../falcon/state/store/InMemoryStateStore.java | 227 ++++++++
.../falcon/state/store/InstanceStateStore.java | 113 ++++
.../apache/falcon/state/store/StateStore.java | 27 +
.../falcon/workflow/engine/DAGEngine.java | 115 ++++
.../workflow/engine/DAGEngineFactory.java | 53 ++
.../workflow/engine/FalconWorkflowEngine.java | 366 ++++++++++++
.../falcon/workflow/engine/OozieDAGEngine.java | 401 +++++++++++++
.../execution/FalconExecutionServiceTest.java | 557 +++++++++++++++++++
.../apache/falcon/execution/MockDAGEngine.java | 122 ++++
.../falcon/execution/SchedulerUtilTest.java | 50 ++
.../notification/service/AlarmServiceTest.java | 77 +++
.../service/SchedulerServiceTest.java | 314 +++++++++++
.../apache/falcon/predicate/PredicateTest.java | 53 ++
.../falcon/state/EntityStateServiceTest.java | 119 ++++
.../falcon/state/InstanceStateServiceTest.java | 138 +++++
.../resources/config/cluster/cluster-0.1.xml | 43 ++
.../src/test/resources/config/feed/feed-0.1.xml | 57 ++
.../resources/config/process/process-0.1.xml | 54 ++
webapp/pom.xml | 6 +
60 files changed, 7800 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bc4fdf5..a4dc1c8 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Trunk (Unreleased)
FALCON-1401 MetadataMappingService fails to add an edge for a process instance(Pallavi Rao)
NEW FEATURES
+ FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
+
FALCON-1315 Update falcon ui for HiveDR, secure clusters and bug fixes(Armando Reyna/Venkat Ranganathan via Sowmya Ramesh)
FALCON-1102 Gather data transfer details of filesystem replication(Peeyush Bishnoi via Sowmya Ramesh)
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 3ab9339..ceefb17 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -85,10 +85,32 @@ public final class EntityUtil {
private static final long DAY_IN_MS = 24 * HOUR_IN_MS;
private static final long MONTH_IN_MS = 31 * DAY_IN_MS;
private static final long ONE_MS = 1;
+ public static final String MR_JOB_PRIORITY = "jobPriority";
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
private static final String STAGING_DIR_NAME_SEPARATOR = "_";
+ /** Priority with which the DAG will be scheduled.
+ * Matches the five priorities of Hadoop jobs.
+ */
+ public enum JOBPRIORITY {
+ VERY_HIGH((short) 1),
+ HIGH((short) 2),
+ NORMAL((short) 3),
+ LOW((short) 4),
+ VERY_LOW((short) 5);
+
+ private short priority;
+
+ public short getPriority() {
+ return priority;
+ }
+
+ JOBPRIORITY(short priority) {
+ this.priority = priority;
+ }
+ }
+
private EntityUtil() {}
public static <T extends Entity> T getEntity(EntityType type, String entityName) throws FalconException {
@@ -1015,4 +1037,16 @@ public final class EntityUtil {
}
return props;
}
+
+ public static JOBPRIORITY getPriority(Process process) {
+ org.apache.falcon.entity.v0.process.Properties processProps = process.getProperties();
+ if (processProps != null) {
+ for (org.apache.falcon.entity.v0.process.Property prop : processProps.getProperties()) {
+ if (prop.getName().equals(MR_JOB_PRIORITY)) {
+ return JOBPRIORITY.valueOf(prop.getValue());
+ }
+ }
+ }
+ return JOBPRIORITY.NORMAL;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 54e6cd1..8cd3c3c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,6 +110,9 @@
<hive.version>0.13.1</hive.version>
<jetty.version>6.1.26</jetty.version>
<jersey.version>1.9</jersey.version>
+ <quartz.version>2.2.1</quartz.version>
+ <joda.version>2.8.2</joda.version>
+ <mockito.version>1.9.5</mockito.version>
<internal.maven.repo>file:///tmp/falcontemprepo</internal.maven.repo>
<excluded.test.groups>exhaustive</excluded.test.groups>
</properties>
@@ -427,6 +430,7 @@
<module>messaging</module>
<module>oozie-el-extensions</module>
<module>oozie</module>
+ <module>scheduler</module>
<module>acquisition</module>
<module>replication</module>
<module>retention</module>
@@ -680,7 +684,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
- <version>1.8.5</version>
+ <version>${mockito.version}</version>
<scope>provided</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/pom.xml
----------------------------------------------------------------------
diff --git a/scheduler/pom.xml b/scheduler/pom.xml
new file mode 100644
index 0000000..dddfcce
--- /dev/null
+++ b/scheduler/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-main</artifactId>
+ <version>0.8-SNAPSHOT</version>
+ </parent>
+ <artifactId>falcon-scheduler</artifactId>
+ <description>Apache Falcon Scheduler Module</description>
+ <name>Apache Falcon Scheduler</name>
+ <packaging>jar</packaging>
+
+ <profiles>
+ <profile>
+ <id>hadoop-2</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-oozie-adaptor</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-messaging</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-test-util</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.oozie</groupId>
+ <artifactId>oozie-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.quartz-scheduler</groupId>
+ <artifactId>quartz</artifactId>
+ <version>${quartz.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>${joda.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/exception/DAGEngineException.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/exception/DAGEngineException.java b/scheduler/src/main/java/org/apache/falcon/exception/DAGEngineException.java
new file mode 100644
index 0000000..8b5bb64
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/exception/DAGEngineException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.exception;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Exception thrown by DAG Execution Engine.
+ */
+public class DAGEngineException extends FalconException {
+
+ /**
+ * @param e
+ */
+ public DAGEngineException(Throwable e) {
+ super(e);
+ }
+
+ /**
+ * @param message - custom message
+ * @param e
+ */
+ public DAGEngineException(String message, Throwable e) {
+ super(message, e);
+ }
+
+ /**
+ * @param message - custom message
+ */
+ public DAGEngineException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/exception/InvalidStateTransitionException.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/exception/InvalidStateTransitionException.java b/scheduler/src/main/java/org/apache/falcon/exception/InvalidStateTransitionException.java
new file mode 100644
index 0000000..19284a5
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/exception/InvalidStateTransitionException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.exception;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Exception thrown during state transition of entities and instances.
+ */
+public class InvalidStateTransitionException extends FalconException {
+ /**
+ * @param e Exception
+ */
+ public InvalidStateTransitionException(Throwable e) {
+ super(e);
+ }
+
+ /**
+ * @param message - custom exception message
+ * @param e
+ */
+ public InvalidStateTransitionException(String message, Throwable e) {
+ super(message, e);
+ }
+
+ /**
+ * @param message - custom exception message
+ */
+ public InvalidStateTransitionException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/exception/NotificationServiceException.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/exception/NotificationServiceException.java b/scheduler/src/main/java/org/apache/falcon/exception/NotificationServiceException.java
new file mode 100644
index 0000000..b7f84df
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/exception/NotificationServiceException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.exception;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Exception thrown by notification services.
+ */
+public class NotificationServiceException extends FalconException {
+
+ /**
+ * @param e
+ */
+ public NotificationServiceException(Throwable e) {
+ super(e);
+ }
+
+ /**
+ * @param message - custom message
+ * @param e
+ */
+ public NotificationServiceException(String message, Throwable e) {
+ super(message, e);
+ }
+
+ /**
+ * @param message - custom message
+ */
+ public NotificationServiceException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/exception/StateStoreException.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/exception/StateStoreException.java b/scheduler/src/main/java/org/apache/falcon/exception/StateStoreException.java
new file mode 100644
index 0000000..93bdad3
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/exception/StateStoreException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.exception;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Exception thrown by the State store API.
+ */
+public class StateStoreException extends FalconException {
+ /**
+ * @param e
+ */
+ public StateStoreException(Throwable e) {
+ super(e);
+ }
+
+ /**
+ * @param message - custom message
+ * @param e
+ */
+ public StateStoreException(String message, Throwable e) {
+ super(message, e);
+ }
+
+ /**
+ * @param message - custom message
+ */
+ public StateStoreException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
new file mode 100644
index 0000000..9b07b9e
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/execution/EntityExecutor.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceStateChangeHandler;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.StateStore;
+
+/**
+ * This class is responsible for creation of execution instances for a given entity.
+ * An execution instance is created upon receipt of a "trigger event".
+ * It also handles the state transition of each execution instance.
+ * This class is also responsible for handling user interrupts for an entity such as suspend, kill etc.
+ */
+public abstract class EntityExecutor implements NotificationHandler, InstanceStateChangeHandler {
+ protected static final ConfigurationStore STORE = ConfigurationStore.get();
+ // The number of execution instances to be cached by default
+ public static final String DEFAULT_CACHE_SIZE = "20";
+ protected String cluster;
+ protected static final StateStore STATE_STORE = AbstractStateStore.get();
+ protected ID id;
+
+ /**
+ * Schedules execution instances for the entity. Idempotent operation.
+ *
+ * @throws FalconException
+ */
+ public abstract void schedule() throws FalconException;
+
+ /**
+ * Suspends all "active" execution instances of the entity. Idempotent operation.
+ * The operation can fail on certain instances. In such cases, the operation is partially successful.
+ *
+ * @throws FalconException - When the operation on an instance fails
+ */
+ public abstract void suspendAll() throws FalconException;
+
+ /**
+ * Resumes all suspended execution instances of the entity. Idempotent operation.
+ * The operation can fail on certain instances. In such cases, the operation is partially successful.
+ *
+ * @throws FalconException - When the operation on an instance fails
+ */
+ public abstract void resumeAll() throws FalconException;
+
+ /**
+ * Deletes all execution instances of an entity, even from the store. Idempotent operation.
+ * The operation can fail on certain instances. In such cases, the operation is partially successful.
+ *
+ * @throws FalconException - When the operation on an instance fails
+ */
+ public abstract void killAll() throws FalconException;
+
+ /**
+ * Suspends a specified set of execution instances. Idempotent operation.
+ * The operation can fail on certain instances. In such cases, the operation is partially successful.
+ *
+ * @param instance
+ * @throws FalconException
+ */
+ public abstract void suspend(ExecutionInstance instance) throws FalconException;
+
+ /**
+ * Resumes a specified set of execution instances. Idempotent operation.
+ * The operation can fail on certain instances. In such cases, the operation is partially successful.
+ *
+ * @param instance
+ * @throws FalconException
+ */
+ public abstract void resume(ExecutionInstance instance) throws FalconException;
+
+ /**
+ * Kills a specified set of execution instances. Idempotent operation.
+ * The operation can fail on certain instances. In such cases, the operation is partially successful.
+ *
+ * @param instance
+ * @throws FalconException
+ */
+ public abstract void kill(ExecutionInstance instance) throws FalconException;
+
+ /**
+ * @return The entity
+ */
+ public abstract Entity getEntity();
+
+ /**
+ * @return ID of the entity
+ */
+ public ID getId() {
+ return id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
new file mode 100644
index 0000000..3869ff2
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.execution;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.util.List;
+import java.util.TimeZone;
+
+/**
+ * Represents an execution instance of an entity.
+ */
+public abstract class ExecutionInstance implements NotificationHandler {
+
+ // TODO : Add more fields
+ private final String cluster;
+ // External ID is the ID used to identify the Job submitted to the DAG Engine, as returned by the DAG Engine.
+ // For example, for Oozie this would be the workflow Id.
+ private String externalID;
+ private final DateTime instanceTime;
+ private final DateTime creationTime;
+ private DateTime actualStart;
+ private DateTime actualEnd;
+ private static final DateTimeZone UTC = DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"));
+
+ /**
+ * @param instanceTime
+ * @param cluster
+ */
+ public ExecutionInstance(DateTime instanceTime, String cluster) {
+ this.instanceTime = new DateTime(instanceTime, UTC);
+ this.cluster = cluster;
+ this.creationTime = DateTime.now(UTC);
+ }
+
+ /**
+ * For a-periodic instances.
+ * @param cluster
+ */
+ public ExecutionInstance(String cluster) {
+ this.instanceTime = DateTime.now();
+ this.cluster = cluster;
+ this.creationTime = DateTime.now(UTC);
+ }
+
+ /**
+ * @return - The external id corresponding to this instance.
+ * If the instance is executed on Oozie, externalID will the Oozie workflow ID.
+ */
+ public String getExternalID() {
+ return externalID;
+ }
+
+ /**
+ * Setter for external ID, Oozie workflow ID, for example.
+ *
+ * @param jobID
+ */
+ public void setExternalID(String jobID) {
+ this.externalID = jobID;
+ }
+
+ /**
+ * @return The unique ID of this instance. The instance is referred using this ID inside the system.
+ */
+ public abstract ID getId();
+
+ /**
+ * @return - The entity to which this instance belongs.
+ */
+ public abstract Entity getEntity();
+
+ /**
+ * @return - The nominal time of the instance.
+ */
+ public DateTime getInstanceTime() {
+ return instanceTime;
+ }
+
+ /**
+ * @return - The name of the cluster on which this instance is running
+ */
+ public String getCluster() {
+ return cluster;
+ }
+
+ /**
+ * @return - The sequential numerical id of the instance
+ */
+ public abstract int getInstanceSequence();
+
+ /**
+ * @return - Actual start time of instance.
+ */
+ public DateTime getActualStart() {
+ return actualStart;
+ }
+
+ /**
+ * @param actualStart
+ */
+ public void setActualStart(DateTime actualStart) {
+ this.actualStart = actualStart;
+ }
+
+ /**
+ * @return - Completion time of the instance
+ */
+ public DateTime getActualEnd() {
+ return actualEnd;
+ }
+
+ /**
+ * @param actualEnd
+ */
+ public void setActualEnd(DateTime actualEnd) {
+ this.actualEnd = actualEnd;
+ }
+
+
+ public DateTime getCreationTime() {
+ return creationTime;
+ }
+
+ /**
+ * @return - The gating conditions on which this instance is waiting before it is scheduled for execution.
+ * @throws FalconException
+ */
+ public abstract List<Predicate> getAwaitingPredicates() throws FalconException;
+
+ /**
+ * Suspends the instance if it is in one of the active states, waiting, ready or running.
+ *
+ * @throws FalconException
+ */
+ public abstract void suspend() throws FalconException;
+
+ /**
+ * Resumes a previously suspended instance.
+ *
+ * @throws FalconException
+ */
+ public abstract void resume() throws FalconException;
+
+ /**
+ * Kills an instance if it is in one of the active states, waiting, ready or running.
+ *
+ * @throws FalconException
+ */
+ public abstract void kill() throws FalconException;
+
+ /**
+ * Handles any clean up and de-registration of notification subscriptions.
+ * Invoked when the instance reaches one of its terminal states.
+ *
+ * @throws FalconException
+ */
+ public abstract void destroy() throws FalconException;
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
new file mode 100644
index 0000000..b959320
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.execution;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.service.FalconService;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.EntityStateChangeHandler;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.StateService;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This singleton is the entry point for all callbacks from the notification services.
+ * The execution service handles any system level events that apply to all entities.
+ * It is responsible for creation of entity executors one per entity, per cluster.
+ */
+public final class FalconExecutionService implements FalconService, EntityStateChangeHandler, NotificationHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FalconExecutionService.class);
+
+ // Stores all entity executors in memory
+ private ConcurrentMap<ID, EntityExecutor> executors = new ConcurrentHashMap<ID, EntityExecutor>();
+
+ private static FalconExecutionService executionService = new FalconExecutionService();
+
+ @Override
+ public String getName() {
+ return "FalconExecutionService";
+ }
+
+ public void init() {
+ LOG.debug("State store instance being used : {}", AbstractStateStore.get());
+ // Initialize all executors from store
+ for (Entity entity : AbstractStateStore.get().getEntities(EntityState.STATE.SCHEDULED)) {
+ try {
+ for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+ EntityExecutor executor = createEntityExecutor(entity, cluster);
+ executors.put(new ID(entity, cluster), executor);
+ executor.schedule();
+ }
+ } catch (FalconException e) {
+ LOG.error("Unable to load entity : " + entity.getName(), e);
+ throw new RuntimeException(e);
+ }
+ }
+ // TODO : During migration, the state store itself may not have been completely bootstrapped.
+ }
+
+ /**
+ * Returns an EntityExecutor implementation based on the entity type.
+ *
+ * @param entity
+ * @param cluster
+ * @return
+ * @throws FalconException
+ */
+ private EntityExecutor createEntityExecutor(Entity entity, String cluster) throws FalconException {
+ switch (entity.getEntityType()) {
+ case FEED:
+ throw new UnsupportedOperationException("No support yet for feed.");
+ case PROCESS:
+ return new ProcessExecutor(((Process)entity), cluster);
+ default:
+ throw new IllegalArgumentException("Unhandled type " + entity.getEntityType().name());
+ }
+ }
+
+ @Override
+ public void destroy() throws FalconException {
+
+ }
+
+ /**
+ * @return - An instance(singleton) of FalconExecutionService
+ */
+ public static FalconExecutionService get() {
+ return executionService;
+ }
+
+ private FalconExecutionService() {}
+
+ @Override
+ public void onEvent(Event event) throws FalconException {
+ // Currently, simply passes along the event to the appropriate executor
+ EntityExecutor executor = executors.get(event.getTarget().getEntityID());
+ if (executor == null) {
+ // The executor has gone away, throw an exception so the notification service knows
+ throw new FalconException("Target executor for " + event.getTarget().getEntityID() + " does not exist.");
+ }
+ executor.onEvent(event);
+ }
+
+ @Override
+ public void onSubmit(Entity entity) throws FalconException {
+ // Do nothing
+ }
+
+ @Override
+ public void onSchedule(Entity entity) throws FalconException {
+ for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+ EntityExecutor executor = createEntityExecutor(entity, cluster);
+ ID id = new ID(entity, cluster);
+ executors.put(id, executor);
+ LOG.info("Scheduling entity {}.", id);
+ executor.schedule();
+ }
+ }
+
+ @Override
+ public void onSuspend(Entity entity) throws FalconException {
+ for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+ EntityExecutor executor = getEntityExecutor(entity, cluster);
+ executor.suspendAll();
+ }
+ }
+
+ @Override
+ public void onResume(Entity entity) throws FalconException {
+ for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+ EntityExecutor executor = createEntityExecutor(entity, cluster);
+ executors.put(new ID(entity, cluster), executor);
+ LOG.info("Resuming entity, {} of type {} on cluster {}.", entity.getName(),
+ entity.getEntityType(), cluster);
+ executor.resumeAll();
+ }
+ }
+
+ /**
+ * Schedules an entity.
+ *
+ * @param entity
+ * @throws FalconException
+ */
+ public void schedule(Entity entity) throws FalconException {
+ StateService.get().handleStateChange(entity, EntityState.EVENT.SCHEDULE, this);
+ }
+
+ /**
+ * Suspends an entity.
+ *
+ * @param entity
+ * @throws FalconException
+ */
+ public void suspend(Entity entity) throws FalconException {
+ StateService.get().handleStateChange(entity, EntityState.EVENT.SUSPEND, this);
+ }
+
+ /**
+ * Resumes an entity.
+ *
+ * @param entity
+ * @throws FalconException
+ */
+ public void resume(Entity entity) throws FalconException {
+ StateService.get().handleStateChange(entity, EntityState.EVENT.RESUME, this);
+ }
+
+ /**
+ * Deletes an entity from the execution service.
+ *
+ * @param entity
+ * @throws FalconException
+ */
+ public void delete(Entity entity) throws FalconException {
+ for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+ EntityExecutor executor = getEntityExecutor(entity, cluster);
+ executor.killAll();
+ executors.remove(executor.getId());
+ }
+ }
+
+ /**
+ * Returns the instance of {@link EntityExecutor} for a given entity and cluster.
+ *
+ * @param entity
+ * @param cluster
+ * @return
+ * @throws FalconException
+ */
+ public EntityExecutor getEntityExecutor(Entity entity, String cluster) throws FalconException {
+ ID id = new ID(entity, cluster);
+ if (executors.containsKey(id)) {
+ return executors.get(id);
+ } else {
+ throw new FalconException("Entity executor for entity : " + id.getEntityKey() + " does not exist.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
new file mode 100644
index 0000000..b071f5f
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.notification.service.event.Event;
+
+/**
+ * An interface that every class that handles notifications from notification services must implement.
+ */
+public interface NotificationHandler {
+ /**
+ * The method a notification service calls to onEvent an event.
+ *
+ * @param event
+ * @throws FalconException
+ */
+ void onEvent(Event event) throws FalconException;
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
new file mode 100644
index 0000000..19089c4
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Cluster;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.notification.service.event.DataEvent;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.JobCompletedEvent;
+import org.apache.falcon.notification.service.event.JobScheduledEvent;
+import org.apache.falcon.notification.service.impl.DataAvailabilityService;
+import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.workflow.engine.DAGEngine;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Represents an execution instance of a process.
+ * Responsible for user actions such as suspend, resume, kill on individual instances.
+ */
+
+public class ProcessExecutionInstance extends ExecutionInstance {
+ private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutionInstance.class);
+ private final Process process;
+ private List<Predicate> awaitedPredicates = new ArrayList<Predicate>();
+ private DAGEngine dagEngine = null;
+ private boolean hasTimedOut = false;
+ private ID id;
+ private int instanceSequence;
+ private final FalconExecutionService executionService = FalconExecutionService.get();
+
+ /**
+ * Constructor.
+ *
+ * @param process
+ * @param instanceTime
+ * @param cluster
+ * @throws FalconException
+ */
+ public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster) throws FalconException {
+ super(instanceTime, cluster);
+ this.process = process;
+ this.id = new ID(process, cluster, getInstanceTime());
+ computeInstanceSequence();
+ dagEngine = DAGEngineFactory.getDAGEngine(cluster);
+ registerForNotifications(false);
+ }
+
+ // Computes the instance number based on the nominal time.
+ // Method can be extended to assign instance numbers for non-time based instances.
+ private void computeInstanceSequence() {
+ for (Cluster processCluster : process.getClusters().getClusters()) {
+ if (processCluster.getName().equals(getCluster())) {
+ Date start = processCluster.getValidity().getStart();
+ instanceSequence = EntityUtil.getInstanceSequence(start, process.getFrequency(),
+ process.getTimezone(), getInstanceTime().toDate());
+ break;
+ }
+ }
+ }
+
+ // Currently, registers for only data notifications to ensure gating conditions are met.
+ // Can be extended to register for other notifications.
+ private void registerForNotifications(boolean isResume) throws FalconException {
+ if (process.getInputs() == null) {
+ return;
+ }
+ for (Input input : process.getInputs().getInputs()) {
+ // Register for notification for every required input
+ if (input.isOptional()) {
+ continue;
+ }
+ Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
+ for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
+ List<Location> locations = FeedHelper.getLocations(cluster, feed);
+ for (Location loc : locations) {
+ if (loc.getType() != LocationType.DATA) {
+ continue;
+ }
+
+ Predicate predicate = Predicate.createDataPredicate(loc);
+ // To ensure we evaluate only predicates not evaluated before when an instance is resumed.
+ if (isResume && !awaitedPredicates.contains(predicate)) {
+ continue;
+ }
+ // TODO : Revisit this once the Data Availability Service has been built
+ DataAvailabilityService.DataRequestBuilder requestBuilder =
+ (DataAvailabilityService.DataRequestBuilder)
+ NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA)
+ .createRequestBuilder(executionService, getId());
+ requestBuilder.setDataLocation(new Path(loc.getPath()));
+ NotificationServicesRegistry.register(requestBuilder.build());
+ LOG.info("Registered for a data notification for process {} for data location {}",
+ process.getName(), loc.getPath());
+ awaitedPredicates.add(predicate);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onEvent(Event event) throws FalconException {
+ switch (event.getSource()) {
+ case JOB_SCHEDULE:
+ JobScheduledEvent jobScheduleEvent = (JobScheduledEvent) event;
+ setExternalID(jobScheduleEvent.getExternalID());
+ setActualStart(jobScheduleEvent.getStartTime());
+ break;
+ case JOB_COMPLETION:
+ setActualEnd(((JobCompletedEvent)event).getEndTime());
+ break;
+ case DATA:
+ // Data has not become available and the wait time has passed
+ if (((DataEvent) event).getStatus() == DataEvent.STATUS.UNAVAILABLE) {
+ if (getTimeOutInMillis() <= (System.currentTimeMillis() - getCreationTime().getMillis())) {
+ hasTimedOut = true;
+ }
+ } else {
+ // If the event matches any of the awaited predicates, remove the predicate of the awaited list
+ Predicate toRemove = null;
+ for (Predicate predicate : awaitedPredicates) {
+ if (predicate.evaluate(Predicate.getPredicate(event))) {
+ toRemove = predicate;
+ break;
+ }
+ }
+ if (toRemove != null) {
+ awaitedPredicates.remove(toRemove);
+ }
+ }
+ break;
+ default:
+ }
+ }
+
+ /**
+ * Is the instance ready to be scheduled?
+ *
+ * @return true when it is not already scheduled or is gated on some conditions.
+ */
+ public boolean isReady() {
+ if (getExternalID() != null) {
+ return false;
+ }
+ if (awaitedPredicates.isEmpty()) {
+ return true;
+ } else {
+ // If it is waiting to be scheduled, it is in ready.
+ for (Predicate predicate : awaitedPredicates) {
+ if (!predicate.getType().equals(Predicate.TYPE.JOB_COMPLETION)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ /**
+ * Is the instance scheduled for execution?
+ *
+ * @return - true if it is scheduled and has not yet completed.
+ * @throws FalconException
+ */
+ public boolean isScheduled() throws FalconException {
+ return getExternalID() != null && dagEngine.isScheduled(this);
+ }
+
+ /**
+ * Has the instance timed out waiting for gating conditions to be met?
+ *
+ * @return
+ */
+ public boolean hasTimedout() {
+ return hasTimedOut || (getTimeOutInMillis() <= (System.currentTimeMillis() - getCreationTime().getMillis()));
+ }
+
+ @Override
+ public ID getId() {
+ return id;
+ }
+
+ @Override
+ public Entity getEntity() {
+ return process;
+ }
+
+ @Override
+ public int getInstanceSequence() {
+ return instanceSequence;
+ }
+
+ @Override
+ public List<Predicate> getAwaitingPredicates() throws FalconException {
+ return awaitedPredicates;
+ }
+
+ @Override
+ public void suspend() throws FalconException {
+ if (getExternalID() != null) {
+ dagEngine.suspend(this);
+ }
+ destroy();
+ }
+
+ @Override
+ public void resume() throws FalconException {
+ // Was already scheduled on the DAGEngine, so resume on DAGEngine if suspended
+ if (getExternalID() != null) {
+ dagEngine.resume(this);
+ } else if (awaitedPredicates.size() != 0) {
+ // Evaluate any remaining predicates
+ registerForNotifications(true);
+ }
+ }
+
+ @Override
+ public void kill() throws FalconException {
+ if (getExternalID() != null) {
+ dagEngine.kill(this);
+ }
+ destroy();
+ }
+
+ // If timeout specified in process, uses it.
+ // Else, defaults to frequency of the entity * timeoutFactor
+ private long getTimeOutInMillis() {
+ if (process.getTimeout() == null) {
+ // Default timeout is the frequency of the entity
+ int timeoutFactor = Integer.parseInt(RuntimeProperties.get().getProperty("instance.timeout.factor",
+ "1"));
+ return SchedulerUtil.getFrequencyInMillis(DateTime.now(), process.getFrequency()) * timeoutFactor;
+ } else {
+ // TODO : Should timeout = 0 have a special meaning or should it be disallowed?
+ return SchedulerUtil.getFrequencyInMillis(DateTime.now(), process.getTimeout());
+ }
+ }
+
+ @Override
+ public void destroy() throws FalconException {
+ NotificationServicesRegistry.unregister(executionService, getId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
new file mode 100644
index 0000000..68c34e7
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.process.Cluster;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.exception.InvalidStateTransitionException;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.JobCompletedEvent;
+import org.apache.falcon.notification.service.event.TimeElapsedEvent;
+import org.apache.falcon.notification.service.impl.JobCompletionService;
+import org.apache.falcon.notification.service.impl.SchedulerService;
+import org.apache.falcon.notification.service.impl.AlarmService;
+import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.StateService;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.TimeZone;
+
+/**
+ * This class is responsible for managing execution instances of a process.
+ * It caches the active process instances in memory and handles notification events.
+ * It intercepts all the notification events intended for its instances and passes them along to the instance after
+ * acting on it, where applicable.
+ */
+public class ProcessExecutor extends EntityExecutor {
+ private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutor.class);
+ protected LoadingCache<ID, ProcessExecutionInstance> instances;
+ private Predicate triggerPredicate;
+ private final Process process;
+ private final StateService stateService = StateService.get();
+ private final FalconExecutionService executionService = FalconExecutionService.get();
+
+ /**
+ * Constructor per entity, per cluster.
+ *
+ * @param proc
+ * @param clusterName
+ * @throws FalconException
+ */
+ public ProcessExecutor(Process proc, String clusterName) throws FalconException {
+ process = proc;
+ cluster = clusterName;
+ id = new ID(proc, clusterName);
+ }
+
+ @Override
+ public void schedule() throws FalconException {
+ // Lazy instantiation
+ if (instances == null) {
+ initInstances();
+ }
+ // Check to handle restart and restoration from state store.
+ if (STATE_STORE.getEntity(id).getCurrentState() != EntityState.STATE.SCHEDULED) {
+ dryRun();
+ } else {
+ LOG.info("Process, {} was already scheduled on cluster, {}.", process.getName(), cluster);
+ LOG.info("Loading instances for process {} from state store.", process.getName());
+ reloadInstances();
+ }
+ registerForNotifications();
+ }
+
+ private void dryRun() throws FalconException {
+ DAGEngineFactory.getDAGEngine(cluster).submit(process);
+ }
+
+ // Initializes the cache of execution instances. Cache is backed by the state store.
+ private void initInstances() throws FalconException {
+ int cacheSize = Integer.parseInt(StartupProperties.get().getProperty("scheduler.instance.cache.size",
+ DEFAULT_CACHE_SIZE));
+
+ instances = CacheBuilder.newBuilder()
+ .maximumSize(cacheSize)
+ .build(new CacheLoader<ID, ProcessExecutionInstance>() {
+ @Override
+ public ProcessExecutionInstance load(ID id) throws Exception {
+ return (ProcessExecutionInstance) STATE_STORE.getExecutionInstance(id).getInstance();
+ }
+ });
+ }
+
+ // Re-load any active instances from state
+ private void reloadInstances() throws FalconException {
+ for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster,
+ InstanceState.getActiveStates())) {
+ ExecutionInstance instance = instanceState.getInstance();
+ LOG.debug("Loading instance {} from state.", instance.getId());
+ switch (instanceState.getCurrentState()) {
+ case RUNNING:
+ onSchedule(instance);
+ break;
+ case READY:
+ onConditionsMet(instance);
+ break;
+ case WAITING:
+ instance.resume();
+ break;
+ default: // skip
+ }
+ instances.put(instance.getId(), (ProcessExecutionInstance) instance);
+ }
+ }
+
+ @Override
+ public void suspendAll() throws FalconException {
+ NotificationServicesRegistry.unregister(executionService, getId());
+ StringBuffer errMsg = new StringBuffer();
+ // Only active instances are in memory. Suspend them first.
+ for (ExecutionInstance instance : instances.asMap().values()) {
+ try {
+ suspend(instance);
+ } catch (FalconException e) {
+ // Proceed with next
+ errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage());
+ LOG.error("Instance suspend failed for : " + instance.getId(), e);
+ }
+ }
+ for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster,
+ InstanceState.getActiveStates())) {
+ ExecutionInstance instance = instanceState.getInstance();
+ try {
+ suspend(instance);
+ } catch (FalconException e) {
+ errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage());
+ LOG.error("Instance suspend failed for : " + instance.getId(), e);
+ }
+ }
+ // Some errors
+ if (errMsg.length() != 0) {
+ throw new FalconException("Some instances failed to suspend : " + errMsg.toString());
+ }
+ }
+
+ @Override
+ public void resumeAll() throws FalconException {
+ if (instances == null) {
+ initInstances();
+ }
+ StringBuffer errMsg = new StringBuffer();
+ ArrayList<InstanceState.STATE> states = new ArrayList<InstanceState.STATE>();
+ // TODO : Distinguish between individually suspended instance versus suspended entity?
+ states.add(InstanceState.STATE.SUSPENDED);
+ // Load cache with suspended instances
+ for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster, states)) {
+ ExecutionInstance instance = instanceState.getInstance();
+ try {
+ resume(instance);
+ } catch (FalconException e) {
+ errMsg.append("Instance suspend failed for : " + instance.getId() + " due to " + e.getMessage());
+ LOG.error("Instance suspend failed for : " + instance.getId(), e);
+ }
+ }
+ registerForNotifications();
+ // Some errors
+ if (errMsg.length() != 0) {
+ throw new FalconException("Some instances failed to resume : " + errMsg.toString());
+ }
+ }
+
+ @Override
+ public void killAll() throws FalconException {
+ NotificationServicesRegistry.unregister(executionService, getId());
+ StringBuffer errMsg = new StringBuffer();
+ // Only active instances are in memory. Kill them first.
+ for (ExecutionInstance instance : instances.asMap().values()) {
+ try {
+ kill(instance);
+ } catch (FalconException e) {
+ // Proceed with next
+ errMsg.append("Instance kill failed for : " + instance.getId() + " due to " + e.getMessage());
+ LOG.error("Instance kill failed for : " + instance.getId(), e);
+ }
+ }
+ for (InstanceState instanceState : STATE_STORE.getExecutionInstances(process, cluster,
+ InstanceState.getActiveStates())) {
+ ExecutionInstance instance = instanceState.getInstance();
+ try {
+ kill(instance);
+ } catch (FalconException e) {
+ errMsg.append("Instance kill failed for : " + instance.getId() + " due to " + e.getMessage());
+ LOG.error("Instance kill failed for : " + instance.getId(), e);
+ }
+ }
+ // Some errors
+ if (errMsg.length() != 0) {
+ throw new FalconException("Some instances failed to kill : " + errMsg.toString());
+ }
+ }
+
+ @Override
+ public void suspend(ExecutionInstance instance) throws FalconException {
+ try {
+ instance.suspend();
+ stateService.handleStateChange(instance, InstanceState.EVENT.SUSPEND, this);
+ } catch (Exception e) {
+ LOG.error("Suspend failed for instance id : " + instance.getId(), e);
+ throw new FalconException("Suspend failed for instance : " + instance.getId(), e);
+ }
+
+ }
+
+ @Override
+ public void resume(ExecutionInstance instance) throws FalconException {
+
+ try {
+ instance.resume();
+ if (((ProcessExecutionInstance) instance).isScheduled()) {
+ stateService.handleStateChange(instance, InstanceState.EVENT.RESUME_RUNNING, this);
+ onSchedule(instance);
+ } else if (((ProcessExecutionInstance) instance).isReady()) {
+ stateService.handleStateChange(instance, InstanceState.EVENT.RESUME_READY, this);
+ onConditionsMet(instance);
+ } else {
+ stateService.handleStateChange(instance, InstanceState.EVENT.RESUME_WAITING, this);
+ }
+ } catch (Exception e) {
+ LOG.error("Resume failed for instance id : " + instance.getId(), e);
+ throw new FalconException("Resume failed for instance : " + instance.getId(), e);
+ }
+ }
+
+ @Override
+ public void kill(ExecutionInstance instance) throws FalconException {
+ try {
+ // Kill will de-register from notification services
+ instance.kill();
+ stateService.handleStateChange(instance, InstanceState.EVENT.KILL, this);
+ } catch (Exception e) {
+ LOG.error("Kill failed for instance id : " + instance.getId(), e);
+ throw new FalconException("Kill failed for instance : " + instance.getId(), e);
+ }
+ }
+
+ @Override
+ public Entity getEntity() {
+ return process;
+ }
+
+ private ProcessExecutionInstance buildInstance(Event event) throws FalconException {
+ // If a time triggered instance, use nominal time from event
+ if (event.getSource() == NotificationServicesRegistry.SERVICE.TIME) {
+ TimeElapsedEvent timeEvent = (TimeElapsedEvent) event;
+ LOG.debug("Creating a new process instance for nominal time {}.", timeEvent.getInstanceTime());
+ return new ProcessExecutionInstance(process, timeEvent.getInstanceTime(), cluster);
+ } else {
+ return new ProcessExecutionInstance(process, DateTime.now(), cluster);
+ }
+ }
+
+ @Override
+ public void onEvent(Event event) throws FalconException {
+ try {
+ // Handle event if applicable
+ if (shouldHandleEvent(event)) {
+ handleEvent(event);
+ } else {
+ // Else, pass it along to the execution instance
+ ProcessExecutionInstance instance = instances.get(event.getTarget());
+ if (instance != null) {
+ instance.onEvent(event);
+ if (instance.isReady()) {
+ stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this);
+ } else if (instance.hasTimedout()) {
+ stateService.handleStateChange(instance, InstanceState.EVENT.TIME_OUT, this);
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new FalconException("Unable to handle event with source : " + event.getSource() + " with target:"
+ + event.getTarget(), e);
+ }
+ }
+
+ private void handleEvent(Event event) throws FalconException {
+ ProcessExecutionInstance instance;
+ try {
+ switch (event.getSource()) {
+ // TODO : Handle cases where scheduling fails.
+ case JOB_SCHEDULE:
+ instance = instances.get(event.getTarget());
+ instance.onEvent(event);
+ stateService.handleStateChange(instance, InstanceState.EVENT.SCHEDULE, this);
+ break;
+ case JOB_COMPLETION:
+ instance = instances.get(event.getTarget());
+ instance.onEvent(event);
+ switch (((JobCompletedEvent) event).getStatus()) {
+ case SUCCEEDED:
+ stateService.handleStateChange(instance, InstanceState.EVENT.SUCCEED, this);
+ break;
+ case FAILED:
+ stateService.handleStateChange(instance, InstanceState.EVENT.FAIL, this);
+ break;
+ case KILLED:
+ stateService.handleStateChange(instance, InstanceState.EVENT.KILL, this);
+ break;
+ case SUSPENDED:
+ stateService.handleStateChange(instance, InstanceState.EVENT.SUSPEND, this);
+ break;
+ default:
+ throw new InvalidStateTransitionException(
+ "Job seems to be have been managed outside Falcon.");
+ }
+ break;
+ default:
+ if (isTriggerEvent(event)) {
+ instance = buildInstance(event);
+ stateService.handleStateChange(instance, InstanceState.EVENT.TRIGGER, this);
+ // This happens where are no conditions the instance is waiting on (for example, no data inputs).
+ if (instance.isReady()) {
+ stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this);
+ }
+ }
+ }
+ } catch (Exception ee) {
+ throw new FalconException("Unable to cache execution instance", ee);
+ }
+ }
+
+ // Evaluates the trigger predicate against the current event, to determine if a new instance needs to be triggered.
+ private boolean isTriggerEvent(Event event) {
+ try {
+ return triggerPredicate.evaluate(Predicate.getPredicate(event));
+ } catch (FalconException e) {
+ return false;
+ }
+ }
+
+ // Registers for all notifications that should trigger an instance.
+ // Currently, only time based triggers are handled.
+ protected void registerForNotifications() throws FalconException {
+ AlarmService.AlarmRequestBuilder requestBuilder =
+ (AlarmService.AlarmRequestBuilder)
+ NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.TIME)
+ .createRequestBuilder(executionService, getId());
+ Cluster processCluster = ProcessHelper.getCluster(process, cluster);
+
+ InstanceState instanceState = STATE_STORE.getLastExecutionInstance(process, cluster);
+ // If there are no instances, use process's start, else, use last materialized instance's nominal time
+ Date startTime = (instanceState == null) ? processCluster.getValidity().getStart()
+ : EntityUtil.getNextInstanceTime(instanceState.getInstance().getInstanceTime().toDate(),
+ EntityUtil.getFrequency(process), EntityUtil.getTimeZone(process), 1);
+ Date endTime = processCluster.getValidity().getEnd();
+ // TODO : Handle cron based and calendar based time triggers
+ // TODO : Set execution order details.
+ requestBuilder.setFrequency(process.getFrequency())
+ .setStartTime(new DateTime(startTime))
+ .setEndTime(new DateTime(endTime))
+ .setTimeZone(TimeZone.getTimeZone("UTC"));
+ NotificationServicesRegistry.register(requestBuilder.build());
+ LOG.info("Registered for a time based notification for process {} with frequency: {}, "
+ + "start time: {}, end time: {}", process.getName(), process.getFrequency(), startTime, endTime);
+ triggerPredicate = Predicate.createTimePredicate(startTime.getTime(), endTime.getTime(), -1);
+ }
+
+ @Override
+ public ID getId() {
+ return id;
+ }
+
+ // This executor must handle any events intended for itself.
+ // Or, if it is job run or job complete notifications, so it can handle the instance's state transition.
+ private boolean shouldHandleEvent(Event event) {
+ return event.getTarget().equals(id)
+ || event.getSource() == NotificationServicesRegistry.SERVICE.JOB_COMPLETION
+ || event.getSource() == NotificationServicesRegistry.SERVICE.JOB_SCHEDULE;
+ }
+
+ @Override
+ public void onTrigger(ExecutionInstance instance) throws FalconException {
+ instances.put(new ID(instance), (ProcessExecutionInstance) instance);
+ }
+
+ @Override
+ public void onConditionsMet(ExecutionInstance instance) throws FalconException {
+ // Put process in run queue and register for notification
+ SchedulerService.JobScheduleRequestBuilder requestBuilder = (SchedulerService.JobScheduleRequestBuilder)
+ NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE)
+ .createRequestBuilder(executionService, getId());
+ requestBuilder.setInstance(instance);
+ NotificationServicesRegistry.register(requestBuilder.build());
+ }
+
+ @Override
+ public void onSchedule(ExecutionInstance instance) throws FalconException {
+ JobCompletionService.JobCompletionRequestBuilder completionRequestBuilder =
+ (JobCompletionService.JobCompletionRequestBuilder)
+ NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_COMPLETION)
+ .createRequestBuilder(executionService, getId());
+ completionRequestBuilder.setExternalId(instance.getExternalID());
+ completionRequestBuilder.setCluster(instance.getCluster());
+ NotificationServicesRegistry.register(completionRequestBuilder.build());
+ }
+
+ @Override
+ public void onSuspend(ExecutionInstance instance) throws FalconException {
+ instances.invalidate(instance.getId());
+ }
+
+ @Override
+ public void onResume(ExecutionInstance instance) throws FalconException {
+ instances.put(instance.getId(), (ProcessExecutionInstance) instance);
+ }
+
+ @Override
+ public void onKill(ExecutionInstance instance) throws FalconException {
+ instances.invalidate(instance.getId());
+ }
+
+ @Override
+ public void onSuccess(ExecutionInstance instance) throws FalconException {
+ instance.destroy();
+ instances.invalidate(instance.getId());
+ }
+
+ @Override
+ public void onFailure(ExecutionInstance instance) throws FalconException {
+ instance.destroy();
+ instances.invalidate(instance.getId());
+ }
+
+ @Override
+ public void onTimeOut(ExecutionInstance instance) throws FalconException {
+ instance.destroy();
+ instances.invalidate(instance.getId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java b/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java
new file mode 100644
index 0000000..3e7fc9b
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.entity.v0.Frequency;
+import org.joda.time.DateTime;
+
+/**
+ * Contains utility methods.
+ */
+public final class SchedulerUtil {
+
+ private static final long MINUTE_IN_MS = 60 * 1000L;
+ private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS;
+
+ private SchedulerUtil(){};
+
+ /**
+ * Returns the frequency in millis from the given time.
+ * Needs to take the calender into account.
+ * @param referenceTime
+ * @param frequency
+ * @return
+ */
+ public static long getFrequencyInMillis(DateTime referenceTime, Frequency frequency) {
+ switch (frequency.getTimeUnit()) {
+ case minutes:
+ return MINUTE_IN_MS * frequency.getFrequencyAsInt();
+ case hours:
+ return HOUR_IN_MS * frequency.getFrequencyAsInt();
+ case days:
+ return referenceTime.plusDays(frequency.getFrequencyAsInt()).getMillis() - referenceTime.getMillis();
+ case months:
+ return referenceTime.plusMonths(frequency.getFrequencyAsInt()).getMillis() - referenceTime.getMillis();
+ default:
+ throw new IllegalArgumentException("Invalid time unit " + frequency.getTimeUnit().name());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/FalconNotificationService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/FalconNotificationService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/FalconNotificationService.java
new file mode 100644
index 0000000..41d20a8
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/FalconNotificationService.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service;
+
+import org.apache.falcon.exception.NotificationServiceException;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.request.NotificationRequest;
+import org.apache.falcon.service.FalconService;
+import org.apache.falcon.state.ID;
+
+/**
+ * An interface that every notification service must implement.
+ */
+public interface FalconNotificationService extends FalconService {
+
+ /**
+ * Register for a notification.
+ *
+ * @param notifRequest
+ */
+ void register(NotificationRequest notifRequest) throws NotificationServiceException;
+
+ /**
+ * De-register from receiving notifications.
+ * @param handler - The notification handler that needs to be de-registered.
+ * @param callbackID
+ */
+ void unregister(NotificationHandler handler, ID callbackID) throws NotificationServiceException;
+
+ /**
+ * Creates and returns an implementation of
+ * {@link RequestBuilder} that is applicable to the service.
+ * @param handler - The notification handler that needs to be de-registered.
+ * @param callbackID
+ * @return
+ */
+ RequestBuilder createRequestBuilder(NotificationHandler handler, ID callbackID);
+
+ /**
+ * Builder to build appropriate {@link NotificationRequest}.
+ * @param <T>
+ */
+ abstract class RequestBuilder<T extends NotificationRequest> {
+
+ protected NotificationHandler handler;
+ protected ID callbackId;
+
+ public RequestBuilder(NotificationHandler notificationHandler, ID callbackID) {
+ if (notificationHandler == null) {
+ throw new IllegalArgumentException("Handler cannot be null.");
+ }
+ this.handler = notificationHandler;
+ this.callbackId = callbackID;
+ }
+
+ /**
+ * @return Corresponding {@link NotificationRequest}.
+ */
+ public abstract T build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/NotificationServicesRegistry.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/NotificationServicesRegistry.java b/scheduler/src/main/java/org/apache/falcon/notification/service/NotificationServicesRegistry.java
new file mode 100644
index 0000000..3ffb489
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/NotificationServicesRegistry.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service;
+
+import org.apache.falcon.exception.NotificationServiceException;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.request.NotificationRequest;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.state.ID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A service registry that manages the notification services.
+ * This class is also responsible for routing any register and unregister calls to the appropriate service.
+ */
+public final class NotificationServicesRegistry {
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationServicesRegistry.class);
+
+ /**
+ * A list of notifiation service that the scheduler framework uses.
+ */
+ public enum SERVICE {
+ TIME("AlarmService"),
+ DATA("DataAvailabilityService"),
+ JOB_COMPLETION("JobCompletionService"),
+ JOB_SCHEDULE("JobSchedulerService");
+
+ private final String name;
+
+ private SERVICE(String name) {
+ this.name = name;
+ }
+
+ public String toString() {
+ return name;
+ }
+ }
+
+ private NotificationServicesRegistry() {
+ }
+
+ /**
+ * Routes the notification request to appropriate service based on the request.
+ *
+ * @param notifRequest
+ */
+ public static void register(NotificationRequest notifRequest) throws NotificationServiceException {
+ FalconNotificationService service = getService(notifRequest.getService());
+ service.register(notifRequest);
+ }
+
+ /**
+ * De-registers the listener from all services.
+ *
+ * @param listenerID
+ */
+ public static void unregister(NotificationHandler handler, ID listenerID)
+ throws NotificationServiceException {
+ for (SERVICE service : SERVICE.values()) {
+ unregisterForNotification(handler, listenerID, service);
+ }
+ }
+
+ /**
+ * @param serviceType - Type of service requested
+ * @return An instance of {@link org.apache.falcon.notification.service.FalconNotificationService}
+ */
+ public static FalconNotificationService getService(SERVICE serviceType) {
+ FalconNotificationService service = Services.get().getService(serviceType.toString());
+ if (service == null) {
+ LOG.error("Unable to find service type : {} . Service not registered.", serviceType.toString());
+ throw new RuntimeException("Unable to find service : " + serviceType.toString()
+ + " . Service not registered.");
+ }
+ return service;
+ }
+
+ /**
+ * @param serviceName - Name of service requested
+ * @return - An instance of {@link org.apache.falcon.notification.service.FalconNotificationService}
+ * @throws NotificationServiceException
+ */
+ public static FalconNotificationService getService(String serviceName) throws NotificationServiceException {
+ SERVICE serviceType = null;
+ for (SERVICE type : SERVICE.values()) {
+ if (type.toString().equals(serviceName)) {
+ serviceType = type;
+ }
+ }
+ if (serviceType == null) {
+ LOG.error("Unable to find service : {}. Not a valid service.", serviceName);
+ throw new NotificationServiceException("Unable to find service : " + serviceName
+ + " . Not a valid service.");
+ }
+ return getService(serviceType);
+ }
+
+ /**
+ * Routes the unregister request to the mentioned service.
+ * @param handler
+ * @param listenerID
+ * @param service
+ */
+ public static void unregisterForNotification(NotificationHandler handler, ID listenerID, SERVICE service)
+ throws NotificationServiceException {
+ FalconNotificationService falconNotificationService = getService(service);
+ falconNotificationService.unregister(handler, listenerID);
+ }
+}
[2/6] falcon git commit: FALCON-1213 Base framework of the native
scheduler
Posted by pa...@apache.org.
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
new file mode 100644
index 0000000..ca2010b
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.workflow.engine;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.exception.DAGEngineException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A DAG Engine that uses Oozie to execute the DAG.
+ */
+public class OozieDAGEngine implements DAGEngine {
+ private static final Logger LOG = LoggerFactory.getLogger(OozieDAGEngine.class);
+ private final OozieClient client;
+ private static final int WORKFLOW_STATUS_RETRY_DELAY_MS = 100;
+
+ private static final String WORKFLOW_STATUS_RETRY_COUNT = "workflow.status.retry.count";
+ private static final List<String> PARENT_WF_ACTION_NAMES = Arrays.asList(
+ "pre-processing",
+ "recordsize",
+ "succeeded-post-processing",
+ "failed-post-processing"
+ );
+
+ public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm";
+ private final Cluster cluster;
+
+ public OozieDAGEngine(Cluster cluster) throws DAGEngineException {
+ try {
+ client = OozieClientFactory.get(cluster);
+ this.cluster = cluster;
+ } catch (Exception e) {
+ throw new DAGEngineException(e);
+ }
+ }
+
+ public OozieDAGEngine(String clusterName) throws DAGEngineException {
+ try {
+ this.cluster = ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
+ client = OozieClientFactory.get(cluster);
+ } catch (Exception e) {
+ throw new DAGEngineException(e);
+ }
+ }
+
+ @Override
+ public String run(ExecutionInstance instance) throws DAGEngineException {
+ try {
+ Properties properties = getRunProperties(instance);
+ Path buildPath = EntityUtil.getLatestStagingPath(cluster, instance.getEntity());
+ switchUser();
+ properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser());
+ properties.setProperty(OozieClient.APP_PATH, buildPath.toString());
+ return client.run(properties);
+ } catch (OozieClientException e) {
+ LOG.error("Ozie client exception:", e);
+ throw new DAGEngineException(e);
+ } catch (FalconException e1) {
+ LOG.error("Falcon Exception : ", e1);
+ throw new DAGEngineException(e1);
+ }
+ }
+
+ private void prepareEntityBuildPath(Entity entity) throws FalconException {
+ Path stagingPath = EntityUtil.getBaseStagingPath(cluster, entity);
+ Path logPath = EntityUtil.getLogPath(cluster, entity);
+
+ try {
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+ ClusterHelper.getConfiguration(cluster));
+ HadoopClientFactory.mkdirsWithDefaultPerms(fs, stagingPath);
+ HadoopClientFactory.mkdirsWithDefaultPerms(fs, logPath);
+ } catch (IOException e) {
+ throw new FalconException("Error preparing base staging dirs: " + stagingPath, e);
+ }
+ }
+
+ private void dryRunInternal(Properties props) throws OozieClientException {
+ LOG.info("Dry run with properties {}", props);
+ client.dryrun(props);
+ }
+
+ private void switchUser() {
+ String user = System.getProperty("user.name");
+ CurrentUser.authenticate(user);
+ }
+
+ @Override
+ public boolean isScheduled(ExecutionInstance instance) throws DAGEngineException {
+ try {
+ return statusEquals(client.getJobInfo(instance.getExternalID()).getStatus().name(),
+ Job.Status.PREP, Job.Status.RUNNING);
+ } catch (OozieClientException e) {
+ throw new DAGEngineException(e);
+ }
+ }
+
+ // TODO : To be implemented. Currently hardcoded for process
+ private Properties getRunProperties(ExecutionInstance instance) {
+ Properties props = new Properties();
+ DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT);
+ String nominalTime = fmt.print(instance.getInstanceTime());
+ props.put("nominalTime", nominalTime);
+ props.put("timeStamp", nominalTime);
+ props.put("feedNames", "NONE");
+ props.put("feedInstancePaths", "NONE");
+ props.put("falconInputFeeds", "NONE");
+ props.put("falconInPaths", "NONE");
+ props.put("feedNames", "NONE");
+ props.put("feedInstancePaths", "NONE");
+ props.put("userJMSNotificationEnabled", "true");
+ return props;
+ }
+
+ // TODO : To be implemented. Currently hardcoded for process
+ private Properties getDryRunProperties(Entity entity) {
+ Properties props = new Properties();
+ DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT);
+ String nominalTime = fmt.print(DateTime.now());
+ props.put("nominalTime", nominalTime);
+ props.put("timeStamp", nominalTime);
+ props.put("feedNames", "NONE");
+ props.put("feedInstancePaths", "NONE");
+ props.put("falconInputFeeds", "NONE");
+ props.put("falconInPaths", "NONE");
+ props.put("feedNames", "NONE");
+ props.put("feedInstancePaths", "NONE");
+ props.put("userJMSNotificationEnabled", "true");
+ return props;
+ }
+
+ @Override
+ public void suspend(ExecutionInstance instance) throws DAGEngineException {
+ try {
+ client.suspend(instance.getExternalID());
+ assertStatus(instance.getExternalID(), Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED, Job.Status.SUCCEEDED,
+ Job.Status.FAILED, Job.Status.KILLED);
+ LOG.info("Suspended job {} on cluster {}", instance.getExternalID(), instance.getCluster());
+ } catch (OozieClientException e) {
+ throw new DAGEngineException(e);
+ }
+ }
+
+ @Override
+ public void resume(ExecutionInstance instance) throws DAGEngineException {
+ try {
+ client.resume(instance.getExternalID());
+ assertStatus(instance.getExternalID(), Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUCCEEDED,
+ Job.Status.FAILED, Job.Status.KILLED);
+ LOG.info("Resumed job {} on cluster {}", instance.getExternalID(), instance.getCluster());
+ } catch (OozieClientException e) {
+ throw new DAGEngineException(e);
+ }
+ }
+
+ @Override
+ public void kill(ExecutionInstance instance) throws DAGEngineException {
+ try {
+ client.kill(instance.getExternalID());
+ assertStatus(instance.getExternalID(), Job.Status.KILLED, Job.Status.SUCCEEDED, Job.Status.FAILED);
+ LOG.info("Killed job {} on cluster {}", instance.getExternalID(), instance.getCluster());
+ } catch (OozieClientException e) {
+ throw new DAGEngineException(e);
+ }
+ }
+
+ @Override
+ public void reRun(ExecutionInstance instance) throws DAGEngineException {
+ // TODO : Implement this
+ }
+
+ @Override
+ public void submit(Entity entity) throws DAGEngineException {
+ try {
+ // TODO : remove hardcoded Tag value when feed support is added.
+ OozieOrchestrationWorkflowBuilder builder =
+ OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.DEFAULT);
+ prepareEntityBuildPath(entity);
+ Path buildPath = EntityUtil.getNewStagingPath(cluster, entity);
+ Properties properties = builder.build(cluster, buildPath);
+ if (properties == null) {
+ LOG.info("Entity {} is not scheduled on cluster {}", entity.getName(), cluster);
+ throw new DAGEngineException("Properties for entity " + entity.getName() + " is empty");
+ }
+
+ switchUser();
+ LOG.debug("Logged in user is " + CurrentUser.getUser());
+ properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser());
+ properties.setProperty(OozieClient.APP_PATH, buildPath.toString());
+ properties.putAll(getDryRunProperties(entity));
+ //Do submit before run as run is asynchronous
+ dryRunInternal(properties);
+ } catch (OozieClientException e) {
+ LOG.error("Oozie client exception:", e);
+ throw new DAGEngineException(e);
+ } catch (FalconException e1) {
+ LOG.error("Falcon Exception : ", e1);
+ throw new DAGEngineException(e1);
+ }
+ }
+
+ @Override
+ public InstancesResult.Instance info(String externalID) throws DAGEngineException {
+ InstancesResult.Instance instance = new InstancesResult.Instance();
+ try {
+ LOG.debug("Retrieving details for job {} ", externalID);
+ WorkflowJob jobInfo = client.getJobInfo(externalID);
+ instance.startTime = jobInfo.getStartTime();
+ if (jobInfo.getStatus().name().equals(Job.Status.RUNNING.name())) {
+ instance.endTime = new Date();
+ } else {
+ instance.endTime = jobInfo.getEndTime();
+ }
+ instance.cluster = cluster.getName();
+ instance.runId = jobInfo.getRun();
+ instance.status = InstancesResult.WorkflowStatus.valueOf(jobInfo.getStatus().name());
+ instance.logFile = jobInfo.getConsoleUrl();
+ instance.wfParams = getWFParams(jobInfo);
+ return instance;
+ } catch (Exception e) {
+ LOG.error("Error when attempting to get info for " + externalID, e);
+ throw new DAGEngineException(e);
+ }
+ }
+
+ private InstancesResult.KeyValuePair[] getWFParams(WorkflowJob jobInfo) {
+ Configuration conf = new Configuration(false);
+ conf.addResource(new ByteArrayInputStream(jobInfo.getConf().getBytes()));
+ InstancesResult.KeyValuePair[] wfParams = new InstancesResult.KeyValuePair[conf.size()];
+ int i = 0;
+ for (Map.Entry<String, String> entry : conf) {
+ wfParams[i++] = new InstancesResult.KeyValuePair(entry.getKey(), entry.getValue());
+ }
+ return wfParams;
+ }
+
+ @Override
+ public List<InstancesResult.InstanceAction> getJobDetails(String externalID) throws DAGEngineException {
+ List<InstancesResult.InstanceAction> instanceActions = new ArrayList<>();
+ try {
+ WorkflowJob wfJob = client.getJobInfo(externalID);
+ List<WorkflowAction> wfActions = wfJob.getActions();
+ // We wanna capture job urls for all user-actions & non succeeded actions of the main workflow
+ for (WorkflowAction action : wfActions) {
+ if (action.getType().equalsIgnoreCase("sub-workflow")
+ && StringUtils.isNotEmpty(action.getExternalId())) {
+ // if the action is sub-workflow, get job urls of all actions within the sub-workflow
+ List<WorkflowAction> subWorkFlowActions = client
+ .getJobInfo(action.getExternalId()).getActions();
+ for (WorkflowAction subWfAction : subWorkFlowActions) {
+ if (!subWfAction.getType().startsWith(":")) {
+ InstancesResult.InstanceAction instanceAction =
+ new InstancesResult.InstanceAction(subWfAction.getName(),
+ subWfAction.getExternalStatus(), subWfAction.getConsoleUrl());
+ instanceActions.add(instanceAction);
+ }
+ }
+ } else if (!action.getType().startsWith(":")) {
+ // if the action is a transition node it starts with :, we don't need their statuses
+ if (PARENT_WF_ACTION_NAMES.contains(action.getName())
+ && !Job.Status.SUCCEEDED.toString().equals(action.getExternalStatus())) {
+ // falcon actions in the main workflow are defined in the list
+ // get job urls for all non succeeded actions of the main workflow
+ InstancesResult.InstanceAction instanceAction =
+ new InstancesResult.InstanceAction(action.getName(), action.getExternalStatus(),
+ action.getConsoleUrl());
+ instanceActions.add(instanceAction);
+ } else if (!PARENT_WF_ACTION_NAMES.contains(action.getName())
+ && !StringUtils.equals(action.getExternalId(), "-")) {
+ // if user-action is pig/hive there is no sub-workflow, we wanna capture their urls as well
+ InstancesResult.InstanceAction instanceAction =
+ new InstancesResult.InstanceAction(action.getName(), action.getExternalStatus(),
+ action.getConsoleUrl());
+ instanceActions.add(instanceAction);
+ }
+ }
+ }
+ return instanceActions;
+ } catch (OozieClientException oce) {
+ throw new DAGEngineException(oce);
+ }
+ }
+
+ @Override
+ public boolean isAlive() throws DAGEngineException {
+ try {
+ return client.getSystemMode() == OozieClient.SYSTEM_MODE.NORMAL;
+ } catch (OozieClientException e) {
+ throw new DAGEngineException("Unable to reach Oozie server.", e);
+ }
+ }
+
+ @Override
+ public Properties getConfiguration(String externalID) throws DAGEngineException {
+ Properties props = new Properties();
+ try {
+ WorkflowJob jobInfo = client.getJobInfo(externalID);
+ Configuration conf = new Configuration(false);
+ conf.addResource(new ByteArrayInputStream(jobInfo.getConf().getBytes()));
+
+ for (Map.Entry<String, String> entry : conf) {
+ props.put(entry.getKey(), entry.getValue());
+ }
+ } catch (OozieClientException e) {
+ throw new DAGEngineException(e);
+ }
+
+ return props;
+ }
+
+ // Get status of a workflow (with retry) and ensure it is one of statuses requested.
+ private void assertStatus(String jobID, Job.Status... statuses) throws DAGEngineException {
+ String actualStatus = null;
+ int retryCount;
+ String retry = RuntimeProperties.get().getProperty(WORKFLOW_STATUS_RETRY_COUNT, "30");
+ try {
+ retryCount = Integer.valueOf(retry);
+ } catch (NumberFormatException nfe) {
+ throw new DAGEngineException("Invalid value provided for runtime property \""
+ + WORKFLOW_STATUS_RETRY_COUNT + "\". Please provide an integer value.");
+ }
+ for (int counter = 0; counter < retryCount; counter++) {
+ try {
+ actualStatus = client.getJobInfo(jobID).getStatus().name();
+ } catch (OozieClientException e) {
+ LOG.error("Unable to get status of workflow: " + jobID, e);
+ throw new DAGEngineException(e);
+ }
+ if (!statusEquals(actualStatus, statuses)) {
+ try {
+ Thread.sleep(WORKFLOW_STATUS_RETRY_DELAY_MS);
+ } catch (InterruptedException ignore) {
+ //ignore
+ }
+ } else {
+ return;
+ }
+ }
+ throw new DAGEngineException("For Job" + jobID + ", actual statuses: " + actualStatus + ", expected statuses: "
+ + Arrays.toString(statuses));
+ }
+
+ private boolean statusEquals(String left, Job.Status... right) {
+ for (Job.Status rightElement : right) {
+ if (left.equals(rightElement.name())) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
new file mode 100644
index 0000000..b2f9e59
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
@@ -0,0 +1,557 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.notification.service.event.DataEvent;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.JobCompletedEvent;
+import org.apache.falcon.notification.service.event.JobScheduledEvent;
+import org.apache.falcon.notification.service.event.TimeElapsedEvent;
+import org.apache.falcon.notification.service.impl.AlarmService;
+import org.apache.falcon.notification.service.impl.DataAvailabilityService;
+import org.apache.falcon.notification.service.impl.JobCompletionService;
+import org.apache.falcon.notification.service.impl.SchedulerService;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.engine.DAGEngine;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.OozieClientException;
+import org.apache.oozie.client.WorkflowJob;
+import org.joda.time.DateTime;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+
+/**
+ * Tests the API of FalconExecution Service and in turn the FalconExecutionService.get()s.
+ */
+public class FalconExecutionServiceTest extends AbstractTestBase {
+
+ private InMemoryStateStore stateStore = null;
+ private AlarmService mockTimeService;
+ private DataAvailabilityService mockDataService;
+ private SchedulerService mockSchedulerService;
+ private JobCompletionService mockCompletionService;
+ private DAGEngine dagEngine;
+ private int instanceCount = 0;
+
+ @BeforeClass
+ public void init() throws Exception {
+ this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+ this.conf = dfsCluster.getConf();
+ setupServices();
+ setupConfigStore();
+ }
+
+ @AfterClass
+ public void tearDown() {
+ this.dfsCluster.shutdown();
+ }
+
+ // State store is set up to sync with Config Store. That gets tested too.
+ public void setupConfigStore() throws Exception {
+ stateStore = (InMemoryStateStore) AbstractStateStore.get();
+ getStore().registerListener(stateStore);
+ storeEntity(EntityType.CLUSTER, "testCluster");
+ storeEntity(EntityType.FEED, "clicksFeed");
+ storeEntity(EntityType.FEED, "clicksSummary");
+ }
+
+ public void setupServices() throws FalconException, OozieClientException {
+ mockTimeService = Mockito.mock(AlarmService.class);
+ Mockito.when(mockTimeService.getName()).thenReturn("AlarmService");
+ Mockito.when(mockTimeService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+ Mockito.any(ID.class))).thenCallRealMethod();
+
+ mockDataService = Mockito.mock(DataAvailabilityService.class);
+ Mockito.when(mockDataService.getName()).thenReturn("DataAvailabilityService");
+ Mockito.when(mockDataService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+ Mockito.any(ID.class))).thenCallRealMethod();
+ mockSchedulerService = Mockito.mock(SchedulerService.class);
+ Mockito.when(mockSchedulerService.getName()).thenReturn("JobSchedulerService");
+ StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName());
+ StartupProperties.get().setProperty("execution.service.impl", FalconExecutionService.class.getName());
+ dagEngine = Mockito.spy(DAGEngineFactory.getDAGEngine("testCluster"));
+ Mockito.when(mockSchedulerService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+ Mockito.any(ID.class))).thenCallRealMethod();
+ mockCompletionService = Mockito.mock(JobCompletionService.class);
+ Mockito.when(mockCompletionService.getName()).thenReturn("JobCompletionService");
+ Mockito.when(mockCompletionService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+ Mockito.any(ID.class))).thenCallRealMethod();
+ Services.get().register(mockTimeService);
+ Services.get().register(mockDataService);
+ Services.get().register(mockSchedulerService);
+ Services.get().register(mockCompletionService);
+ Services.get().register(FalconExecutionService.get());
+ }
+
+ @BeforeMethod
+ private void setupStateStore() throws FalconException {
+ stateStore.clear();
+ }
+
+ @Test
+ public void testBasicFlow() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize1");
+ Process process = getStore().get(EntityType.PROCESS, "summarize1");
+ Assert.assertNotNull(process);
+ ID processKey = new ID(process);
+ String clusterName = dfsCluster.getCluster().getName();
+
+ // Schedule a process
+ Assert.assertEquals(stateStore.getEntity(processKey).getCurrentState(), EntityState.STATE.SUBMITTED);
+ FalconExecutionService.get().schedule(process);
+ Assert.assertEquals(stateStore.getEntity(processKey).getCurrentState(), EntityState.STATE.SCHEDULED);
+
+ // Simulate a time notification
+ Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+
+ // Ensure an instance is triggered and registers for data notification
+ Assert.assertEquals(stateStore.getAllExecutionInstances(process, clusterName).size(), 1);
+ InstanceState instance = stateStore.getAllExecutionInstances(process, clusterName).iterator().next();
+ Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.WAITING);
+
+ // Simulate a data notification
+ event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ // Ensure the instance is ready for execution
+ Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.READY);
+
+ // Simulate a scheduled notification
+ event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ // Ensure the instance is running
+ instance = stateStore.getAllExecutionInstances(process, clusterName).iterator().next();
+ Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.RUNNING);
+
+ // Simulate a job complete notification
+ event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instance.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ // Ensure the instance is in succeeded and is in the state store
+ instance = stateStore.getAllExecutionInstances(process, clusterName).iterator().next();
+ Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.SUCCEEDED);
+ }
+
+ @Test
+ public void testSuspendResume() throws Exception {
+ Mockito.doNothing().when(dagEngine).resume(Mockito.any(ExecutionInstance.class));
+ storeEntity(EntityType.PROCESS, "summarize2");
+ Process process = getStore().get(EntityType.PROCESS, "summarize2");
+ Assert.assertNotNull(process);
+ String clusterName = dfsCluster.getCluster().getName();
+ ID processID = new ID(process, clusterName);
+
+ // Schedule a process
+ Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
+ FalconExecutionService.get().schedule(process);
+
+ // Simulate two time notifications
+ Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+
+ // Suspend and resume All waiting - check for notification deregistration
+
+ Iterator i = stateStore.getAllExecutionInstances(process, clusterName).iterator();
+ InstanceState instance1 = (InstanceState) i.next();
+ InstanceState instance2 = (InstanceState) i.next();
+
+ // Simulate a data notification
+ event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance1.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ // One in ready and one in waiting. Both should be suspended.
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY);
+ Assert.assertEquals(instance1.getInstance().getAwaitingPredicates().size(), 0);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.WAITING);
+
+ FalconExecutionService.get().suspend(process);
+
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUSPENDED);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.SUSPENDED);
+ Mockito.verify(mockDataService).unregister(FalconExecutionService.get(),
+ instance1.getInstance().getId());
+ Mockito.verify(mockDataService).unregister(FalconExecutionService.get(),
+ instance2.getInstance().getId());
+ Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), processID);
+
+ FalconExecutionService.get().resume(process);
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.WAITING);
+
+ // Simulate a data notification and a job run notification
+ event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance2.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance1.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ // One in running and the other in ready. Both should be suspended
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
+ Mockito.when(dagEngine.isScheduled(instance1.getInstance())).thenReturn(true);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
+
+ FalconExecutionService.get().suspend(process);
+
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUSPENDED);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.SUSPENDED);
+
+ FalconExecutionService.get().resume(process);
+
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
+
+ // Running should finish after resume
+ event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instance1.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUCCEEDED);
+ }
+
+ @Test
+ // Kill waiting, ready, running - check for notification deregistration
+ public void testDelete() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize4");
+ Process process = getStore().get(EntityType.PROCESS, "summarize4");
+ Assert.assertNotNull(process);
+ String clusterName = dfsCluster.getCluster().getName();
+ ID processID = new ID(process, clusterName);
+
+ // Schedule a process
+ Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
+ FalconExecutionService.get().schedule(process);
+
+ // Simulate three time notifications
+ Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+
+ // Suspend and resume All waiting - check for notification deregistration
+ Iterator i = stateStore.getAllExecutionInstances(process, clusterName).iterator();
+ InstanceState instance1 = (InstanceState) i.next();
+ InstanceState instance2 = (InstanceState) i.next();
+ InstanceState instance3 = (InstanceState) i.next();
+
+ // Simulate two data notifications and one job run
+ event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance1.getInstance());
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance2.getInstance());
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance1.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ // One in ready, one in waiting and one running.
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
+ Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING);
+
+ FalconExecutionService.get().delete(process);
+
+ // Deregister from notification services
+ Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), processID);
+ }
+
+ @Test
+ public void testTimeOut() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize3");
+ Process process = getStore().get(EntityType.PROCESS, "summarize3");
+ Assert.assertNotNull(process);
+ String clusterName = dfsCluster.getCluster().getName();
+ ID processID = new ID(process, clusterName);
+
+ // Schedule a process
+ Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
+ FalconExecutionService.get().schedule(process);
+
+ // Simulate time notification
+ Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+
+ // Simulate data unavailable notification and timeout
+ InstanceState instanceState = stateStore.getAllExecutionInstances(process, clusterName).iterator().next();
+ ((Process) instanceState.getInstance().getEntity()).setTimeout(new Frequency("minutes(0)"));
+ DataEvent dataEvent = (DataEvent) createEvent(NotificationServicesRegistry.SERVICE.DATA,
+ instanceState.getInstance());
+ dataEvent.setStatus(DataEvent.STATUS.UNAVAILABLE);
+
+ FalconExecutionService.get().onEvent(dataEvent);
+
+ Assert.assertEquals(instanceState.getCurrentState(), InstanceState.STATE.TIMED_OUT);
+ }
+
+ // Non-triggering event should not create an instance
+ @Test
+ public void testNonTriggeringEvents() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize6");
+ Process process = getStore().get(EntityType.PROCESS, "summarize6");
+ Assert.assertNotNull(process);
+ String clusterName = dfsCluster.getCluster().getName();
+ ID processID = new ID(process, clusterName);
+
+ // Schedule a process
+ Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
+ FalconExecutionService.get().schedule(process);
+ Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SCHEDULED);
+
+ // Simulate data notification with a callback of the process.
+ Event event = createEvent(NotificationServicesRegistry.SERVICE.DATA, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+
+ // No instances should get triggered
+ Assert.assertTrue(stateStore.getAllExecutionInstances(process, clusterName).isEmpty());
+ }
+
+ // Individual instance suspend, resume, kill
+ @Test
+ public void testInstanceOperations() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize5");
+ Process process = getStore().get(EntityType.PROCESS, "summarize5");
+ Assert.assertNotNull(process);
+ String clusterName = dfsCluster.getCluster().getName();
+ ID processID = new ID(process, clusterName);
+
+ // Schedule a process
+ Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SUBMITTED);
+ FalconExecutionService.get().schedule(process);
+
+ // Simulate three time notifications
+ Event event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.TIME, process, clusterName);
+ FalconExecutionService.get().onEvent(event);
+
+ // Suspend and resume All waiting - check for notification deregistration
+ Iterator i = stateStore.getAllExecutionInstances(process, clusterName).iterator();
+ InstanceState instance1 = (InstanceState) i.next();
+ InstanceState instance2 = (InstanceState) i.next();
+ InstanceState instance3 = (InstanceState) i.next();
+
+ // Simulate two data notifications and one job run
+ event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance1.getInstance());
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.DATA, instance2.getInstance());
+ FalconExecutionService.get().onEvent(event);
+ event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instance1.getInstance());
+ FalconExecutionService.get().onEvent(event);
+
+ // One in ready, one in waiting and one running.
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
+ Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING);
+
+ EntityExecutor entityExecutor = FalconExecutionService.get().getEntityExecutor(process, clusterName);
+ Assert.assertNotNull(entityExecutor);
+ Collection<ExecutionInstance> instances = new ArrayList<>();
+ for (InstanceState instanceState : stateStore.getAllExecutionInstances(process, clusterName)) {
+ instances.add(instanceState.getInstance());
+ }
+
+ for (ExecutionInstance instance : instances) {
+ entityExecutor.suspend(instance);
+ }
+
+ // Instances must be suspended, but, entity itself must not be.
+ for (InstanceState instanceState : stateStore.getAllExecutionInstances(process, clusterName)) {
+ Assert.assertEquals(instanceState.getCurrentState(), InstanceState.STATE.SUSPENDED);
+ Mockito.verify(mockDataService).unregister(FalconExecutionService.get(),
+ instanceState.getInstance().getId());
+ }
+ Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SCHEDULED);
+
+ for (ExecutionInstance instance : instances) {
+ entityExecutor.resume(instance);
+ }
+ // Back to one in ready, one in waiting and one running.
+ Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
+ Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
+ Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING);
+
+ for (ExecutionInstance instance : instances) {
+ entityExecutor.kill(instance);
+ }
+
+ // Instances must be killed, but, entity itself must not be.
+ for (InstanceState instanceState : stateStore.getAllExecutionInstances(process, clusterName)) {
+ Assert.assertEquals(instanceState.getCurrentState(), InstanceState.STATE.KILLED);
+ }
+ Assert.assertEquals(stateStore.getEntity(processID).getCurrentState(), EntityState.STATE.SCHEDULED);
+ }
+
+ @Test(priority = -1)
+ // Add some entities and instance in the store and ensure the executor picks up from where it left
+ public void testSystemRestart() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize7");
+ Process process = getStore().get(EntityType.PROCESS, "summarize7");
+ Assert.assertNotNull(process);
+ String clusterName = dfsCluster.getCluster().getName();
+ ID processID = new ID(process, clusterName);
+
+ // Store couple of instances in store
+ stateStore.getEntity(processID).setCurrentState(EntityState.STATE.SCHEDULED);
+ ProcessExecutionInstance instance1 = new ProcessExecutionInstance(process,
+ new DateTime(System.currentTimeMillis() - 60 * 60 * 1000), clusterName);
+ InstanceState instanceState1 = new InstanceState(instance1);
+ instanceState1.setCurrentState(InstanceState.STATE.RUNNING);
+ stateStore.putExecutionInstance(instanceState1);
+ ProcessExecutionInstance instance2 = new ProcessExecutionInstance(process,
+ new DateTime(System.currentTimeMillis() - 30 * 60 * 1000), clusterName);
+ InstanceState instanceState2 = new InstanceState(instance2);
+ instanceState2.setCurrentState(InstanceState.STATE.READY);
+ stateStore.putExecutionInstance(instanceState2);
+
+ FalconExecutionService.get().init();
+
+ // Simulate a scheduled notification. This should cause the reload from state store
+ Event event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instanceState2.getInstance());
+ FalconExecutionService.get().onEvent(event);
+ Assert.assertEquals(instanceState2.getCurrentState(), InstanceState.STATE.RUNNING);
+
+ // Simulate a Job completion notification and ensure the instance resumes from where it left
+ event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instanceState1.getInstance());
+ FalconExecutionService.get().onEvent(event);
+ Assert.assertEquals(instanceState1.getCurrentState(), InstanceState.STATE.SUCCEEDED);
+ }
+
+ @Test(dataProvider = "actions")
+ public void testPartialFailures(String name, String action, InstanceState.STATE state) throws Exception {
+ storeEntity(EntityType.PROCESS, name);
+ Process process = getStore().get(EntityType.PROCESS, name);
+ Assert.assertNotNull(process);
+ String clusterName = dfsCluster.getCluster().getName();
+ ID processID = new ID(process, clusterName);
+
+ // Schedule the process
+ FalconExecutionService.get().schedule(process);
+
+ // Store couple of instances in store
+ stateStore.getEntity(processID).setCurrentState(EntityState.STATE.SCHEDULED);
+ ProcessExecutionInstance instance1 = new ProcessExecutionInstance(process,
+ new DateTime(System.currentTimeMillis() - 60 * 60 * 1000), clusterName);
+ instance1.setExternalID("123");
+ InstanceState instanceState1 = new InstanceState(instance1);
+ instanceState1.setCurrentState(InstanceState.STATE.RUNNING);
+ stateStore.putExecutionInstance(instanceState1);
+ ProcessExecutionInstance instance2 = new ProcessExecutionInstance(process,
+ new DateTime(System.currentTimeMillis() - 30 * 60 * 1000), clusterName);
+ InstanceState instanceState2 = new InstanceState(instance2);
+ instanceState2.setCurrentState(InstanceState.STATE.READY);
+ stateStore.putExecutionInstance(instanceState2);
+
+ // Mock failure
+ ((MockDAGEngine)dagEngine).addFailInstance(instance1);
+ Method m = FalconExecutionService.get().getClass().getMethod(action, Entity.class);
+ try {
+ m.invoke(FalconExecutionService.get(), process);
+ Assert.fail("Exception expected.");
+ } catch (Exception e) {
+ // One instance must fail and the other not
+ Assert.assertEquals(instanceState2.getCurrentState(), state);
+ Assert.assertEquals(instanceState1.getCurrentState(), InstanceState.STATE.RUNNING);
+ }
+
+ // throw no exception
+ ((MockDAGEngine)dagEngine).removeFailInstance(instance1);
+ m.invoke(FalconExecutionService.get(), process);
+
+ // Both instances must be in expected state.
+ Assert.assertEquals(instanceState2.getCurrentState(), state);
+ Assert.assertEquals(instanceState1.getCurrentState(), state);
+ }
+
+ @DataProvider(name = "actions")
+ public Object[][] testActions() {
+ return new Object[][] {
+ {"summarize8", "suspend", InstanceState.STATE.SUSPENDED},
+ {"summarize9", "delete", InstanceState.STATE.KILLED},
+ };
+ }
+
+ private Event createEvent(NotificationServicesRegistry.SERVICE type, Process process, String cluster) {
+ ID id = new ID(process, cluster);
+ switch (type) {
+ case TIME:
+ Date start = process.getClusters().getClusters().get(0).getValidity().getStart();
+ long instanceOffset =
+ SchedulerUtil.getFrequencyInMillis(DateTime.now(), process.getFrequency()) * instanceCount++;
+ return new TimeElapsedEvent(id, new DateTime(start),
+ new DateTime(process.getClusters().getClusters().get(0).getValidity().getEnd()),
+ new DateTime(start.getTime() + instanceOffset));
+ case DATA:
+ DataEvent dataEvent = new DataEvent(id, new Path("/projects/falcon/clicks"), LocationType.DATA,
+ DataEvent.STATUS.AVAILABLE);
+ return dataEvent;
+ default:
+ return null;
+ }
+ }
+
+ private Event createEvent(NotificationServicesRegistry.SERVICE type, ExecutionInstance instance) {
+ ID id = new ID(instance);
+ switch (type) {
+ case DATA:
+ DataEvent dataEvent = new DataEvent(id, new Path("/projects/falcon/clicks"), LocationType.DATA,
+ DataEvent.STATUS.AVAILABLE);
+ return dataEvent;
+ case JOB_SCHEDULE:
+ JobScheduledEvent scheduledEvent = new JobScheduledEvent(id, JobScheduledEvent.STATUS.SUCCESSFUL);
+ scheduledEvent.setExternalID("234");
+ return scheduledEvent;
+ case JOB_COMPLETION:
+ JobCompletedEvent jobEvent = new JobCompletedEvent(id, WorkflowJob.Status.SUCCEEDED, DateTime.now());
+ return jobEvent;
+ default:
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
new file mode 100644
index 0000000..087114f
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.DAGEngineException;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.workflow.engine.DAGEngine;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A Mock DAG Execution Engine used for tests.
+ */
+public class MockDAGEngine implements DAGEngine {
+ private List<ExecutionInstance> failInstances = new ArrayList<>();
+ private Map<ExecutionInstance, Integer> runInvocations = new HashMap<>();
+
+ public MockDAGEngine(String cluster) {
+
+ }
+
+ @Override
+ public String run(ExecutionInstance instance) throws DAGEngineException {
+ if (failInstances.contains(instance)) {
+ throw new DAGEngineException("Mock failure.");
+ }
+ Integer count = 1;
+ if (runInvocations.containsKey(instance)) {
+ // Increment count
+ count = runInvocations.get(instance) + 1;
+ }
+
+ runInvocations.put(instance, count);
+ return "123";
+ }
+
+ @Override
+ public boolean isScheduled(ExecutionInstance instance) throws DAGEngineException {
+ return true;
+ }
+
+ @Override
+ public void suspend(ExecutionInstance instance) throws DAGEngineException {
+ if (failInstances.contains(instance)) {
+ throw new DAGEngineException("mock failure.");
+ }
+ }
+
+ @Override
+ public void resume(ExecutionInstance instance) throws DAGEngineException {
+
+ }
+
+ @Override
+ public void kill(ExecutionInstance instance) throws DAGEngineException {
+ if (failInstances.contains(instance)) {
+ throw new DAGEngineException("mock failure.");
+ }
+ }
+
+ @Override
+ public void reRun(ExecutionInstance instance) throws DAGEngineException {
+
+ }
+
+ @Override
+ public void submit(Entity entity) throws DAGEngineException {
+
+ }
+
+ @Override
+ public InstancesResult.Instance info(String externalID) throws DAGEngineException {
+ return new InstancesResult.Instance();
+ }
+
+ @Override
+ public List<InstancesResult.InstanceAction> getJobDetails(String externalID) throws DAGEngineException {
+ return null;
+ }
+
+ @Override
+ public boolean isAlive() throws DAGEngineException {
+ return false;
+ }
+
+ @Override
+ public Properties getConfiguration(String externalID) throws DAGEngineException {
+ return null;
+ }
+
+ public void addFailInstance(ExecutionInstance failInstance) {
+ this.failInstances.add(failInstance);
+ }
+
+ public void removeFailInstance(ExecutionInstance failInstance) {
+ this.failInstances.remove(failInstance);
+ }
+
+ public Integer getTotalRuns(ExecutionInstance instance) {
+ return runInvocations.get(instance);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java b/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java
new file mode 100644
index 0000000..9457454
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/execution/SchedulerUtilTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.execution;
+
+import org.apache.falcon.entity.v0.Frequency;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test for Utility methods.
+ */
+public class SchedulerUtilTest {
+
+ @Test(dataProvider = "frequencies")
+ public void testGetFrequencyInMillis(DateTime referenceTime, Frequency frequency, long expectedValue) {
+ Assert.assertEquals(SchedulerUtil.getFrequencyInMillis(referenceTime, frequency), expectedValue);
+ }
+
+ @DataProvider(name = "frequencies")
+ public Object[][] getTestFrequencies() {
+ DateTimeFormatter formatter = DateTimeFormat.forPattern("dd/MM/yyyy HH:mm:ss");
+ return new Object[][] {
+ {DateTime.now(), new Frequency("minutes(10)"), 10*60*1000L},
+ {DateTime.now(), new Frequency("hours(6)"), 6*60*60*1000L},
+ // Feb of leap year
+ {formatter.parseDateTime("04/02/2012 14:00:00"), new Frequency("months(1)"), 29*24*60*60*1000L},
+ // Months with 31 and 30 days
+ {formatter.parseDateTime("02/10/2015 03:30:00"), new Frequency("months(2)"), (31+30)*24*60*60*1000L},
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java
new file mode 100644
index 0000000..36f1fd1
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/notification/service/AlarmServiceTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.impl.AlarmService;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.TimeZone;
+
+/**
+ * Class to test the time notification service.
+ */
+public class AlarmServiceTest {
+
+ private static AlarmService timeService = Mockito.spy(new AlarmService());
+ private static NotificationHandler handler = Mockito.mock(NotificationHandler.class);
+
+ @BeforeClass
+ public void setup() throws FalconException {
+ timeService.init();
+ }
+
+ @Test
+ // This test ensures notifications are generated for time events that have occurred in the past.
+ public void testbackLogCatchup() throws Exception {
+ TimeZone tz = TimeZone.getTimeZone("UTC");
+ DateTime now = DateTime.now(DateTimeZone.forTimeZone(tz));
+ // Start time 2 mins ago
+ DateTime startTime = new DateTime(now.getMillis() - 2*60*1000, DateTimeZone.forTimeZone(tz));
+ // End time 2 mins later
+ DateTime endTime = new DateTime(now.getMillis() + 2*60*1000 , DateTimeZone.forTimeZone(tz));
+
+ Process mockProcess = new Process();
+ mockProcess.setName("test");
+ ID id = new ID(mockProcess);
+ id.setCluster("testCluster");
+
+ AlarmService.AlarmRequestBuilder request =
+ new AlarmService.AlarmRequestBuilder(handler, id);
+ request.setStartTime(startTime);
+ request.setEndTime(endTime);
+ request.setFrequency(new Frequency("minutes(1)"));
+ request.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ timeService.register(request.build());
+ // Asynchronous execution, hence a small wait.
+ Thread.sleep(1000);
+ // Based on the minute boundary, there might be 3.
+ Mockito.verify(handler, Mockito.atLeast(2)).onEvent(Mockito.any(Event.class));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
new file mode 100644
index 0000000..b4a0f35
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.MockDAGEngine;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.execution.ProcessExecutionInstance;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.JobCompletedEvent;
+import org.apache.falcon.notification.service.event.JobScheduledEvent;
+import org.apache.falcon.notification.service.impl.DataAvailabilityService;
+import org.apache.falcon.notification.service.impl.JobCompletionService;
+import org.apache.falcon.notification.service.impl.SchedulerService;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.engine.DAGEngine;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.oozie.client.WorkflowJob;
+import org.joda.time.DateTime;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+
+import static org.apache.falcon.state.InstanceState.STATE;
+
+/**
+ * Class to test the job scheduler service.
+ */
+public class SchedulerServiceTest extends AbstractTestBase {
+
+ private SchedulerService scheduler = Mockito.spy(new SchedulerService());
+ private NotificationHandler handler;
+ private static String cluster = "testCluster";
+ private static InMemoryStateStore stateStore = (InMemoryStateStore) AbstractStateStore.get();
+ private static DAGEngine mockDagEngine;
+ private static Process process;
+ private volatile boolean failed = false;
+
+ @BeforeMethod
+ public void setup() throws FalconException {
+ // Initialized before every test in order to track invocations.
+ handler = Mockito.spy(new MockNotificationHandler());
+ }
+
+ @BeforeClass
+ public void init() throws Exception {
+ this.dfsCluster = EmbeddedCluster.newCluster(cluster);
+ this.conf = dfsCluster.getConf();
+ setupConfigStore();
+ DataAvailabilityService mockDataService = Mockito.mock(DataAvailabilityService.class);
+ Mockito.when(mockDataService.getName()).thenReturn("DataAvailabilityService");
+ Mockito.when(mockDataService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+ Mockito.any(ID.class))).thenCallRealMethod();
+ Services.get().register(mockDataService);
+ JobCompletionService mockCompletionService = Mockito.mock(JobCompletionService.class);
+ Mockito.when(mockCompletionService.getName()).thenReturn("JobCompletionService");
+ Mockito.when(mockCompletionService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+ Mockito.any(ID.class))).thenCallRealMethod();
+ Services.get().register(mockCompletionService);
+
+ Services.get().register(scheduler);
+ scheduler.init();
+ StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName());
+ mockDagEngine = DAGEngineFactory.getDAGEngine("testCluster");
+ }
+
+ @AfterClass
+ public void tearDown() {
+ this.dfsCluster.shutdown();
+ }
+
+ // State store is set up to sync with Config Store. That gets tested too.
+ public void setupConfigStore() throws Exception {
+ getStore().registerListener(stateStore);
+ storeEntity(EntityType.CLUSTER, "testCluster");
+ storeEntity(EntityType.FEED, "clicksFeed");
+ storeEntity(EntityType.FEED, "clicksSummary");
+ storeEntity(EntityType.PROCESS, "mockSummary");
+ process = getStore().get(EntityType.PROCESS, "mockSummary");
+ }
+
+ @Test
+ public void testSchedulingWithParallelInstances() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize");
+ Process mockProcess = getStore().get(EntityType.PROCESS, "summarize");
+ mockProcess.setParallel(1);
+ Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
+ ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
+ SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance1.getId());
+ request.setInstance(instance1);
+ // No instances running, ensure DAGEngine.run is invoked.
+ scheduler.register(request.build());
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+ Mockito.verify(handler).onEvent(Mockito.any(JobScheduledEvent.class));
+ // Max. instances running, the new instance should not be run.
+ ExecutionInstance instance2 = new ProcessExecutionInstance(mockProcess,
+ new DateTime(startTime.getTime() + 60000), cluster);
+ SchedulerService.JobScheduleRequestBuilder request2 = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance2.getId());
+ request2.setInstance(instance2);
+ scheduler.register(request2.build());
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), null);
+ // Max. instances running, the new instance should not be run.
+ ExecutionInstance instance3 = new ProcessExecutionInstance(mockProcess,
+ new DateTime(startTime.getTime() + 120000), cluster);
+ SchedulerService.JobScheduleRequestBuilder request3 = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance3.getId());
+ request3.setInstance(instance3);
+ scheduler.register(request3.build());
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+ // Simulate the completion of previous instance.
+ stateStore.getExecutionInstance(instance1.getId()).setCurrentState(STATE.SUCCEEDED);
+ scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster), WorkflowJob.Status.SUCCEEDED,
+ DateTime.now()));
+ // When an instance completes instance2 should get scheduled next iteration
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1));
+ // Simulate another completion and ensure instance3 runs.
+ stateStore.getExecutionInstance(instance2.getId()).setCurrentState(STATE.SUCCEEDED);
+ scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster), WorkflowJob.Status.SUCCEEDED,
+ DateTime.now()));
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance3), new Integer(1));
+ }
+
+ @Test
+ public void testSchedulingWithDependencies() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize1");
+ Process mockProcess = getStore().get(EntityType.PROCESS, "summarize1");
+ Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
+ // Create two instances, one dependent on the other.
+ ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
+ ExecutionInstance instance2 = new ProcessExecutionInstance(mockProcess,
+ new DateTime(startTime.getTime() + 60000), cluster);
+ SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance1.getId());
+ ArrayList<ExecutionInstance> dependencies = new ArrayList<ExecutionInstance>();
+ dependencies.add(instance2);
+ request.setDependencies(dependencies);
+ request.setInstance(instance1);
+ // instance2 is not scheduled, dependency not satisfied
+ // So, instance1 should not be scheduled either.
+ scheduler.register(request.build());
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), null);
+ Mockito.verify(handler, Mockito.times(0)).onEvent(Mockito.any(JobScheduledEvent.class));
+
+ // Schedule instance1
+ SchedulerService.JobScheduleRequestBuilder request2 = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance2.getId());
+ request2.setInstance(instance2);
+ scheduler.register(request2.build());
+
+ // Simulate completion of the instance.
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1));
+ Mockito.verify(handler, Mockito.times(1)).onEvent(Mockito.any(JobScheduledEvent.class));
+ stateStore.getExecutionInstance(instance2.getId()).setCurrentState(STATE.SUCCEEDED);
+ scheduler.onEvent(new JobCompletedEvent(new ID(mockProcess, cluster, instance2.getInstanceTime()),
+ WorkflowJob.Status.SUCCEEDED, DateTime.now()));
+ // Dependency now satisfied. Now, the first instance should get scheduled after retry delay.
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+ }
+
+ @Test
+ public void testSchedulingWithPriorities() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize2");
+ Process mockProcess = getStore().get(EntityType.PROCESS, "summarize2");
+ storeEntity(EntityType.PROCESS, "summarize3");
+ Process mockProcess2 = getStore().get(EntityType.PROCESS, "summarize3");
+ // Set higher priority
+ Property priorityProp = new Property();
+ priorityProp.setName(EntityUtil.MR_JOB_PRIORITY);
+ priorityProp.setValue("HIGH");
+ mockProcess2.getProperties().getProperties().add(priorityProp);
+ Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
+ // Create two one dependent on the other.
+ ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
+ ExecutionInstance instance2 = new ProcessExecutionInstance(mockProcess2, new DateTime(startTime), cluster);
+ SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance1.getId());
+ request.setInstance(instance1);
+ SchedulerService.JobScheduleRequestBuilder request2 = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance2.getId());
+ request2.setInstance(instance2);
+ scheduler.register(request.build());
+ scheduler.register(request2.build());
+ Thread.sleep(100);
+ // Instance2 should be scheduled first.
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1));
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+ Mockito.verify(handler, Mockito.times(2)).onEvent(Mockito.any(JobScheduledEvent.class));
+ }
+
+ @Test
+ public void testDeRegistration() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize4");
+ Process mockProcess = getStore().get(EntityType.PROCESS, "summarize4");
+ mockProcess.setParallel(3);
+ Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
+ ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
+ // Schedule 3 instances.
+ SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance1.getId());
+ request.setInstance(instance1);
+ scheduler.register(request.build());
+ ExecutionInstance instance2 = new ProcessExecutionInstance(mockProcess,
+ new DateTime(startTime.getTime() + 60000), cluster);
+ SchedulerService.JobScheduleRequestBuilder request2 = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance2.getId());
+ request2.setInstance(instance2);
+ scheduler.register(request2.build());
+ ExecutionInstance instance3 = new ProcessExecutionInstance(mockProcess,
+ new DateTime(startTime.getTime() + 120000), cluster);
+ SchedulerService.JobScheduleRequestBuilder request3 = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(handler, instance3.getId());
+ request3.setInstance(instance3);
+ scheduler.register(request3.build());
+
+ // Abort second instance
+ scheduler.unregister(handler, instance2.getId());
+
+ Thread.sleep(100);
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance3), new Integer(1));
+ // Second instance should not run.
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), null);
+ }
+
+ @Test
+ public void testScheduleFailure() throws Exception {
+ storeEntity(EntityType.PROCESS, "summarize5");
+ Process mockProcess = getStore().get(EntityType.PROCESS, "summarize5");
+ Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
+ ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
+ // Scheduling an instance should fail.
+ NotificationHandler failureHandler = new NotificationHandler() {
+ @Override
+ public void onEvent(Event event) throws FalconException {
+ JobScheduledEvent scheduledEvent = ((JobScheduledEvent) event);
+ if (scheduledEvent.getStatus() != JobScheduledEvent.STATUS.FAILED) {
+ failed = true;
+ }
+ }
+ };
+ SchedulerService.JobScheduleRequestBuilder request = (SchedulerService.JobScheduleRequestBuilder)
+ scheduler.createRequestBuilder(failureHandler, instance1.getId());
+ request.setInstance(instance1);
+ ((MockDAGEngine)mockDagEngine).addFailInstance(instance1);
+ scheduler.register(request.build());
+ Thread.sleep(100);
+ Assert.assertFalse(failed);
+ ((MockDAGEngine)mockDagEngine).removeFailInstance(instance1);
+ }
+
+ /**
+ * A mock notification Handler that makes appropriate state changes.
+ */
+ public static class MockNotificationHandler implements NotificationHandler {
+ @Override
+ public void onEvent(Event event) throws FalconException {
+ JobScheduledEvent scheduledEvent = ((JobScheduledEvent) event);
+ Process p = (Process) process.copy();
+ p.setName(scheduledEvent.getTarget().getEntityName());
+ ProcessExecutionInstance instance = new ProcessExecutionInstance(p,
+ scheduledEvent.getTarget().getInstanceTime(), cluster);
+ InstanceState state = new InstanceState(instance).setCurrentState(STATE.RUNNING);
+ if (!stateStore.executionInstanceExists(instance.getId())) {
+ stateStore.putExecutionInstance(state);
+ } else {
+ stateStore.updateExecutionInstance(state);
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java b/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java
new file mode 100644
index 0000000..95dd5ae
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/predicate/PredicateTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.predicate;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.notification.service.event.TimeElapsedEvent;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.apache.falcon.entity.v0.process.Process;
+
+/**
+ * Tests the predicate class.
+ */
+public class PredicateTest {
+
+ @Test
+ public void testPredicateFromEvent() throws FalconException {
+ Process process = new Process();
+ process.setName("test");
+ DateTime now = DateTime.now();
+ TimeElapsedEvent te = new TimeElapsedEvent(new ID(process), now, now, now);
+ Predicate.getPredicate(te);
+ }
+
+ @Test
+ public void testComparison() {
+ Predicate firstPredicate = Predicate.createTimePredicate(100, 200, 50);
+ Predicate secondPredicate = Predicate.createTimePredicate(1000, 2000, 50);
+ Predicate thirdPredicate = Predicate.createTimePredicate(100, 200, -1);
+
+ Assert.assertFalse(firstPredicate.evaluate(secondPredicate));
+ Assert.assertFalse(secondPredicate.evaluate(thirdPredicate));
+ //With "ANY" type
+ Assert.assertTrue(firstPredicate.evaluate(thirdPredicate));
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
new file mode 100644
index 0000000..2f32b43
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.exception.InvalidStateTransitionException;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.InMemoryStateStore;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Tests to ensure entity state changes happen correctly.
+ */
+public class EntityStateServiceTest {
+
+ private EntityStateChangeHandler listener = Mockito.mock(EntityStateChangeHandler.class);
+
+ @BeforeMethod
+ public void setUp() {
+ ((InMemoryStateStore) AbstractStateStore.get()).clear();
+ }
+
+ // Tests a schedulable entity's lifecycle : Submit -> run -> suspend -> resume
+ @Test
+ public void testLifeCycle() throws FalconException {
+ Process mockEntity = new Process();
+ mockEntity.setName("test");
+
+ StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUBMIT, listener);
+ EntityState entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next();
+ Mockito.verify(listener).onSubmit(mockEntity);
+ Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SUBMITTED));
+ StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SCHEDULE, listener);
+ Mockito.verify(listener).onSchedule(mockEntity);
+ Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SCHEDULED));
+ StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUSPEND, listener);
+ Mockito.verify(listener).onSuspend(mockEntity);
+ Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SUSPENDED));
+ StateService.get().handleStateChange(mockEntity, EntityState.EVENT.RESUME, listener);
+ Mockito.verify(listener).onResume(mockEntity);
+ Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SCHEDULED));
+ }
+
+ @Test
+ public void testInvalidTransitions() throws FalconException {
+ Feed mockEntity = new Feed();
+ mockEntity.setName("test");
+ StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUBMIT, listener);
+ // Attempt suspending a submitted entity
+ try {
+ StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUSPEND, listener);
+ Assert.fail("Exception expected");
+ } catch (InvalidStateTransitionException e) {
+ // Do nothing
+ }
+
+ StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SCHEDULE, listener);
+ // Attempt resuming a scheduled entity
+ try {
+ StateService.get().handleStateChange(mockEntity, EntityState.EVENT.RESUME, listener);
+ Assert.fail("Exception expected");
+ } catch (InvalidStateTransitionException e) {
+ // Do nothing
+ }
+
+ // Attempt scheduling a cluster
+ Cluster mockCluster = new Cluster();
+ mockCluster.setName("test");
+ StateService.get().handleStateChange(mockCluster, EntityState.EVENT.SUBMIT, listener);
+ try {
+ StateService.get().handleStateChange(mockCluster, EntityState.EVENT.SCHEDULE, listener);
+ Assert.fail("Exception expected");
+ } catch (FalconException e) {
+ // Do nothing
+ }
+ }
+
+ @Test(dataProvider = "state_and_events")
+ public void testIdempotency(EntityState.STATE state, EntityState.EVENT event)
+ throws InvalidStateTransitionException {
+ Process mockEntity = new Process();
+ mockEntity.setName("test");
+
+ EntityState entityState = new EntityState(mockEntity).setCurrentState(state);
+ entityState.nextTransition(event);
+ Assert.assertEquals(entityState.getCurrentState(), state);
+ }
+
+ @DataProvider(name = "state_and_events")
+ public Object[][] stateAndEvents() {
+ return new Object[][]{
+ {EntityState.STATE.SCHEDULED, EntityState.EVENT.SCHEDULE},
+ {EntityState.STATE.SUBMITTED, EntityState.EVENT.SUBMIT},
+ {EntityState.STATE.SUSPENDED, EntityState.EVENT.SUSPEND},
+ };
+ }
+}
[4/6] falcon git commit: FALCON-1213 Base framework of the native
scheduler
Posted by pa...@apache.org.
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
new file mode 100644
index 0000000..4883fe7
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.event;
+
+
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * An event generated by {@link org.apache.falcon.notification.service.impl.DataAvailabilityService}
+ * indicating availability or non-availability of a dataset.
+ */
+public class DataEvent implements Event {
+ private final ID callbackID;
+ private Path dataLocation;
+ private LocationType dataType;
+ private STATUS status;
+
+ /**
+ * Enumerates the status of data.
+ */
+ public enum STATUS {
+ AVAILABLE,
+ UNAVAILABLE
+ }
+
+ public DataEvent(ID callbackID, Path location, LocationType locType, STATUS availability) {
+ this.callbackID = callbackID;
+ this.dataLocation = location;
+ this.dataType = locType;
+ this.status = availability;
+ }
+
+ public STATUS getStatus() {
+ return status;
+ }
+
+ public void setStatus(STATUS availability) {
+ this.status = availability;
+ }
+
+ public Path getDataLocation() {
+ return dataLocation;
+ }
+
+ public LocationType getDataType() {
+ return dataType;
+ }
+
+ @Override
+ public NotificationServicesRegistry.SERVICE getSource() {
+ return NotificationServicesRegistry.SERVICE.DATA;
+ }
+
+ @Override
+ public ID getTarget() {
+ return callbackID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
new file mode 100644
index 0000000..140973b
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.event;
+
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+
+/**
+ * An events that are generated by notification services must implement this interface.
+ */
+public interface Event {
+
+ /**
+ * @return The service that generated this event
+ */
+ NotificationServicesRegistry.SERVICE getSource();
+
+ /**
+ * @return ID of the notification handler for which this event was meant for.
+ */
+ ID getTarget();
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
new file mode 100644
index 0000000..c587343
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.event;
+
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.apache.oozie.client.WorkflowJob;
+import org.joda.time.DateTime;
+
+/**
+ * An event generated by {@link org.apache.falcon.notification.service.impl.JobCompletionService}
+ * indicating completion of a Job.
+ */
+public class JobCompletedEvent implements Event {
+
+ private WorkflowJob.Status status;
+ private final ID callbackID;
+ private DateTime endTime;
+
+ public JobCompletedEvent(ID callbackID, WorkflowJob.Status jobStatus, DateTime end) {
+ this.callbackID = callbackID;
+ this.status = jobStatus;
+ this.endTime = end;
+ }
+
+ public WorkflowJob.Status getStatus() {
+ return status;
+ }
+
+ @Override
+ public NotificationServicesRegistry.SERVICE getSource() {
+ return NotificationServicesRegistry.SERVICE.JOB_COMPLETION;
+ }
+
+ @Override
+ public ID getTarget() {
+ return callbackID;
+ }
+
+ public DateTime getEndTime() {
+ return endTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
new file mode 100644
index 0000000..55023e7
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.event;
+
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+
+/**
+ * An event generated by {@link org.apache.falcon.notification.service.impl.SchedulerService}
+ * indicating if an instance was scheduled for execution.
+ */
+public class JobScheduledEvent implements Event {
+ private final ID callbackID;
+ private String externalID;
+ private STATUS status;
+ private DateTime startTime;
+
+ public JobScheduledEvent(ID callbackID, STATUS status) {
+ this.callbackID = callbackID;
+ this.status = status;
+ }
+
+ public String getExternalID() {
+ return externalID;
+ }
+
+ public void setExternalID(String externalID) {
+ this.externalID = externalID;
+ }
+
+ @Override
+ public NotificationServicesRegistry.SERVICE getSource() {
+ return NotificationServicesRegistry.SERVICE.JOB_SCHEDULE;
+ }
+
+ @Override
+ public ID getTarget() {
+ return callbackID;
+ }
+
+ /**
+ * @return - The status of the scheduled DAG/Job.
+ */
+ public STATUS getStatus() {
+ return status;
+ }
+
+
+ public DateTime getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(DateTime startTime) {
+ this.startTime = startTime;
+ }
+
+ /**
+ * Enumeration of possible statuses of a DAG/Job.
+ */
+ public enum STATUS {
+ FAILED,
+ SUCCESSFUL
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
new file mode 100644
index 0000000..7ec4de6
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.event;
+
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+
+/**
+ * An event generated by {@link org.apache.falcon.notification.service.impl.AlarmService}
+ * indicating that a given time duration has elapsed.
+ */
+public class TimeElapsedEvent implements Event {
+ private DateTime startTime;
+ private DateTime endTime;
+ private DateTime instanceTime;
+ private final ID callbackID;
+
+ public DateTime getInstanceTime() {
+ return instanceTime;
+ }
+
+ public DateTime getStartTime() {
+ return startTime;
+ }
+
+ public DateTime getEndTime() {
+ return endTime;
+ }
+
+ public TimeElapsedEvent(ID callbackID, DateTime start, DateTime end, DateTime instTime) {
+ this.callbackID = callbackID;
+ this.startTime = start;
+ this.endTime = end;
+ this.instanceTime = instTime;
+ }
+
+ @Override
+ public NotificationServicesRegistry.SERVICE getSource() {
+ return NotificationServicesRegistry.SERVICE.TIME;
+ }
+
+ @Override
+ public ID getTarget() {
+ return callbackID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java
new file mode 100644
index 0000000..cccdeac
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/AlarmService.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.impl;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.exception.NotificationServiceException;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.execution.SchedulerUtil;
+import org.apache.falcon.notification.service.FalconNotificationService;
+import org.apache.falcon.notification.service.event.TimeElapsedEvent;
+import org.apache.falcon.notification.service.request.NotificationRequest;
+import org.apache.falcon.notification.service.request.AlarmRequest;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.quartz.CalendarIntervalTrigger;
+import org.quartz.DateBuilder;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.TriggerKey;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.quartz.CalendarIntervalScheduleBuilder.calendarIntervalSchedule;
+import static org.quartz.JobBuilder.newJob;
+import static org.quartz.TriggerBuilder.newTrigger;
+
+/**
+ * This notification service notifies {@link NotificationHandler} when requested time
+ * event has occurred. The class users to subscribe to frequency based, cron based or some calendar based time events.
+ */
+public class AlarmService implements FalconNotificationService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AlarmService.class);
+
+ private Map<ID, TriggerKey> notifications = new HashMap<ID, TriggerKey>();
+ private static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);
+ private Scheduler scheduler;
+
+ @Override
+ public void init() throws FalconException {
+ try {
+ scheduler = StdSchedulerFactory.getDefaultScheduler();
+ scheduler.start();
+ } catch (SchedulerException e) {
+ throw new FalconException(e);
+ }
+ }
+
+ @Override
+ public void register(NotificationRequest notificationRequest) throws NotificationServiceException {
+ LOG.info("Registering alarm notification for " + notificationRequest.getCallbackId());
+ AlarmRequest request = (AlarmRequest) notificationRequest;
+ DateTime currentTime = DateTime.now();
+ DateTime nextStartTime = request.getStartTime();
+ DateTime endTime;
+ if (request.getEndTime().isBefore(currentTime)) {
+ endTime = request.getEndTime();
+ } else {
+ endTime = currentTime;
+ }
+ // Handle past events.
+ // TODO : Quartz doesn't seem to support running jobs for past events.
+ // TODO : Remove the handling of past events when that support is added.
+ if (request.getStartTime().isBefore(currentTime)) {
+
+ List<Date> instanceTimes = EntityUtil.getInstanceTimes(request.getStartTime().toDate(),
+ request.getFrequency(), request.getTimeZone(), request.getStartTime().toDate(),
+ endTime.toDate());
+ if (instanceTimes != null && !instanceTimes.isEmpty()) {
+ Date lastInstanceTime = instanceTimes.get(instanceTimes.size() - 1);
+ nextStartTime = new DateTime(lastInstanceTime.getTime()
+ + SchedulerUtil.getFrequencyInMillis(new DateTime(lastInstanceTime), request.getFrequency()));
+ // Introduce some delay to allow for rest of the registration to complete.
+ LOG.debug("Triggering events for past from {} till {}", instanceTimes.get(0), lastInstanceTime);
+ executor.schedule(new CatchupJob(request, instanceTimes), 1, TimeUnit.SECONDS);
+ }
+ }
+ // All past events have been scheduled. Nothing to schedule in the future.
+ if (request.getEndTime().isBefore(currentTime)) {
+ return;
+ }
+ LOG.debug("Scheduling to trigger events from {} to {} with frequency {}", nextStartTime, request.getEndTime(),
+ request.getFrequency());
+ // Schedule future events using Quartz
+ CalendarIntervalTrigger trigger = newTrigger()
+ .withIdentity(notificationRequest.getCallbackId().toString(), "Falcon")
+ .startAt(nextStartTime.toDate())
+ .endAt(request.getEndTime().toDate())
+ .withSchedule(
+ calendarIntervalSchedule()
+ .withInterval(request.getFrequency().getFrequencyAsInt(),
+ getTimeUnit(request.getFrequency().getTimeUnit()))
+ .withMisfireHandlingInstructionFireAndProceed())
+ .build();
+
+ // define the job and tie it to our Job class
+ JobDetail job = newJob(FalconProcessJob.class)
+ .withIdentity(getJobKey(notificationRequest.getCallbackId().toString()))
+ .setJobData(getJobDataMap((AlarmRequest) notificationRequest))
+ .build();
+ notifications.put(notificationRequest.getCallbackId(), trigger.getKey());
+ // Tell quartz to run the job using our trigger
+ try {
+ scheduler.scheduleJob(job, trigger);
+ } catch (SchedulerException e) {
+ LOG.error("Error scheduling entity {}", trigger.getKey());
+ throw new NotificationServiceException(e);
+ }
+ }
+
+ // Maps the timeunit in entity specification to the one in Quartz DateBuilder
+ private DateBuilder.IntervalUnit getTimeUnit(Frequency.TimeUnit timeUnit) {
+ switch (timeUnit) {
+ case minutes:
+ return DateBuilder.IntervalUnit.MINUTE;
+ case hours:
+ return DateBuilder.IntervalUnit.HOUR;
+ case days:
+ return DateBuilder.IntervalUnit.DAY;
+ case months:
+ return DateBuilder.IntervalUnit.MONTH;
+ default:
+ throw new IllegalArgumentException("Invalid time unit " + timeUnit.name());
+ }
+ }
+
+ private JobKey getJobKey(String key) {
+ return new JobKey(key, "Falcon");
+ }
+
+ private JobDataMap getJobDataMap(AlarmRequest request) {
+ JobDataMap jobProps = new JobDataMap();
+ jobProps.put("request", request);
+
+ return jobProps;
+ }
+
+ @Override
+ public void unregister(NotificationHandler handler, ID listenerID) throws NotificationServiceException {
+ try {
+ LOG.info("Removing time notification for handler {} with callbackID {}", handler, listenerID);
+ scheduler.unscheduleJob(notifications.get(listenerID));
+ notifications.remove(listenerID);
+ } catch (SchedulerException e) {
+ throw new NotificationServiceException("Unable to deregister " + listenerID, e);
+ }
+ }
+
+ @Override
+ public RequestBuilder createRequestBuilder(NotificationHandler handler, ID callbackID) {
+ return new AlarmRequestBuilder(handler, callbackID);
+ }
+
+ @Override
+ public String getName() {
+ return "AlarmService";
+ }
+
+ @Override
+ public void destroy() throws FalconException {
+ try {
+ scheduler.shutdown();
+ } catch (SchedulerException e) {
+ LOG.warn("Quartz Scheduler shutdown failed.", e);
+ }
+
+ }
+
+ // Generates a time elapsed event and invokes onEvent on the handler.
+ private static void notifyHandler(AlarmRequest request, DateTime instanceTime) throws NotificationServiceException {
+ TimeElapsedEvent event = new TimeElapsedEvent(request.getCallbackId(), request.getStartTime(),
+ request.getEndTime(), instanceTime);
+ try {
+ LOG.info("Sending notification to {} with nominal time {} ", request.getCallbackId(),
+ event.getInstanceTime());
+ request.getHandler().onEvent(event);
+ } catch (FalconException e) {
+ LOG.error("Unable to onEvent " + request.getCallbackId() + " for nominal time, " + instanceTime, e);
+ throw new NotificationServiceException(e);
+ }
+ }
+
+ /**
+ * The Job that runs when a time trigger happens.
+ */
+ public static class FalconProcessJob implements Job {
+ public FalconProcessJob() {
+ }
+
+ @Override
+ public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
+ LOG.debug("Quartz job called at : {}, Next fire time: {}", jobExecutionContext.getFireTime(),
+ jobExecutionContext.getNextFireTime());
+
+ AlarmRequest request = (AlarmRequest) jobExecutionContext.getJobDetail()
+ .getJobDataMap().get("request");
+ DateTime instanceTime = new DateTime(jobExecutionContext.getScheduledFireTime(),
+ DateTimeZone.forTimeZone(request.getTimeZone()));
+
+ try {
+ notifyHandler(request, instanceTime);
+ } catch (NotificationServiceException e) {
+ throw new JobExecutionException(e);
+ }
+ }
+ }
+
+ // Quartz doesn't seem to be able to schedule past events. This job specifically handles that.
+ private static class CatchupJob implements Runnable {
+
+ private final AlarmRequest request;
+ private final List<Date> instanceTimes;
+
+ public CatchupJob(AlarmRequest request, List<Date> triggerTimes) {
+ this.request = request;
+ this.instanceTimes = triggerTimes;
+ }
+
+ @Override
+ public void run() {
+ if (instanceTimes == null) {
+ return;
+ }
+ // Immediate notification for all past events.
+ for(Date instanceTime : instanceTimes) {
+ DateTime nominalDateTime = new DateTime(instanceTime, DateTimeZone.forTimeZone(request.getTimeZone()));
+ try {
+ notifyHandler(request, nominalDateTime);
+ } catch (NotificationServiceException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Builder that builds {@link AlarmRequest}.
+ */
+ public static class AlarmRequestBuilder extends RequestBuilder<AlarmRequest> {
+
+ private DateTime startTime;
+ private DateTime endTime;
+ private Frequency frequency;
+ private TimeZone timeZone;
+
+ public AlarmRequestBuilder(NotificationHandler handler, ID callbackID) {
+ super(handler, callbackID);
+ }
+
+ /**
+ * @param start of the timer
+ * @return This instance
+ */
+ public AlarmRequestBuilder setStartTime(DateTime start) {
+ this.startTime = start;
+ return this;
+ }
+
+ /**
+ * @param end of the timer
+ * @return This instance
+ */
+ public AlarmRequestBuilder setEndTime(DateTime end) {
+ this.endTime = end;
+ return this;
+ }
+
+ /**
+ * @param freq of the timer
+ * @return This instance
+ */
+ public AlarmRequestBuilder setFrequency(Frequency freq) {
+ this.frequency = freq;
+ return this;
+ }
+
+ /**
+ * @param timeZone
+ */
+ public void setTimeZone(TimeZone timeZone) {
+ this.timeZone = timeZone;
+ }
+
+ @Override
+ public AlarmRequest build() {
+ if (callbackId == null || startTime == null || endTime == null || frequency == null) {
+ throw new IllegalArgumentException("Missing one or more of the mandatory arguments:"
+ + " callbackId, startTime, endTime, frequency");
+ }
+ return new AlarmRequest(handler, callbackId, startTime, endTime, frequency, timeZone);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
new file mode 100644
index 0000000..7ffb351
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.impl;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.exception.NotificationServiceException;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.FalconNotificationService;
+import org.apache.falcon.notification.service.request.DataNotificationRequest;
+import org.apache.falcon.notification.service.request.NotificationRequest;
+import org.apache.falcon.state.ID;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This notification service notifies {@link NotificationHandler} when requested data
+ * becomes available. This class also supports time out, in which case it notifies about the unavailability.
+ * TODO : Complete/Modify this skeletal class
+ */
+public class DataAvailabilityService implements FalconNotificationService {
+
+ @Override
+ public void register(NotificationRequest request) throws NotificationServiceException {
+ // TODO : Implement this
+ }
+
+ @Override
+ public void unregister(NotificationHandler handler, ID listenerID) {
+ // TODO : Implement this
+ }
+
+ @Override
+ public RequestBuilder createRequestBuilder(NotificationHandler handler, ID callbackID) {
+ return new DataRequestBuilder(handler, callbackID);
+ }
+
+ @Override
+ public String getName() {
+ return "DataAvailabilityService";
+ }
+
+ @Override
+ public void init() throws FalconException {
+ // TODO : Implement this
+ }
+
+ @Override
+ public void destroy() throws FalconException {
+
+ }
+
+ /**
+ * Builds {@link DataNotificationRequest}.
+ */
+ public static class DataRequestBuilder extends RequestBuilder<DataNotificationRequest> {
+ private Path dataLocation;
+
+ public DataRequestBuilder(NotificationHandler handler, ID callbackID) {
+ super(handler, callbackID);
+ }
+
+ /**
+ * @param location
+ * @return This instance
+ */
+ public DataRequestBuilder setDataLocation(Path location) {
+ this.dataLocation = location;
+ return this;
+ }
+
+ @Override
+ public DataNotificationRequest build() {
+ if (callbackId == null || dataLocation == null) {
+ throw new IllegalArgumentException("Missing one or more of the mandatory arguments:"
+ + " callbackId, dataLocation");
+ }
+ return new DataNotificationRequest(handler, callbackId, dataLocation);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
new file mode 100644
index 0000000..73a4199
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/JobCompletionService.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.exception.NotificationServiceException;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.FalconNotificationService;
+import org.apache.falcon.notification.service.event.JobCompletedEvent;
+import org.apache.falcon.notification.service.request.JobCompletionNotificationRequest;
+import org.apache.falcon.notification.service.request.NotificationRequest;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowExecutionListener;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.oozie.client.WorkflowJob;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TimeZone;
+
+/**
+ * This notification service notifies {@link NotificationHandler} when an external job
+ * completes.
+ */
+public class JobCompletionService implements FalconNotificationService, WorkflowExecutionListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobCompletionService.class);
+ private static DateTimeZone utc = DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"));
+
+ private List<NotificationHandler> listeners = Collections.synchronizedList(new ArrayList<NotificationHandler>());
+
+ @Override
+ public void register(NotificationRequest notifRequest) throws NotificationServiceException {
+ if (notifRequest == null) {
+ throw new NotificationServiceException("Request object cannot be null");
+ }
+ listeners.add(notifRequest.getHandler());
+ JobCompletionNotificationRequest request = (JobCompletionNotificationRequest) notifRequest;
+ // Check if the job is already complete.
+ // If yes, send a notification synchronously.
+ // If not, we expect that this class will get notified when the job completes
+ // as this class is a listener to WorkflowJobEndNotificationService.
+ if (request.getExternalId() != null && request.getCluster() != null) {
+ try {
+ Properties props = DAGEngineFactory.getDAGEngine(request.getCluster())
+ .getConfiguration(request.getExternalId());
+ WorkflowExecutionContext context = createContext(props);
+ if (context.hasWorkflowFailed()) {
+ onFailure(context);
+ } else if (context.hasWorkflowSucceeded()) {
+ onSuccess(context);
+ }
+ } catch (FalconException e) {
+ throw new NotificationServiceException(e);
+ }
+ }
+ }
+
+ @Override
+ public void unregister(NotificationHandler handler, ID listenerID) {
+ listeners.remove(handler);
+ }
+
+ @Override
+ public RequestBuilder createRequestBuilder(NotificationHandler handler, ID callbackID) {
+ return new JobCompletionRequestBuilder(handler, callbackID);
+ }
+
+ @Override
+ public String getName() {
+ return "JobCompletionService";
+ }
+
+ @Override
+ public void init() throws FalconException {
+ LOG.debug("Registering to job end notification service");
+ Services.get().<WorkflowJobEndNotificationService>getService(
+ WorkflowJobEndNotificationService.SERVICE_NAME).registerListener(this);
+ }
+
+ @Override
+ public void destroy() throws FalconException {
+
+ }
+
+ @Override
+ public void onSuccess(WorkflowExecutionContext context) throws FalconException {
+ onEnd(context, WorkflowJob.Status.SUCCEEDED);
+ }
+
+ @Override
+ public void onFailure(WorkflowExecutionContext context) throws FalconException {
+ onEnd(context, WorkflowJob.Status.FAILED);
+ }
+
+ @Override
+ public void onStart(WorkflowExecutionContext context) throws FalconException {
+ // Do nothing
+ }
+
+ @Override
+ public void onSuspend(WorkflowExecutionContext context) throws FalconException {
+ // Do nothing
+ }
+
+ @Override
+ public void onWait(WorkflowExecutionContext context) throws FalconException {
+ // Do nothing
+ }
+
+ private void onEnd(WorkflowExecutionContext context, WorkflowJob.Status status) throws FalconException {
+ JobCompletedEvent event = new JobCompletedEvent(constructCallbackID(context), status, getEndTime(context));
+ for (NotificationHandler handler : listeners) {
+ LOG.debug("Notifying {} with event {}", handler, event.getTarget());
+ handler.onEvent(event);
+ }
+ }
+
+ private DateTime getEndTime(WorkflowExecutionContext context) throws FalconException {
+ return new DateTime(DAGEngineFactory.getDAGEngine(context.getClusterName())
+ .info(context.getWorkflowId()).getEndTime());
+ }
+
+ // Constructs the callback ID from the details available in the context.
+ private ID constructCallbackID(WorkflowExecutionContext context) throws FalconException {
+ ID id = new ID(EntityType.valueOf(context.getEntityType()), context.getEntityName());
+ id.setCluster(context.getClusterName());
+ id.setInstanceTime(new DateTime(EntityUtil.parseDateUTC(context.getNominalTimeAsISO8601()), utc));
+ return id;
+ }
+
+ private WorkflowExecutionContext createContext(Properties props) {
+ // for backwards compatibility, read all args from properties
+ Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<WorkflowExecutionArgs, String>();
+ for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
+ String optionValue = props.getProperty(arg.getName());
+ if (StringUtils.isNotEmpty(optionValue)) {
+ wfProperties.put(arg, optionValue);
+ }
+ }
+
+ return WorkflowExecutionContext.create(wfProperties);
+ }
+
+ /**
+ * Builds {@link JobCompletionNotificationRequest}.
+ */
+ public static class JobCompletionRequestBuilder extends RequestBuilder<JobCompletionNotificationRequest> {
+ private String cluster;
+ private String externalId;
+
+ public JobCompletionRequestBuilder(NotificationHandler handler, ID callbackID) {
+ super(handler, callbackID);
+ }
+
+ /**
+ * @param clusterName
+ */
+ public JobCompletionRequestBuilder setCluster(String clusterName) {
+ this.cluster = clusterName;
+ return this;
+ }
+
+ /**
+ * @param id - The external job id for which job completion notification is requested.
+ * @return
+ */
+ public JobCompletionRequestBuilder setExternalId(String id) {
+ this.externalId = id;
+ return this;
+ }
+
+ @Override
+ public JobCompletionNotificationRequest build() {
+ return new JobCompletionNotificationRequest(handler, callbackId, cluster, externalId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
new file mode 100644
index 0000000..848f89c
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.impl;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.exception.NotificationServiceException;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.FalconNotificationService;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.JobScheduledEvent;
+import org.apache.falcon.notification.service.request.JobCompletionNotificationRequest;
+import org.apache.falcon.notification.service.request.JobScheduleNotificationRequest;
+import org.apache.falcon.notification.service.request.NotificationRequest;
+import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.apache.falcon.util.ReflectionUtils;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This notification service notifies {@link NotificationHandler} when an execution
+ * instance is scheduled on a DAG Engine.
+ * Current implementation of scheduler handles parallel scheduling of instances,
+ * dependencies (an instance depending on completion of another) and priority.
+ */
+public class SchedulerService implements FalconNotificationService, NotificationHandler,
+ RemovalListener<ID, List<ExecutionInstance>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SchedulerService.class);
+
+ public static final String DEFAULT_NUM_OF_SCHEDULER_THREADS = "5";
+ public static final String NUM_OF_SCHEDULER_THREADS_PROP = "scheduler.threads.count";
+
+ // Once scheduling conditions are met, it goes to run queue to be run on DAGEngine, based on priority.
+ private ThreadPoolExecutor runQueue;
+
+ private static final StateStore STATE_STORE = AbstractStateStore.get();
+
+ private Cache<ID, Object> instancesToIgnore;
+ // TODO : limit the no. of awaiting instances per entity
+ private LoadingCache<ID, List<ExecutionInstance>> awaitedInstances;
+
+ @Override
+ public void register(NotificationRequest notifRequest) throws NotificationServiceException {
+ JobScheduleNotificationRequest request = (JobScheduleNotificationRequest) notifRequest;
+ if (request.getInstance() == null) {
+ throw new NotificationServiceException("Request must contain an instance.");
+ }
+ // When the instance is getting rescheduled for run. As in the case of suspend and resume.
+ Object obj = instancesToIgnore.getIfPresent(request.getInstance().getId());
+ if (obj != null) {
+ instancesToIgnore.invalidate(request.getInstance().getId());
+ }
+ runQueue.execute(new InstanceRunner(request));
+ }
+
+ @Override
+ public void unregister(NotificationHandler handler, ID listenerID) {
+ // If ID is that of an entity, do nothing
+ if (listenerID.getInstanceTime() == null) {
+ return;
+ }
+ // Not efficient to iterate over elements to remove this. Add to ignore list.
+ instancesToIgnore.put(listenerID, new Object());
+
+ }
+
+ @Override
+ public RequestBuilder createRequestBuilder(NotificationHandler handler, ID callbackID) {
+ return new JobScheduleRequestBuilder(handler, callbackID);
+ }
+
+ @Override
+ public String getName() {
+ return "JobSchedulerService";
+ }
+
+ @Override
+ public void init() throws FalconException {
+ int numThreads = Integer.parseInt(RuntimeProperties.get().getProperty(NUM_OF_SCHEDULER_THREADS_PROP,
+ DEFAULT_NUM_OF_SCHEDULER_THREADS));
+
+ // Uses a priority queue to ensure instances with higher priority gets run first.
+ PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new PriorityComparator());
+ runQueue = new ThreadPoolExecutor(1, numThreads, 0L, TimeUnit.MILLISECONDS, pq);
+
+ CacheLoader instanceCacheLoader = new CacheLoader<ID, Collection<ExecutionInstance>>() {
+ @Override
+ public Collection<ExecutionInstance> load(ID id) throws Exception {
+ List<InstanceState.STATE> states = new ArrayList<InstanceState.STATE>();
+ states.add(InstanceState.STATE.READY);
+ List<ExecutionInstance> readyInstances = new ArrayList<>();
+ // TODO : Limit it to no. of instances that can be run in parallel.
+ for (InstanceState state : STATE_STORE.getExecutionInstances(id.getEntityID(), states)) {
+ readyInstances.add(state.getInstance());
+ }
+ return readyInstances;
+ }
+ };
+
+ awaitedInstances = CacheBuilder.newBuilder()
+ .maximumSize(100)
+ .concurrencyLevel(1)
+ .removalListener(this)
+ .build(instanceCacheLoader);
+
+ instancesToIgnore = CacheBuilder.newBuilder()
+ .expireAfterWrite(1, TimeUnit.HOURS)
+ .concurrencyLevel(1)
+ .build();
+ // Interested in all job completion events.
+ JobCompletionNotificationRequest completionRequest = (JobCompletionNotificationRequest)
+ NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_COMPLETION)
+ .createRequestBuilder(this, null).build();
+ NotificationServicesRegistry.register(completionRequest);
+ }
+
+ @Override
+ public void onRemoval(RemovalNotification<ID, List<ExecutionInstance>> removalNotification) {
+ // When instances are removed due to size...
+ // Ensure instances are persisted in state store and add to another list of awaited entities.
+ if (removalNotification.wasEvicted()) {
+ for (ExecutionInstance instance : removalNotification.getValue()) {
+ InstanceState state = new InstanceState(instance);
+ state.setCurrentState(InstanceState.STATE.READY);
+ try {
+ STATE_STORE.updateExecutionInstance(state);
+ } catch (StateStoreException e) {
+ throw new RuntimeException("Unable to persist the ready instance " + instance.getId(), e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onEvent(Event event) throws FalconException {
+ // Interested only in job completion events.
+ if (event.getSource() == NotificationServicesRegistry.SERVICE.JOB_COMPLETION) {
+ try {
+ // Check if the instance is awaited.
+ ID id = event.getTarget();
+ List<ExecutionInstance> instances = awaitedInstances.get(id);
+ // Else, check if the entity is awaited.
+ if (instances == null) {
+ id = id.getEntityID();
+ instances = awaitedInstances.get(id);
+ }
+ if (instances != null && !instances.isEmpty()) {
+ ExecutionInstance instance = instances.get(0);
+ if (instance != null && instance.getAwaitingPredicates() != null) {
+ for (Predicate predicate : instance.getAwaitingPredicates()) {
+ if (predicate.getType() == Predicate.TYPE.JOB_COMPLETION) {
+ // Construct a request object
+ NotificationHandler handler = ReflectionUtils
+ .getInstanceByClassName(predicate.getClauseValue("handler").toString());
+ JobScheduleRequestBuilder requestBuilder = new JobScheduleRequestBuilder(
+ handler, instance.getId());
+ requestBuilder.setInstance(instance);
+ InstanceRunner runner = new InstanceRunner(requestBuilder.build());
+ // Since an instance just finished of the same entity just finished
+ if (id.equals(instance.getId())) {
+ runner.incrementAllowedInstances();
+ }
+ runQueue.execute(runner);
+ instances.remove(instance);
+ }
+ }
+ }
+ }
+ if (instances != null && instances.isEmpty()) {
+ awaitedInstances.invalidate(id);
+ }
+ } catch (Exception e) {
+ throw new FalconException(e);
+ }
+ }
+ }
+
+ @Override
+ public void destroy() throws FalconException {
+ runQueue.shutdownNow();
+ instancesToIgnore.invalidateAll();
+ }
+
+ private void notifyFailureEvent(JobScheduleNotificationRequest request) throws FalconException {
+ JobScheduledEvent event = new JobScheduledEvent(request.getCallbackId(), JobScheduledEvent.STATUS.FAILED);
+ request.getHandler().onEvent(event);
+ }
+
+ private class InstanceRunner implements Runnable {
+ private final ExecutionInstance instance;
+ private final JobScheduleNotificationRequest request;
+ private short priority;
+ private int allowedParallelInstances = 1;
+
+ public InstanceRunner(JobScheduleNotificationRequest request) {
+ this.request = request;
+ this.instance = request.getInstance();
+ this.priority = getPriority(instance.getEntity()).getPriority();
+ allowedParallelInstances = EntityUtil.getParallel(instance.getEntity());
+ }
+
+ public int incrementAllowedInstances() {
+ return ++allowedParallelInstances;
+ }
+
+ private EntityUtil.JOBPRIORITY getPriority(Entity entity) {
+ switch(entity.getEntityType()) {
+ case PROCESS :
+ return EntityUtil.getPriority((Process)entity);
+ default :
+ throw new UnsupportedOperationException("Scheduling of entities other "
+ + "than process is not supported yet.");
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ // If de-registered
+ if (instancesToIgnore.getIfPresent(instance.getId()) != null) {
+ LOG.debug("Instance {} has been deregistered. Ignoring.", instance.getId());
+ instancesToIgnore.invalidate(instance.getId());
+ return;
+ }
+ LOG.debug("Received request to run instance {}", instance.getId());
+ if (checkConditions()) {
+ // If instance not already scheduled.
+ String externalId = instance.getExternalID();
+ if (externalId == null) {
+ externalId = DAGEngineFactory.getDAGEngine(instance.getCluster()).run(instance);
+ LOG.info("Scheduled job {} for instance {}", externalId, instance.getId());
+ }
+ JobScheduledEvent event = new JobScheduledEvent(instance.getId(),
+ JobScheduledEvent.STATUS.SUCCESSFUL);
+ event.setExternalID(externalId);
+ event.setStartTime(new DateTime(DAGEngineFactory.getDAGEngine(instance.getCluster())
+ .info(externalId).getStartTime()));
+ request.getHandler().onEvent(event);
+ }
+ } catch (FalconException e) {
+ LOG.error("Error running the instance : " + instance.getId(), e);
+ try {
+ notifyFailureEvent(request);
+ } catch (FalconException fe) {
+ throw new RuntimeException("Unable to onEvent : " + request.getCallbackId(), fe);
+ }
+ }
+ }
+
+ public short getPriority() {
+ return priority;
+ }
+
+ private boolean checkConditions() throws FalconException {
+ try {
+ // TODO : If and when the no. of scheduling conditions increase, consider chaining condition checks.
+ // Run if all conditions are met.
+ if (instanceCheck() && dependencyCheck()) {
+ return true;
+ } else {
+ ID entityID = instance.getId().getEntityID();
+ // Instance is awaiting scheduling conditions to be met. Add predicate to that effect.
+ instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate(request.getHandler(),
+ entityID));
+ updateAwaitedInstances(entityID);
+ LOG.debug("Schedule conditions not met for instance {}. Awaiting on {}",
+ instance.getId(), entityID);
+ }
+ } catch (Exception e) {
+ LOG.error("Instance run failed with error : ", e);
+ throw new FalconException("Instance run failed", e);
+ }
+ return false;
+ }
+
+ private void updateAwaitedInstances(ID id) throws ExecutionException {
+ synchronized (id) {
+ List<ExecutionInstance> instances = awaitedInstances.get(id);
+ if (instances == null) {
+ // Order is FIFO.
+ instances = new LinkedList<>();
+ awaitedInstances.put(id, instances);
+ }
+ instances.add(instance);
+ }
+ }
+
+ private boolean dependencyCheck() throws FalconException, ExecutionException {
+ if (request.getDependencies() == null || request.getDependencies().isEmpty()) {
+ return true;
+ }
+
+ for (ExecutionInstance execInstance : request.getDependencies()) {
+ // Dependants should wait for this instance to complete. Add predicate to that effect.
+ instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate(
+ request.getHandler(), execInstance.getId()));
+ updateAwaitedInstances(execInstance.getId());
+ }
+ return false;
+ }
+
+ // Ensure no. of instances running in parallel is per entity specification.
+ private boolean instanceCheck() throws StateStoreException {
+ return STATE_STORE.getExecutionInstances(instance.getEntity(), instance.getCluster(),
+ InstanceState.getRunningStates()).size() < allowedParallelInstances;
+ }
+ }
+
+ // A priority based comparator to be used by the {@link java.util.concurrent.PriorityBlockingQueue}
+ private static class PriorityComparator<T extends InstanceRunner> implements Comparator<T>, Serializable {
+ @Override
+ public int compare(T o1, T o2) {
+ return o1.getPriority() - o2.getPriority();
+ }
+ }
+
+ /**
+ * Builds {@link JobScheduleNotificationRequest}.
+ */
+ public static class JobScheduleRequestBuilder extends RequestBuilder<JobScheduleNotificationRequest> {
+ private List<ExecutionInstance> dependencies;
+ private ExecutionInstance instance;
+
+ public JobScheduleRequestBuilder(NotificationHandler handler, ID callbackID) {
+ super(handler, callbackID);
+ }
+
+ /**
+ * @param execInstance that needs to be scheduled
+ * @return
+ */
+ public JobScheduleRequestBuilder setInstance(ExecutionInstance execInstance) {
+ this.instance = execInstance;
+ return this;
+ }
+
+ /**
+ * Dependencies to wait for before scheduling.
+ * @param dependencies
+ */
+ public void setDependencies(List<ExecutionInstance> dependencies) {
+ this.dependencies = dependencies;
+ }
+
+ @Override
+ public JobScheduleNotificationRequest build() {
+ if (callbackId == null || instance == null) {
+ throw new IllegalArgumentException("Missing one or more of the mandatory arguments:"
+ + " callbackId, execInstance");
+ }
+ return new JobScheduleNotificationRequest(handler, callbackId, instance, dependencies);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/AlarmRequest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/AlarmRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/AlarmRequest.java
new file mode 100644
index 0000000..2628dc8
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/AlarmRequest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.request;
+
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.joda.time.DateTime;
+
+import java.util.TimeZone;
+
+/**
+ * Request intended for {@link org.apache.falcon.notification.service.impl.AlarmService}
+ * for time based notifications.
+ * The setter methods of the class support chaining similar to a builder class.
+ * TODO : Might need a separate builder too.
+ */
+public class AlarmRequest extends NotificationRequest {
+
+ private DateTime startTime;
+ private DateTime endTime;
+ private Frequency frequency;
+ private TimeZone timeZone;
+
+ /**
+ * Constructor.
+ * @param notifHandler
+ * @param callbackId
+ */
+ public AlarmRequest(NotificationHandler notifHandler, ID callbackId, DateTime start,
+ DateTime end, Frequency freq, TimeZone tz) {
+ this.handler = notifHandler;
+ this.callbackId = callbackId;
+ this.service = NotificationServicesRegistry.SERVICE.TIME;
+ this.startTime = start;
+ this.endTime = end;
+ this.frequency = freq;
+ this.timeZone = tz;
+ }
+
+ /**
+ * @return frequency of the timer
+ */
+ public Frequency getFrequency() {
+ return frequency;
+ }
+
+ /**
+ * @return start time of the timer
+ */
+ public DateTime getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * @return end time of the timer
+ */
+ public DateTime getEndTime() {
+ return endTime;
+ }
+
+ /**
+ * @return timezone of the request.
+ */
+ public TimeZone getTimeZone() {
+ return timeZone;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
new file mode 100644
index 0000000..8393de0
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.request;
+
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Request intended for {@link import org.apache.falcon.notification.service.impl.DataAvailabilityService}
+ * for data notifications.
+ * The setter methods of the class support chaining similar to a builder class.
+ * TODO : Complete/modify this skeletal class
+ */
+public class DataNotificationRequest extends NotificationRequest {
+ private final Path dataLocation;
+ private String cluster;
+
+ /**
+ * @return data location to be watched.
+ */
+ public Path getDataLocation() {
+ return dataLocation;
+ }
+
+ /**
+ * Given a number of instances, should the service wait for exactly those many,
+ * at least those many or at most those many instances.
+ */
+ public enum INSTANCELIMIT {
+ EXACTLY_N,
+ AT_LEAST_N,
+ AT_MOST_N
+ }
+
+ /**
+ * Constructor.
+ * @param notifHandler
+ * @param callbackId
+ */
+ public DataNotificationRequest(NotificationHandler notifHandler, ID callbackId, Path location) {
+ this.handler = notifHandler;
+ this.callbackId = callbackId;
+ this.dataLocation = location;
+ this.service = NotificationServicesRegistry.SERVICE.DATA;
+ }
+
+ /**
+ * @return cluster name
+ */
+ public String getCluster() {
+ return cluster;
+ }
+
+ /**
+ * @param clusterName
+ * @return This instance
+ */
+ public DataNotificationRequest setCluster(String clusterName) {
+ this.cluster = clusterName;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobCompletionNotificationRequest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobCompletionNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobCompletionNotificationRequest.java
new file mode 100644
index 0000000..1d35476
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobCompletionNotificationRequest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.request;
+
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+
+/**
+ * Request intended for {@link org.apache.falcon.notification.service.impl.JobCompletionService}
+ * for job completion notifications.
+ * The setter methods of the class support chaining similar to a builder class.
+ */
+public class JobCompletionNotificationRequest extends NotificationRequest {
+
+ private String externalId;
+ private String cluster;
+ /**
+ * Constructor.
+ * @param notifHandler
+ * @param callbackId
+ */
+ public JobCompletionNotificationRequest(NotificationHandler notifHandler, ID callbackId, String clstr,
+ String jobId) {
+ this.handler = notifHandler;
+ this.service = NotificationServicesRegistry.SERVICE.JOB_COMPLETION;
+ this.callbackId = callbackId;
+ this.cluster = clstr;
+ this.externalId = jobId;
+ }
+
+ /**
+ * @return - The external job id for which job completion notification is requested.
+ */
+ public String getExternalId() {
+ return externalId;
+ }
+
+ /**
+ * @return cluster name
+ */
+ public String getCluster() {
+ return cluster;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobScheduleNotificationRequest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobScheduleNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobScheduleNotificationRequest.java
new file mode 100644
index 0000000..80133bd
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/JobScheduleNotificationRequest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.request;
+
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+
+import java.util.List;
+
+/**
+ * Request intended for {@link org.apache.falcon.notification.service.impl.SchedulerService}
+ * for job run notifications.
+ * The setter methods of the class support chaining similar to a builder class.
+ */
+public class JobScheduleNotificationRequest extends NotificationRequest {
+ private ExecutionInstance instance;
+ private List<ExecutionInstance> dependencies;
+
+ /**
+ * Constructor.
+ * @param notifHandler
+ * @param id
+ */
+ public JobScheduleNotificationRequest(NotificationHandler notifHandler, ID id, ExecutionInstance inst,
+ List<ExecutionInstance> deps) {
+ this.handler = notifHandler;
+ this.service = NotificationServicesRegistry.SERVICE.JOB_SCHEDULE;
+ this.callbackId = id;
+ this.instance = inst;
+ this.dependencies = deps;
+ }
+
+ /**
+ * @return execution instance that will be scheduled.
+ */
+ public ExecutionInstance getInstance() {
+ return instance;
+ }
+
+ public List<ExecutionInstance> getDependencies() {
+ return dependencies;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/notification/service/request/NotificationRequest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/NotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/NotificationRequest.java
new file mode 100644
index 0000000..c89668d
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/NotificationRequest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service.request;
+
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.state.ID;
+
+/**
+ * An abstract class that all notification requests of services must extend.
+ * TODO : Complete/modify this skeleton class
+ */
+public abstract class NotificationRequest {
+ protected NotificationHandler handler;
+ protected ID callbackId;
+ protected NotificationServicesRegistry.SERVICE service;
+
+ /**
+ * @return - The service that this request is intended for
+ */
+ public NotificationServicesRegistry.SERVICE getService() {
+ return service;
+ }
+
+ /**
+ * @return - The entity that needs to be notified when this request is satisfied.
+ */
+ public ID getCallbackId() {
+ return callbackId;
+ }
+
+ /**
+ * @return - The notification handler.
+ */
+ public NotificationHandler getHandler() {
+ return handler;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
new file mode 100644
index 0000000..fb4ce82
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.predicate;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.notification.service.event.DataEvent;
+import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.TimeElapsedEvent;
+import org.apache.falcon.state.ID;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represents the gating condition for which an instance is waiting before it is scheduled.
+ * This will be serialized and stored in state store.
+ */
+public class Predicate implements Serializable {
+ /**
+ * Type of predicate, currently data and time are supported.
+ */
+ public enum TYPE {
+ DATA,
+ TIME,
+ JOB_COMPLETION
+ }
+
+ private final TYPE type;
+
+ // A key-value pair of clauses that need make this predicate.
+ private Map<String, Comparable> clauses = new HashMap<String, Comparable>();
+
+ // A generic "any" object that can be used when a particular key is allowed to have any value.
+ public static final Comparable<? extends Serializable> ANY = new Any();
+
+ /**
+ * @return type of predicate
+ */
+ public TYPE getType() {
+ return type;
+ }
+
+ /**
+ * @param key
+ * @return the value corresponding to the key
+ */
+ public Comparable getClauseValue(String key) {
+ return clauses.get(key);
+ }
+
+ /**
+ * Compares this predicate with the supplied predicate.
+ *
+ * @param suppliedPredicate
+ * @return true, if the clauses of the predicates match. false, otherwise.
+ */
+ public boolean evaluate(Predicate suppliedPredicate) {
+ if (type != suppliedPredicate.getType()) {
+ return false;
+ }
+ boolean eval = true;
+ // Iterate over each clause and ensure it matches the clauses of this predicate.
+ for (Map.Entry<String, Comparable> entry : suppliedPredicate.getClauses().entrySet()) {
+ eval = eval && matches(entry.getKey(), entry.getValue());
+ if (!eval) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Compares the two values of a key.
+ private boolean matches(String lhs, Comparable<? extends Serializable> rhs) {
+ if (clauses.containsKey(lhs) && clauses.get(lhs) != null
+ && rhs != null) {
+ if (clauses.get(lhs).equals(ANY) || rhs.equals(ANY)) {
+ return true;
+ } else {
+ return clauses.get(lhs).compareTo(rhs) == 0;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @param type of predicate
+ */
+ public Predicate(TYPE type) {
+ this.type = type;
+ }
+
+ /**
+ * @return the name-value pairs that make up the clauses of this predicate.
+ */
+ public Map<String, Comparable> getClauses() {
+ return clauses;
+ }
+
+ /**
+ * @param lhs - The key in the key-value pair of a clause
+ * @param rhs - The value in the key-value pair of a clause
+ * @return This instance
+ */
+ public Predicate addClause(String lhs, Comparable<? extends Serializable> rhs) {
+ clauses.put(lhs, rhs);
+ return this;
+ }
+
+ /**
+ * Creates a Predicate of Type TIME.
+ *
+ * @param start
+ * @param end
+ * @param instanceTime
+ * @return
+ */
+ public static Predicate createTimePredicate(long start, long end, long instanceTime) {
+ return new Predicate(TYPE.TIME)
+ .addClause("start", (start < 0) ? ANY : start)
+ .addClause("end", (end < 0) ? ANY : end)
+ .addClause("instanceTime", (instanceTime < 0) ? ANY : instanceTime);
+ }
+
+ /**
+ * Creates a predicate of type DATA.
+ *
+ * @param location
+ * @return
+ */
+ public static Predicate createDataPredicate(Location location) {
+ return new Predicate(TYPE.DATA)
+ .addClause("path", (location == null) ? ANY : location.getPath())
+ .addClause("type", (location == null) ? ANY : location.getType());
+ }
+
+ /**
+ * Creates a predicate of type JOB_COMPLETION.
+ *
+ * @param handler
+ * @param id
+ * @return
+ */
+ public static Predicate createJobCompletionPredicate(NotificationHandler handler, ID id) {
+ return new Predicate(TYPE.JOB_COMPLETION)
+ .addClause("instanceId", id.toString())
+ .addClause("handler", handler.getClass().getName());
+ }
+
+ /**
+ * Creates a predicate from an event based on the event source and values in the event.
+ *
+ * @param event
+ * @return
+ * @throws FalconException
+ */
+ public static Predicate getPredicate(Event event) throws FalconException {
+ if (event.getSource() == NotificationServicesRegistry.SERVICE.DATA) {
+ DataEvent dataEvent = (DataEvent) event;
+ if (dataEvent.getDataLocation() != null && dataEvent.getDataType() != null) {
+ Location loc = new Location();
+ loc.setPath(dataEvent.getDataLocation().toString());
+ loc.setType(dataEvent.getDataType());
+ return createDataPredicate(loc);
+ } else {
+ throw new FalconException("Event does not have enough data to create a predicate");
+ }
+ } else if (event.getSource() == NotificationServicesRegistry.SERVICE.TIME) {
+ TimeElapsedEvent timeEvent = (TimeElapsedEvent) event;
+ if (timeEvent.getStartTime() != null && timeEvent.getEndTime() != null) {
+ long instanceTime = (timeEvent.getInstanceTime() == null)? -1 : timeEvent.getInstanceTime().getMillis();
+ return Predicate.createTimePredicate(timeEvent.getStartTime().getMillis(),
+ timeEvent.getEndTime().getMillis(), instanceTime);
+ } else {
+ throw new FalconException("Event does not have enough data to create a predicate");
+ }
+
+ } else {
+ throw new FalconException("Unhandled event type " + event.getSource());
+ }
+ }
+
+ /**
+ * An "Any" class that returns '0' when compared to any other object.
+ */
+ private static class Any implements Comparable, Serializable {
+ @Override
+ public int compareTo(Object o) {
+ return 0;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
new file mode 100644
index 0000000..15aea9a
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.InvalidStateTransitionException;
+
+/**
+ * Represents the state of a schedulable entity.
+ * Implements {@link org.apache.falcon.state.StateMachine} for an entity.
+ */
+public class EntityState implements StateMachine<EntityState.STATE, EntityState.EVENT> {
+ private Entity entity;
+ private STATE currentState;
+ private static final STATE INITIAL_STATE = STATE.SUBMITTED;
+
+ /**
+ * Enumerates all the valid states of a schedulable entity and the valid transitions from that state.
+ */
+ public enum STATE implements StateMachine<EntityState.STATE, EntityState.EVENT> {
+ SUBMITTED {
+ @Override
+ public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+ switch (event) {
+ case SCHEDULE:
+ return STATE.SCHEDULED;
+ case SUBMIT:
+ return this;
+ default:
+ throw new InvalidStateTransitionException("Submitted entities can only be scheduled.");
+ }
+ }
+ },
+ SCHEDULED {
+ @Override
+ public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+ switch (event) {
+ case SUSPEND:
+ return STATE.SUSPENDED;
+ case SCHEDULE:
+ return this;
+ default:
+ throw new InvalidStateTransitionException("Scheduled entities can only be suspended.");
+ }
+ }
+ },
+ SUSPENDED {
+ @Override
+ public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+ switch (event) {
+ case RESUME:
+ return STATE.SCHEDULED;
+ case SUSPEND:
+ return this;
+ default:
+ throw new InvalidStateTransitionException("Suspended entities can only be resumed.");
+ }
+ }
+ }
+ }
+
+ /**
+ * Enumerates all the valid events that can cause a state transition.
+ */
+ public enum EVENT {
+ SUBMIT,
+ SCHEDULE,
+ SUSPEND,
+ RESUME
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param e - Entity
+ */
+ public EntityState(Entity e) {
+ this.entity = e;
+ currentState = INITIAL_STATE;
+ }
+
+ /**
+ * @return - The entity
+ */
+ public Entity getEntity() {
+ return entity;
+ }
+
+ /**
+ * @param e - entity
+ * @return - This instance
+ */
+ public EntityState setEntity(Entity e) {
+ this.entity = e;
+ return this;
+ }
+
+ /**
+ * @return - Current state of the entity.
+ */
+ public STATE getCurrentState() {
+ return currentState;
+ }
+
+ /**
+ * @param state
+ * @return - This instance
+ */
+ public EntityState setCurrentState(STATE state) {
+ this.currentState = state;
+ return this;
+ }
+
+ @Override
+ public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
+ return currentState.nextTransition(event);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4175c54a/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java b/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
new file mode 100644
index 0000000..44ec3fc
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityStateChangeHandler.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Entity;
+
+/**
+ * Any handler interested in handling state changes of entities must implement this interface.
+ */
+public interface EntityStateChangeHandler {
+
+ /**
+ * Invoked when an entity is submitted.
+ *
+ * @param entity
+ * @throws FalconException
+ */
+ void onSubmit(Entity entity) throws FalconException;
+
+ /**
+ * Invoked when an entity is scheduled.
+ *
+ * @param entity
+ * @throws FalconException
+ */
+ void onSchedule(Entity entity) throws FalconException;
+
+ /**
+ * Invoked when an entity is suspended.
+ *
+ * @param entity
+ * @throws FalconException
+ */
+ void onSuspend(Entity entity) throws FalconException;
+
+ /**
+ * Invoked when the an intity is resumed.
+ *
+ * @param entity
+ * @throws FalconException
+ */
+ void onResume(Entity entity) throws FalconException;
+}
[6/6] falcon git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/falcon
Posted by pa...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/falcon
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/a0911bd8
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/a0911bd8
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/a0911bd8
Branch: refs/heads/master
Commit: a0911bd8298049f372a845c07049e9f1c4b165e2
Parents: 4175c54 9e6d5a6
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Tue Oct 20 17:39:18 2015 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Tue Oct 20 17:39:18 2015 +0530
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 3 +
.../regression/Entities/ProcessMerlin.java | 9 +
.../helpers/entity/AbstractEntityHelper.java | 10 +
.../regression/core/util/EntityLineageUtil.java | 63 ++++
.../regression/core/util/InstanceUtil.java | 40 ++-
.../regression/core/util/KerberosHelper.java | 9 +
.../falcon/regression/core/util/OozieUtil.java | 52 +++
.../falcon/regression/core/util/Util.java | 22 ++
.../ui/search/AbstractSearchPage.java | 69 +++-
.../regression/ui/search/ClusterWizardPage.java | 40 +--
.../regression/ui/search/EntityWizardPage.java | 94 ++++++
.../regression/ui/search/FeedWizardPage.java | 27 +-
.../falcon/regression/ui/search/LoginPage.java | 1 +
.../regression/ui/search/MirrorWizardPage.java | 13 +-
.../falcon/regression/ui/search/PageHeader.java | 30 +-
.../regression/ui/search/ProcessWizardPage.java | 37 +-
.../falcon/regression/ExternalFSTest.java | 2 +-
.../falcon/regression/FeedReplicationTest.java | 109 +++++-
.../regression/ProcessInstanceStatusTest.java | 39 +++
.../falcon/regression/ProcessUpdateTest.java | 112 +++++++
.../falcon/regression/TestngListener.java | 2 +-
.../falcon/regression/hive/dr/HiveDRTest.java | 118 ++++---
.../falcon/regression/hive/dr/HiveDbDRTest.java | 61 ++--
.../regression/hive/dr/RecipeExecLocation.java | 63 ++++
.../lineage/ListProcessInstancesTest.java | 27 +-
.../regression/searchUI/ClusterSetupTest.java | 48 ++-
.../regression/searchUI/FeedSetupTest.java | 61 ++--
.../regression/searchUI/HomePageTest.java | 4 +-
.../searchUI/MirrorSourceTargetOptionsTest.java | 2 +
.../falcon/regression/searchUI/MirrorTest.java | 76 ++++-
.../regression/searchUI/ProcessSetupTest.java | 65 ++--
.../regression/security/FalconClientTest.java | 7 +-
.../triage/PipelineInstanceDependencyTest.java | 335 +++++++++++++++++++
33 files changed, 1365 insertions(+), 285 deletions(-)
----------------------------------------------------------------------