You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/06/19 12:00:44 UTC

git commit: FALCON-263 API to get workflow parameters. Contributed by pavan kumar kolamuri

Repository: incubator-falcon
Updated Branches:
  refs/heads/master b68946a02 -> 1be98326b


FALCON-263 API to get workflow parameters. Contributed by pavan kumar kolamuri


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/1be98326
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/1be98326
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/1be98326

Branch: refs/heads/master
Commit: 1be98326bd6ce1658bf8a9db592bd9b6a768e8b2
Parents: b68946a
Author: Shwetha GS <sh...@inmobi.com>
Authored: Thu Jun 19 15:30:36 2014 +0530
Committer: Shwetha GS <sh...@inmobi.com>
Committed: Thu Jun 19 15:30:36 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../java/org/apache/falcon/cli/FalconCLI.java   | 15 ++++++--
 .../org/apache/falcon/client/FalconClient.java  | 23 +++++++++++-
 .../apache/falcon/resource/InstancesResult.java |  7 ++++
 .../workflow/engine/AbstractWorkflowEngine.java |  3 ++
 docs/src/site/twiki/FalconCLI.twiki             |  7 ++++
 .../workflow/engine/OozieWorkflowEngine.java    | 38 ++++++++++++++++----
 .../resource/AbstractInstanceManager.java       | 24 +++++++++++++
 .../resource/proxy/InstanceManagerProxy.java    | 21 +++++++++++
 .../apache/falcon/resource/InstanceManager.java | 14 ++++++++
 .../java/org/apache/falcon/cli/FalconCLIIT.java | 10 ++++++
 11 files changed, 153 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0b007e2..f72c1c4 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+   FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
 
   IMPROVEMENTS
    FALCON-280 Validate the ACL in Feed entity with the user submitting the entity

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index c51c3c0..753c39f 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -93,6 +93,7 @@ public class FalconCLI {
     public static final String CURRENT_COLO = "current.colo";
     public static final String CLIENT_PROPERTIES = "/client.properties";
     public static final String LIFECYCLE_OPT = "lifecycle";
+    public static final String PARARMS_OPT = "params";
 
     // Graph Commands
     public static final String GRAPH_CMD = "graph";
@@ -241,6 +242,9 @@ public class FalconCLI {
             result = client.rerunInstances(type, entity, start, end, colo, clusters, sourceClusters, lifeCycles);
         } else if (optionsList.contains(LOG_OPT)) {
             result = client.getLogsOfInstances(type, entity, start, end, colo, runid, lifeCycles);
+        } else if (optionsList.contains(PARARMS_OPT)) {
+            // start time is the nominal time of instance
+            result = client.getParamsOfInstance(type, entity, start, colo, clusters, sourceClusters, lifeCycles);
         } else {
             throw new FalconCLIException("Invalid command");
         }
@@ -548,6 +552,12 @@ public class FalconCLI {
                 "Logs print the logs for process instances for a given process in "
                         + "the range start time and optional end time");
 
+        Option params = new Option(
+                PARARMS_OPT,
+                false,
+                "Displays the workflow parameters for a given instance of specified nominal time");
+
+
         OptionGroup group = new OptionGroup();
         group.addOption(running);
         group.addOption(status);
@@ -559,10 +569,12 @@ public class FalconCLI {
         group.addOption(rerun);
         group.addOption(logs);
         group.addOption(continues);
+        group.addOption(params);
 
         Option url = new Option(URL_OPTION, true, "Falcon URL");
         Option start = new Option(START_OPT, true,
-                "Start time is required for commands, status, kill, suspend, resume and re-run");
+                "Start time is required for commands, status, kill, suspend, resume and re-run"
+                        + "and it is nominal time while displaying workflow params");
         Option end = new Option(
                 END_OPT,
                 true,
@@ -592,7 +604,6 @@ public class FalconCLI {
                 "describes life cycle of entity , for feed it can be replication/retention "
                        + "and for process it can be execution");
 
-
         instanceOptions.addOption(url);
         instanceOptions.addOptionGroup(group);
         instanceOptions.addOption(start);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index beecc0f..ed0a0ba 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -201,7 +201,8 @@ public class FalconClient {
         RESUME("api/instance/resume/", HttpMethod.POST, MediaType.APPLICATION_JSON),
         RERUN("api/instance/rerun/", HttpMethod.POST, MediaType.APPLICATION_JSON),
         LOG("api/instance/logs/", HttpMethod.GET, MediaType.APPLICATION_JSON),
-        SUMMARY("api/instance/summary/", HttpMethod.GET, MediaType.APPLICATION_JSON);
+        SUMMARY("api/instance/summary/", HttpMethod.GET, MediaType.APPLICATION_JSON),
+        PARAMS("api/instance/params/", HttpMethod.GET, MediaType.APPLICATION_JSON);
 
         private String path;
         private String method;
@@ -424,6 +425,16 @@ public class FalconClient {
         return sendInstanceRequest(Instances.LOG, type, entity, start, end,
                 null, runId, colo, lifeCycles);
     }
+
+    public String getParamsOfInstance(String type, String entity,
+                                      String start, String colo,
+                                      String clusters, String sourceClusters,
+                                      List<LifeCycle> lifeCycles)
+        throws FalconCLIException, UnsupportedEncodingException {
+
+        return sendInstanceRequest(Instances.PARAMS, type, entity,
+                start, null, null, null, colo, lifeCycles);
+    }
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     public String getThreadDump() throws FalconCLIException {
@@ -611,6 +622,7 @@ public class FalconClient {
         }
 
     }
+
     //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
 
     private void checkLifeCycleOption(List<LifeCycle> lifeCycles, String type) throws FalconCLIException {
@@ -742,6 +754,15 @@ public class FalconClient {
                 toAppend = instance.getLogFile() != null ? instance.getLogFile() : "-";
                 sb.append(toAppend).append("\n");
 
+                if (instance.getWfParams() != null) {
+                    Map<String, String> props = instance.getWfParams();
+                    sb.append("Workflow params").append("\n");
+                    for (Map.Entry<String, String> entry : props.entrySet()) {
+                        sb.append(entry.getKey()).append("=").append(entry.getValue()).append("\n");
+                    }
+                    sb.append("\n");
+                }
+
             }
         }
         sb.append("\nAdditional Information:\n");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
index da0ccc5..c3c93f2 100644
--- a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
@@ -19,8 +19,10 @@
 package org.apache.falcon.resource;
 
 import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlElementWrapper;
 import javax.xml.bind.annotation.XmlRootElement;
 import java.util.Date;
+import java.util.Map;
 
 /**
  * Pojo for JAXB marshalling / unmarshalling.
@@ -99,6 +101,9 @@ public class InstancesResult extends APIResult {
         @XmlElement
         public InstanceAction[] actions;
 
+        @XmlElementWrapper(name="params")
+        public Map<String, String> wfParams;
+
         public Instance() {
         }
 
@@ -144,6 +149,8 @@ public class InstancesResult extends APIResult {
             return details;
         }
 
+        public Map<String, String> getWfParams() { return wfParams; }
+
 
         @Override
         public String toString() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index c28cb03..eedd81f 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -94,4 +94,7 @@ public abstract class AbstractWorkflowEngine {
     public abstract Properties getWorkflowProperties(String cluster, String jobId) throws FalconException;
 
     public abstract InstancesResult getJobDetails(String cluster, String jobId) throws FalconException;
+
+    public abstract InstancesResult getInstanceParams(Entity entity, Date start, Date end,
+                                                      List<LifeCycle> lifeCycles) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index dbb6981..2c5d9c5 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -172,6 +172,13 @@ This can be used with instance management options. Default values are replicatio
 Usage:
 $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -status -lifecycle <<lifecycletype>> -start "yyyy-MM-dd'T'HH:mm'Z'" -end "yyyy-MM-dd'T'HH:mm'Z'"
 
+---+++Params
+
+Displays the workflow params of a given instance. Where start time is considered as nominal time of that instance.
+
+Usage:
+$FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -params -start "yyyy-MM-dd'T'HH:mm'Z'"
+
 
 ---++ Graphs Options
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 7166b13..eb0c213 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -44,6 +44,7 @@ import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.workflow.OozieWorkflowBuilder;
 import org.apache.falcon.workflow.WorkflowBuilder;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.client.BundleJob;
@@ -60,6 +61,7 @@ import org.apache.oozie.client.rest.RestConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -481,8 +483,14 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return doSummaryJobAction(entity, start, end, null, lifeCycles);
     }
 
+    @Override
+    public InstancesResult getInstanceParams(Entity entity, Date start, Date end,
+                                             List<LifeCycle> lifeCycles) throws FalconException {
+        return doJobAction(JobAction.PARAMS, entity, start, end, null, lifeCycles);
+    }
+
     private static enum JobAction {
-        KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY
+        KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY, PARAMS
     }
 
     private WorkflowJob getWorkflowInfo(String cluster, String wfId) throws FalconException {
@@ -536,6 +544,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                     instance.endTime = jobInfo.getEndTime();
                     instance.logFile = jobInfo.getConsoleUrl();
                     instance.sourceCluster = sourceCluster;
+                    if (action == JobAction.PARAMS) {
+                        instance.wfParams = getWFParams(jobInfo);
+                    }
                 }
                 instance.details = coordinatorAction.getMissingDependencies();
                 instances.add(instance);
@@ -624,6 +635,16 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return instancesSummaryResult;
     }
 
+    private Map<String, String> getWFParams(WorkflowJob jobInfo) {
+        Map<String, String> wfParams = new HashMap<String, String>();
+        Configuration conf = new Configuration(false);
+        conf.addResource(new ByteArrayInputStream(jobInfo.getConf().getBytes()));
+        for (Map.Entry<String, String> entry : conf) {
+            wfParams.put(entry.getKey(), entry.getValue());
+        }
+        return wfParams;
+    }
+
     private void updateInstanceSummary(CoordinatorJob coordJob, Map<String, Long> instancesSummary) {
         List<CoordinatorAction> actions = coordJob.getActions();
 
@@ -689,6 +710,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         case STATUS:
             break;
 
+        case PARAMS:
+            break;
+
         default:
             throw new IllegalArgumentException("Unhandled action " + action);
         }
@@ -795,10 +819,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                 boolean retentionCoord  = isRetentionCoord(coord);
                 Frequency freq = createFrequency(String.valueOf(coord.getFrequency()), coord.getTimeUnit());
                 TimeZone tz = EntityUtil.getTimeZone(coord.getTimeZone());
-                Date iterStart = EntityUtil.getNextStartTime(coord.getStartTime(), freq, tz, start);
+
                 Date iterEnd = ((nextMaterializedTime.before(end) || retentionCoord) ? nextMaterializedTime : end);
+                Calendar endCal = Calendar.getInstance(EntityUtil.getTimeZone(coord.getTimeZone()));
+                endCal.setTime(EntityUtil.getNextStartTime(coord.getStartTime(), freq, tz, iterEnd));
+                endCal.add(freq.getTimeUnit().getCalendarUnit(), -(Integer.valueOf((coord.getFrequency()))));
 
-                while (iterStart.before(iterEnd)) {
+                while (start.compareTo(endCal.getTime()) <= 0) {
                     if (retentionCoord) {
                         if (retentionInstancesCount >= maxRetentionInstancesCount) {
                             break;
@@ -806,13 +833,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
                         retentionInstancesCount++;
                     }
 
-                    int sequence = EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, iterEnd);
+                    int sequence = EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, endCal.getTime());
                     String actionId = coord.getId() + "@" + sequence;
                     addCoordAction(client, actions, actionId);
-                    Calendar endCal = Calendar.getInstance(EntityUtil.getTimeZone(coord.getTimeZone()));
-                    endCal.setTime(iterEnd);
                     endCal.add(freq.getTimeUnit().getCalendarUnit(), -(Integer.valueOf((coord.getFrequency()))));
-                    iterEnd = endCal.getTime();
                 }
             }
             actionsMap.put(cluster, actions);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index 3b87469..0df81cd 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -157,6 +157,30 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
         }
     }
 
+    public InstancesResult getInstanceParams(String type,
+                                          String entity, String startTime,
+                                          String colo, List<LifeCycle> lifeCycles) {
+        checkColo(colo);
+        checkType(type);
+        try {
+            lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type);
+            if (lifeCycles.size() != 1) {
+                throw new FalconException("For displaying wf-params there can't be more than one lifecycle "
+                        + lifeCycles);
+            }
+            validateParams(type, entity, startTime, null);
+
+            Entity entityObject = EntityUtil.getEntity(type, entity);
+            Date start = EntityUtil.parseDateUTC(startTime);
+            Date end = getEndDate(start, null);
+            AbstractWorkflowEngine wfEngine = getWorkflowEngine();
+            return wfEngine.getInstanceParams(entityObject, start, end, lifeCycles);
+        } catch (Throwable e) {
+            LOG.error("Failed to display params of an instance", e);
+            throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST);
+        }
+    }
+
     public InstancesResult killInstance(HttpServletRequest request,
                                         String type, String entity, String startStr,
                                         String endStr, String colo,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
index 42b4aeb..36ce6c9 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
@@ -133,6 +133,27 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
     }
 
     @GET
+    @Path("params/{type}/{entity}")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Monitored(event = "instance-params")
+    @Override
+    public InstancesResult getInstanceParams(
+            @Dimension("type") @PathParam("type") final String type,
+            @Dimension("entity") @PathParam("entity") final String entity,
+            @Dimension("start-time") @QueryParam("start") final String start,
+            @Dimension("colo") @QueryParam("colo") String colo,
+            @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
+        return new InstanceProxy() {
+            @Override
+            protected InstancesResult doExecute(String colo) throws FalconException {
+                return getInstanceManager(colo).invoke("getInstanceParams",
+                        type, entity, start, colo, lifeCycles);
+            }
+        }.execute(colo, type, entity);
+    }
+
+
+    @GET
     @Path("logs/{type}/{entity}")
     @Produces(MediaType.APPLICATION_JSON)
     @Monitored(event = "instance-logs")

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
index bdf5e1b..cdf703d 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
@@ -90,6 +90,20 @@ public class InstanceManager extends AbstractInstanceManager {
         return super.getLogs(type, entity, startStr, endStr, colo, runId, lifeCycles);
     }
 
+    @GET
+    @Path("params/{type}/{entity}")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Monitored(event = "instance-params")
+    @Override
+    public InstancesResult getInstanceParams(
+            @Dimension("type") @PathParam("type") String type,
+            @Dimension("entity") @PathParam("entity") String entity,
+            @Dimension("start-time") @QueryParam("start") String start,
+            @Dimension("colo") @QueryParam("colo") String colo,
+            @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) {
+        return super.getInstanceParams(type, entity, start, colo, lifeCycles);
+    }
+
 
     @POST
     @Path("kill/{type}/{entity}")

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1be98326/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 9471e27..ea40411 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -383,6 +383,11 @@ public class FalconCLIIT {
                         + overlay.get("outputFeedName")
                         + " -start "+ SchemaHelper.getDateFormat().format(new Date())));
 
+        Assert.assertEquals(0,
+                executeWithURL("instance -params -type process -name "
+                        + overlay.get("processName")
+                        + " -start " + START_INSTANCE));
+
     }
 
     public void testInstanceRunningAndSummaryCommands() throws Exception {
@@ -418,6 +423,11 @@ public class FalconCLIIT {
                 executeWithURL("instance -summary -type feed -lifecycle eviction -name "
                         + overlay.get("outputFeedName")
                         + " -start "+ SchemaHelper.getDateFormat().format(new Date())));
+
+        Assert.assertEquals(0,
+                executeWithURL("instance -params -type process -name "
+                        + overlay.get("processName")
+                        + " -start " + START_INSTANCE));
     }