You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/06/04 08:14:04 UTC

[2/2] falcon git commit: FALCON-1039 Add instance dependency API in falcon. Contributed by Ajay Yadava

FALCON-1039 Add instance dependency API in falcon. Contributed by Ajay Yadava


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9fd86b78
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9fd86b78
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9fd86b78

Branch: refs/heads/master
Commit: 9fd86b786195ac03fc25d31c6f35062c4014f10a
Parents: 42f175a
Author: Ajay Yadava <aj...@gmail.com>
Authored: Thu Jun 4 11:42:59 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Thu Jun 4 11:43:32 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../java/org/apache/falcon/ResponseHelper.java  |  14 +
 .../java/org/apache/falcon/cli/FalconCLI.java   |  21 +-
 .../org/apache/falcon/client/FalconClient.java  |  18 +
 .../org/apache/falcon/resource/EntityList.java  |   6 +-
 .../resource/InstanceDependencyResult.java      |  86 +++++
 .../resource/SchedulableEntityInstance.java     | 155 ++++++++
 .../org/apache/falcon/entity/EntityUtil.java    | 142 ++++++-
 .../org/apache/falcon/entity/FeedHelper.java    | 287 +++++++++++++-
 .../org/apache/falcon/entity/ProcessHelper.java | 108 ++++++
 .../apache/falcon/entity/FeedHelperTest.java    | 370 ++++++++++++++++++-
 .../apache/falcon/entity/ProcessHelperTest.java | 207 +++++++++++
 docs/src/site/twiki/FalconCLI.twiki             |  35 ++
 .../site/twiki/restapi/InstanceDependency.twiki |  49 +++
 docs/src/site/twiki/restapi/ResourceList.twiki  |   1 +
 .../falcon/resource/AbstractEntityManager.java  |   4 +-
 .../resource/AbstractInstanceManager.java       |  79 +++-
 .../resource/proxy/InstanceManagerProxy.java    |  37 +-
 .../apache/falcon/resource/InstanceManager.java |  20 +-
 .../resource/SchedulableEntityManager.java      |   9 +-
 .../java/org/apache/falcon/cli/FalconCLIIT.java |   4 +
 21 files changed, 1624 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9c84f85..7ce2ba3 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1039 Add instance dependency API in falcon(Ajay Yadava)
 
   IMPROVEMENTS
     FALCON-1060 Handle transaction failures in Lineage(Pavan Kumar Kolamuri via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/ResponseHelper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/ResponseHelper.java b/client/src/main/java/org/apache/falcon/ResponseHelper.java
index 2261ceb..78598ba 100644
--- a/client/src/main/java/org/apache/falcon/ResponseHelper.java
+++ b/client/src/main/java/org/apache/falcon/ResponseHelper.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.resource.FeedInstanceResult;
 import org.apache.falcon.resource.FeedLookupResult;
+import org.apache.falcon.resource.InstanceDependencyResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
 import org.apache.falcon.resource.EntitySummaryResult;
@@ -277,4 +278,17 @@ public final class ResponseHelper {
         sb.append("\nRequest Id: ").append(feedLookupResult.getRequestId());
         return sb.toString();
     }
+
+    public static String getString(InstanceDependencyResult dependencyResult) {
+        StringBuilder sb = new StringBuilder();
+        String results = dependencyResult.toString();
+        if (StringUtils.isEmpty(results)) {
+            sb.append("No dependencies found!");
+        } else {
+            sb.append(results);
+        }
+        sb.append("\n\nResponse: ").append(dependencyResult.getMessage());
+        sb.append("\nRequest Id: ").append(dependencyResult.getRequestId());
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index a5e3728..f169917 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -18,7 +18,6 @@
 
 package org.apache.falcon.cli;
 
-import org.apache.falcon.ResponseHelper;
 import com.sun.jersey.api.client.ClientHandlerException;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
@@ -28,12 +27,14 @@ import org.apache.commons.cli.ParseException;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.LifeCycle;
+import org.apache.falcon.ResponseHelper;
 import org.apache.falcon.client.FalconCLIException;
 import org.apache.falcon.client.FalconClient;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.resource.FeedLookupResult;
+import org.apache.falcon.resource.InstanceDependencyResult;
 import org.apache.falcon.resource.InstancesResult;
 
 import java.io.IOException;
@@ -102,6 +103,7 @@ public class FalconCLI {
     public static final String FORCE_RERUN_FLAG = "force";
 
     public static final String INSTANCE_CMD = "instance";
+    public static final String INSTANCE_TIME_OPT = "instanceTime";
     public static final String START_OPT = "start";
     public static final String END_OPT = "end";
     public static final String RUNNING_OPT = "running";
@@ -229,6 +231,7 @@ public class FalconCLI {
         String result;
         String type = commandLine.getOptionValue(ENTITY_TYPE_OPT);
         String entity = commandLine.getOptionValue(ENTITY_NAME_OPT);
+        String instanceTime = commandLine.getOptionValue(INSTANCE_TIME_OPT);
         String start = commandLine.getOptionValue(START_OPT);
         String end = commandLine.getOptionValue(END_OPT);
         String filePath = commandLine.getOptionValue(FILE_PATH_OPT);
@@ -250,7 +253,12 @@ public class FalconCLI {
         validateInstanceCommands(optionsList, entity, type, colo);
 
 
-        if (optionsList.contains(RUNNING_OPT)) {
+        if (optionsList.contains(DEPENDENCY_OPT)) {
+            validateNotEmpty(instanceTime, INSTANCE_TIME_OPT);
+            InstanceDependencyResult response = client.getInstanceDependencies(type, entity, instanceTime, colo);
+            result = ResponseHelper.getString(response);
+
+        } else if (optionsList.contains(RUNNING_OPT)) {
             validateOrderBy(orderBy, instanceAction);
             validateFilterBy(filterBy, instanceAction);
             result =
@@ -785,6 +793,11 @@ public class FalconCLI {
                 false,
                 "Displays feed listing and their status between a start and end time range.");
 
+        Option dependency = new Option(
+                DEPENDENCY_OPT,
+                false,
+                "Displays dependent instances for a specified instance.");
+
         OptionGroup group = new OptionGroup();
         group.addOption(running);
         group.addOption(list);
@@ -798,6 +811,7 @@ public class FalconCLI {
         group.addOption(logs);
         group.addOption(params);
         group.addOption(listing);
+        group.addOption(dependency);
 
         Option url = new Option(URL_OPTION, true, "Falcon URL");
         Option start = new Option(START_OPT, true,
@@ -843,6 +857,8 @@ public class FalconCLI {
         Option forceRerun = new Option(FORCE_RERUN_FLAG, false,
                 "Flag to forcefully rerun entire workflow of an instance");
 
+        Option instanceTime = new Option(INSTANCE_TIME_OPT, true, "Time for an instance");
+
         instanceOptions.addOption(url);
         instanceOptions.addOptionGroup(group);
         instanceOptions.addOption(start);
@@ -861,6 +877,7 @@ public class FalconCLI {
         instanceOptions.addOption(sortOrder);
         instanceOptions.addOption(numResults);
         instanceOptions.addOption(forceRerun);
+        instanceOptions.addOption(instanceTime);
 
         return instanceOptions;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 786e0a0..20c32e4 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -37,6 +37,7 @@ import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.resource.EntitySummaryResult;
 import org.apache.falcon.resource.FeedInstanceResult;
 import org.apache.falcon.resource.FeedLookupResult;
+import org.apache.falcon.resource.InstanceDependencyResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
 import org.apache.falcon.resource.LineageGraphResult;
@@ -242,6 +243,7 @@ public class FalconClient {
         LOG("api/instance/logs/", HttpMethod.GET, MediaType.APPLICATION_JSON),
         SUMMARY("api/instance/summary/", HttpMethod.GET, MediaType.APPLICATION_JSON),
         PARAMS("api/instance/params/", HttpMethod.GET, MediaType.APPLICATION_JSON),
+        DEPENDENCY("api/instance/dependencies/", HttpMethod.GET, MediaType.APPLICATION_JSON),
         LISTING("api/instance/listing/", HttpMethod.GET, MediaType.APPLICATION_JSON);
 
         private String path;
@@ -805,6 +807,22 @@ public class FalconClient {
         return clientResponse;
     }
 
+    public InstanceDependencyResult getInstanceDependencies(String entityType, String entityName, String instanceTime,
+                                                            String colo) throws FalconCLIException {
+        checkType(entityType);
+        Instances api = Instances.DEPENDENCY;
+
+        WebResource resource = service.path(api.path).path(entityType).path(entityName);
+        resource = resource.queryParam("instanceTime", instanceTime);
+        resource = resource.queryParam("colo", colo);
+        ClientResponse clientResponse = resource
+                    .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+                    .accept(api.mimeType)
+                    .method(api.method, ClientResponse.class);
+        checkIfSuccessful(clientResponse);
+        return clientResponse.getEntity(InstanceDependencyResult.class);
+    }
+
     //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
 
     private void checkLifeCycleOption(List<LifeCycle> lifeCycles, String type) throws FalconCLIException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/resource/EntityList.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/EntityList.java b/client/src/main/java/org/apache/falcon/resource/EntityList.java
index ee33234..6e132f0 100644
--- a/client/src/main/java/org/apache/falcon/resource/EntityList.java
+++ b/client/src/main/java/org/apache/falcon/resource/EntityList.java
@@ -36,6 +36,8 @@ import java.util.List;
 @XmlAccessorType(XmlAccessType.FIELD)
 @edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
 public class EntityList {
+    public static final String INPUT_TAG = "Input";
+    public static final String OUTPUT_TAG = "Output";
 
     @XmlElement
     private int totalResults;
@@ -184,14 +186,14 @@ public class EntityList {
             if (process.getInputs() != null) {
                 for (Input i : process.getInputs().getInputs()) {
                     if (i.getFeed().equals(entityNameToMatch)) {
-                        tagList.add("Input");
+                        tagList.add(INPUT_TAG);
                     }
                 }
             }
             if (process.getOutputs() != null) {
                 for (Output o : process.getOutputs().getOutputs()) {
                     if (o.getFeed().equals(entityNameToMatch)) {
-                        tagList.add("Output");
+                        tagList.add(OUTPUT_TAG);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/resource/InstanceDependencyResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/InstanceDependencyResult.java b/client/src/main/java/org/apache/falcon/resource/InstanceDependencyResult.java
new file mode 100644
index 0000000..0751f12
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/resource/InstanceDependencyResult.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * Entity list used for marshalling / unmarshalling with REST calls.
+ */
+@XmlRootElement(name = "dependents")
+@XmlAccessorType(XmlAccessType.FIELD)
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
+public class InstanceDependencyResult extends APIResult {
+
+    @XmlElement(name = "dependencies")
+    private SchedulableEntityInstance[] dependencies;
+
+    //For JAXB
+    private InstanceDependencyResult() {
+        super();
+    }
+
+    public InstanceDependencyResult(Status status, String message) {
+        super(status, message);
+    }
+
+    public SchedulableEntityInstance[] getDependencies() {
+        return dependencies;
+    }
+
+    public void setDependencies(SchedulableEntityInstance[] dependencies) {
+        this.dependencies = dependencies;
+    }
+
+
+    @Override
+    public Object[] getCollection() {
+        return getDependencies();
+    }
+
+    @Override
+    public void setCollection(Object[] items) {
+        if (items == null) {
+            setDependencies(new SchedulableEntityInstance[0]);
+        } else {
+            SchedulableEntityInstance[] newInstances = new SchedulableEntityInstance[items.length];
+            for (int index = 0; index < items.length; index++) {
+                newInstances[index] = (SchedulableEntityInstance)items[index];
+            }
+            setDependencies(newInstances);
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder buffer = new StringBuilder();
+        if (dependencies != null) {
+            for (SchedulableEntityInstance element : dependencies) {
+                buffer.append(element.toString());
+                buffer.append("\n");
+            }
+        }
+        return buffer.toString();
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java
new file mode 100644
index 0000000..2a7ecdb
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
+
+import java.util.Date;
+
+/**
+ * Instance of a Schedulable Entity (Feed/Process).
+ */
+public class SchedulableEntityInstance {
+
+    public static final String INPUT = "Input";
+    public static final String OUTPUT = "Output";
+
+    private String entityName;
+
+    private String cluster;
+
+    private Date instanceTime;
+
+    private EntityType entityType;
+
+    private String tag;
+
+    //for JAXB
+    private SchedulableEntityInstance() {
+
+    }
+
+    public SchedulableEntityInstance(String entityName, String cluster, Date instanceTime, EntityType type) {
+        this.entityName = entityName;
+        this.cluster = cluster;
+        this.entityType = type;
+        if (instanceTime != null) {
+            this.instanceTime = new Date(instanceTime.getTime());
+        }
+    }
+
+    public String getTag() {
+        return tag;
+    }
+
+    public void setTag(String tag) {
+        this.tag = tag;
+    }
+
+    public String getEntityName() {
+        return entityName;
+    }
+
+    public void setEntityName(String entityName) {
+        this.entityName = entityName;
+    }
+
+    public String getCluster() {
+        return cluster;
+    }
+
+    public void setCluster(String cluster) {
+        this.cluster = cluster;
+    }
+
+    public EntityType getEntityType() {
+        return entityType;
+    }
+
+    public void setEntityType(EntityType entityType) {
+        this.entityType = entityType;
+    }
+
+    public Date getInstanceTime() {
+        return new Date(instanceTime.getTime());
+    }
+
+    public void setInstanceTime(Date instanceTime) {
+        this.instanceTime = new Date(instanceTime.getTime());
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("name: " + entityName
+                + ", type: " + entityType
+                + ", cluster: " + cluster
+                + ", instanceTime: " + SchemaHelper.formatDateUTC(instanceTime));
+        sb.append(", tag: " + ((tag != null) ? tag : ""));
+        return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        SchedulableEntityInstance that = (SchedulableEntityInstance) o;
+
+        if (instanceTime == null ? that.instanceTime != null : !instanceTime.equals(that.instanceTime)) {
+            return false;
+        }
+
+        if (!entityType.equals(that.entityType)) {
+            return false;
+        }
+
+        if (!StringUtils.equals(entityName, that.entityName)) {
+            return false;
+        }
+
+        if (!StringUtils.equals(cluster, that.cluster)) {
+            return false;
+        }
+
+        if (!StringUtils.equals(tag, that.tag)) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = instanceTime.hashCode();
+        result = 31 * result + entityName.hashCode();
+        result = 31 * result + entityType.hashCode();
+        result = 31 * result + cluster.hashCode();
+        if (tag != null) {
+            result = 31 * result + tag.hashCode();
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 26d3da2..7ebf39e 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -28,6 +28,7 @@ import org.apache.falcon.Tag;
 import org.apache.falcon.entity.WorkflowNameBuilder.WorkflowName;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityGraph;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
@@ -35,9 +36,13 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.*;
+import org.apache.falcon.entity.v0.process.LateInput;
+import org.apache.falcon.entity.v0.process.LateProcess;
+import org.apache.falcon.entity.v0.process.PolicyType;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Retry;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.hadoop.fs.FileStatus;
@@ -53,7 +58,19 @@ import java.lang.reflect.Method;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
 
 /**
  * Helper to get entity object.
@@ -65,6 +82,7 @@ public final class EntityUtil {
     private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS;
     private static final long DAY_IN_MS = 24 * HOUR_IN_MS;
     private static final long MONTH_IN_MS = 31 * DAY_IN_MS;
+    private static final long ONE_MS = 1;
 
     public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
 
@@ -244,8 +262,8 @@ public final class EntityUtil {
         return feed.getTimezone();
     }
 
-    public static Date getNextStartTime(Date startTime, Frequency frequency, TimeZone timezone, Date now) {
-        if (startTime.after(now)) {
+    public static Date getNextStartTime(Date startTime, Frequency frequency, TimeZone timezone, Date referenceTime) {
+        if (startTime.after(referenceTime)) {
             return startTime;
         }
 
@@ -255,16 +273,16 @@ public final class EntityUtil {
         int count = 0;
         switch (frequency.getTimeUnit()) {
         case months:
-            count = (int) ((now.getTime() - startTime.getTime()) / MONTH_IN_MS);
+            count = (int) ((referenceTime.getTime() - startTime.getTime()) / MONTH_IN_MS);
             break;
         case days:
-            count = (int) ((now.getTime() - startTime.getTime()) / DAY_IN_MS);
+            count = (int) ((referenceTime.getTime() - startTime.getTime()) / DAY_IN_MS);
             break;
         case hours:
-            count = (int) ((now.getTime() - startTime.getTime()) / HOUR_IN_MS);
+            count = (int) ((referenceTime.getTime() - startTime.getTime()) / HOUR_IN_MS);
             break;
         case minutes:
-            count = (int) ((now.getTime() - startTime.getTime()) / MINUTE_IN_MS);
+            count = (int) ((referenceTime.getTime() - startTime.getTime()) / MINUTE_IN_MS);
             break;
         default:
         }
@@ -273,7 +291,7 @@ public final class EntityUtil {
         if (count > 2) {
             startCal.add(frequency.getTimeUnit().getCalendarUnit(), ((count - 2) / freq) * freq);
         }
-        while (startCal.getTime().before(now)) {
+        while (startCal.getTime().before(referenceTime)) {
             startCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
         }
         return startCal.getTime();
@@ -747,6 +765,12 @@ public final class EntityUtil {
         return pipelines;
     }
 
+    public static EntityList getEntityDependencies(Entity entity) throws FalconException {
+        Set<Entity> dependents = EntityGraph.get().getDependents(entity);
+        Entity[] dependentEntities = dependents.toArray(new Entity[dependents.size()]);
+        return new EntityList(dependentEntities, entity);
+    }
+
     public static Pair<Date, Date> getEntityStartEndDates(Entity entityObject) {
         Set<String> clusters = EntityUtil.getClustersDefined(entityObject);
         Pair<Date, String> clusterMinStartDate = null;
@@ -761,4 +785,104 @@ public final class EntityUtil {
         }
         return new Pair<Date, Date>(clusterMinStartDate.first, clusterMaxEndDate.first);
     }
+
+    /**
+     * Returns the previous instance(before or on) for a given referenceTime
+     *
+     * Example: For a feed in "UTC" with startDate "2014-01-01 00:00" and frequency of  "days(1)" a referenceTime
+     * of "2015-01-01 00:00" will return "2015-01-01 00:00".
+     *
+     * Similarly for the above feed if we give a reference Time of "2015-01-01 04:00" will also result in
+     * "2015-01-01 00:00"
+     *
+     * @param startTime start time of the entity
+     * @param frequency frequency of the entity
+     * @param tz timezone of the entity
+     * @param referenceTime time before which the instanceTime is desired
+     * @return  instance(before or on) the referenceTime
+     */
+    public static Date getPreviousInstanceTime(Date startTime, Frequency frequency, TimeZone tz, Date referenceTime) {
+        if (tz == null) {
+            tz = TimeZone.getTimeZone("UTC");
+        }
+        Calendar insCal = Calendar.getInstance(tz);
+        insCal.setTime(startTime);
+
+        int instanceCount = getInstanceSequence(startTime, frequency, tz, referenceTime);
+        final int freq = frequency.getFrequencyAsInt() * instanceCount;
+        insCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
+
+        while (insCal.getTime().after(referenceTime)) {
+            insCal.add(frequency.getTimeUnit().getCalendarUnit(), -1);
+        }
+        return insCal.getTime();
+    }
+
+    /**
+     * Find the times at which the given entity will run in a given time range.
+     * <p/>
+     * Both start and end Date are inclusive.
+     *
+     * @param entity      feed or process entity whose instance times are to be found
+     * @param clusterName name of the cluster
+     * @param startRange  start time for the input range
+     * @param endRange    end time for the input range
+     * @return List of instance times at which the entity will run in the given time range
+     */
+    public static List<Date> getEntityInstanceTimes(Entity entity, String clusterName, Date startRange, Date endRange) {
+        Date start = null;
+        switch (entity.getEntityType()) {
+
+        case FEED:
+            Feed feed = (Feed) entity;
+            start = FeedHelper.getCluster(feed, clusterName).getValidity().getStart();
+            return getInstanceTimes(start, feed.getFrequency(), feed.getTimezone(),
+                    startRange, endRange);
+
+        case PROCESS:
+            Process process = (Process) entity;
+            start = ProcessHelper.getCluster(process, clusterName).getValidity().getStart();
+            return getInstanceTimes(start, process.getFrequency(),
+                    process.getTimezone(), startRange, endRange);
+
+        default:
+            throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
+        }
+    }
+
+
+    /**
+     * Find instance times given first instance start time and frequency till a given end time.
+     *
+     * It finds the first valid instance time in the given time range, it then uses frequency to find next instances
+     * in the given time range.
+     *
+     * @param startTime startTime of the entity (time of first instance ever of the given entity)
+     * @param frequency frequency of the entity
+     * @param timeZone  timeZone of the entity
+     * @param startRange start time for the input range of interest.
+     * @param endRange end time for the input range of interest.
+     * @return list of instance run times of the given entity in the given time range.
+     */
+    public static List<Date> getInstanceTimes(Date startTime, Frequency frequency, TimeZone timeZone,
+                                              Date startRange, Date endRange) {
+        List<Date> result = new LinkedList<>();
+        if (timeZone == null) {
+            timeZone = TimeZone.getTimeZone("UTC");
+        }
+
+        while(true){
+            Date nextStartTime = getNextStartTime(startTime, frequency, timeZone, startRange);
+            if (nextStartTime.before(startRange) || nextStartTime.after(endRange)){
+                break;
+            }
+
+            result.add(nextStartTime);
+            // this is required because getNextStartTime returns greater than or equal to referenceTime
+            startRange = new Date(nextStartTime.getTime() + ONE_MS); // 1 milli seconds later
+        }
+        return result;
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index acb8598..9f4eb61 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.entity;
 
 import org.apache.commons.lang3.StringUtils;
+
 import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
@@ -33,16 +34,33 @@ import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
 import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.resource.FeedInstanceResult;
+import org.apache.falcon.resource.SchedulableEntityInstance;
 import org.apache.falcon.util.BuildProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TimeZone;
 import java.util.regex.Matcher;
 
 /**
@@ -50,6 +68,9 @@ import java.util.regex.Matcher;
  */
 public final class FeedHelper {
 
+    private static final Logger LOG = LoggerFactory.getLogger(FeedHelper.class);
+    private static final int ONE_MS = 1;
+
     public static final String FORMAT = "yyyyMMddHHmm";
 
     private FeedHelper() {}
@@ -273,7 +294,7 @@ public final class FeedHelper {
 
     public static Properties getClusterProperties(org.apache.falcon.entity.v0.cluster.Cluster cluster) {
         Properties properties = new Properties();
-        Map<String, String> clusterVars = new HashMap<String, String>();
+        Map<String, String> clusterVars = new HashMap<>();
         clusterVars.put("colo", cluster.getColo());
         clusterVars.put("name", cluster.getName());
         if (cluster.getProperties() != null) {
@@ -354,7 +375,7 @@ public final class FeedHelper {
      *  Extracts date from the actual data path e.g., /path/2014/05/06 maps to 2014-05-06T00:00Z.
      * @param instancePath - actual data path
      * @param templatePath - template path from feed definition
-     * @param timeZone
+     * @param timeZone timeZone
      * @return date corresponding to the path
      */
     //consider just the first occurrence of the pattern
@@ -364,7 +385,7 @@ public final class FeedHelper {
         Calendar cal = Calendar.getInstance(timeZone);
         int lastEnd = 0;
 
-        Set<FeedDataPath.VARS> matchedVars = new HashSet<FeedDataPath.VARS>();
+        Set<FeedDataPath.VARS> matchedVars = new HashSet<>();
         while (matcher.find()) {
             FeedDataPath.VARS pathVar = FeedDataPath.VARS.from(matcher.group());
             String pad = templatePath.substring(lastEnd, matcher.start());
@@ -415,6 +436,264 @@ public final class FeedHelper {
 
     }
 
+    private static void validateFeedInstance(Feed feed, Date instanceTime,
+                                             org.apache.falcon.entity.v0.cluster.Cluster cluster) {
+
+        // validate the cluster
+        Cluster feedCluster = getCluster(feed, cluster.getName());
+        if (feedCluster == null) {
+            throw new IllegalArgumentException("Cluster :" + cluster.getName() + " is not a valid cluster for feed:"
+                    + feed.getName());
+        }
+
+        // validate that instanceTime is in validity range
+        if (feedCluster.getValidity().getStart().after(instanceTime)
+                || feedCluster.getValidity().getEnd().before(instanceTime)) {
+            throw new IllegalArgumentException("instanceTime: " + instanceTime + " is not in validity range for"
+                    + " Feed: " + feed.getName() + " on cluster:" + cluster.getName());
+        }
+
+        // validate instanceTime on basis of startTime and frequency
+        Date nextInstance = EntityUtil.getNextStartTime(feedCluster.getValidity().getStart(), feed.getFrequency(),
+                feed.getTimezone(), instanceTime);
+        if (!nextInstance.equals(instanceTime)) {
+            throw new IllegalArgumentException("instanceTime: " + instanceTime + " is not a valid instance for the "
+                    + " feed: " + feed.getName() + " on cluster: " + cluster.getName()
+                    + " on the basis of startDate and frequency");
+        }
+    }
+
+    /**
+    * Given a feed Instance finds the generating process instance.
+    *
+    * [process, cluster, instanceTime]
+    *
+    * If the feed is replicated, then it returns null.
+    *
+    * @param feed output feed
+    * @param feedInstanceTime instance time of the feed
+    * @return returns the instance of the process which produces the given feed
+            */
+    public static SchedulableEntityInstance getProducerInstance(Feed feed, Date feedInstanceTime,
+                                        org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
+
+        //validate the inputs
+        validateFeedInstance(feed, feedInstanceTime, cluster);
+
+        Process process = getProducerProcess(feed);
+        if (process != null) {
+            try {
+                Date processInstanceTime = getProducerInstanceTime(feed, feedInstanceTime, process, cluster);
+                SchedulableEntityInstance producer = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+                        processInstanceTime, EntityType.PROCESS);
+                producer.setTag(SchedulableEntityInstance.OUTPUT);
+                return producer;
+            } catch (FalconException e) {
+                LOG.error("Error in trying to get producer process: {}'s instance time for feed: {}'s instance: } "
+                        + " on cluster:{}", process.getName(), feed.getName(), feedInstanceTime, cluster.getName());
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Given a feed find it's generating process.
+     *
+     * If no generating process is found it returns null.
+     * @param feed output feed
+     * @return Process which produces the given feed.
+     */
+    public static Process getProducerProcess(Feed feed) throws FalconException {
+
+        EntityList dependencies = EntityUtil.getEntityDependencies(feed);
+
+        for (EntityList.EntityElement e : dependencies.getElements()) {
+            if (e.tag.contains(EntityList.OUTPUT_TAG)) {
+                return EntityUtil.getEntity(EntityType.PROCESS, e.name);
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Find the producerInstance which will generate the given feedInstance.
+     *
+     * @param feed output feed
+     * @param feedInstanceTime instance time of the output feed
+     * @param producer producer process
+     * @return time of the producer instance which will produce the given feed instance.
+     */
+    private static Date getProducerInstanceTime(Feed feed, Date feedInstanceTime, Process producer,
+                                       org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
+
+        String clusterName = cluster.getName();
+        Cluster feedCluster = getCluster(feed, clusterName);
+        org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(producer, clusterName);
+        Date producerStartDate = processCluster.getValidity().getStart();
+
+        // read the process definition and find the relative time difference between process and output feed
+        // if output process instance time is now then output FeedInstance time is x
+        String outputInstance = null;
+        for (Output op : producer.getOutputs().getOutputs()) {
+            if (StringUtils.equals(feed.getName(), op.getFeed())) {
+                outputInstance = op.getInstance();
+            }
+        }
+
+        ExpressionHelper.setReferenceDate(producerStartDate);
+        ExpressionHelper evaluator = ExpressionHelper.get();
+        // producerInstance = feedInstanceTime + (difference between producer process and feed)
+        // the feedInstance before or equal to this time is the required one
+        Date relativeFeedInstance = evaluator.evaluate(outputInstance, Date.class);
+        Date feedInstanceActual = EntityUtil.getPreviousInstanceTime(feedCluster.getValidity().getStart(),
+                feed.getFrequency(), feed.getTimezone(), relativeFeedInstance);
+        Long producerInstanceTime = feedInstanceTime.getTime() + (producerStartDate.getTime()
+                - feedInstanceActual.getTime());
+        Date producerInstance = new Date(producerInstanceTime);
+
+        //validate that the producerInstance is in the validity range on the provided cluster
+        if (producerInstance.before(processCluster.getValidity().getStart())
+                || producerInstance.after(processCluster.getValidity().getEnd())) {
+            throw new IllegalArgumentException("Instance time provided: " + feedInstanceTime
+                    + " for feed " + feed.getName()
+                    + " is outside the range of instances produced by the producer process: " + producer.getName()
+                    + " in it's validity range on provided cluster: " + cluster.getName());
+        }
+        return producerInstance;
+    }
+
+
+    public static Set<SchedulableEntityInstance> getConsumerInstances(Feed feed, Date feedInstanceTime,
+                  org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
+
+        Set<SchedulableEntityInstance> result = new HashSet<>();
+        // validate that the feed has this cluster & validate that the instanceTime is a valid instanceTime
+        validateFeedInstance(feed, feedInstanceTime, cluster);
+
+        Set<Process> consumers = getConsumerProcesses(feed);
+        for (Process p : consumers) {
+            Set<Date> consumerInstanceTimes = getConsumerProcessInstanceTimes(feed, feedInstanceTime, p, cluster);
+            for (Date date : consumerInstanceTimes) {
+                SchedulableEntityInstance in = new SchedulableEntityInstance(p.getName(), cluster.getName(), date,
+                        EntityType.PROCESS);
+                in.setTag(SchedulableEntityInstance.INPUT);
+                result.add(in);
+            }
+        }
+        return result;
+    }
+
+
+    /**
+     * Returns the consumer processes for a given feed if any, null otherwise.
+     *
+     * @param feed input feed
+     * @return the set of processes which use the given feed as input, empty set if no consumers.
+     */
+    public static Set<Process> getConsumerProcesses(Feed feed) throws FalconException {
+        Set<Process> result = new HashSet<>();
+        EntityList dependencies = EntityUtil.getEntityDependencies(feed);
+
+        for (EntityList.EntityElement e : dependencies.getElements()) {
+            if (e.tag.contains(EntityList.INPUT_TAG)) {
+                Process consumer = EntityUtil.getEntity(EntityType.PROCESS, e.name);
+                result.add(consumer);
+            }
+        }
+        return result;
+    }
+
+    // return all instances of a process which will consume the given feed instance
+    private static Set<Date> getConsumerProcessInstanceTimes(Feed feed, Date feedInstancetime, Process consumer,
+              org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
+
+        Set<Date> result = new HashSet<>();
+        // find relevant cluster for the process
+        org.apache.falcon.entity.v0.process.Cluster processCluster =
+                ProcessHelper.getCluster(consumer, cluster.getName());
+        if (processCluster == null) {
+            throw new IllegalArgumentException("Cluster is not valid for process");
+        }
+        Date processStartDate = processCluster.getValidity().getStart();
+        Cluster feedCluster = getCluster(feed, cluster.getName());
+        Date feedStartDate = feedCluster.getValidity().getStart();
+
+        // find all corresponding Inputs as a process may refer same feed multiple times
+        List<Input> inputFeeds = new ArrayList<>();
+        if (consumer.getInputs() != null && consumer.getInputs().getInputs() != null) {
+            for (Input input : consumer.getInputs().getInputs()) {
+                if (StringUtils.equals(input.getFeed(), feed.getName())) {
+                    inputFeeds.add(input);
+                }
+            }
+        }
+
+        // for each input corresponding to given feed, find corresponding consumer instances
+        for (Input in : inputFeeds) {
+            /* Algorithm for finding a consumer instance for an input feed instance
+            Step 1. Find one instance which will consume the given feed instance.
+                    a. take process start date and find last input feed instance time. In this step take care of
+                        frequencies being out of sync.
+                    b. using the above find the time difference between the process instance and feed instance.
+                    c. Adding the above time difference to given feed instance for which we want to find the consumer
+                        instances we will get one consumer process instance.
+            Step 2. Keep checking for next instances of process till they consume the given feed Instance.
+            Step 3. Similarly check for all previous instances of process till they consume the given feed instance.
+            */
+
+            // Step 1.a & 1.b
+            ExpressionHelper.setReferenceDate(processStartDate);
+            ExpressionHelper evaluator = ExpressionHelper.get();
+            Date startRelative = evaluator.evaluate(in.getStart(), Date.class);
+            Date startTimeActual = EntityUtil.getNextStartTime(feedStartDate,
+                    feed.getFrequency(), feed.getTimezone(), startRelative);
+            Long offset = processStartDate.getTime() - startTimeActual.getTime();
+
+            // Step 1.c
+            Date processInstanceStartRelative = new Date(feedInstancetime.getTime() + offset);
+            Date processInstanceStartActual = EntityUtil.getPreviousInstanceTime(processStartDate,
+                    consumer.getFrequency(), consumer.getTimezone(), processInstanceStartRelative);
+
+
+            // Step 2.
+            Date currentInstance = processInstanceStartActual;
+            while (true) {
+                Date nextConsumerInstance = EntityUtil.getNextStartTime(processStartDate,
+                        consumer.getFrequency(), consumer.getTimezone(), currentInstance);
+
+                ExpressionHelper.setReferenceDate(nextConsumerInstance);
+                evaluator = ExpressionHelper.get();
+                Long rangeStart = evaluator.evaluate(in.getStart(), Date.class).getTime();
+                Long rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime();
+                if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() < rangeEnd) {
+                    result.add(nextConsumerInstance);
+                } else {
+                    break;
+                }
+                currentInstance = new Date(nextConsumerInstance.getTime() + ONE_MS);
+            }
+
+            // Step 3.
+            currentInstance = processInstanceStartActual;
+            while (true) {
+                Date nextConsumerInstance = EntityUtil.getPreviousInstanceTime(processStartDate,
+                        consumer.getFrequency(), consumer.getTimezone(), currentInstance);
+
+                ExpressionHelper.setReferenceDate(nextConsumerInstance);
+                evaluator = ExpressionHelper.get();
+                Long rangeStart = evaluator.evaluate(in.getStart(), Date.class).getTime();
+                Long rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime();
+                if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() < rangeEnd) {
+                    result.add(nextConsumerInstance);
+                } else {
+                    break;
+                }
+                currentInstance = new Date(nextConsumerInstance.getTime() - ONE_MS);
+            }
+        }
+        return result;
+    }
+
     public static FeedInstanceResult getFeedInstanceListing(Entity entityObject,
                                                             Date start, Date end) throws FalconException {
         Set<String> clusters = EntityUtil.getClustersDefinedInColos(entityObject);

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index 29aefa0..fe78bc8 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -20,12 +20,20 @@ package org.apache.falcon.entity;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.resource.SchedulableEntityInstance;
+
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 /**
  * Helper methods for accessing process members.
@@ -34,6 +42,7 @@ public final class ProcessHelper {
 
     private ProcessHelper() {}
 
+
     public static Cluster getCluster(Process process, String clusterName) {
         for (Cluster cluster : process.getClusters().getClusters()) {
             if (cluster.getName().equals(clusterName)) {
@@ -77,4 +86,103 @@ public final class ProcessHelper {
 
         return storageType;
     }
+
+    private static void validateProcessInstance(Process process, Date instanceTime,
+                                                org.apache.falcon.entity.v0.cluster.Cluster cluster) {
+        //validate the cluster
+        Cluster processCluster = getCluster(process, cluster.getName());
+        if (processCluster == null) {
+            throw new IllegalArgumentException("Cluster provided: " + cluster.getName()
+                    + " is not a valid cluster for the process: " + process.getName());
+        }
+
+        // check if instanceTime is in validity range
+        if (instanceTime.before(processCluster.getValidity().getStart())
+                || instanceTime.after(processCluster.getValidity().getEnd())) {
+            throw new IllegalArgumentException("Instance time provided: " + instanceTime
+                    + " is not in validity range of process: " + process.getName()
+                    + "on cluster: " + cluster.getName());
+        }
+
+        // check instanceTime is valid on the basis of startTime and frequency
+        Date nextInstance = EntityUtil.getNextStartTime(processCluster.getValidity().getStart(),
+                process.getFrequency(), process.getTimezone(), instanceTime);
+        if (!nextInstance.equals(instanceTime)) {
+            throw new IllegalArgumentException("Instance time provided: " + instanceTime
+                    + " for process: " + process.getName() + " is not a valid instance time on cluster: "
+                    + cluster.getName() + " on the basis of startDate and frequency");
+        }
+    }
+
+    /**
+     * Given a process instance, returns the feed instances which are used as input for this process instance.
+     *
+     * @param process            given process
+     * @param instanceTime       nominal time of the process instance
+     * @param cluster            - cluster for the process instance
+     * @param allowOptionalFeeds switch to indicate whether optional feeds should be considered in input feeds.
+     * @return Set of input feed instances which are consumed by the given process instance.
+     * @throws org.apache.falcon.FalconException
+     */
+    public static Set<SchedulableEntityInstance> getInputFeedInstances(Process process, Date instanceTime,
+               org.apache.falcon.entity.v0.cluster.Cluster cluster, boolean allowOptionalFeeds) throws FalconException {
+
+        // validate the inputs
+        validateProcessInstance(process, instanceTime, cluster);
+
+        Set<SchedulableEntityInstance> result = new HashSet<>();
+        if (process.getInputs() != null) {
+            ConfigurationStore store = ConfigurationStore.get();
+            for (Input i : process.getInputs().getInputs()) {
+                if (i.isOptional() && !allowOptionalFeeds) {
+                    continue;
+                }
+                Feed feed = store.get(EntityType.FEED, i.getFeed());
+                // inputStart is process instance time + (now - startTime)
+                ExpressionHelper evaluator = ExpressionHelper.get();
+                ExpressionHelper.setReferenceDate(instanceTime);
+                Date inputInstanceStartDate = evaluator.evaluate(i.getStart(), Date.class);
+                Date inputInstanceEndDate = evaluator.evaluate(i.getEnd(), Date.class);
+                List<Date> instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(),
+                        inputInstanceStartDate, inputInstanceEndDate);
+                SchedulableEntityInstance instance;
+                for (Date time : instanceTimes) {
+                    instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), time, EntityType.FEED);
+                    instance.setTag(SchedulableEntityInstance.INPUT);
+                    result.add(instance);
+                }
+            }
+        }
+        return result;
+    }
+
+    public static Set<SchedulableEntityInstance> getOutputFeedInstances(Process process, Date instanceTime,
+                                        org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
+        Set<SchedulableEntityInstance> result = new HashSet<>();
+
+        // validate the inputs
+        validateProcessInstance(process, instanceTime, cluster);
+
+        if (process.getOutputs() != null && process.getOutputs().getOutputs() != null) {
+
+            ExpressionHelper.setReferenceDate(instanceTime);
+            ExpressionHelper evaluator = ExpressionHelper.get();
+            SchedulableEntityInstance candidate;
+            ConfigurationStore store = ConfigurationStore.get();
+            for (Output output : process.getOutputs().getOutputs()) {
+
+                Date outputInstance = evaluator.evaluate(output.getInstance(), Date.class);
+                // find the feed
+                Feed feed = store.get(EntityType.FEED, output.getFeed());
+                org.apache.falcon.entity.v0.feed.Cluster fCluster = FeedHelper.getCluster(feed, cluster.getName());
+                outputInstance = EntityUtil.getNextStartTime(fCluster.getValidity().getStart(), feed.getFrequency(),
+                        feed.getTimezone(), outputInstance);
+                candidate = new SchedulableEntityInstance(output.getFeed(), cluster.getName(), outputInstance,
+                        EntityType.FEED);
+                candidate.setTag(SchedulableEntityInstance.OUTPUT);
+                result.add(candidate);
+            }
+        }
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
index 266d029..f70edfb 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
@@ -18,24 +18,58 @@
 
 package org.apache.falcon.entity;
 
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Properties;
 import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.entity.v0.feed.*;
+import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.Validity;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Inputs;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Outputs;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.resource.SchedulableEntityInstance;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.TimeZone;
 
 /**
  * Test for feed helper methods.
  */
-public class FeedHelperTest {
-    public static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+public class FeedHelperTest extends AbstractTestBase {
+    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+    private ConfigurationStore store;
+
+    @BeforeClass
+    public void init() throws Exception {
+        initConfigStore();
+    }
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        cleanupStore();
+        store = getStore();
+    }
 
     @Test
     public void testPartitionExpression() {
@@ -107,4 +141,334 @@ public class FeedHelperTest {
                 locations.getLocations());
         Assert.assertEquals(FeedHelper.getLocation(feed, cluster, LocationType.DATA), location2);
     }
+
+    @Test
+    public void testGetProducerProcessWithOffset() throws FalconException, ParseException {
+        //create a feed, submit it, test that ProducerProcess is null
+
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Assert.assertNull(FeedHelper.getProducerProcess(feed));
+
+        // create it's producer process submit it, test it's ProducerProcess
+        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Outputs outputs = new Outputs();
+        Output outFeed = new Output();
+        outFeed.setName("outputFeed");
+        outFeed.setFeed(feed.getName());
+        outFeed.setInstance("today(0,0)");
+        outputs.getOutputs().add(outFeed);
+        process.setOutputs(outputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
+        SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-27 10:00 UTC"),
+                cluster);
+        SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+                getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
+        expected.setTag(SchedulableEntityInstance.OUTPUT);
+        Assert.assertEquals(result, expected);
+    }
+
+    @Test
+    public void testGetProducerProcessForNow() throws FalconException, ParseException {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Assert.assertNull(FeedHelper.getProducerProcess(feed));
+
+        // create it's producer process submit it, test it's ProducerProcess
+        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Outputs outputs = new Outputs();
+        Output outFeed = new Output();
+        outFeed.setName("outputFeed");
+        outFeed.setFeed(feed.getName());
+        outFeed.setInstance("now(0,0)");
+        outputs.getOutputs().add(outFeed);
+        process.setOutputs(outputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
+        SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 10:00 UTC"),
+                cluster);
+        SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+                getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
+        expected.setTag(SchedulableEntityInstance.OUTPUT);
+        Assert.assertEquals(result, expected);
+    }
+
+    @Test
+    public void testGetProducerWithNowNegativeOffset() throws FalconException, ParseException {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Assert.assertNull(FeedHelper.getProducerProcess(feed));
+
+        // create it's producer process submit it, test it's ProducerProcess
+        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Outputs outputs = new Outputs();
+        Output outFeed = new Output();
+        outFeed.setName("outputFeed");
+        outFeed.setFeed(feed.getName());
+        outFeed.setInstance("now(-4,0)");
+        outputs.getOutputs().add(outFeed);
+        process.setOutputs(outputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
+        SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-27 10:00 UTC"),
+                cluster);
+        SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+                getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
+        expected.setTag(SchedulableEntityInstance.OUTPUT);
+        Assert.assertEquals(result, expected);
+    }
+
+
+    @Test
+    public void testGetProducerWithNowPositiveOffset() throws FalconException, ParseException {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Assert.assertNull(FeedHelper.getProducerProcess(feed));
+
+        // create it's producer process submit it, test it's ProducerProcess
+        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Outputs outputs = new Outputs();
+        Output outFeed = new Output();
+        outFeed.setName("outputFeed");
+        outFeed.setFeed(feed.getName());
+        outFeed.setInstance("now(4,0)");
+        outputs.getOutputs().add(outFeed);
+        process.setOutputs(outputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
+        SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 10:00 UTC"),
+                cluster);
+        SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+                getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
+        expected.setTag(SchedulableEntityInstance.OUTPUT);
+        Assert.assertEquals(result, expected);
+    }
+
+    @Test
+    public void testGetProducerProcessInstance() throws FalconException, ParseException {
+        //create a feed, submit it, test that ProducerProcess is null
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 00:00 UTC", "2016-02-28 10:00 UTC");
+
+        // create it's producer process submit it, test it's ProducerProcess
+        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Outputs outputs = new Outputs();
+        Output outFeed = new Output();
+        outFeed.setName("outputFeed");
+        outFeed.setFeed(feed.getName());
+        outFeed.setInstance("today(0,0)");
+        outputs.getOutputs().add(outFeed);
+        process.setOutputs(outputs);
+        store.publish(EntityType.PROCESS, process);
+        Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
+        SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 00:00 UTC"),
+                cluster);
+        SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+                getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
+        expected.setTag(SchedulableEntityInstance.OUTPUT);
+        Assert.assertEquals(result, expected);
+    }
+
+    @Test
+    public void testGetConsumerProcesses() throws FalconException, ParseException {
+        //create a feed, submit it, test that ConsumerProcesses is blank list
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+
+        //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
+        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Inputs inputs = new Inputs();
+        Input inFeed = new Input();
+        inFeed.setName("outputFeed");
+        inFeed.setFeed(feed.getName());
+        inFeed.setStart("today(0,0)");
+        inFeed.setEnd("today(0,0)");
+        inputs.getInputs().add(inFeed);
+        process.setInputs(inputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Set<Process> result = FeedHelper.getConsumerProcesses(feed);
+        Assert.assertEquals(result.size(), 1);
+        Assert.assertTrue(result.contains(process));
+    }
+
+    @Test
+    public void testGetConsumerProcessInstances() throws Exception {
+        //create a feed, submit it, test that ConsumerProcesses is blank list
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
+
+        //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
+        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Inputs inputs = new Inputs();
+        Input inFeed = new Input();
+        inFeed.setName("inputFeed");
+        inFeed.setFeed(feed.getName());
+        inFeed.setStart("now(-4, 30)");
+        inFeed.setEnd("now(4, 30)");
+        inputs.getInputs().add(inFeed);
+        process.setInputs(inputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
+                getDate("2012-02-28 09:00 UTC"), cluster);
+        Assert.assertEquals(result.size(), 1);
+
+        Set<SchedulableEntityInstance> expected = new HashSet<>();
+        SchedulableEntityInstance ins = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+                getDate("2012-02-28 10:00 UTC"), EntityType.PROCESS);
+        ins.setTag(SchedulableEntityInstance.INPUT);
+        expected.add(ins);
+        Assert.assertEquals(result, expected);
+
+    }
+
+    @Test
+    public void testGetMultipleConsumerInstances() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
+
+        //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
+        Process process = prepareProcess(cluster, "hours(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Inputs inputs = new Inputs();
+        Input inFeed = new Input();
+        inFeed.setName("inputFeed");
+        inFeed.setFeed(feed.getName());
+        inFeed.setStart("now(-4, 30)");
+        inFeed.setEnd("now(4, 30)");
+        inputs.getInputs().add(inFeed);
+        process.setInputs(inputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
+                getDate("2012-02-28 09:00 UTC"), cluster);
+        Assert.assertEquals(result.size(), 8);
+
+        Set<SchedulableEntityInstance> expected = new HashSet<>();
+        String[] consumers = { "2012-02-28 05:00 UTC", "2012-02-28 06:00 UTC", "2012-02-28 07:00 UTC",
+            "2012-02-28 08:00 UTC", "2012-02-28 09:00 UTC", "2012-02-28 10:00 UTC", "2012-02-28 11:00 UTC",
+            "2012-02-28 12:00 UTC", };
+        for (String d : consumers) {
+            SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+                    getDate(d), EntityType.PROCESS);
+            i.setTag(SchedulableEntityInstance.INPUT);
+            expected.add(i);
+        }
+        Assert.assertEquals(result, expected);
+    }
+
+    @Test
+    public void testGetConsumerWithNow() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
+
+        //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
+        Process process = prepareProcess(cluster, "hours(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Inputs inputs = new Inputs();
+        Input inFeed = new Input();
+        inFeed.setName("inputFeed");
+        inFeed.setFeed(feed.getName());
+        inFeed.setStart("today(0, 0)");
+        inFeed.setEnd("now(0, 0)");
+        inputs.getInputs().add(inFeed);
+        process.setInputs(inputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
+                getDate("2012-02-28 00:00 UTC"), cluster);
+        Assert.assertEquals(result.size(), 23);
+    }
+
+    @Test
+    public void testGetConsumerWithLatest() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
+
+        //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
+        Process process = prepareProcess(cluster, "hours(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Inputs inputs = new Inputs();
+        Input inFeed = new Input();
+        inFeed.setName("inputFeed");
+        inFeed.setFeed(feed.getName());
+        inFeed.setStart("today(0, 0)");
+        inFeed.setEnd("latest(0)");
+        inputs.getInputs().add(inFeed);
+        process.setInputs(inputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
+                getDate("2012-02-28 00:00 UTC"), cluster);
+        System.out.println("result.size() = " + result.size());
+        Assert.assertEquals(result.size(), 23);
+    }
+
+    private Validity getFeedValidity(String start, String end) throws ParseException {
+        Validity validity = new Validity();
+        validity.setStart(getDate(start));
+        validity.setEnd(getDate(end));
+        return validity;
+    }
+
+    private org.apache.falcon.entity.v0.process.Validity getProcessValidity(String start, String end) throws
+            ParseException {
+
+        org.apache.falcon.entity.v0.process.Validity validity = new org.apache.falcon.entity.v0.process.Validity();
+        validity.setStart(getDate(start));
+        validity.setEnd(getDate(end));
+        return validity;
+    }
+
+    private Date getDate(String dateString) throws ParseException {
+        DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm Z");
+        return format.parse(dateString);
+    }
+
+    private Cluster publishCluster() throws FalconException {
+        Cluster cluster = new Cluster();
+        cluster.setName("feedCluster");
+        cluster.setColo("colo");
+        store.publish(EntityType.CLUSTER, cluster);
+        return cluster;
+
+    }
+
+    private Feed publishFeed(Cluster cluster, String frequency, String start, String end)
+        throws FalconException, ParseException {
+
+        Feed feed = new Feed();
+        feed.setName("feed");
+        Frequency f = new Frequency(frequency);
+        feed.setFrequency(f);
+        feed.setTimezone(UTC);
+        Clusters fClusters = new Clusters();
+        org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster();
+        fCluster.setName(cluster.getName());
+        fCluster.setValidity(getFeedValidity(start, end));
+        fClusters.getClusters().add(fCluster);
+        feed.setClusters(fClusters);
+        store.publish(EntityType.FEED, feed);
+
+        return feed;
+    }
+
+    private Process prepareProcess(Cluster cluster, String frequency, String start, String end) throws ParseException {
+        Process process = new Process();
+        process.setName("process");
+        process.setTimezone(UTC);
+        org.apache.falcon.entity.v0.process.Clusters pClusters = new org.apache.falcon.entity.v0.process.Clusters();
+        org.apache.falcon.entity.v0.process.Cluster pCluster = new org.apache.falcon.entity.v0.process.Cluster();
+        pCluster.setName(cluster.getName());
+        org.apache.falcon.entity.v0.process.Validity validity = getProcessValidity(start, end);
+        pCluster.setValidity(validity);
+        pClusters.getClusters().add(pCluster);
+        process.setClusters(pClusters);
+        Frequency f = new Frequency(frequency);
+        process.setFrequency(f);
+        return process;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java b/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java
new file mode 100644
index 0000000..0d396ae
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Inputs;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Outputs;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.resource.SchedulableEntityInstance;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TimeZone;
+
+
+/**
+ * Tests for ProcessHelper methods.
+ */
+public class ProcessHelperTest extends AbstractTestBase {
+    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+    private ConfigurationStore store;
+
+    @BeforeClass
+    public void init() throws Exception {
+        initConfigStore();
+    }
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        cleanupStore();
+        store = ConfigurationStore.get();
+    }
+
+    @Test
+    public void testGetInputFeedInstances() throws FalconException, ParseException {
+        // create a process with input feeds
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC");
+
+        // find the input Feed instances time
+        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Inputs inputs = new Inputs();
+        Input input = getInput("inputFeed", feed.getName(), "today(0,-30)", "today(2,30)", false);
+        inputs.getInputs().add(input);
+        process.setInputs(inputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Date processInstanceDate = getDate("2012-02-28 10:00 UTC");
+        Set<SchedulableEntityInstance> inputFeedInstances = ProcessHelper.getInputFeedInstances(process,
+                processInstanceDate, cluster, false);
+        Assert.assertEquals(inputFeedInstances.size(), 3);
+
+        Set<SchedulableEntityInstance> expectedInputFeedInstances = new HashSet<>();
+        SchedulableEntityInstance instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(),
+                getDate("2012-02-28 00:00 UTC"), EntityType.FEED);
+        instance.setTag(SchedulableEntityInstance.INPUT);
+        expectedInputFeedInstances.add(instance);
+        instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), getDate("2012-02-28 01:00 UTC"),
+                EntityType.FEED);
+        instance.setTag(SchedulableEntityInstance.INPUT);
+        expectedInputFeedInstances.add(instance);
+        instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), getDate("2012-02-28 02:00 UTC"),
+                EntityType.FEED);
+        instance.setTag(SchedulableEntityInstance.INPUT);
+        expectedInputFeedInstances.add(instance);
+
+        //Validate with expected result
+        Assert.assertTrue(inputFeedInstances.equals(expectedInputFeedInstances));
+    }
+
+    @Test
+    public void testGetOutputFeedInstances() throws FalconException, ParseException {
+        // create a process with input feeds
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "days(1)", "2012-02-27 11:00 UTC", "2016-02-28 11:00 UTC");
+        Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+        Outputs outputs = new Outputs();
+        outputs.getOutputs().add(getOutput("outputFeed", feed.getName(), "now(0,0)"));
+        process.setOutputs(outputs);
+        store.publish(EntityType.PROCESS, process);
+
+        Set<SchedulableEntityInstance> result = ProcessHelper.getOutputFeedInstances(process,
+                getDate("2012-02-28 10:00 UTC"), cluster);
+
+        Set<SchedulableEntityInstance> expected = new HashSet<>();
+        SchedulableEntityInstance ins = new SchedulableEntityInstance(feed.getName(), cluster.getName(),
+                getDate("2012-02-28 11:00 UTC"), EntityType.FEED);
+        ins.setTag(SchedulableEntityInstance.OUTPUT);
+        expected.add(ins);
+
+        Assert.assertEquals(result, expected);
+
+    }
+
+    private org.apache.falcon.entity.v0.process.Validity getProcessValidity(String start, String end) throws
+            ParseException {
+
+        org.apache.falcon.entity.v0.process.Validity validity = new org.apache.falcon.entity.v0.process.Validity();
+        validity.setStart(getDate(start));
+        validity.setEnd(getDate(end));
+        return validity;
+    }
+
+    private Date getDate(String dateString) throws ParseException {
+        return new SimpleDateFormat("yyyy-MM-dd HH:mm Z").parse(dateString);
+    }
+
+    private org.apache.falcon.entity.v0.feed.Validity getFeedValidity(String start, String end) throws ParseException {
+        org.apache.falcon.entity.v0.feed.Validity validity = new org.apache.falcon.entity.v0.feed.Validity();
+        validity.setStart(getDate(start));
+        validity.setEnd(getDate(end));
+        return validity;
+    }
+
+    private Input getInput(String name, String feedName, String start, String end, boolean isOptional) {
+        Input inFeed = new Input();
+        inFeed.setName(name);
+        inFeed.setFeed(feedName);
+        inFeed.setStart(start);
+        inFeed.setEnd(end);
+        inFeed.setOptional(isOptional);
+        return inFeed;
+    }
+
+    private Output getOutput(String name, String feedName, String instance) {
+        Output output = new Output();
+        output.setInstance(instance);
+        output.setFeed(feedName);
+        output.setName(name);
+        return output;
+    }
+
+    private Cluster publishCluster() throws FalconException {
+        Cluster cluster = new Cluster();
+        cluster.setName("feedCluster");
+        cluster.setColo("colo");
+        store.publish(EntityType.CLUSTER, cluster);
+        return cluster;
+
+    }
+
+    private Feed publishFeed(Cluster cluster, String frequency, String start, String end)
+        throws FalconException, ParseException {
+
+        Feed feed = new Feed();
+        feed.setName("feed");
+        Frequency f = new Frequency(frequency);
+        feed.setFrequency(f);
+        feed.setTimezone(UTC);
+        Clusters fClusters = new Clusters();
+        org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster();
+        fCluster.setName(cluster.getName());
+        fCluster.setValidity(getFeedValidity(start, end));
+        fClusters.getClusters().add(fCluster);
+        feed.setClusters(fClusters);
+        store.publish(EntityType.FEED, feed);
+
+        return feed;
+    }
+
+    private Process prepareProcess(Cluster cluster, String frequency, String start, String end) throws ParseException {
+        Process process = new Process();
+        process.setName("process");
+        process.setTimezone(UTC);
+        org.apache.falcon.entity.v0.process.Clusters pClusters = new org.apache.falcon.entity.v0.process.Clusters();
+        org.apache.falcon.entity.v0.process.Cluster pCluster = new org.apache.falcon.entity.v0.process.Cluster();
+        pCluster.setName(cluster.getName());
+        org.apache.falcon.entity.v0.process.Validity validity = getProcessValidity(start, end);
+        pCluster.setValidity(validity);
+        pClusters.getClusters().add(pCluster);
+        process.setClusters(pClusters);
+        Frequency f = new Frequency(frequency);
+        process.setFrequency(f);
+        return process;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index e447915..50dce84 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -280,6 +280,41 @@ Usage:
 $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -params -start "yyyy-MM-dd'T'HH:mm'Z'"
 
 
+
+---+++Dependency
+Display the dependent instances which are dependent on the given instance. For example for a given process instance it will
+list all the input feed instances(if any) and the output feed instances(if any).
+
+An example use case of this command is as follows:
+Suppose you find out that the data in a feed instance was incorrect and you need to figure out which all process instances
+consumed this feed instance so that you can reprocess them after correcting the feed instance. You can give the feed instance
+and it will tell you which process instance produced this feed and which all process instances consumed this feed.
+
+NOTE:
+1. instanceTime must be a valid instanceTime e.g. instanceTime of a feed should be in it's validity range on applicable clusters,
+ and it should be in the range of instances produced by the producer process(if any)
+
+2. For processes with inputs like latest() which vary with time the results are not guaranteed to be correct.
+
+Usage:
+$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -params -instanceTime "yyyy-MM-dd'T'HH:mm'Z'"
+
+For example:
+$FALCON_HOME/bin/falcon instance -dependency -type feed -name out -instanceTime 2014-12-15T00:00Z
+name: producer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:00Z, tag: Output
+name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:03Z, tag: Input
+name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:04Z, tag: Input
+name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:02Z, tag: Input
+name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:05Z, tag: Input
+
+
+Response: default/Success!
+
+Request Id: default/1125035965@qtp-503156953-7 - 447be0ad-1d38-4dce-b438-20f3de69b172
+
+
+<a href="./Restapi/InstanceDependency.html">Optional params described here.</a>
+
 ---++ Metadata Lineage Options
 
 ---+++Lineage

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/docs/src/site/twiki/restapi/InstanceDependency.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceDependency.twiki b/docs/src/site/twiki/restapi/InstanceDependency.twiki
new file mode 100644
index 0000000..dc452de
--- /dev/null
+++ b/docs/src/site/twiki/restapi/InstanceDependency.twiki
@@ -0,0 +1,49 @@
+---++ GET /api/instance/dependency/:entity-type/:entity-name
+   * <a href="#Description">Description</a>
+   * <a href="#Parameters">Parameters</a>
+   * <a href="#Results">Results</a>
+   * <a href="#Examples">Examples</a>
+
+---++ Description
+Get dependent instances for a particular instance.
+
+---++ Parameters
+   * :entity-type Valid options are feed or process.
+   * :entity-name Name of the entity
+   * instanceTime <mandatory param> time of the given instance
+   * colo <optional param> name of the colo
+
+
+---++ Results
+Dependent instances for the specified instance
+
+---++ Examples
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/instance/dependency/feed/myFeed?colo=*&instanceTime=2012-04-03T07:00Z
+</verbatim>
+---+++ Result
+<verbatim>
+{
+    'status': 'SUCCEEDED',
+    'message': 'default/Success!\n',
+    'dependencies': [
+        {
+            'cluster': 'local',
+            'entityName': 'consumer-process',
+            'entityType': 'PROCESS',
+            'instanceTime': '2014-12-18T00:00Z',
+            'tags': 'Input'
+        },
+        {
+            'cluster': 'local',
+            'entityName': 'producer-process',
+            'entityType': 'PROCESS',
+            'instanceTime': '2014-12-18T00:00Z',
+            'tags': 'Output'
+        }
+    ],
+    'requestId': 'default/1405883107@qtp-1501726962-6-0c2e690f-546b-47b0-a5ee-0365d4522a31\n'
+}
+</verbatim>
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/docs/src/site/twiki/restapi/ResourceList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki
index 060e0af..49dddb7 100644
--- a/docs/src/site/twiki/restapi/ResourceList.twiki
+++ b/docs/src/site/twiki/restapi/ResourceList.twiki
@@ -67,6 +67,7 @@ See also: [[../Security.twiki][Security in Falcon]]
 | POST        | [[InstanceRerun][api/instance/rerun/:entity-type/:entity-name]]             | Rerun a given instance       |
 | GET         | [[InstanceLogs][api/instance/logs/:entity-type/:entity-name]]               | Get logs of a given instance |
 | GET         | [[InstanceSummary][api/instance/summary/:entity-type/:entity-name]]         | Return summary of instances for an entity |
+| GET         | [[InstanceDependency][api/instance/dependency/:entity-type/:entity-name]]   | Return dependent instances for a given instance |
 
 ---++ REST Call on Metadata Lineage Resource