You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/06/04 08:14:03 UTC
[1/2] falcon git commit: FALCON-1039 Add instance dependency API in
falcon. Contributed by Ajay Yadava
Repository: falcon
Updated Branches:
refs/heads/master 42f175a12 -> 9fd86b786
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 678235e..a721666 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -553,9 +553,7 @@ public abstract class AbstractEntityManager {
try {
Entity entityObj = EntityUtil.getEntity(type, entityName);
- Set<Entity> dependents = EntityGraph.get().getDependents(entityObj);
- Entity[] dependentEntities = dependents.toArray(new Entity[dependents.size()]);
- return new EntityList(dependentEntities, entityObj);
+ return EntityUtil.getEntityDependencies(entityObj);
} catch (Exception e) {
LOG.error("Unable to get dependencies for entityName {} ({})", entityName, type, e);
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index 42907c8..72f9fe4 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -19,16 +19,24 @@
package org.apache.falcon.resource;
import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.*;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.FalconWebException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.Pair;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.logging.LogProvider;
import org.apache.falcon.resource.InstancesResult.Instance;
+import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,7 +45,17 @@ import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
/**
* A base class for managing Entity's Instance operations.
@@ -160,6 +178,63 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
}
+
+ public InstanceDependencyResult getInstanceDependencies(String entityType, String entityName,
+ String instanceTimeString, String colo) {
+ checkColo(colo);
+ EntityType type = checkType(entityType);
+ Set<SchedulableEntityInstance> result = new HashSet<>();
+
+ try {
+ Date instanceTime = EntityUtil.parseDateUTC(instanceTimeString);
+ for (String clusterName : DeploymentUtil.getCurrentClusters()) {
+ Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName);
+ switch (type) {
+
+ case PROCESS:
+ Process process = EntityUtil.getEntity(EntityType.PROCESS, entityName);
+ org.apache.falcon.entity.v0.process.Cluster pCluster = ProcessHelper.getCluster(process,
+ clusterName);
+ if (pCluster != null) {
+ Set<SchedulableEntityInstance> inputFeeds = ProcessHelper.getInputFeedInstances(process,
+ instanceTime, cluster, true);
+ Set<SchedulableEntityInstance> outputFeeds = ProcessHelper.getOutputFeedInstances(process,
+ instanceTime, cluster);
+ result.addAll(inputFeeds);
+ result.addAll(outputFeeds);
+ }
+ break;
+
+ case FEED:
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, entityName);
+ org.apache.falcon.entity.v0.feed.Cluster fCluster = FeedHelper.getCluster(feed, clusterName);
+ if (fCluster != null) {
+ Set<SchedulableEntityInstance> consumers = FeedHelper.getConsumerInstances(feed, instanceTime,
+ cluster);
+ SchedulableEntityInstance producer = FeedHelper.getProducerInstance(feed, instanceTime,
+ cluster);
+ result.addAll(consumers);
+ if (producer != null) {
+ result.add(producer);
+ }
+ }
+ break;
+
+ default:
+ throw FalconWebException.newInstanceException("Instance dependency isn't supported for type: "
+ + entityType, Response.Status.BAD_REQUEST);
+ }
+ }
+
+ } catch (Throwable throwable) {
+ throw FalconWebException.newInstanceException(throwable, Response.Status.BAD_REQUEST);
+ }
+
+ InstanceDependencyResult res = new InstanceDependencyResult(APIResult.Status.SUCCEEDED, "Success!");
+ res.setDependencies(result.toArray(new SchedulableEntityInstance[0]));
+ return res;
+ }
+
public InstancesSummaryResult getSummary(String type, String entity, String startStr, String endStr,
String colo, List<LifeCycle> lifeCycles) {
checkColo(colo);
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
index 68219fb..1a8396c 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
@@ -27,18 +27,28 @@ import org.apache.falcon.monitors.Monitored;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.AbstractInstanceManager;
import org.apache.falcon.resource.FeedInstanceResult;
+import org.apache.falcon.resource.InstanceDependencyResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesSummaryResult;
import org.apache.falcon.resource.channel.Channel;
import org.apache.falcon.resource.channel.ChannelFactory;
import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.*;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.lang.reflect.Constructor;
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* A proxy implementation of the entity instance operations.
@@ -338,6 +348,29 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
}
}.execute(colo, type, entity);
}
+
+
+ @GET
+ @Path("dependencies/{type}/{entity}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Monitored(event = "instance-dependency")
+ public InstanceDependencyResult instanceDependencies(
+ @Dimension("type") @PathParam("type") final String entityType,
+ @Dimension("entityName") @PathParam("entity") final String entityName,
+ @Dimension("instanceTime") @QueryParam("instanceTime") final String instanceTimeStr,
+ @Dimension("colo") @QueryParam("colo") String colo) {
+
+ return new InstanceProxy<InstanceDependencyResult>(InstanceDependencyResult.class) {
+
+ @Override
+ protected InstanceDependencyResult doExecute(String colo) throws FalconException {
+ return getInstanceManager(colo).invoke("instanceDependencies",
+ entityType, entityName, instanceTimeStr, colo);
+ }
+
+ }.execute(colo, entityType, entityName);
+ }
+
//RESUME CHECKSTYLE CHECK ParameterNumberCheck
private abstract class InstanceProxy<T extends APIResult> {
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
index dc533a2..c2ac5b2 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
@@ -23,7 +23,13 @@ import org.apache.falcon.monitors.Dimension;
import org.apache.falcon.monitors.Monitored;
import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.*;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import java.util.List;
@@ -230,4 +236,16 @@ public class InstanceManager extends AbstractInstanceManager {
}
//RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+ @GET
+ @Path("dependencies/{type}/{entity}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Monitored(event = "instance-dependency")
+ public InstanceDependencyResult instanceDependencies(
+ @Dimension("type") @PathParam("type") String entityType,
+ @Dimension("entityName") @PathParam("entity") String entityName,
+ @Dimension("instanceTime") @QueryParam("instanceTime") String instanceTimeStr,
+ @Dimension("colo") @QueryParam("colo") String colo) {
+ return super.getInstanceDependencies(entityType, entityName, instanceTimeStr, colo);
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
index fa04add..ed6f44e 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
@@ -23,7 +23,14 @@ import org.apache.falcon.monitors.Dimension;
import org.apache.falcon.monitors.Monitored;
import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 797b595..90acb59 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -287,6 +287,10 @@ public class FalconCLIIT {
Assert.assertEquals(executeWithURL("entity -schedule -type feed -name " + overlay.get("outputFeedName")), 0);
OozieTestUtils.waitForProcessWFtoStart(context);
+ //Test the dependency command
+ Assert.assertEquals(executeWithURL("instance -dependency -type feed -name " + overlay.get("inputFeedName")
+ + " -instanceTime 2010-01-01T00:00Z"), 0);
+
Assert.assertEquals(executeWithURL("instance -status -type feed -name "
+ overlay.get("outputFeedName")
+ " -start " + START_INSTANCE), 0);
[2/2] falcon git commit: FALCON-1039 Add instance dependency API in
falcon. Contributed by Ajay Yadava
Posted by aj...@apache.org.
FALCON-1039 Add instance dependency API in falcon. Contributed by Ajay Yadava
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9fd86b78
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9fd86b78
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9fd86b78
Branch: refs/heads/master
Commit: 9fd86b786195ac03fc25d31c6f35062c4014f10a
Parents: 42f175a
Author: Ajay Yadava <aj...@gmail.com>
Authored: Thu Jun 4 11:42:59 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Thu Jun 4 11:43:32 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../java/org/apache/falcon/ResponseHelper.java | 14 +
.../java/org/apache/falcon/cli/FalconCLI.java | 21 +-
.../org/apache/falcon/client/FalconClient.java | 18 +
.../org/apache/falcon/resource/EntityList.java | 6 +-
.../resource/InstanceDependencyResult.java | 86 +++++
.../resource/SchedulableEntityInstance.java | 155 ++++++++
.../org/apache/falcon/entity/EntityUtil.java | 142 ++++++-
.../org/apache/falcon/entity/FeedHelper.java | 287 +++++++++++++-
.../org/apache/falcon/entity/ProcessHelper.java | 108 ++++++
.../apache/falcon/entity/FeedHelperTest.java | 370 ++++++++++++++++++-
.../apache/falcon/entity/ProcessHelperTest.java | 207 +++++++++++
docs/src/site/twiki/FalconCLI.twiki | 35 ++
.../site/twiki/restapi/InstanceDependency.twiki | 49 +++
docs/src/site/twiki/restapi/ResourceList.twiki | 1 +
.../falcon/resource/AbstractEntityManager.java | 4 +-
.../resource/AbstractInstanceManager.java | 79 +++-
.../resource/proxy/InstanceManagerProxy.java | 37 +-
.../apache/falcon/resource/InstanceManager.java | 20 +-
.../resource/SchedulableEntityManager.java | 9 +-
.../java/org/apache/falcon/cli/FalconCLIIT.java | 4 +
21 files changed, 1624 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9c84f85..7ce2ba3 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Trunk (Unreleased)
INCOMPATIBLE CHANGES
NEW FEATURES
+ FALCON-1039 Add instance dependency API in falcon(Ajay Yadava)
IMPROVEMENTS
FALCON-1060 Handle transaction failures in Lineage(Pavan Kumar Kolamuri via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/ResponseHelper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/ResponseHelper.java b/client/src/main/java/org/apache/falcon/ResponseHelper.java
index 2261ceb..78598ba 100644
--- a/client/src/main/java/org/apache/falcon/ResponseHelper.java
+++ b/client/src/main/java/org/apache/falcon/ResponseHelper.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.resource.FeedLookupResult;
+import org.apache.falcon.resource.InstanceDependencyResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesSummaryResult;
import org.apache.falcon.resource.EntitySummaryResult;
@@ -277,4 +278,17 @@ public final class ResponseHelper {
sb.append("\nRequest Id: ").append(feedLookupResult.getRequestId());
return sb.toString();
}
+
+ public static String getString(InstanceDependencyResult dependencyResult) {
+ StringBuilder sb = new StringBuilder();
+ String results = dependencyResult.toString();
+ if (StringUtils.isEmpty(results)) {
+ sb.append("No dependencies found!");
+ } else {
+ sb.append(results);
+ }
+ sb.append("\n\nResponse: ").append(dependencyResult.getMessage());
+ sb.append("\nRequest Id: ").append(dependencyResult.getRequestId());
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index a5e3728..f169917 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -18,7 +18,6 @@
package org.apache.falcon.cli;
-import org.apache.falcon.ResponseHelper;
import com.sun.jersey.api.client.ClientHandlerException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
@@ -28,12 +27,14 @@ import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.LifeCycle;
+import org.apache.falcon.ResponseHelper;
import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.client.FalconClient;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.FeedLookupResult;
+import org.apache.falcon.resource.InstanceDependencyResult;
import org.apache.falcon.resource.InstancesResult;
import java.io.IOException;
@@ -102,6 +103,7 @@ public class FalconCLI {
public static final String FORCE_RERUN_FLAG = "force";
public static final String INSTANCE_CMD = "instance";
+ public static final String INSTANCE_TIME_OPT = "instanceTime";
public static final String START_OPT = "start";
public static final String END_OPT = "end";
public static final String RUNNING_OPT = "running";
@@ -229,6 +231,7 @@ public class FalconCLI {
String result;
String type = commandLine.getOptionValue(ENTITY_TYPE_OPT);
String entity = commandLine.getOptionValue(ENTITY_NAME_OPT);
+ String instanceTime = commandLine.getOptionValue(INSTANCE_TIME_OPT);
String start = commandLine.getOptionValue(START_OPT);
String end = commandLine.getOptionValue(END_OPT);
String filePath = commandLine.getOptionValue(FILE_PATH_OPT);
@@ -250,7 +253,12 @@ public class FalconCLI {
validateInstanceCommands(optionsList, entity, type, colo);
- if (optionsList.contains(RUNNING_OPT)) {
+ if (optionsList.contains(DEPENDENCY_OPT)) {
+ validateNotEmpty(instanceTime, INSTANCE_TIME_OPT);
+ InstanceDependencyResult response = client.getInstanceDependencies(type, entity, instanceTime, colo);
+ result = ResponseHelper.getString(response);
+
+ } else if (optionsList.contains(RUNNING_OPT)) {
validateOrderBy(orderBy, instanceAction);
validateFilterBy(filterBy, instanceAction);
result =
@@ -785,6 +793,11 @@ public class FalconCLI {
false,
"Displays feed listing and their status between a start and end time range.");
+ Option dependency = new Option(
+ DEPENDENCY_OPT,
+ false,
+ "Displays dependent instances for a specified instance.");
+
OptionGroup group = new OptionGroup();
group.addOption(running);
group.addOption(list);
@@ -798,6 +811,7 @@ public class FalconCLI {
group.addOption(logs);
group.addOption(params);
group.addOption(listing);
+ group.addOption(dependency);
Option url = new Option(URL_OPTION, true, "Falcon URL");
Option start = new Option(START_OPT, true,
@@ -843,6 +857,8 @@ public class FalconCLI {
Option forceRerun = new Option(FORCE_RERUN_FLAG, false,
"Flag to forcefully rerun entire workflow of an instance");
+ Option instanceTime = new Option(INSTANCE_TIME_OPT, true, "Time for an instance");
+
instanceOptions.addOption(url);
instanceOptions.addOptionGroup(group);
instanceOptions.addOption(start);
@@ -861,6 +877,7 @@ public class FalconCLI {
instanceOptions.addOption(sortOrder);
instanceOptions.addOption(numResults);
instanceOptions.addOption(forceRerun);
+ instanceOptions.addOption(instanceTime);
return instanceOptions;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 786e0a0..20c32e4 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -37,6 +37,7 @@ import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.EntitySummaryResult;
import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.resource.FeedLookupResult;
+import org.apache.falcon.resource.InstanceDependencyResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesSummaryResult;
import org.apache.falcon.resource.LineageGraphResult;
@@ -242,6 +243,7 @@ public class FalconClient {
LOG("api/instance/logs/", HttpMethod.GET, MediaType.APPLICATION_JSON),
SUMMARY("api/instance/summary/", HttpMethod.GET, MediaType.APPLICATION_JSON),
PARAMS("api/instance/params/", HttpMethod.GET, MediaType.APPLICATION_JSON),
+ DEPENDENCY("api/instance/dependencies/", HttpMethod.GET, MediaType.APPLICATION_JSON),
LISTING("api/instance/listing/", HttpMethod.GET, MediaType.APPLICATION_JSON);
private String path;
@@ -805,6 +807,22 @@ public class FalconClient {
return clientResponse;
}
+ public InstanceDependencyResult getInstanceDependencies(String entityType, String entityName, String instanceTime,
+ String colo) throws FalconCLIException {
+ checkType(entityType);
+ Instances api = Instances.DEPENDENCY;
+
+ WebResource resource = service.path(api.path).path(entityType).path(entityName);
+ resource = resource.queryParam("instanceTime", instanceTime);
+ resource = resource.queryParam("colo", colo);
+ ClientResponse clientResponse = resource
+ .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+ .accept(api.mimeType)
+ .method(api.method, ClientResponse.class);
+ checkIfSuccessful(clientResponse);
+ return clientResponse.getEntity(InstanceDependencyResult.class);
+ }
+
//RESUME CHECKSTYLE CHECK VisibilityModifierCheck
private void checkLifeCycleOption(List<LifeCycle> lifeCycles, String type) throws FalconCLIException {
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/resource/EntityList.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/EntityList.java b/client/src/main/java/org/apache/falcon/resource/EntityList.java
index ee33234..6e132f0 100644
--- a/client/src/main/java/org/apache/falcon/resource/EntityList.java
+++ b/client/src/main/java/org/apache/falcon/resource/EntityList.java
@@ -36,6 +36,8 @@ import java.util.List;
@XmlAccessorType(XmlAccessType.FIELD)
@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
public class EntityList {
+ public static final String INPUT_TAG = "Input";
+ public static final String OUTPUT_TAG = "Output";
@XmlElement
private int totalResults;
@@ -184,14 +186,14 @@ public class EntityList {
if (process.getInputs() != null) {
for (Input i : process.getInputs().getInputs()) {
if (i.getFeed().equals(entityNameToMatch)) {
- tagList.add("Input");
+ tagList.add(INPUT_TAG);
}
}
}
if (process.getOutputs() != null) {
for (Output o : process.getOutputs().getOutputs()) {
if (o.getFeed().equals(entityNameToMatch)) {
- tagList.add("Output");
+ tagList.add(OUTPUT_TAG);
}
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/resource/InstanceDependencyResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/InstanceDependencyResult.java b/client/src/main/java/org/apache/falcon/resource/InstanceDependencyResult.java
new file mode 100644
index 0000000..0751f12
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/resource/InstanceDependencyResult.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * Entity list used for marshalling / unmarshalling with REST calls.
+ */
+@XmlRootElement(name = "dependents")
+@XmlAccessorType(XmlAccessType.FIELD)
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
+public class InstanceDependencyResult extends APIResult {
+
+ @XmlElement(name = "dependencies")
+ private SchedulableEntityInstance[] dependencies;
+
+ //For JAXB
+ private InstanceDependencyResult() {
+ super();
+ }
+
+ public InstanceDependencyResult(Status status, String message) {
+ super(status, message);
+ }
+
+ public SchedulableEntityInstance[] getDependencies() {
+ return dependencies;
+ }
+
+ public void setDependencies(SchedulableEntityInstance[] dependencies) {
+ this.dependencies = dependencies;
+ }
+
+
+ @Override
+ public Object[] getCollection() {
+ return getDependencies();
+ }
+
+ @Override
+ public void setCollection(Object[] items) {
+ if (items == null) {
+ setDependencies(new SchedulableEntityInstance[0]);
+ } else {
+ SchedulableEntityInstance[] newInstances = new SchedulableEntityInstance[items.length];
+ for (int index = 0; index < items.length; index++) {
+ newInstances[index] = (SchedulableEntityInstance)items[index];
+ }
+ setDependencies(newInstances);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ if (dependencies != null) {
+ for (SchedulableEntityInstance element : dependencies) {
+ buffer.append(element.toString());
+ buffer.append("\n");
+ }
+ }
+ return buffer.toString();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java
new file mode 100644
index 0000000..2a7ecdb
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
+
+import java.util.Date;
+
+/**
+ * Instance of a Schedulable Entity (Feed/Process).
+ */
+public class SchedulableEntityInstance {
+
+ public static final String INPUT = "Input";
+ public static final String OUTPUT = "Output";
+
+ private String entityName;
+
+ private String cluster;
+
+ private Date instanceTime;
+
+ private EntityType entityType;
+
+ private String tag;
+
+ //for JAXB
+ private SchedulableEntityInstance() {
+
+ }
+
+ public SchedulableEntityInstance(String entityName, String cluster, Date instanceTime, EntityType type) {
+ this.entityName = entityName;
+ this.cluster = cluster;
+ this.entityType = type;
+ if (instanceTime != null) {
+ this.instanceTime = new Date(instanceTime.getTime());
+ }
+ }
+
+ public String getTag() {
+ return tag;
+ }
+
+ public void setTag(String tag) {
+ this.tag = tag;
+ }
+
+ public String getEntityName() {
+ return entityName;
+ }
+
+ public void setEntityName(String entityName) {
+ this.entityName = entityName;
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ public void setCluster(String cluster) {
+ this.cluster = cluster;
+ }
+
+ public EntityType getEntityType() {
+ return entityType;
+ }
+
+ public void setEntityType(EntityType entityType) {
+ this.entityType = entityType;
+ }
+
+ public Date getInstanceTime() {
+ return new Date(instanceTime.getTime());
+ }
+
+ public void setInstanceTime(Date instanceTime) {
+ this.instanceTime = new Date(instanceTime.getTime());
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("name: " + entityName
+ + ", type: " + entityType
+ + ", cluster: " + cluster
+ + ", instanceTime: " + SchemaHelper.formatDateUTC(instanceTime));
+ sb.append(", tag: " + ((tag != null) ? tag : ""));
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SchedulableEntityInstance that = (SchedulableEntityInstance) o;
+
+ if (instanceTime == null ? that.instanceTime != null : !instanceTime.equals(that.instanceTime)) {
+ return false;
+ }
+
+ if (!entityType.equals(that.entityType)) {
+ return false;
+ }
+
+ if (!StringUtils.equals(entityName, that.entityName)) {
+ return false;
+ }
+
+ if (!StringUtils.equals(cluster, that.cluster)) {
+ return false;
+ }
+
+ if (!StringUtils.equals(tag, that.tag)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = instanceTime.hashCode();
+ result = 31 * result + entityName.hashCode();
+ result = 31 * result + entityType.hashCode();
+ result = 31 * result + cluster.hashCode();
+ if (tag != null) {
+ result = 31 * result + tag.hashCode();
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 26d3da2..7ebf39e 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -28,6 +28,7 @@ import org.apache.falcon.Tag;
import org.apache.falcon.entity.WorkflowNameBuilder.WorkflowName;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityGraph;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
@@ -35,9 +36,13 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.*;
+import org.apache.falcon.entity.v0.process.LateInput;
+import org.apache.falcon.entity.v0.process.LateProcess;
+import org.apache.falcon.entity.v0.process.PolicyType;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Retry;
import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.resource.EntityList;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.hadoop.fs.FileStatus;
@@ -53,7 +58,19 @@ import java.lang.reflect.Method;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
/**
* Helper to get entity object.
@@ -65,6 +82,7 @@ public final class EntityUtil {
private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS;
private static final long DAY_IN_MS = 24 * HOUR_IN_MS;
private static final long MONTH_IN_MS = 31 * DAY_IN_MS;
+ private static final long ONE_MS = 1;
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
@@ -244,8 +262,8 @@ public final class EntityUtil {
return feed.getTimezone();
}
- public static Date getNextStartTime(Date startTime, Frequency frequency, TimeZone timezone, Date now) {
- if (startTime.after(now)) {
+ public static Date getNextStartTime(Date startTime, Frequency frequency, TimeZone timezone, Date referenceTime) {
+ if (startTime.after(referenceTime)) {
return startTime;
}
@@ -255,16 +273,16 @@ public final class EntityUtil {
int count = 0;
switch (frequency.getTimeUnit()) {
case months:
- count = (int) ((now.getTime() - startTime.getTime()) / MONTH_IN_MS);
+ count = (int) ((referenceTime.getTime() - startTime.getTime()) / MONTH_IN_MS);
break;
case days:
- count = (int) ((now.getTime() - startTime.getTime()) / DAY_IN_MS);
+ count = (int) ((referenceTime.getTime() - startTime.getTime()) / DAY_IN_MS);
break;
case hours:
- count = (int) ((now.getTime() - startTime.getTime()) / HOUR_IN_MS);
+ count = (int) ((referenceTime.getTime() - startTime.getTime()) / HOUR_IN_MS);
break;
case minutes:
- count = (int) ((now.getTime() - startTime.getTime()) / MINUTE_IN_MS);
+ count = (int) ((referenceTime.getTime() - startTime.getTime()) / MINUTE_IN_MS);
break;
default:
}
@@ -273,7 +291,7 @@ public final class EntityUtil {
if (count > 2) {
startCal.add(frequency.getTimeUnit().getCalendarUnit(), ((count - 2) / freq) * freq);
}
- while (startCal.getTime().before(now)) {
+ while (startCal.getTime().before(referenceTime)) {
startCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
}
return startCal.getTime();
@@ -747,6 +765,12 @@ public final class EntityUtil {
return pipelines;
}
+ public static EntityList getEntityDependencies(Entity entity) throws FalconException {
+ Set<Entity> dependents = EntityGraph.get().getDependents(entity);
+ Entity[] dependentEntities = dependents.toArray(new Entity[dependents.size()]);
+ return new EntityList(dependentEntities, entity);
+ }
+
public static Pair<Date, Date> getEntityStartEndDates(Entity entityObject) {
Set<String> clusters = EntityUtil.getClustersDefined(entityObject);
Pair<Date, String> clusterMinStartDate = null;
@@ -761,4 +785,104 @@ public final class EntityUtil {
}
return new Pair<Date, Date>(clusterMinStartDate.first, clusterMaxEndDate.first);
}
+
+ /**
+ * Returns the previous instance(before or on) for a given referenceTime
+ *
+ * Example: For a feed in "UTC" with startDate "2014-01-01 00:00" and frequency of "days(1)" a referenceTime
+ * of "2015-01-01 00:00" will return "2015-01-01 00:00".
+ *
+ * Similarly for the above feed if we give a reference Time of "2015-01-01 04:00" will also result in
+ * "2015-01-01 00:00"
+ *
+ * @param startTime start time of the entity
+ * @param frequency frequency of the entity
+ * @param tz timezone of the entity
+ * @param referenceTime time before which the instanceTime is desired
+ * @return instance(before or on) the referenceTime
+ */
+ public static Date getPreviousInstanceTime(Date startTime, Frequency frequency, TimeZone tz, Date referenceTime) {
+ if (tz == null) {
+ tz = TimeZone.getTimeZone("UTC");
+ }
+ Calendar insCal = Calendar.getInstance(tz);
+ insCal.setTime(startTime);
+
+ int instanceCount = getInstanceSequence(startTime, frequency, tz, referenceTime);
+ final int freq = frequency.getFrequencyAsInt() * instanceCount;
+ insCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
+
+ while (insCal.getTime().after(referenceTime)) {
+ insCal.add(frequency.getTimeUnit().getCalendarUnit(), -1);
+ }
+ return insCal.getTime();
+ }
+
+ /**
+ * Find the times at which the given entity will run in a given time range.
+ * <p/>
+ * Both start and end Date are inclusive.
+ *
+ * @param entity feed or process entity whose instance times are to be found
+ * @param clusterName name of the cluster
+ * @param startRange start time for the input range
+ * @param endRange end time for the input range
+ * @return List of instance times at which the entity will run in the given time range
+ */
+ public static List<Date> getEntityInstanceTimes(Entity entity, String clusterName, Date startRange, Date endRange) {
+ Date start = null;
+ switch (entity.getEntityType()) {
+
+ case FEED:
+ Feed feed = (Feed) entity;
+ start = FeedHelper.getCluster(feed, clusterName).getValidity().getStart();
+ return getInstanceTimes(start, feed.getFrequency(), feed.getTimezone(),
+ startRange, endRange);
+
+ case PROCESS:
+ Process process = (Process) entity;
+ start = ProcessHelper.getCluster(process, clusterName).getValidity().getStart();
+ return getInstanceTimes(start, process.getFrequency(),
+ process.getTimezone(), startRange, endRange);
+
+ default:
+ throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
+ }
+ }
+
+
+ /**
+ * Find instance times given first instance start time and frequency till a given end time.
+ *
+ * It finds the first valid instance time in the given time range, it then uses frequency to find next instances
+ * in the given time range.
+ *
+ * @param startTime startTime of the entity (time of first instance ever of the given entity)
+ * @param frequency frequency of the entity
+ * @param timeZone timeZone of the entity
+ * @param startRange start time for the input range of interest.
+ * @param endRange end time for the input range of interest.
+ * @return list of instance run times of the given entity in the given time range.
+ */
+ public static List<Date> getInstanceTimes(Date startTime, Frequency frequency, TimeZone timeZone,
+ Date startRange, Date endRange) {
+ List<Date> result = new LinkedList<>();
+ if (timeZone == null) {
+ timeZone = TimeZone.getTimeZone("UTC");
+ }
+
+ while(true){
+ Date nextStartTime = getNextStartTime(startTime, frequency, timeZone, startRange);
+ if (nextStartTime.before(startRange) || nextStartTime.after(endRange)){
+ break;
+ }
+
+ result.add(nextStartTime);
+ // this is required because getNextStartTime returns greater than or equal to referenceTime
+ startRange = new Date(nextStartTime.getTime() + ONE_MS); // 1 milli seconds later
+ }
+ return result;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index acb8598..9f4eb61 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -19,6 +19,7 @@
package org.apache.falcon.entity;
import org.apache.commons.lang3.StringUtils;
+
import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
@@ -33,16 +34,33 @@ import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.FeedInstanceResult;
+import org.apache.falcon.resource.SchedulableEntityInstance;
import org.apache.falcon.util.BuildProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URISyntaxException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TimeZone;
import java.util.regex.Matcher;
/**
@@ -50,6 +68,9 @@ import java.util.regex.Matcher;
*/
public final class FeedHelper {
+ private static final Logger LOG = LoggerFactory.getLogger(FeedHelper.class);
+ private static final int ONE_MS = 1;
+
public static final String FORMAT = "yyyyMMddHHmm";
private FeedHelper() {}
@@ -273,7 +294,7 @@ public final class FeedHelper {
public static Properties getClusterProperties(org.apache.falcon.entity.v0.cluster.Cluster cluster) {
Properties properties = new Properties();
- Map<String, String> clusterVars = new HashMap<String, String>();
+ Map<String, String> clusterVars = new HashMap<>();
clusterVars.put("colo", cluster.getColo());
clusterVars.put("name", cluster.getName());
if (cluster.getProperties() != null) {
@@ -354,7 +375,7 @@ public final class FeedHelper {
* Extracts date from the actual data path e.g., /path/2014/05/06 maps to 2014-05-06T00:00Z.
* @param instancePath - actual data path
* @param templatePath - template path from feed definition
- * @param timeZone
+ * @param timeZone timeZone
* @return date corresponding to the path
*/
//consider just the first occurrence of the pattern
@@ -364,7 +385,7 @@ public final class FeedHelper {
Calendar cal = Calendar.getInstance(timeZone);
int lastEnd = 0;
- Set<FeedDataPath.VARS> matchedVars = new HashSet<FeedDataPath.VARS>();
+ Set<FeedDataPath.VARS> matchedVars = new HashSet<>();
while (matcher.find()) {
FeedDataPath.VARS pathVar = FeedDataPath.VARS.from(matcher.group());
String pad = templatePath.substring(lastEnd, matcher.start());
@@ -415,6 +436,264 @@ public final class FeedHelper {
}
+ private static void validateFeedInstance(Feed feed, Date instanceTime,
+ org.apache.falcon.entity.v0.cluster.Cluster cluster) {
+
+ // validate the cluster
+ Cluster feedCluster = getCluster(feed, cluster.getName());
+ if (feedCluster == null) {
+ throw new IllegalArgumentException("Cluster :" + cluster.getName() + " is not a valid cluster for feed:"
+ + feed.getName());
+ }
+
+ // validate that instanceTime is in validity range
+ if (feedCluster.getValidity().getStart().after(instanceTime)
+ || feedCluster.getValidity().getEnd().before(instanceTime)) {
+ throw new IllegalArgumentException("instanceTime: " + instanceTime + " is not in validity range for"
+ + " Feed: " + feed.getName() + " on cluster:" + cluster.getName());
+ }
+
+ // validate instanceTime on basis of startTime and frequency
+ Date nextInstance = EntityUtil.getNextStartTime(feedCluster.getValidity().getStart(), feed.getFrequency(),
+ feed.getTimezone(), instanceTime);
+ if (!nextInstance.equals(instanceTime)) {
+ throw new IllegalArgumentException("instanceTime: " + instanceTime + " is not a valid instance for the "
+ + " feed: " + feed.getName() + " on cluster: " + cluster.getName()
+ + " on the basis of startDate and frequency");
+ }
+ }
+
+ /**
+ * Given a feed Instance finds the generating process instance.
+ *
+ * [process, cluster, instanceTime]
+ *
+ * If the feed is replicated, then it returns null.
+ *
+ * @param feed output feed
+ * @param feedInstanceTime instance time of the feed
+ * @return returns the instance of the process which produces the given feed
+ */
+ public static SchedulableEntityInstance getProducerInstance(Feed feed, Date feedInstanceTime,
+ org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
+
+ //validate the inputs
+ validateFeedInstance(feed, feedInstanceTime, cluster);
+
+ Process process = getProducerProcess(feed);
+ if (process != null) {
+ try {
+ Date processInstanceTime = getProducerInstanceTime(feed, feedInstanceTime, process, cluster);
+ SchedulableEntityInstance producer = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+ processInstanceTime, EntityType.PROCESS);
+ producer.setTag(SchedulableEntityInstance.OUTPUT);
+ return producer;
+ } catch (FalconException e) {
+ LOG.error("Error in trying to get producer process: {}'s instance time for feed: {}'s instance: } "
+ + " on cluster:{}", process.getName(), feed.getName(), feedInstanceTime, cluster.getName());
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Given a feed find it's generating process.
+ *
+ * If no generating process is found it returns null.
+ * @param feed output feed
+ * @return Process which produces the given feed.
+ */
+ public static Process getProducerProcess(Feed feed) throws FalconException {
+
+ EntityList dependencies = EntityUtil.getEntityDependencies(feed);
+
+ for (EntityList.EntityElement e : dependencies.getElements()) {
+ if (e.tag.contains(EntityList.OUTPUT_TAG)) {
+ return EntityUtil.getEntity(EntityType.PROCESS, e.name);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Find the producerInstance which will generate the given feedInstance.
+ *
+ * @param feed output feed
+ * @param feedInstanceTime instance time of the output feed
+ * @param producer producer process
+ * @return time of the producer instance which will produce the given feed instance.
+ */
+ private static Date getProducerInstanceTime(Feed feed, Date feedInstanceTime, Process producer,
+ org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
+
+ String clusterName = cluster.getName();
+ Cluster feedCluster = getCluster(feed, clusterName);
+ org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(producer, clusterName);
+ Date producerStartDate = processCluster.getValidity().getStart();
+
+ // read the process definition and find the relative time difference between process and output feed
+ // if output process instance time is now then output FeedInstance time is x
+ String outputInstance = null;
+ for (Output op : producer.getOutputs().getOutputs()) {
+ if (StringUtils.equals(feed.getName(), op.getFeed())) {
+ outputInstance = op.getInstance();
+ }
+ }
+
+ ExpressionHelper.setReferenceDate(producerStartDate);
+ ExpressionHelper evaluator = ExpressionHelper.get();
+ // producerInstance = feedInstanceTime + (difference between producer process and feed)
+ // the feedInstance before or equal to this time is the required one
+ Date relativeFeedInstance = evaluator.evaluate(outputInstance, Date.class);
+ Date feedInstanceActual = EntityUtil.getPreviousInstanceTime(feedCluster.getValidity().getStart(),
+ feed.getFrequency(), feed.getTimezone(), relativeFeedInstance);
+ Long producerInstanceTime = feedInstanceTime.getTime() + (producerStartDate.getTime()
+ - feedInstanceActual.getTime());
+ Date producerInstance = new Date(producerInstanceTime);
+
+ //validate that the producerInstance is in the validity range on the provided cluster
+ if (producerInstance.before(processCluster.getValidity().getStart())
+ || producerInstance.after(processCluster.getValidity().getEnd())) {
+ throw new IllegalArgumentException("Instance time provided: " + feedInstanceTime
+ + " for feed " + feed.getName()
+ + " is outside the range of instances produced by the producer process: " + producer.getName()
+ + " in it's validity range on provided cluster: " + cluster.getName());
+ }
+ return producerInstance;
+ }
+
+
+ public static Set<SchedulableEntityInstance> getConsumerInstances(Feed feed, Date feedInstanceTime,
+ org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
+
+ Set<SchedulableEntityInstance> result = new HashSet<>();
+ // validate that the feed has this cluster & validate that the instanceTime is a valid instanceTime
+ validateFeedInstance(feed, feedInstanceTime, cluster);
+
+ Set<Process> consumers = getConsumerProcesses(feed);
+ for (Process p : consumers) {
+ Set<Date> consumerInstanceTimes = getConsumerProcessInstanceTimes(feed, feedInstanceTime, p, cluster);
+ for (Date date : consumerInstanceTimes) {
+ SchedulableEntityInstance in = new SchedulableEntityInstance(p.getName(), cluster.getName(), date,
+ EntityType.PROCESS);
+ in.setTag(SchedulableEntityInstance.INPUT);
+ result.add(in);
+ }
+ }
+ return result;
+ }
+
+
+ /**
+ * Returns the consumer processes for a given feed if any, null otherwise.
+ *
+ * @param feed input feed
+ * @return the set of processes which use the given feed as input, empty set if no consumers.
+ */
+ public static Set<Process> getConsumerProcesses(Feed feed) throws FalconException {
+ Set<Process> result = new HashSet<>();
+ EntityList dependencies = EntityUtil.getEntityDependencies(feed);
+
+ for (EntityList.EntityElement e : dependencies.getElements()) {
+ if (e.tag.contains(EntityList.INPUT_TAG)) {
+ Process consumer = EntityUtil.getEntity(EntityType.PROCESS, e.name);
+ result.add(consumer);
+ }
+ }
+ return result;
+ }
+
+ // return all instances of a process which will consume the given feed instance
+ private static Set<Date> getConsumerProcessInstanceTimes(Feed feed, Date feedInstancetime, Process consumer,
+ org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
+
+ Set<Date> result = new HashSet<>();
+ // find relevant cluster for the process
+ org.apache.falcon.entity.v0.process.Cluster processCluster =
+ ProcessHelper.getCluster(consumer, cluster.getName());
+ if (processCluster == null) {
+ throw new IllegalArgumentException("Cluster is not valid for process");
+ }
+ Date processStartDate = processCluster.getValidity().getStart();
+ Cluster feedCluster = getCluster(feed, cluster.getName());
+ Date feedStartDate = feedCluster.getValidity().getStart();
+
+ // find all corresponding Inputs as a process may refer same feed multiple times
+ List<Input> inputFeeds = new ArrayList<>();
+ if (consumer.getInputs() != null && consumer.getInputs().getInputs() != null) {
+ for (Input input : consumer.getInputs().getInputs()) {
+ if (StringUtils.equals(input.getFeed(), feed.getName())) {
+ inputFeeds.add(input);
+ }
+ }
+ }
+
+ // for each input corresponding to given feed, find corresponding consumer instances
+ for (Input in : inputFeeds) {
+ /* Algorithm for finding a consumer instance for an input feed instance
+ Step 1. Find one instance which will consume the given feed instance.
+ a. take process start date and find last input feed instance time. In this step take care of
+ frequencies being out of sync.
+ b. using the above find the time difference between the process instance and feed instance.
+ c. Adding the above time difference to given feed instance for which we want to find the consumer
+ instances we will get one consumer process instance.
+ Step 2. Keep checking for next instances of process till they consume the given feed Instance.
+ Step 3. Similarly check for all previous instances of process till they consume the given feed instance.
+ */
+
+ // Step 1.a & 1.b
+ ExpressionHelper.setReferenceDate(processStartDate);
+ ExpressionHelper evaluator = ExpressionHelper.get();
+ Date startRelative = evaluator.evaluate(in.getStart(), Date.class);
+ Date startTimeActual = EntityUtil.getNextStartTime(feedStartDate,
+ feed.getFrequency(), feed.getTimezone(), startRelative);
+ Long offset = processStartDate.getTime() - startTimeActual.getTime();
+
+ // Step 1.c
+ Date processInstanceStartRelative = new Date(feedInstancetime.getTime() + offset);
+ Date processInstanceStartActual = EntityUtil.getPreviousInstanceTime(processStartDate,
+ consumer.getFrequency(), consumer.getTimezone(), processInstanceStartRelative);
+
+
+ // Step 2.
+ Date currentInstance = processInstanceStartActual;
+ while (true) {
+ Date nextConsumerInstance = EntityUtil.getNextStartTime(processStartDate,
+ consumer.getFrequency(), consumer.getTimezone(), currentInstance);
+
+ ExpressionHelper.setReferenceDate(nextConsumerInstance);
+ evaluator = ExpressionHelper.get();
+ Long rangeStart = evaluator.evaluate(in.getStart(), Date.class).getTime();
+ Long rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime();
+ if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() < rangeEnd) {
+ result.add(nextConsumerInstance);
+ } else {
+ break;
+ }
+ currentInstance = new Date(nextConsumerInstance.getTime() + ONE_MS);
+ }
+
+ // Step 3.
+ currentInstance = processInstanceStartActual;
+ while (true) {
+ Date nextConsumerInstance = EntityUtil.getPreviousInstanceTime(processStartDate,
+ consumer.getFrequency(), consumer.getTimezone(), currentInstance);
+
+ ExpressionHelper.setReferenceDate(nextConsumerInstance);
+ evaluator = ExpressionHelper.get();
+ Long rangeStart = evaluator.evaluate(in.getStart(), Date.class).getTime();
+ Long rangeEnd = evaluator.evaluate(in.getEnd(), Date.class).getTime();
+ if (rangeStart <= feedInstancetime.getTime() && feedInstancetime.getTime() < rangeEnd) {
+ result.add(nextConsumerInstance);
+ } else {
+ break;
+ }
+ currentInstance = new Date(nextConsumerInstance.getTime() - ONE_MS);
+ }
+ }
+ return result;
+ }
+
public static FeedInstanceResult getFeedInstanceListing(Entity entityObject,
Date start, Date end) throws FalconException {
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entityObject);
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index 29aefa0..fe78bc8 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -20,12 +20,20 @@ package org.apache.falcon.entity;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.resource.SchedulableEntityInstance;
+
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
/**
* Helper methods for accessing process members.
@@ -34,6 +42,7 @@ public final class ProcessHelper {
private ProcessHelper() {}
+
public static Cluster getCluster(Process process, String clusterName) {
for (Cluster cluster : process.getClusters().getClusters()) {
if (cluster.getName().equals(clusterName)) {
@@ -77,4 +86,103 @@ public final class ProcessHelper {
return storageType;
}
+
+ private static void validateProcessInstance(Process process, Date instanceTime,
+ org.apache.falcon.entity.v0.cluster.Cluster cluster) {
+ //validate the cluster
+ Cluster processCluster = getCluster(process, cluster.getName());
+ if (processCluster == null) {
+ throw new IllegalArgumentException("Cluster provided: " + cluster.getName()
+ + " is not a valid cluster for the process: " + process.getName());
+ }
+
+ // check if instanceTime is in validity range
+ if (instanceTime.before(processCluster.getValidity().getStart())
+ || instanceTime.after(processCluster.getValidity().getEnd())) {
+ throw new IllegalArgumentException("Instance time provided: " + instanceTime
+ + " is not in validity range of process: " + process.getName()
+ + "on cluster: " + cluster.getName());
+ }
+
+ // check instanceTime is valid on the basis of startTime and frequency
+ Date nextInstance = EntityUtil.getNextStartTime(processCluster.getValidity().getStart(),
+ process.getFrequency(), process.getTimezone(), instanceTime);
+ if (!nextInstance.equals(instanceTime)) {
+ throw new IllegalArgumentException("Instance time provided: " + instanceTime
+ + " for process: " + process.getName() + " is not a valid instance time on cluster: "
+ + cluster.getName() + " on the basis of startDate and frequency");
+ }
+ }
+
+ /**
+ * Given a process instance, returns the feed instances which are used as input for this process instance.
+ *
+ * @param process given process
+ * @param instanceTime nominal time of the process instance
+ * @param cluster - cluster for the process instance
+ * @param allowOptionalFeeds switch to indicate whether optional feeds should be considered in input feeds.
+ * @return Set of input feed instances which are consumed by the given process instance.
+ * @throws org.apache.falcon.FalconException
+ */
+ public static Set<SchedulableEntityInstance> getInputFeedInstances(Process process, Date instanceTime,
+ org.apache.falcon.entity.v0.cluster.Cluster cluster, boolean allowOptionalFeeds) throws FalconException {
+
+ // validate the inputs
+ validateProcessInstance(process, instanceTime, cluster);
+
+ Set<SchedulableEntityInstance> result = new HashSet<>();
+ if (process.getInputs() != null) {
+ ConfigurationStore store = ConfigurationStore.get();
+ for (Input i : process.getInputs().getInputs()) {
+ if (i.isOptional() && !allowOptionalFeeds) {
+ continue;
+ }
+ Feed feed = store.get(EntityType.FEED, i.getFeed());
+ // inputStart is process instance time + (now - startTime)
+ ExpressionHelper evaluator = ExpressionHelper.get();
+ ExpressionHelper.setReferenceDate(instanceTime);
+ Date inputInstanceStartDate = evaluator.evaluate(i.getStart(), Date.class);
+ Date inputInstanceEndDate = evaluator.evaluate(i.getEnd(), Date.class);
+ List<Date> instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(),
+ inputInstanceStartDate, inputInstanceEndDate);
+ SchedulableEntityInstance instance;
+ for (Date time : instanceTimes) {
+ instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), time, EntityType.FEED);
+ instance.setTag(SchedulableEntityInstance.INPUT);
+ result.add(instance);
+ }
+ }
+ }
+ return result;
+ }
+
+ public static Set<SchedulableEntityInstance> getOutputFeedInstances(Process process, Date instanceTime,
+ org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
+ Set<SchedulableEntityInstance> result = new HashSet<>();
+
+ // validate the inputs
+ validateProcessInstance(process, instanceTime, cluster);
+
+ if (process.getOutputs() != null && process.getOutputs().getOutputs() != null) {
+
+ ExpressionHelper.setReferenceDate(instanceTime);
+ ExpressionHelper evaluator = ExpressionHelper.get();
+ SchedulableEntityInstance candidate;
+ ConfigurationStore store = ConfigurationStore.get();
+ for (Output output : process.getOutputs().getOutputs()) {
+
+ Date outputInstance = evaluator.evaluate(output.getInstance(), Date.class);
+ // find the feed
+ Feed feed = store.get(EntityType.FEED, output.getFeed());
+ org.apache.falcon.entity.v0.feed.Cluster fCluster = FeedHelper.getCluster(feed, cluster.getName());
+ outputInstance = EntityUtil.getNextStartTime(fCluster.getValidity().getStart(), feed.getFrequency(),
+ feed.getTimezone(), outputInstance);
+ candidate = new SchedulableEntityInstance(output.getFeed(), cluster.getName(), outputInstance,
+ EntityType.FEED);
+ candidate.setTag(SchedulableEntityInstance.OUTPUT);
+ result.add(candidate);
+ }
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
index 266d029..f70edfb 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
@@ -18,24 +18,58 @@
package org.apache.falcon.entity;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Properties;
import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.entity.v0.feed.*;
+import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.Validity;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Inputs;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Outputs;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.resource.SchedulableEntityInstance;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
import java.util.TimeZone;
/**
* Test for feed helper methods.
*/
-public class FeedHelperTest {
- public static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+public class FeedHelperTest extends AbstractTestBase {
+ private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+ private ConfigurationStore store;
+
+ @BeforeClass
+ public void init() throws Exception {
+ initConfigStore();
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ cleanupStore();
+ store = getStore();
+ }
@Test
public void testPartitionExpression() {
@@ -107,4 +141,334 @@ public class FeedHelperTest {
locations.getLocations());
Assert.assertEquals(FeedHelper.getLocation(feed, cluster, LocationType.DATA), location2);
}
+
+ @Test
+ public void testGetProducerProcessWithOffset() throws FalconException, ParseException {
+ //create a feed, submit it, test that ProducerProcess is null
+
+ Cluster cluster = publishCluster();
+ Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Assert.assertNull(FeedHelper.getProducerProcess(feed));
+
+ // create it's producer process submit it, test it's ProducerProcess
+ Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Outputs outputs = new Outputs();
+ Output outFeed = new Output();
+ outFeed.setName("outputFeed");
+ outFeed.setFeed(feed.getName());
+ outFeed.setInstance("today(0,0)");
+ outputs.getOutputs().add(outFeed);
+ process.setOutputs(outputs);
+ store.publish(EntityType.PROCESS, process);
+
+ Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
+ SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-27 10:00 UTC"),
+ cluster);
+ SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+ getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
+ expected.setTag(SchedulableEntityInstance.OUTPUT);
+ Assert.assertEquals(result, expected);
+ }
+
+ @Test
+ public void testGetProducerProcessForNow() throws FalconException, ParseException {
+ Cluster cluster = publishCluster();
+ Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Assert.assertNull(FeedHelper.getProducerProcess(feed));
+
+ // create it's producer process submit it, test it's ProducerProcess
+ Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Outputs outputs = new Outputs();
+ Output outFeed = new Output();
+ outFeed.setName("outputFeed");
+ outFeed.setFeed(feed.getName());
+ outFeed.setInstance("now(0,0)");
+ outputs.getOutputs().add(outFeed);
+ process.setOutputs(outputs);
+ store.publish(EntityType.PROCESS, process);
+
+ Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
+ SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 10:00 UTC"),
+ cluster);
+ SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+ getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
+ expected.setTag(SchedulableEntityInstance.OUTPUT);
+ Assert.assertEquals(result, expected);
+ }
+
+ @Test
+ public void testGetProducerWithNowNegativeOffset() throws FalconException, ParseException {
+ Cluster cluster = publishCluster();
+ Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Assert.assertNull(FeedHelper.getProducerProcess(feed));
+
+ // create it's producer process submit it, test it's ProducerProcess
+ Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Outputs outputs = new Outputs();
+ Output outFeed = new Output();
+ outFeed.setName("outputFeed");
+ outFeed.setFeed(feed.getName());
+ outFeed.setInstance("now(-4,0)");
+ outputs.getOutputs().add(outFeed);
+ process.setOutputs(outputs);
+ store.publish(EntityType.PROCESS, process);
+
+ Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
+ SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-27 10:00 UTC"),
+ cluster);
+ SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+ getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
+ expected.setTag(SchedulableEntityInstance.OUTPUT);
+ Assert.assertEquals(result, expected);
+ }
+
+
+ @Test
+ public void testGetProducerWithNowPositiveOffset() throws FalconException, ParseException {
+ Cluster cluster = publishCluster();
+ Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Assert.assertNull(FeedHelper.getProducerProcess(feed));
+
+ // create it's producer process submit it, test it's ProducerProcess
+ Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Outputs outputs = new Outputs();
+ Output outFeed = new Output();
+ outFeed.setName("outputFeed");
+ outFeed.setFeed(feed.getName());
+ outFeed.setInstance("now(4,0)");
+ outputs.getOutputs().add(outFeed);
+ process.setOutputs(outputs);
+ store.publish(EntityType.PROCESS, process);
+
+ Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
+ SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 10:00 UTC"),
+ cluster);
+ SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+ getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
+ expected.setTag(SchedulableEntityInstance.OUTPUT);
+ Assert.assertEquals(result, expected);
+ }
+
+ @Test
+ public void testGetProducerProcessInstance() throws FalconException, ParseException {
+ //create a feed, submit it, test that ProducerProcess is null
+ Cluster cluster = publishCluster();
+ Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 00:00 UTC", "2016-02-28 10:00 UTC");
+
+ // create it's producer process submit it, test it's ProducerProcess
+ Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Outputs outputs = new Outputs();
+ Output outFeed = new Output();
+ outFeed.setName("outputFeed");
+ outFeed.setFeed(feed.getName());
+ outFeed.setInstance("today(0,0)");
+ outputs.getOutputs().add(outFeed);
+ process.setOutputs(outputs);
+ store.publish(EntityType.PROCESS, process);
+ Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName());
+ SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 00:00 UTC"),
+ cluster);
+ SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+ getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS);
+ expected.setTag(SchedulableEntityInstance.OUTPUT);
+ Assert.assertEquals(result, expected);
+ }
+
+ @Test
+ public void testGetConsumerProcesses() throws FalconException, ParseException {
+ //create a feed, submit it, test that ConsumerProcesses is blank list
+ Cluster cluster = publishCluster();
+ Feed feed = publishFeed(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+
+ //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
+ Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Inputs inputs = new Inputs();
+ Input inFeed = new Input();
+ inFeed.setName("outputFeed");
+ inFeed.setFeed(feed.getName());
+ inFeed.setStart("today(0,0)");
+ inFeed.setEnd("today(0,0)");
+ inputs.getInputs().add(inFeed);
+ process.setInputs(inputs);
+ store.publish(EntityType.PROCESS, process);
+
+ Set<Process> result = FeedHelper.getConsumerProcesses(feed);
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertTrue(result.contains(process));
+ }
+
+ @Test
+ public void testGetConsumerProcessInstances() throws Exception {
+ //create a feed, submit it, test that ConsumerProcesses is blank list
+ Cluster cluster = publishCluster();
+ Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
+
+ //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
+ Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Inputs inputs = new Inputs();
+ Input inFeed = new Input();
+ inFeed.setName("inputFeed");
+ inFeed.setFeed(feed.getName());
+ inFeed.setStart("now(-4, 30)");
+ inFeed.setEnd("now(4, 30)");
+ inputs.getInputs().add(inFeed);
+ process.setInputs(inputs);
+ store.publish(EntityType.PROCESS, process);
+
+ Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
+ getDate("2012-02-28 09:00 UTC"), cluster);
+ Assert.assertEquals(result.size(), 1);
+
+ Set<SchedulableEntityInstance> expected = new HashSet<>();
+ SchedulableEntityInstance ins = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+ getDate("2012-02-28 10:00 UTC"), EntityType.PROCESS);
+ ins.setTag(SchedulableEntityInstance.INPUT);
+ expected.add(ins);
+ Assert.assertEquals(result, expected);
+
+ }
+
+ @Test
+ public void testGetMultipleConsumerInstances() throws Exception {
+ Cluster cluster = publishCluster();
+ Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
+
+ //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
+ Process process = prepareProcess(cluster, "hours(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Inputs inputs = new Inputs();
+ Input inFeed = new Input();
+ inFeed.setName("inputFeed");
+ inFeed.setFeed(feed.getName());
+ inFeed.setStart("now(-4, 30)");
+ inFeed.setEnd("now(4, 30)");
+ inputs.getInputs().add(inFeed);
+ process.setInputs(inputs);
+ store.publish(EntityType.PROCESS, process);
+
+ Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
+ getDate("2012-02-28 09:00 UTC"), cluster);
+ Assert.assertEquals(result.size(), 8);
+
+ Set<SchedulableEntityInstance> expected = new HashSet<>();
+ String[] consumers = { "2012-02-28 05:00 UTC", "2012-02-28 06:00 UTC", "2012-02-28 07:00 UTC",
+ "2012-02-28 08:00 UTC", "2012-02-28 09:00 UTC", "2012-02-28 10:00 UTC", "2012-02-28 11:00 UTC",
+ "2012-02-28 12:00 UTC", };
+ for (String d : consumers) {
+ SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(),
+ getDate(d), EntityType.PROCESS);
+ i.setTag(SchedulableEntityInstance.INPUT);
+ expected.add(i);
+ }
+ Assert.assertEquals(result, expected);
+ }
+
+ @Test
+ public void testGetConsumerWithNow() throws Exception {
+ Cluster cluster = publishCluster();
+ Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
+
+ //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
+ Process process = prepareProcess(cluster, "hours(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Inputs inputs = new Inputs();
+ Input inFeed = new Input();
+ inFeed.setName("inputFeed");
+ inFeed.setFeed(feed.getName());
+ inFeed.setStart("today(0, 0)");
+ inFeed.setEnd("now(0, 0)");
+ inputs.getInputs().add(inFeed);
+ process.setInputs(inputs);
+ store.publish(EntityType.PROCESS, process);
+
+ Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
+ getDate("2012-02-28 00:00 UTC"), cluster);
+ Assert.assertEquals(result.size(), 23);
+ }
+
+ @Test
+ public void testGetConsumerWithLatest() throws Exception {
+ Cluster cluster = publishCluster();
+ Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC");
+
+ //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses
+ Process process = prepareProcess(cluster, "hours(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Inputs inputs = new Inputs();
+ Input inFeed = new Input();
+ inFeed.setName("inputFeed");
+ inFeed.setFeed(feed.getName());
+ inFeed.setStart("today(0, 0)");
+ inFeed.setEnd("latest(0)");
+ inputs.getInputs().add(inFeed);
+ process.setInputs(inputs);
+ store.publish(EntityType.PROCESS, process);
+
+ Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed,
+ getDate("2012-02-28 00:00 UTC"), cluster);
+ System.out.println("result.size() = " + result.size());
+ Assert.assertEquals(result.size(), 23);
+ }
+
+ private Validity getFeedValidity(String start, String end) throws ParseException {
+ Validity validity = new Validity();
+ validity.setStart(getDate(start));
+ validity.setEnd(getDate(end));
+ return validity;
+ }
+
+ private org.apache.falcon.entity.v0.process.Validity getProcessValidity(String start, String end) throws
+ ParseException {
+
+ org.apache.falcon.entity.v0.process.Validity validity = new org.apache.falcon.entity.v0.process.Validity();
+ validity.setStart(getDate(start));
+ validity.setEnd(getDate(end));
+ return validity;
+ }
+
+ private Date getDate(String dateString) throws ParseException {
+ DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm Z");
+ return format.parse(dateString);
+ }
+
+ private Cluster publishCluster() throws FalconException {
+ Cluster cluster = new Cluster();
+ cluster.setName("feedCluster");
+ cluster.setColo("colo");
+ store.publish(EntityType.CLUSTER, cluster);
+ return cluster;
+
+ }
+
+ private Feed publishFeed(Cluster cluster, String frequency, String start, String end)
+ throws FalconException, ParseException {
+
+ Feed feed = new Feed();
+ feed.setName("feed");
+ Frequency f = new Frequency(frequency);
+ feed.setFrequency(f);
+ feed.setTimezone(UTC);
+ Clusters fClusters = new Clusters();
+ org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster();
+ fCluster.setName(cluster.getName());
+ fCluster.setValidity(getFeedValidity(start, end));
+ fClusters.getClusters().add(fCluster);
+ feed.setClusters(fClusters);
+ store.publish(EntityType.FEED, feed);
+
+ return feed;
+ }
+
+ private Process prepareProcess(Cluster cluster, String frequency, String start, String end) throws ParseException {
+ Process process = new Process();
+ process.setName("process");
+ process.setTimezone(UTC);
+ org.apache.falcon.entity.v0.process.Clusters pClusters = new org.apache.falcon.entity.v0.process.Clusters();
+ org.apache.falcon.entity.v0.process.Cluster pCluster = new org.apache.falcon.entity.v0.process.Cluster();
+ pCluster.setName(cluster.getName());
+ org.apache.falcon.entity.v0.process.Validity validity = getProcessValidity(start, end);
+ pCluster.setValidity(validity);
+ pClusters.getClusters().add(pCluster);
+ process.setClusters(pClusters);
+ Frequency f = new Frequency(frequency);
+ process.setFrequency(f);
+ return process;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java b/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java
new file mode 100644
index 0000000..0d396ae
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Inputs;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Outputs;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.resource.SchedulableEntityInstance;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TimeZone;
+
+
+/**
+ * Tests for ProcessHelper methods.
+ */
+public class ProcessHelperTest extends AbstractTestBase {
+ private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+ private ConfigurationStore store;
+
+ @BeforeClass
+ public void init() throws Exception {
+ initConfigStore();
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ cleanupStore();
+ store = ConfigurationStore.get();
+ }
+
+ @Test
+ public void testGetInputFeedInstances() throws FalconException, ParseException {
+ // create a process with input feeds
+ Cluster cluster = publishCluster();
+ Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC");
+
+ // find the input Feed instances time
+ Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Inputs inputs = new Inputs();
+ Input input = getInput("inputFeed", feed.getName(), "today(0,-30)", "today(2,30)", false);
+ inputs.getInputs().add(input);
+ process.setInputs(inputs);
+ store.publish(EntityType.PROCESS, process);
+
+ Date processInstanceDate = getDate("2012-02-28 10:00 UTC");
+ Set<SchedulableEntityInstance> inputFeedInstances = ProcessHelper.getInputFeedInstances(process,
+ processInstanceDate, cluster, false);
+ Assert.assertEquals(inputFeedInstances.size(), 3);
+
+ Set<SchedulableEntityInstance> expectedInputFeedInstances = new HashSet<>();
+ SchedulableEntityInstance instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(),
+ getDate("2012-02-28 00:00 UTC"), EntityType.FEED);
+ instance.setTag(SchedulableEntityInstance.INPUT);
+ expectedInputFeedInstances.add(instance);
+ instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), getDate("2012-02-28 01:00 UTC"),
+ EntityType.FEED);
+ instance.setTag(SchedulableEntityInstance.INPUT);
+ expectedInputFeedInstances.add(instance);
+ instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), getDate("2012-02-28 02:00 UTC"),
+ EntityType.FEED);
+ instance.setTag(SchedulableEntityInstance.INPUT);
+ expectedInputFeedInstances.add(instance);
+
+ //Validate with expected result
+ Assert.assertTrue(inputFeedInstances.equals(expectedInputFeedInstances));
+ }
+
+ @Test
+ public void testGetOutputFeedInstances() throws FalconException, ParseException {
+ // create a process with input feeds
+ Cluster cluster = publishCluster();
+ Feed feed = publishFeed(cluster, "days(1)", "2012-02-27 11:00 UTC", "2016-02-28 11:00 UTC");
+ Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
+ Outputs outputs = new Outputs();
+ outputs.getOutputs().add(getOutput("outputFeed", feed.getName(), "now(0,0)"));
+ process.setOutputs(outputs);
+ store.publish(EntityType.PROCESS, process);
+
+ Set<SchedulableEntityInstance> result = ProcessHelper.getOutputFeedInstances(process,
+ getDate("2012-02-28 10:00 UTC"), cluster);
+
+ Set<SchedulableEntityInstance> expected = new HashSet<>();
+ SchedulableEntityInstance ins = new SchedulableEntityInstance(feed.getName(), cluster.getName(),
+ getDate("2012-02-28 11:00 UTC"), EntityType.FEED);
+ ins.setTag(SchedulableEntityInstance.OUTPUT);
+ expected.add(ins);
+
+ Assert.assertEquals(result, expected);
+
+ }
+
+ private org.apache.falcon.entity.v0.process.Validity getProcessValidity(String start, String end) throws
+ ParseException {
+
+ org.apache.falcon.entity.v0.process.Validity validity = new org.apache.falcon.entity.v0.process.Validity();
+ validity.setStart(getDate(start));
+ validity.setEnd(getDate(end));
+ return validity;
+ }
+
+ private Date getDate(String dateString) throws ParseException {
+ return new SimpleDateFormat("yyyy-MM-dd HH:mm Z").parse(dateString);
+ }
+
+ private org.apache.falcon.entity.v0.feed.Validity getFeedValidity(String start, String end) throws ParseException {
+ org.apache.falcon.entity.v0.feed.Validity validity = new org.apache.falcon.entity.v0.feed.Validity();
+ validity.setStart(getDate(start));
+ validity.setEnd(getDate(end));
+ return validity;
+ }
+
+ private Input getInput(String name, String feedName, String start, String end, boolean isOptional) {
+ Input inFeed = new Input();
+ inFeed.setName(name);
+ inFeed.setFeed(feedName);
+ inFeed.setStart(start);
+ inFeed.setEnd(end);
+ inFeed.setOptional(isOptional);
+ return inFeed;
+ }
+
+ private Output getOutput(String name, String feedName, String instance) {
+ Output output = new Output();
+ output.setInstance(instance);
+ output.setFeed(feedName);
+ output.setName(name);
+ return output;
+ }
+
+ private Cluster publishCluster() throws FalconException {
+ Cluster cluster = new Cluster();
+ cluster.setName("feedCluster");
+ cluster.setColo("colo");
+ store.publish(EntityType.CLUSTER, cluster);
+ return cluster;
+
+ }
+
+ private Feed publishFeed(Cluster cluster, String frequency, String start, String end)
+ throws FalconException, ParseException {
+
+ Feed feed = new Feed();
+ feed.setName("feed");
+ Frequency f = new Frequency(frequency);
+ feed.setFrequency(f);
+ feed.setTimezone(UTC);
+ Clusters fClusters = new Clusters();
+ org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster();
+ fCluster.setName(cluster.getName());
+ fCluster.setValidity(getFeedValidity(start, end));
+ fClusters.getClusters().add(fCluster);
+ feed.setClusters(fClusters);
+ store.publish(EntityType.FEED, feed);
+
+ return feed;
+ }
+
+ private Process prepareProcess(Cluster cluster, String frequency, String start, String end) throws ParseException {
+ Process process = new Process();
+ process.setName("process");
+ process.setTimezone(UTC);
+ org.apache.falcon.entity.v0.process.Clusters pClusters = new org.apache.falcon.entity.v0.process.Clusters();
+ org.apache.falcon.entity.v0.process.Cluster pCluster = new org.apache.falcon.entity.v0.process.Cluster();
+ pCluster.setName(cluster.getName());
+ org.apache.falcon.entity.v0.process.Validity validity = getProcessValidity(start, end);
+ pCluster.setValidity(validity);
+ pClusters.getClusters().add(pCluster);
+ process.setClusters(pClusters);
+ Frequency f = new Frequency(frequency);
+ process.setFrequency(f);
+ return process;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index e447915..50dce84 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -280,6 +280,41 @@ Usage:
$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -params -start "yyyy-MM-dd'T'HH:mm'Z'"
+
+---+++Dependency
+Display the dependent instances which are dependent on the given instance. For example for a given process instance it will
+list all the input feed instances(if any) and the output feed instances(if any).
+
+An example use case of this command is as follows:
+Suppose you find out that the data in a feed instance was incorrect and you need to figure out which all process instances
+consumed this feed instance so that you can reprocess them after correcting the feed instance. You can give the feed instance
+and it will tell you which process instance produced this feed and which all process instances consumed this feed.
+
+NOTE:
+1. instanceTime must be a valid instanceTime e.g. instanceTime of a feed should be in it's validity range on applicable clusters,
+ and it should be in the range of instances produced by the producer process(if any)
+
+2. For processes with inputs like latest() which vary with time the results are not guaranteed to be correct.
+
+Usage:
+$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -params -instanceTime "yyyy-MM-dd'T'HH:mm'Z'"
+
+For example:
+$FALCON_HOME/bin/falcon instance -dependency -type feed -name out -instanceTime 2014-12-15T00:00Z
+name: producer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:00Z, tag: Output
+name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:03Z, tag: Input
+name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:04Z, tag: Input
+name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:02Z, tag: Input
+name: consumer, type: PROCESS, cluster: local, instanceTime: 2014-12-15T00:05Z, tag: Input
+
+
+Response: default/Success!
+
+Request Id: default/1125035965@qtp-503156953-7 - 447be0ad-1d38-4dce-b438-20f3de69b172
+
+
+<a href="./Restapi/InstanceDependency.html">Optional params described here.</a>
+
---++ Metadata Lineage Options
---+++Lineage
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/docs/src/site/twiki/restapi/InstanceDependency.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceDependency.twiki b/docs/src/site/twiki/restapi/InstanceDependency.twiki
new file mode 100644
index 0000000..dc452de
--- /dev/null
+++ b/docs/src/site/twiki/restapi/InstanceDependency.twiki
@@ -0,0 +1,49 @@
+---++ GET /api/instance/dependency/:entity-type/:entity-name
+ * <a href="#Description">Description</a>
+ * <a href="#Parameters">Parameters</a>
+ * <a href="#Results">Results</a>
+ * <a href="#Examples">Examples</a>
+
+---++ Description
+Get dependent instances for a particular instance.
+
+---++ Parameters
+ * :entity-type Valid options are feed or process.
+ * :entity-name Name of the entity
+ * instanceTime <mandatory param> time of the given instance
+ * colo <optional param> name of the colo
+
+
+---++ Results
+Dependent instances for the specified instance
+
+---++ Examples
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/instance/dependency/feed/myFeed?colo=*&instanceTime=2012-04-03T07:00Z
+</verbatim>
+---+++ Result
+<verbatim>
+{
+ 'status': 'SUCCEEDED',
+ 'message': 'default/Success!\n',
+ 'dependencies': [
+ {
+ 'cluster': 'local',
+ 'entityName': 'consumer-process',
+ 'entityType': 'PROCESS',
+ 'instanceTime': '2014-12-18T00:00Z',
+ 'tags': 'Input'
+ },
+ {
+ 'cluster': 'local',
+ 'entityName': 'producer-process',
+ 'entityType': 'PROCESS',
+ 'instanceTime': '2014-12-18T00:00Z',
+ 'tags': 'Output'
+ }
+ ],
+ 'requestId': 'default/1405883107@qtp-1501726962-6-0c2e690f-546b-47b0-a5ee-0365d4522a31\n'
+}
+</verbatim>
+
http://git-wip-us.apache.org/repos/asf/falcon/blob/9fd86b78/docs/src/site/twiki/restapi/ResourceList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki
index 060e0af..49dddb7 100644
--- a/docs/src/site/twiki/restapi/ResourceList.twiki
+++ b/docs/src/site/twiki/restapi/ResourceList.twiki
@@ -67,6 +67,7 @@ See also: [[../Security.twiki][Security in Falcon]]
| POST | [[InstanceRerun][api/instance/rerun/:entity-type/:entity-name]] | Rerun a given instance |
| GET | [[InstanceLogs][api/instance/logs/:entity-type/:entity-name]] | Get logs of a given instance |
| GET | [[InstanceSummary][api/instance/summary/:entity-type/:entity-name]] | Return summary of instances for an entity |
+| GET | [[InstanceDependency][api/instance/dependency/:entity-type/:entity-name]] | Return dependent instances for a given instance |
---++ REST Call on Metadata Lineage Resource