You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/08/08 22:31:39 UTC

[6/8] git commit: FALCON-488 - Introduce Workflow Context in Lineage Service. Contributed by Venkatesh Seetharam

FALCON-488 - Introduce Workflow Context in Lineage Service. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/a0994437
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/a0994437
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/a0994437

Branch: refs/heads/master
Commit: a099443758045d625da5d2bb382898e5e13009ab
Parents: b9781d1
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Aug 8 13:01:37 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Aug 8 13:01:37 2014 -0700

----------------------------------------------------------------------
 .../EntityRelationshipGraphBuilder.java         |   6 +-
 .../InstanceRelationshipGraphBuilder.java       | 100 ++++++------
 .../org/apache/falcon/metadata/LineageArgs.java |  82 ----------
 .../apache/falcon/metadata/LineageRecorder.java | 154 -------------------
 .../falcon/metadata/MetadataMappingService.java |  59 +++----
 .../metadata/RelationshipGraphBuilder.java      |  10 --
 .../falcon/metadata/LineageRecorderTest.java    | 103 -------------
 .../metadata/MetadataMappingServiceTest.java    |  71 +++++----
 .../metadata/LineageMetadataResourceTest.java   |  76 +++++----
 9 files changed, 174 insertions(+), 487 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index 2f46ff4..364d8f1 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -29,6 +29,7 @@ import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -166,10 +167,11 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
     }
 
     public void addWorkflowProperties(Workflow workflow, Vertex processVertex, String processName) {
-        processVertex.setProperty(LineageArgs.USER_WORKFLOW_NAME.getOptionName(),
+        processVertex.setProperty(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(),
                 ProcessHelper.getProcessWorkflowName(workflow.getName(), processName));
         processVertex.setProperty(RelationshipProperty.VERSION.getName(), workflow.getVersion());
-        processVertex.setProperty(LineageArgs.USER_WORKFLOW_ENGINE.getOptionName(), workflow.getEngine().value());
+        processVertex.setProperty(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(),
+                workflow.getEngine().value());
     }
 
     public void updateWorkflowProperties(Workflow oldWorkflow, Workflow newWorkflow,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 7ee185f..34857a3 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -34,9 +34,10 @@ import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Process;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
 
 import java.net.URISyntaxException;
-import java.util.Map;
 
 /**
  * Instance Metadata relationship mapping helper.
@@ -45,18 +46,17 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
 
     private static final Logger LOG = LoggerFactory.getLogger(InstanceRelationshipGraphBuilder.class);
 
-    private static final String PROCESS_INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; // nominal time
     private static final String FEED_INSTANCE_FORMAT = "yyyyMMddHHmm"; // computed
 
     // process workflow properties from message
-    private static final String[] INSTANCE_WORKFLOW_PROPERTIES = {
-        LineageArgs.USER_WORKFLOW_NAME.getOptionName(),
-        LineageArgs.USER_WORKFLOW_ENGINE.getOptionName(),
-        LineageArgs.WORKFLOW_ID.getOptionName(),
-        LineageArgs.RUN_ID.getOptionName(),
-        LineageArgs.STATUS.getOptionName(),
-        LineageArgs.WF_ENGINE_URL.getOptionName(),
-        LineageArgs.USER_SUBFLOW_ID.getOptionName(),
+    private static final WorkflowExecutionArgs[] INSTANCE_WORKFLOW_PROPERTIES = {
+        WorkflowExecutionArgs.USER_WORKFLOW_NAME,
+        WorkflowExecutionArgs.USER_WORKFLOW_ENGINE,
+        WorkflowExecutionArgs.WORKFLOW_ID,
+        WorkflowExecutionArgs.RUN_ID,
+        WorkflowExecutionArgs.STATUS,
+        WorkflowExecutionArgs.WF_ENGINE_URL,
+        WorkflowExecutionArgs.USER_SUBFLOW_ID,
     };
 
 
@@ -64,49 +64,51 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         super(graph, preserveHistory);
     }
 
-    public Vertex addProcessInstance(Map<String, String> lineageMetadata) throws FalconException {
-        String entityName = lineageMetadata.get(LineageArgs.ENTITY_NAME.getOptionName());
-        String processInstanceName = getProcessInstanceName(entityName,
-                lineageMetadata.get(LineageArgs.NOMINAL_TIME.getOptionName()));
-        LOG.info("Adding process instance: {}", processInstanceName);
+    public Vertex addProcessInstance(WorkflowExecutionContext context) throws FalconException {
+        String processInstanceName = getProcessInstanceName(context);
+        LOG.info("Adding process instance: " + processInstanceName);
 
-        String timestamp = getTimestamp(lineageMetadata);
-        Vertex processInstance = addVertex(processInstanceName, RelationshipType.PROCESS_INSTANCE, timestamp);
-        addWorkflowInstanceProperties(processInstance, lineageMetadata);
+        Vertex processInstance = addVertex(processInstanceName,
+                RelationshipType.PROCESS_INSTANCE, context.getTimeStampAsISO8601());
+        addWorkflowInstanceProperties(processInstance, context);
 
-        addInstanceToEntity(processInstance, entityName,
+        addInstanceToEntity(processInstance, context.getEntityName(),
                 RelationshipType.PROCESS_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
-        addInstanceToEntity(processInstance, lineageMetadata.get(LineageArgs.CLUSTER.getOptionName()),
+        addInstanceToEntity(processInstance, context.getClusterName(),
                 RelationshipType.CLUSTER_ENTITY, RelationshipLabel.PROCESS_CLUSTER_EDGE);
-        addInstanceToEntity(processInstance, lineageMetadata.get(LineageArgs.WORKFLOW_USER.getOptionName()),
+        addInstanceToEntity(processInstance, context.getWorkflowUser(),
                 RelationshipType.USER, RelationshipLabel.USER);
 
         if (isPreserveHistory()) {
-            Process process = ConfigurationStore.get().get(EntityType.PROCESS, entityName);
+            Process process = ConfigurationStore.get().get(EntityType.PROCESS, context.getEntityName());
             addDataClassification(process.getTags(), processInstance);
         }
 
         return processInstance;
     }
 
-    private String getTimestamp(Map<String, String> lineageMetadata) {
-        String timestamp = lineageMetadata.get(LineageArgs.TIMESTAMP.getOptionName());
-        return SchemaHelper.formatDateUTCToISO8601(timestamp, PROCESS_INSTANCE_FORMAT);
+    public String getProcessInstanceName(WorkflowExecutionContext context) {
+        return context.getEntityName() + "/" + context.getNominalTimeAsISO8601();
     }
 
     public void addWorkflowInstanceProperties(Vertex processInstance,
-                                              Map<String, String> lineageMetadata) {
-        for (String instanceWorkflowProperty : INSTANCE_WORKFLOW_PROPERTIES) {
-            addProperty(processInstance, lineageMetadata, instanceWorkflowProperty);
+                                              WorkflowExecutionContext context) {
+        for (WorkflowExecutionArgs instanceWorkflowProperty : INSTANCE_WORKFLOW_PROPERTIES) {
+            addProperty(processInstance, context, instanceWorkflowProperty);
         }
+
         processInstance.setProperty(RelationshipProperty.VERSION.getName(),
-                lineageMetadata.get(LineageArgs.USER_WORKFLOW_VERSION.getOptionName()));
+                context.getUserWorkflowVersion());
     }
 
-    public String getProcessInstanceName(String entityName,
-                                         String nominalTime) {
-        return entityName + "/"
-                + SchemaHelper.formatDateUTCToISO8601(nominalTime, PROCESS_INSTANCE_FORMAT);
+    private void addProperty(Vertex vertex, WorkflowExecutionContext context,
+                             WorkflowExecutionArgs optionName) {
+        String value = context.getValue(optionName);
+        if (value == null || value.length() == 0) {
+            return;
+        }
+
+        vertex.setProperty(optionName.getName(), value);
     }
 
     public void addInstanceToEntity(Vertex instanceVertex, String entityName,
@@ -122,37 +124,33 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
         addEdge(instanceVertex, entityVertex, edgeLabel.getName());
     }
 
-    public void addOutputFeedInstances(Map<String, String> lineageMetadata,
+    public void addOutputFeedInstances(WorkflowExecutionContext context,
                                        Vertex processInstance) throws FalconException {
-        String outputFeedNamesArg = lineageMetadata.get(LineageArgs.FEED_NAMES.getOptionName());
+        String outputFeedNamesArg = context.getOutputFeedNames();
         if ("NONE".equals(outputFeedNamesArg)) {
             return; // there are no output feeds for this process
         }
 
-        String[] outputFeedNames = outputFeedNamesArg.split(",");
-        String[] outputFeedInstancePaths =
-                lineageMetadata.get(LineageArgs.FEED_INSTANCE_PATHS.getOptionName()).split(",");
+        String[] outputFeedNames = context.getOutputFeedNamesList();
+        String[] outputFeedInstancePaths = context.getOutputFeedInstancePathsList();
 
         for (int index = 0; index < outputFeedNames.length; index++) {
             String feedName = outputFeedNames[index];
             String feedInstanceDataPath = outputFeedInstancePaths[index];
             addFeedInstance(processInstance, RelationshipLabel.PROCESS_FEED_EDGE,
-                    lineageMetadata, feedName, feedInstanceDataPath);
+                    context, feedName, feedInstanceDataPath);
         }
     }
 
-    public void addInputFeedInstances(Map<String, String> lineageMetadata,
+    public void addInputFeedInstances(WorkflowExecutionContext context,
                                       Vertex processInstance) throws FalconException {
-        String inputFeedNamesArg = lineageMetadata.get(LineageArgs.INPUT_FEED_NAMES.getOptionName());
+        String inputFeedNamesArg = context.getInputFeedNames();
         if ("NONE".equals(inputFeedNamesArg)) {
             return; // there are no input feeds for this process
         }
 
-        String[] inputFeedNames =
-                lineageMetadata.get(LineageArgs.INPUT_FEED_NAMES.getOptionName()).split("#");
-        // Each input feed is separated by #
-        String[] inputFeedInstancePaths =
-                lineageMetadata.get(LineageArgs.INPUT_FEED_PATHS.getOptionName()).split("#");
+        String[] inputFeedNames = context.getInputFeedNamesList();
+        String[] inputFeedInstancePaths = context.getInputFeedInstancePathsList();
 
         for (int index = 0; index < inputFeedNames.length; index++) {
             String inputFeedName = inputFeedNames[index];
@@ -162,21 +160,21 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
 
             for (String feedInstanceDataPath : feedInstancePaths) {
                 addFeedInstance(processInstance, RelationshipLabel.FEED_PROCESS_EDGE,
-                        lineageMetadata, inputFeedName, feedInstanceDataPath);
+                        context, inputFeedName, feedInstanceDataPath);
             }
         }
     }
 
     private void addFeedInstance(Vertex processInstance, RelationshipLabel edgeLabel,
-                                 Map<String, String> lineageMetadata, String feedName,
+                                 WorkflowExecutionContext context, String feedName,
                                  String feedInstanceDataPath) throws FalconException {
-        String clusterName = lineageMetadata.get(LineageArgs.CLUSTER.getOptionName());
+        String clusterName = context.getClusterName();
         LOG.info("Computing feed instance for : name=" + feedName + ", path= "
                 + feedInstanceDataPath + ", in cluster: " + clusterName);
         String feedInstanceName = getFeedInstanceName(feedName, clusterName, feedInstanceDataPath);
         LOG.info("Adding feed instance: " + feedInstanceName);
         Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE,
-                getTimestamp(lineageMetadata));
+                context.getTimeStampAsISO8601());
 
         addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
 
@@ -184,7 +182,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
                 RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
         addInstanceToEntity(feedInstance, clusterName,
                 RelationshipType.CLUSTER_ENTITY, RelationshipLabel.FEED_CLUSTER_EDGE);
-        addInstanceToEntity(feedInstance, lineageMetadata.get(LineageArgs.WORKFLOW_USER.getOptionName()),
+        addInstanceToEntity(feedInstance, context.getWorkflowUser(),
                 RelationshipType.USER, RelationshipLabel.USER);
 
         if (isPreserveHistory()) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java b/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java
deleted file mode 100644
index e91f7ab..0000000
--- a/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-
-/**
- * Args for data Lineage.
- */
-@Deprecated // delete this class
-public enum LineageArgs {
-    // process instance
-    NOMINAL_TIME("nominalTime", "instance time"),
-    ENTITY_TYPE("entityType", "type of the entity"),
-    ENTITY_NAME("entityName", "name of the entity"),
-    TIMESTAMP("timeStamp", "current timestamp"),
-
-    // where
-    CLUSTER("cluster", "name of the current cluster"),
-    OPERATION("operation", "operation like generate, delete, replicate"),
-
-    // who
-    WORKFLOW_USER("workflowUser", "user who owns the feed instance (partition)"),
-
-    // workflow details
-    WORKFLOW_ID("workflowId", "current workflow-id of the instance"),
-    RUN_ID("runId", "current run-id of the instance"),
-    STATUS("status", "status of the user workflow isnstance"),
-    WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie"),
-    USER_SUBFLOW_ID("subflowId", "external id of user workflow"),
-    USER_WORKFLOW_ENGINE("userWorkflowEngine", "user workflow engine type"),
-    USER_WORKFLOW_NAME("userWorkflowName", "user workflow name"),
-    USER_WORKFLOW_VERSION("userWorkflowVersion", "user workflow version"),
-
-    // what inputs
-    INPUT_FEED_NAMES("falconInputFeeds", "name of the feeds which are used as inputs"),
-    INPUT_FEED_PATHS("falconInputPaths", "comma separated input feed instance paths"),
-
-    // what outputs
-    FEED_NAMES("feedNames", "name of the feeds which are generated/replicated/deleted"),
-    FEED_INSTANCE_PATHS("feedInstancePaths", "comma separated feed instance paths"),
-
-    // lineage data recorded
-    LOG_DIR("logDir", "log dir where lineage can be recorded");
-
-    private String name;
-    private String description;
-
-    LineageArgs(String name, String description) {
-        this.name = name;
-        this.description = description;
-    }
-
-    public Option getOption() {
-        return new Option(this.name, true, this.description);
-    }
-
-    public String getOptionName() {
-        return this.name;
-    }
-
-    public String getOptionValue(CommandLine cmd) {
-        return cmd.getOptionValue(this.name);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java b/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
deleted file mode 100644
index 5ab1bf0..0000000
--- a/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Utility called in the post process of oozie workflow to record lineage information.
- */
-@Deprecated // delete this class
-public class LineageRecorder  extends Configured implements Tool {
-
-    private static final Logger LOG = LoggerFactory.getLogger(LineageRecorder.class);
-
-    public static void main(String[] args) throws Exception {
-        ToolRunner.run(new LineageRecorder(), args);
-    }
-
-    @Override
-    public int run(String[] arguments) throws Exception {
-        CommandLine command = getCommand(arguments);
-
-        LOG.info("Parsing lineage metadata from: {}", command);
-        Map<String, String> lineageMetadata = getLineageMetadata(command);
-        LOG.info("Lineage Metadata: {}", lineageMetadata);
-
-        String lineageFile = getFilePath(command.getOptionValue(LineageArgs.LOG_DIR.getOptionName()),
-                command.getOptionValue(LineageArgs.ENTITY_NAME.getOptionName())
-        );
-
-        LOG.info("Persisting lineage metadata to: {}", lineageFile);
-        persistLineageMetadata(lineageMetadata, lineageFile);
-
-        return 0;
-    }
-
-    protected static CommandLine getCommand(String[] arguments) throws ParseException {
-
-        Options options = new Options();
-
-        for (LineageArgs arg : LineageArgs.values()) {
-            addOption(options, arg);
-        }
-
-        return new GnuParser().parse(options, arguments);
-    }
-
-    private static void addOption(Options options, LineageArgs arg) {
-        addOption(options, arg, true);
-    }
-
-    private static void addOption(Options options, LineageArgs arg, boolean isRequired) {
-        Option option = arg.getOption();
-        option.setRequired(isRequired);
-        options.addOption(option);
-    }
-
-    protected Map<String, String> getLineageMetadata(CommandLine command) {
-        Map<String, String> lineageMetadata = new HashMap<String, String>();
-
-        for (LineageArgs arg : LineageArgs.values()) {
-            lineageMetadata.put(arg.getOptionName(), arg.getOptionValue(command));
-        }
-
-        return lineageMetadata;
-    }
-
-    public static String getFilePath(String logDir, String entityName) {
-        return logDir + entityName + "-lineage.json";
-    }
-
-    /**
-     * this method is invoked from with in the workflow.
-     *
-     * @param lineageMetadata metadata to persist
-     * @param lineageFile file to serialize the metadata
-     * @throws IOException
-     * @throws FalconException
-     */
-    protected void persistLineageMetadata(Map<String, String> lineageMetadata,
-                                          String lineageFile) throws IOException, FalconException {
-        OutputStream out = null;
-        Path file = new Path(lineageFile);
-        try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(file.toUri(), getConf());
-            out = fs.create(file);
-
-            // making sure falcon can read this file
-            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-            fs.setPermission(file, permission);
-
-            out.write(JSONValue.toJSONString(lineageMetadata).getBytes());
-        } finally {
-            if (out != null) {
-                try {
-                    out.close();
-                } catch (IOException ignore) {
-                    // ignore
-                }
-            }
-        }
-    }
-
-    public static Map<String, String> parseLineageMetadata(String lineageFile) throws FalconException {
-        try {
-            Path lineageDataPath = new Path(lineageFile); // file has 777 permissions
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(lineageDataPath.toUri());
-
-            BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(lineageDataPath)));
-            return (Map<String, String>) JSONValue.parse(in);
-        } catch (IOException e) {
-            throw new FalconException("Error opening lineage file", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index 5df4611..80e96ba 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -36,9 +36,13 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.service.ConfigurationChangeListener;
 import org.apache.falcon.service.FalconService;
+import org.apache.falcon.service.Services;
 import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowExecutionListener;
 
 import java.util.Map;
 import java.util.Properties;
@@ -47,7 +51,8 @@ import java.util.Set;
 /**
  * Metadata relationship mapping service. Maps relationships into a graph database.
  */
-public class MetadataMappingService implements FalconService, ConfigurationChangeListener {
+public class MetadataMappingService
+        implements FalconService, ConfigurationChangeListener, WorkflowExecutionListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(MetadataMappingService.class);
 
@@ -92,6 +97,8 @@ public class MetadataMappingService implements FalconService, ConfigurationChang
         instanceGraphBuilder = new InstanceRelationshipGraphBuilder(graph, preserveHistory);
 
         ConfigurationStore.get().registerListener(this);
+        Services.get().<WorkflowJobEndNotificationService>getService(
+                WorkflowJobEndNotificationService.NAME).registerListener(this);
     }
 
     protected Graph initializeGraphDB() {
@@ -179,6 +186,9 @@ public class MetadataMappingService implements FalconService, ConfigurationChang
 
     @Override
     public void destroy() throws FalconException {
+        Services.get().<WorkflowJobEndNotificationService>getService(
+                WorkflowJobEndNotificationService.NAME).unregisterListener(this);
+
         LOG.info("Shutting down graph db");
         graph.shutdown();
     }
@@ -244,54 +254,47 @@ public class MetadataMappingService implements FalconService, ConfigurationChang
         // are already added to the graph
     }
 
-    /**
-     * Entity operations.
-     */
-    public enum EntityOperations {
-        GENERATE, REPLICATE, DELETE
-    }
-
-    public void onSuccessfulWorkflowCompletion(String entityName, String operation,
-                                               String logDir) throws FalconException {
-        String lineageFile = LineageRecorder.getFilePath(logDir, entityName);
-
-        LOG.info("Parsing lineage metadata from: {}", lineageFile);
-        Map<String, String> lineageMetadata = LineageRecorder.parseLineageMetadata(lineageFile);
-
-        EntityOperations entityOperation = EntityOperations.valueOf(operation);
+    @Override
+    public void onSuccess(WorkflowExecutionContext context) throws FalconException {
+        WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation();
 
-        LOG.info("Adding lineage for entity: {}, operation: {}", entityName, operation);
+        LOG.info("Adding lineage for context {}", context);
         switch (entityOperation) {
         case GENERATE:
-            onProcessInstanceAdded(lineageMetadata);
+            onProcessInstanceExecuted(context);
             getTransactionalGraph().commit();
             break;
 
         case REPLICATE:
-            onFeedInstanceReplicated(lineageMetadata);
+            onFeedInstanceReplicated(context);
             break;
 
         case DELETE:
-            onFeedInstanceEvicted(lineageMetadata);
+            onFeedInstanceEvicted(context);
             break;
 
         default:
         }
     }
 
-    private void onProcessInstanceAdded(Map<String, String> lineageMetadata) throws FalconException {
-        Vertex processInstance = instanceGraphBuilder.addProcessInstance(lineageMetadata);
-        instanceGraphBuilder.addOutputFeedInstances(lineageMetadata, processInstance);
-        instanceGraphBuilder.addInputFeedInstances(lineageMetadata, processInstance);
+    @Override
+    public void onFailure(WorkflowExecutionContext context) throws FalconException {
+        // do nothing since lineage is only recorded for successful workflow
+    }
+
+    private void onProcessInstanceExecuted(WorkflowExecutionContext context) throws FalconException {
+        Vertex processInstance = instanceGraphBuilder.addProcessInstance(context);
+        instanceGraphBuilder.addOutputFeedInstances(context, processInstance);
+        instanceGraphBuilder.addInputFeedInstances(context, processInstance);
     }
 
-    private void onFeedInstanceReplicated(Map<String, String> lineageMetadata) {
-        LOG.info("Adding replicated feed instance: {}", lineageMetadata.get(LineageArgs.NOMINAL_TIME.getOptionName()));
+    private void onFeedInstanceReplicated(WorkflowExecutionContext context) {
+        LOG.info("Adding replicated feed instance: {}", context.getNominalTimeAsISO8601());
         // todo - tbd
     }
 
-    private void onFeedInstanceEvicted(Map<String, String> lineageMetadata) {
-        LOG.info("Adding evicted feed instance: {}", lineageMetadata.get(LineageArgs.NOMINAL_TIME.getOptionName()));
+    private void onFeedInstanceEvicted(WorkflowExecutionContext context) {
+        LOG.info("Adding evicted feed instance: {}", context.getNominalTimeAsISO8601());
         // todo - tbd
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
index 9ee0ea6..640df41 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java
@@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Date;
 import java.util.Iterator;
-import java.util.Map;
 
 /**
  * Base class for Metadata relationship mapping helper.
@@ -192,13 +191,4 @@ public abstract class RelationshipGraphBuilder {
     protected String getCurrentTimeStamp() {
         return SchemaHelper.formatDateUTC(new Date());
     }
-
-    protected void addProperty(Vertex vertex, Map<String, String> lineageMetadata, String optionName) {
-        String value = lineageMetadata.get(optionName);
-        if (value == null || value.length() == 0) {
-            return;
-        }
-
-        vertex.setProperty(optionName, value);
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/common/src/test/java/org/apache/falcon/metadata/LineageRecorderTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/LineageRecorderTest.java b/common/src/test/java/org/apache/falcon/metadata/LineageRecorderTest.java
deleted file mode 100644
index d686ab6..0000000
--- a/common/src/test/java/org/apache/falcon/metadata/LineageRecorderTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.Map;
-
-/**
- * Unit test for lineage recorder.
- */
-public class LineageRecorderTest {
-
-    private static final String LOGS_DIR = "target/log";
-    private static final String NOMINAL_TIME = "2014-01-01-01-00";
-    private static final String ENTITY_NAME = "test-process-entity";
-
-    private String[] args;
-
-    @BeforeMethod
-    public void setUp() throws Exception {
-        args = new String[]{
-            "-" + LineageArgs.ENTITY_NAME.getOptionName(), ENTITY_NAME,
-            "-" + LineageArgs.FEED_NAMES.getOptionName(), "out-click-logs,out-raw-logs",
-            "-" + LineageArgs.FEED_INSTANCE_PATHS.getOptionName(),
-            "/out-click-logs/10/05/05/00/20,/out-raw-logs/10/05/05/00/20",
-            "-" + LineageArgs.WORKFLOW_ID.getOptionName(), "workflow-01-00",
-            "-" + LineageArgs.WORKFLOW_USER.getOptionName(), "falcon",
-            "-" + LineageArgs.RUN_ID.getOptionName(), "1",
-            "-" + LineageArgs.NOMINAL_TIME.getOptionName(), NOMINAL_TIME,
-            "-" + LineageArgs.TIMESTAMP.getOptionName(), "2014-01-01-01-00",
-            "-" + LineageArgs.ENTITY_TYPE.getOptionName(), ("process"),
-            "-" + LineageArgs.OPERATION.getOptionName(), ("GENERATE"),
-            "-" + LineageArgs.STATUS.getOptionName(), ("SUCCEEDED"),
-            "-" + LineageArgs.CLUSTER.getOptionName(), "corp",
-            "-" + LineageArgs.WF_ENGINE_URL.getOptionName(), "http://localhost:11000/oozie/",
-            "-" + LineageArgs.LOG_DIR.getOptionName(), LOGS_DIR,
-            "-" + LineageArgs.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id" + "test",
-            "-" + LineageArgs.USER_WORKFLOW_ENGINE.getOptionName(), "oozie",
-            "-" + LineageArgs.INPUT_FEED_NAMES.getOptionName(), "in-click-logs,in-raw-logs",
-            "-" + LineageArgs.INPUT_FEED_PATHS.getOptionName(),
-            "/in-click-logs/10/05/05/00/20,/in-raw-logs/10/05/05/00/20",
-            "-" + LineageArgs.USER_WORKFLOW_NAME.getOptionName(), "test-workflow",
-            "-" + LineageArgs.USER_WORKFLOW_VERSION.getOptionName(), "1.0.0",
-        };
-    }
-
-    @AfterMethod
-    public void tearDown() throws Exception {
-
-    }
-
-    @Test
-    public void testMain() throws Exception {
-        LineageRecorder lineageRecorder = new LineageRecorder();
-        CommandLine command = LineageRecorder.getCommand(args);
-        Map<String, String> lineageMetadata = lineageRecorder.getLineageMetadata(command);
-
-        LineageRecorder.main(args);
-
-        String lineageFile = LineageRecorder.getFilePath(LOGS_DIR, ENTITY_NAME);
-        Path lineageDataPath = new Path(lineageFile);
-        FileSystem fs = lineageDataPath.getFileSystem(new Configuration());
-        Assert.assertTrue(fs.exists(lineageDataPath));
-
-        Map<String, String> recordedLineageMetadata =
-                LineageRecorder.parseLineageMetadata(lineageFile);
-
-        for (Map.Entry<String, String> entry : lineageMetadata.entrySet()) {
-            Assert.assertEquals(lineageMetadata.get(entry.getKey()),
-                    recordedLineageMetadata.get(entry.getKey()));
-        }
-    }
-
-    @Test
-    public void testGetFilePath() throws Exception {
-        String path = LOGS_DIR + ENTITY_NAME + "-lineage.json";
-        Assert.assertEquals(path, LineageRecorder.getFilePath(LOGS_DIR, ENTITY_NAME));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 52bb7ae..c94de10 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -36,7 +36,11 @@ import org.apache.falcon.entity.v0.feed.Locations;
 import org.apache.falcon.entity.v0.process.EngineType;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.service.Services;
 import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -75,6 +79,8 @@ public class MetadataMappingServiceTest {
     public static final String OUTPUT_INSTANCE_PATHS =
         "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
 
+    public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
+
     private ConfigurationStore configStore;
     private MetadataMappingService service;
 
@@ -91,6 +97,7 @@ public class MetadataMappingServiceTest {
 
         configStore = ConfigurationStore.get();
 
+        Services.get().register(new WorkflowJobEndNotificationService());
         StartupProperties.get().setProperty("falcon.graph.preserve.history", "true");
         service = new MetadataMappingService();
         service.init();
@@ -218,9 +225,9 @@ public class MetadataMappingServiceTest {
         service.destroy();
         service.init();
 
-        LineageRecorder.main(getTestMessageArgs());
-
-        service.onSuccessfulWorkflowCompletion(PROCESS_ENTITY_NAME, OPERATION, LOGS_DIR);
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
 
         debug(service.getGraph());
         GraphUtils.dump(service.getGraph());
@@ -592,31 +599,39 @@ public class MetadataMappingServiceTest {
 
     private static String[] getTestMessageArgs() {
         return new String[]{
-            "-" + LineageArgs.NOMINAL_TIME.getOptionName(), NOMINAL_TIME,
-            "-" + LineageArgs.TIMESTAMP.getOptionName(), NOMINAL_TIME,
-
-            "-" + LineageArgs.ENTITY_NAME.getOptionName(), PROCESS_ENTITY_NAME,
-            "-" + LineageArgs.ENTITY_TYPE.getOptionName(), ("process"),
-            "-" + LineageArgs.CLUSTER.getOptionName(), CLUSTER_ENTITY_NAME,
-            "-" + LineageArgs.OPERATION.getOptionName(), OPERATION,
-
-            "-" + LineageArgs.INPUT_FEED_NAMES.getOptionName(), INPUT_FEED_NAMES,
-            "-" + LineageArgs.INPUT_FEED_PATHS.getOptionName(), INPUT_INSTANCE_PATHS,
-
-            "-" + LineageArgs.FEED_NAMES.getOptionName(), OUTPUT_FEED_NAMES,
-            "-" + LineageArgs.FEED_INSTANCE_PATHS.getOptionName(), OUTPUT_INSTANCE_PATHS,
-
-            "-" + LineageArgs.WORKFLOW_ID.getOptionName(), "workflow-01-00",
-            "-" + LineageArgs.WORKFLOW_USER.getOptionName(), FALCON_USER,
-            "-" + LineageArgs.RUN_ID.getOptionName(), "1",
-            "-" + LineageArgs.STATUS.getOptionName(), "SUCCEEDED",
-            "-" + LineageArgs.WF_ENGINE_URL.getOptionName(), "http://localhost:11000/oozie",
-            "-" + LineageArgs.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id",
-            "-" + LineageArgs.USER_WORKFLOW_NAME.getOptionName(), WORKFLOW_NAME,
-            "-" + LineageArgs.USER_WORKFLOW_VERSION.getOptionName(), WORKFLOW_VERSION,
-            "-" + LineageArgs.USER_WORKFLOW_ENGINE.getOptionName(), EngineType.PIG.name(),
-
-            "-" + LineageArgs.LOG_DIR.getOptionName(), LOGS_DIR,
+            "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), CLUSTER_ENTITY_NAME,
+            "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"),
+            "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), PROCESS_ENTITY_NAME,
+            "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME,
+            "-" + WorkflowExecutionArgs.OPERATION.getName(), OPERATION,
+
+            "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
+            "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
+
+            "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
+            "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+
+            "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
+            "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
+            "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
+            "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
+            "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), NOMINAL_TIME,
+
+            "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie",
+            "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id",
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), WORKFLOW_NAME,
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION,
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(),
+
+
+            "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER,
+            "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
+            "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER,
+            "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
+            "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
+
+            "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
+            "-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt",
         };
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a0994437/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
index d664782..a82acda 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
@@ -34,14 +34,15 @@ import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
 import org.apache.falcon.entity.v0.process.EngineType;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.metadata.LineageArgs;
-import org.apache.falcon.metadata.LineageRecorder;
 import org.apache.falcon.metadata.MetadataMappingService;
 import org.apache.falcon.metadata.RelationshipLabel;
 import org.apache.falcon.metadata.RelationshipProperty;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.service.Services;
 import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
 import org.json.simple.JSONValue;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -95,10 +96,14 @@ public class LineageMetadataResourceTest {
 
         configStore = ConfigurationStore.get();
 
+        Services.get().register(new WorkflowJobEndNotificationService());
+        Assert.assertTrue(Services.get().isRegistered(WorkflowJobEndNotificationService.NAME));
+
         StartupProperties.get().setProperty("falcon.graph.preserve.history", "true");
         service = new MetadataMappingService();
         service.init();
         Services.get().register(service);
+        Assert.assertTrue(Services.get().isRegistered(MetadataMappingService.SERVICE_NAME));
 
         addClusterEntity();
         addFeedEntity();
@@ -110,8 +115,11 @@ public class LineageMetadataResourceTest {
     public void tearDown() throws Exception {
         cleanupGraphStore(service.getGraph());
         cleanupConfigurationStore(configStore);
+
         service.destroy();
+
         StartupProperties.get().setProperty("falcon.graph.preserve.history", "false");
+        Services.get().reset();
     }
 
     @Test (expectedExceptions = WebApplicationException.class)
@@ -397,6 +405,7 @@ public class LineageMetadataResourceTest {
             Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode());
             Assert.assertEquals(response.getEntity().toString(), "Lineage Metadata Service is not enabled.");
         } finally {
+            Services.get().register(new WorkflowJobEndNotificationService());
             Services.get().register(service);
         }
     }
@@ -517,37 +526,46 @@ public class LineageMetadataResourceTest {
     }
 
     public void addInstance() throws Exception {
-        LineageRecorder.main(getTestMessageArgs());
-        service.onSuccessfulWorkflowCompletion(PROCESS_ENTITY_NAME, OPERATION, LOGS_DIR);
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
     }
 
     private static String[] getTestMessageArgs() {
         return new String[]{
-            "-" + LineageArgs.NOMINAL_TIME.getOptionName(), NOMINAL_TIME,
-            "-" + LineageArgs.TIMESTAMP.getOptionName(), NOMINAL_TIME,
-
-            "-" + LineageArgs.ENTITY_NAME.getOptionName(), PROCESS_ENTITY_NAME,
-            "-" + LineageArgs.ENTITY_TYPE.getOptionName(), ("process"),
-            "-" + LineageArgs.CLUSTER.getOptionName(), CLUSTER_ENTITY_NAME,
-            "-" + LineageArgs.OPERATION.getOptionName(), OPERATION,
-
-            "-" + LineageArgs.INPUT_FEED_NAMES.getOptionName(), INPUT_FEED_NAMES,
-            "-" + LineageArgs.INPUT_FEED_PATHS.getOptionName(), INPUT_INSTANCE_PATHS,
-
-            "-" + LineageArgs.FEED_NAMES.getOptionName(), OUTPUT_FEED_NAMES,
-            "-" + LineageArgs.FEED_INSTANCE_PATHS.getOptionName(), OUTPUT_INSTANCE_PATHS,
-
-            "-" + LineageArgs.WORKFLOW_ID.getOptionName(), "workflow-01-00",
-            "-" + LineageArgs.WORKFLOW_USER.getOptionName(), FALCON_USER,
-            "-" + LineageArgs.RUN_ID.getOptionName(), "1",
-            "-" + LineageArgs.STATUS.getOptionName(), "SUCCEEDED",
-            "-" + LineageArgs.WF_ENGINE_URL.getOptionName(), "http://localhost:11000/oozie",
-            "-" + LineageArgs.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id",
-            "-" + LineageArgs.USER_WORKFLOW_NAME.getOptionName(), WORKFLOW_NAME,
-            "-" + LineageArgs.USER_WORKFLOW_VERSION.getOptionName(), WORKFLOW_VERSION,
-            "-" + LineageArgs.USER_WORKFLOW_ENGINE.getOptionName(), EngineType.PIG.name(),
-
-            "-" + LineageArgs.LOG_DIR.getOptionName(), LOGS_DIR,
+            "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), CLUSTER_ENTITY_NAME,
+            "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"),
+            "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), PROCESS_ENTITY_NAME,
+            "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME,
+            "-" + WorkflowExecutionArgs.OPERATION.getName(), OPERATION,
+
+            "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
+            "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
+
+            "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
+            "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+
+            "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
+            "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
+            "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
+            "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
+            "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), NOMINAL_TIME,
+
+            "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie",
+            "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id",
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), WORKFLOW_NAME,
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION,
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(),
+
+
+            "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), "blah",
+            "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
+            "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), "blah",
+            "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
+            "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
+
+            "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
+            "-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt",
         };
     }