You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/08/08 22:31:37 UTC

[4/8] git commit: FALCON-486 - Introduce Workflow Context in Post Processing. Contributed by Venkatesh Seetharam

FALCON-486 - Introduce Workflow Context in Post Processing. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/c6000363
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/c6000363
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/c6000363

Branch: refs/heads/master
Commit: c6000363a19e11745d9f3f7abc14e03f08be7f88
Parents: 23762e5
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Aug 8 12:58:23 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Aug 8 12:58:23 2014 -0700

----------------------------------------------------------------------
 .../org/apache/falcon/metadata/LineageArgs.java |   1 +
 .../apache/falcon/metadata/LineageRecorder.java |   1 +
 .../falcon/messaging/JMSMessageProducer.java    |  57 ++---
 .../org/apache/falcon/logging/JobLogMover.java  | 170 ++++++++++++++
 .../org/apache/falcon/logging/LogMover.java     | 229 -------------------
 .../falcon/workflow/FalconPostProcessing.java   | 227 +++---------------
 .../workflow/FalconPostProcessingTest.java      |  78 +++----
 .../org/apache/falcon/process/PigProcessIT.java |   2 +-
 .../org/apache/falcon/util/OozieTestUtils.java  |  15 +-
 9 files changed, 268 insertions(+), 512 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java b/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java
index 7adf5bb..e91f7ab 100644
--- a/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java
+++ b/common/src/main/java/org/apache/falcon/metadata/LineageArgs.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.Option;
 /**
  * Args for data Lineage.
  */
+@Deprecated // delete this class
 public enum LineageArgs {
     // process instance
     NOMINAL_TIME("nominalTime", "instance time"),

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java b/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
index 8a946ad..5ab1bf0 100644
--- a/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/LineageRecorder.java
@@ -46,6 +46,7 @@ import java.util.Map;
 /**
  * Utility called in the post process of oozie workflow to record lineage information.
  */
+@Deprecated // delete this class
 public class LineageRecorder  extends Configured implements Tool {
 
     private static final Logger LOG = LoggerFactory.getLogger(LineageRecorder.class);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index 6c9859c..aeadf5c 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -63,35 +63,6 @@ public class JMSMessageProducer {
 
     private static final long DEFAULT_TTL = 3 * 24 * 60 * 60 * 1000;
 
-    private static final WorkflowExecutionArgs[] FALCON_FILTER = {
-        WorkflowExecutionArgs.NOMINAL_TIME,
-        WorkflowExecutionArgs.ENTITY_NAME,
-        WorkflowExecutionArgs.OPERATION,
-        WorkflowExecutionArgs.LOG_DIR,
-        WorkflowExecutionArgs.STATUS,
-        WorkflowExecutionArgs.CONTEXT_FILE,
-        WorkflowExecutionArgs.TIMESTAMP,
-    };
-
-    private static final WorkflowExecutionArgs[] USER_FILTER = {
-        WorkflowExecutionArgs.CLUSTER_NAME,
-        WorkflowExecutionArgs.ENTITY_NAME,
-        WorkflowExecutionArgs.ENTITY_TYPE,
-        WorkflowExecutionArgs.NOMINAL_TIME,
-        WorkflowExecutionArgs.OPERATION,
-
-        WorkflowExecutionArgs.FEED_NAMES,
-        WorkflowExecutionArgs.FEED_INSTANCE_PATHS,
-
-        WorkflowExecutionArgs.WORKFLOW_ID,
-        WorkflowExecutionArgs.WORKFLOW_USER,
-        WorkflowExecutionArgs.RUN_ID,
-        WorkflowExecutionArgs.STATUS,
-        WorkflowExecutionArgs.TIMESTAMP,
-        WorkflowExecutionArgs.LOG_FILE,
-    };
-
-
     private final WorkflowExecutionContext context;
     private final MessageType messageType;
 
@@ -170,9 +141,26 @@ public class JMSMessageProducer {
      * Accepts a Message to be send to JMS topic, creates a new
      * Topic based on topic name if it does not exist or else
      * existing topic with the same name is used to send the message.
+     * Sends all arguments.
+     *
+     * @return error code
+     * @throws JMSException
      */
     public int sendMessage() throws JMSException {
-        List<Map<String, String>> messageList = buildMessageList();
+        return sendMessage(WorkflowExecutionArgs.values());
+    }
+
+    /**
+     * Accepts a Message to be send to JMS topic, creates a new
+     * Topic based on topic name if it does not exist or else
+     * existing topic with the same name is used to send the message.
+     *
+     * @param filteredArgs args sent in the message.
+     * @return error code
+     * @throws JMSException
+     */
+    public int sendMessage(WorkflowExecutionArgs[] filteredArgs) throws JMSException {
+        List<Map<String, String>> messageList = buildMessageList(filteredArgs);
 
         if (messageList.isEmpty()) {
             LOG.warn("No operation on output feed");
@@ -198,10 +186,7 @@ public class JMSMessageProducer {
         return 0;
     }
 
-    private List<Map<String, String>> buildMessageList() {
-        WorkflowExecutionArgs[] fileredArgs = messageType == MessageType.FALCON
-                ? FALCON_FILTER : USER_FILTER;
-
+    private List<Map<String, String>> buildMessageList(WorkflowExecutionArgs[] filteredArgs) {
         String[] feedNames = getFeedNames();
         if (feedNames == null) {
             return Collections.emptyList();
@@ -217,7 +202,7 @@ public class JMSMessageProducer {
 
         List<Map<String, String>> messages = new ArrayList<Map<String, String>>(feedPaths.length);
         for (int i = 0; i < feedPaths.length; i++) {
-            Map<String, String> message = buildMessage(fileredArgs);
+            Map<String, String> message = buildMessage(filteredArgs);
 
             // override default values
             if (context.getEntityType().equalsIgnoreCase("PROCESS")) {
@@ -347,7 +332,7 @@ public class JMSMessageProducer {
 
     @SuppressWarnings("unchecked")
     private Connection createAndStartConnection(String implementation, String userName,
-                                          String password, String url)
+                                                String password, String url)
         throws JMSException, ClassNotFoundException, InstantiationException,
                IllegalAccessException, InvocationTargetException, NoSuchMethodException {
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
new file mode 100644
index 0000000..1a08ada
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.logging;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Utility called in the post process of oozie workflow to move oozie action executor log.
+ */
+public class JobLogMover {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobLogMover.class);
+
+    public static final Set<String> FALCON_ACTIONS =
+        new HashSet<String>(Arrays.asList(new String[]{"eviction", "replication", }));
+
+    private Configuration getConf() {
+        return new Configuration();
+    }
+
+    public int run(WorkflowExecutionContext context) {
+        try {
+            OozieClient client = new OozieClient(context.getWorkflowEngineUrl());
+            WorkflowJob jobInfo;
+            try {
+                jobInfo = client.getJobInfo(context.getUserSubflowId());
+            } catch (OozieClientException e) {
+                LOG.error("Error getting jobinfo for: {}", context.getUserSubflowId(), e);
+                return 0;
+            }
+
+            Path path = new Path(context.getLogDir() + "/"
+                    + String.format("%03d", context.getWorkflowRunId()));
+            FileSystem fs = path.getFileSystem(getConf());
+
+            if (EntityType.FEED.name().equalsIgnoreCase(context.getEntityType())
+                    || notUserWorkflowEngineIsOozie(context.getUserWorkflowEngine())) {
+                // if replication wf, retention wf or PIG Process
+                copyOozieLog(client, fs, path, jobInfo.getId());
+
+                List<WorkflowAction> workflowActions = jobInfo.getActions();
+                for (int i=0; i < workflowActions.size(); i++) {
+                    if (FALCON_ACTIONS.contains(workflowActions.get(i).getName())) {
+                        copyTTlogs(fs, path, jobInfo.getActions().get(i));
+                        break;
+                    }
+                }
+            } else {
+                // if process wf with oozie engine
+                String subflowId = jobInfo.getExternalId();
+                copyOozieLog(client, fs, path, subflowId);
+                WorkflowJob subflowInfo = client.getJobInfo(subflowId);
+                List<WorkflowAction> actions = subflowInfo.getActions();
+                for (WorkflowAction action : actions) {
+                    if (action.getType().equals("pig")
+                            || action.getType().equals("java")) {
+                        copyTTlogs(fs, path, action);
+                    } else {
+                        LOG.info("Ignoring hadoop TT log for non-pig and non-java action: {}", action.getName());
+                    }
+                }
+            }
+
+        } catch (Exception e) {
+            // JobLogMover doesn't throw exception, a failed log mover will not fail the user workflow
+            LOG.error("Exception in log mover:", e);
+        }
+        return 0;
+    }
+
+    private boolean notUserWorkflowEngineIsOozie(String userWorkflowEngine) {
+        // userWorkflowEngine will be null for replication and "pig" for pig
+        return userWorkflowEngine != null && EngineType.fromValue(userWorkflowEngine) != EngineType.OOZIE;
+    }
+
+    private void copyOozieLog(OozieClient client, FileSystem fs, Path path,
+                              String id) throws OozieClientException, IOException {
+        InputStream in = new ByteArrayInputStream(client.getJobLog(id).getBytes());
+        OutputStream out = fs.create(new Path(path, "oozie.log"));
+        IOUtils.copyBytes(in, out, 4096, true);
+        LOG.info("Copied oozie log to {}", path);
+    }
+
+    private void copyTTlogs(FileSystem fs, Path path,
+                            WorkflowAction action) throws Exception {
+        String ttLogURL = getTTlogURL(action.getExternalId());
+        if (ttLogURL != null) {
+            LOG.info("Fetching log for action: {} from url: {}", action.getExternalId(), ttLogURL);
+            InputStream in = getURLinputStream(new URL(ttLogURL));
+            OutputStream out = fs.create(new Path(path, action.getName() + "_"
+                    + getMappedStatus(action.getStatus()) + ".log"));
+            IOUtils.copyBytes(in, out, 4096, true);
+            LOG.info("Copied log to {}", path);
+        }
+    }
+
+    private String getMappedStatus(WorkflowAction.Status status) {
+        if (status == WorkflowAction.Status.FAILED
+                || status == WorkflowAction.Status.KILLED
+                || status == WorkflowAction.Status.ERROR) {
+            return "FAILED";
+        } else {
+            return "SUCCEEDED";
+        }
+    }
+
+    private String getTTlogURL(String jobId) throws Exception {
+        TaskLogURLRetriever logRetriever = ReflectionUtils
+                .newInstance(getLogRetrieverClassName(), getConf());
+        return logRetriever.retrieveTaskLogURL(jobId);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Class<? extends TaskLogURLRetriever> getLogRetrieverClassName() {
+        try {
+            return (Class<? extends TaskLogURLRetriever>)
+                    Class.forName("org.apache.falcon.logging.v1.TaskLogRetrieverV1");
+        } catch (ClassNotFoundException e) {
+            LOG.warn("V1 Retriever missing, falling back to Default retriever");
+            return DefaultTaskLogRetriever.class;
+        }
+    }
+
+    private InputStream getURLinputStream(URL url) throws IOException {
+        URLConnection connection = url.openConnection();
+        connection.setDoOutput(true);
+        connection.connect();
+        return connection.getInputStream();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
deleted file mode 100644
index 3922b38..0000000
--- a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.logging;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.OozieClientException;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.client.WorkflowJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URL;
-import java.net.URLConnection;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Utility called in the post process of oozie workflow to move oozie action executor log.
- */
-public class LogMover extends Configured implements Tool {
-
-    private static final Logger LOG = LoggerFactory.getLogger(LogMover.class);
-    public static final Set<String> FALCON_ACTIONS =
-        new HashSet<String>(Arrays.asList(new String[]{"eviction", "replication", }));
-
-    /**
-     * Args to the command.
-     */
-    private static class ARGS {
-        private String oozieUrl;
-        private String subflowId;
-        private String runId;
-        private String logDir;
-        private String entityType;
-        private String userWorkflowEngine;
-    }
-
-    public static void main(String[] args) throws Exception {
-        ToolRunner.run(new LogMover(), args);
-    }
-
-    @Override
-    public int run(String[] arguments) throws Exception {
-        try {
-            ARGS args = new ARGS();
-            setupArgs(arguments, args);
-
-            OozieClient client = new OozieClient(args.oozieUrl);
-            WorkflowJob jobInfo;
-            try {
-                jobInfo = client.getJobInfo(args.subflowId);
-            } catch (OozieClientException e) {
-                LOG.error("Error getting jobinfo for: {}", args.subflowId, e);
-                return 0;
-            }
-
-            Path path = new Path(args.logDir + "/"
-                    + String.format("%03d", Integer.parseInt(args.runId)));
-            FileSystem fs = path.getFileSystem(getConf());
-
-            if (args.entityType.equalsIgnoreCase(EntityType.FEED.name())
-                    || notUserWorkflowEngineIsOozie(args.userWorkflowEngine)) {
-                // if replication wf, retention wf or PIG Process
-                copyOozieLog(client, fs, path, jobInfo.getId());
-
-                List<WorkflowAction> workflowActions = jobInfo.getActions();
-                for (int i=0; i < workflowActions.size(); i++) {
-                    if (FALCON_ACTIONS.contains(workflowActions.get(i).getName())) {
-                        copyTTlogs(fs, path, jobInfo.getActions().get(i));
-                        break;
-                    }
-                }
-            } else {
-                // if process wf with oozie engine
-                String subflowId = jobInfo.getExternalId();
-                copyOozieLog(client, fs, path, subflowId);
-                WorkflowJob subflowInfo = client.getJobInfo(subflowId);
-                List<WorkflowAction> actions = subflowInfo.getActions();
-                for (WorkflowAction action : actions) {
-                    if (action.getType().equals("pig")
-                            || action.getType().equals("java")) {
-                        copyTTlogs(fs, path, action);
-                    } else {
-                        LOG.info("Ignoring hadoop TT log for non-pig and non-java action: {}", action.getName());
-                    }
-                }
-            }
-
-        } catch (Exception e) {
-            LOG.error("Exception in log mover", e);
-        }
-        return 0;
-    }
-
-    private boolean notUserWorkflowEngineIsOozie(String userWorkflowEngine) {
-        // userWorkflowEngine will be null for replication and "pig" for pig
-        return userWorkflowEngine != null && EngineType.fromValue(userWorkflowEngine) != EngineType.OOZIE;
-    }
-
-    private void copyOozieLog(OozieClient client, FileSystem fs, Path path,
-                              String id) throws OozieClientException, IOException {
-        InputStream in = new ByteArrayInputStream(client.getJobLog(id).getBytes());
-        OutputStream out = fs.create(new Path(path, "oozie.log"));
-        IOUtils.copyBytes(in, out, 4096, true);
-        LOG.info("Copied oozie log to {}", path);
-    }
-
-    private void copyTTlogs(FileSystem fs, Path path,
-                            WorkflowAction action) throws Exception {
-        String ttLogURL = getTTlogURL(action.getExternalId());
-        if (ttLogURL != null) {
-            LOG.info("Fetching log for action: {} from url: {}", action.getExternalId(), ttLogURL);
-            InputStream in = getURLinputStream(new URL(ttLogURL));
-            OutputStream out = fs.create(new Path(path, action.getName() + "_"
-                    + getMappedStatus(action.getStatus()) + ".log"));
-            IOUtils.copyBytes(in, out, 4096, true);
-            LOG.info("Copied log to {}", path);
-        }
-    }
-
-    private String getMappedStatus(WorkflowAction.Status status) {
-        if (status == WorkflowAction.Status.FAILED
-                || status == WorkflowAction.Status.KILLED
-                || status == WorkflowAction.Status.ERROR) {
-            return "FAILED";
-        } else {
-            return "SUCCEEDED";
-        }
-    }
-
-    private void setupArgs(String[] arguments, ARGS args) throws ParseException {
-        Options options = new Options();
-
-        Option opt = new Option("workflowEngineUrl", true, "url of workflow engine, ex:oozie");
-        opt.setRequired(true);
-        options.addOption(opt);
-
-        opt = new Option("subflowId", true, "external id of userworkflow");
-        opt.setRequired(true);
-        options.addOption(opt);
-
-        opt = new Option("userWorkflowEngine", true, "user workflow engine type");
-        opt.setRequired(false);  // replication will NOT have this arg sent
-        options.addOption(opt);
-
-        opt = new Option("runId", true, "current workflow's runid");
-        opt.setRequired(true);
-        options.addOption(opt);
-
-        opt = new Option("logDir", true, "log dir where job logs are stored");
-        opt.setRequired(true);
-        options.addOption(opt);
-
-        opt = new Option("status", true, "user workflow status");
-        opt.setRequired(true);
-        options.addOption(opt);
-
-        opt = new Option("entityType", true, "entity type feed or process");
-        opt.setRequired(true);
-        options.addOption(opt);
-
-        CommandLine cmd = new GnuParser().parse(options, arguments);
-
-        args.oozieUrl = cmd.getOptionValue("workflowEngineUrl");
-        args.subflowId = cmd.getOptionValue("subflowId");
-        args.userWorkflowEngine = cmd.getOptionValue("userWorkflowEngine");
-        args.runId = cmd.getOptionValue("runId");
-        args.logDir = cmd.getOptionValue("logDir");
-        args.entityType = cmd.getOptionValue("entityType");
-    }
-
-    private String getTTlogURL(String jobId) throws Exception {
-        TaskLogURLRetriever logRetriever = ReflectionUtils.newInstance(getLogRetrieverClassName(), getConf());
-        return logRetriever.retrieveTaskLogURL(jobId);
-    }
-
-    @SuppressWarnings("unchecked")
-    private Class<? extends TaskLogURLRetriever> getLogRetrieverClassName() {
-        try {
-            return (Class<? extends TaskLogURLRetriever>)
-                    Class.forName("org.apache.falcon.logging.v1.TaskLogRetrieverV1");
-        } catch (ClassNotFoundException e) {
-            LOG.warn("V1 Retriever missing, falling back to Default retriever");
-            return DefaultTaskLogRetriever.class;
-        }
-    }
-
-    private InputStream getURLinputStream(URL url) throws IOException {
-        URLConnection connection = url.openConnection();
-        connection.setDoOutput(true);
-        connection.connect();
-        return connection.getInputStream();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index d3befa2..0adc11b 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -17,12 +17,8 @@
  */
 package org.apache.falcon.workflow;
 
-import org.apache.commons.cli.*;
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.logging.LogMover;
-import org.apache.falcon.messaging.MessageProducer;
-import org.apache.falcon.metadata.LineageArgs;
-import org.apache.falcon.metadata.LineageRecorder;
+import org.apache.falcon.logging.JobLogMover;
+import org.apache.falcon.messaging.JMSMessageProducer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -31,68 +27,12 @@ import org.apache.hadoop.util.ToolRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * Utility called by oozie workflow engine post workflow execution in parent workflow.
  */
 public class FalconPostProcessing extends Configured implements Tool {
     private static final Logger LOG = LoggerFactory.getLogger(FalconPostProcessing.class);
 
-    /**
-     * Args that the utility understands.
-     */
-    public enum Arg {
-        CLUSTER("cluster", "name of the current cluster"),
-        ENTITY_TYPE("entityType", "type of the entity"),
-        ENTITY_NAME("entityName", "name of the entity"),
-        NOMINAL_TIME("nominalTime", "instance time"),
-        OPERATION("operation", "operation like generate, delete, replicate"),
-        WORKFLOW_ID("workflowId", "current workflow-id of the instance"),
-        RUN_ID("runId", "current run-id of the instance"),
-        STATUS("status", "status of the user workflow instance"),
-        TIMESTAMP("timeStamp", "current timestamp"),
-        TOPIC_NAME("topicName", "name of the topic to be used to send JMS message"),
-        BRKR_IMPL_CLASS("brokerImplClass", "falcon message broker Implementation class"),
-        BRKR_URL("brokerUrl", "falcon message broker url"),
-        USER_BRKR_IMPL_CLASS("userBrokerImplClass", "user broker Impl class"),
-        USER_BRKR_URL("userBrokerUrl", "user broker url"),
-        BRKR_TTL("brokerTTL", "time to live for broker message in sec"),
-        FEED_NAMES("feedNames", "name of the feeds which are generated/replicated/deleted"),
-        FEED_INSTANCE_PATHS("feedInstancePaths", "comma separated feed instance paths"),
-        LOG_FILE("logFile", "log file path where feeds to be deleted are recorded"),
-        WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie"),
-        USER_SUBFLOW_ID("subflowId", "external id of user workflow"),
-        USER_WORKFLOW_ENGINE("userWorkflowEngine", "user workflow engine type"),
-        USER_WORKFLOW_NAME("userWorkflowName", "user workflow name"),
-        USER_WORKFLOW_VERSION("userWorkflowVersion", "user workflow version"),
-        LOG_DIR("logDir", "log dir where job logs are copied"),
-        WORKFLOW_USER("workflowUser", "user who owns the feed instance (partition)"),
-        INPUT_FEED_NAMES("falconInputFeeds", "name of the feeds which are used as inputs"),
-        INPUT_FEED_PATHS("falconInputPaths", "comma separated input feed instance paths");
-
-        private String name;
-        private String description;
-
-        Arg(String name, String description) {
-            this.name = name;
-            this.description = description;
-        }
-
-        public Option getOption() {
-            return new Option(this.name, true, this.description);
-        }
-
-        public String getOptionName() {
-            return this.name;
-        }
-
-        public String getOptionValue(CommandLine cmd) {
-            return cmd.getOptionValue(this.name);
-        }
-    }
-
     public static void main(String[] args) throws Exception {
         ToolRunner.run(new Configuration(), new FalconPostProcessing(), args);
     }
@@ -100,161 +40,46 @@ public class FalconPostProcessing extends Configured implements Tool {
     @Override
     public int run(String[] args) throws Exception {
 
-        CommandLine cmd = getCommand(args);
-
-        LOG.info("Sending user message {}", cmd);
-        invokeUserMessageProducer(cmd);
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(args,
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+        LOG.info("Post workflow execution context created {}", context);
+        // serialize the context to HDFS under logs dir before sending the message
+        context.serialize();
 
-        if ("SUCCEEDED".equals(Arg.STATUS.getOptionValue(cmd))) {
-            LOG.info("Recording lineage for {}", cmd);
-            recordLineageMetadata(cmd);
-        }
+        LOG.info("Sending user message {} ", context);
+        invokeUserMessageProducer(context);
 
-        //LogMover doesn't throw exception, a failed log mover will not fail the user workflow
-        LOG.info("Moving logs {}", cmd);
-        invokeLogProducer(cmd);
+        // JobLogMover doesn't throw exception, a failed log mover will not fail the user workflow
+        LOG.info("Moving logs {}", context);
+        invokeLogProducer(context);
 
-        LOG.info("Sending falcon message {}", cmd);
-        invokeFalconMessageProducer(cmd);
+        LOG.info("Sending falcon message {}", context);
+        invokeFalconMessageProducer(context);
 
         return 0;
     }
 
-    private void invokeUserMessageProducer(CommandLine cmd) throws Exception {
-        List<String> args = new ArrayList<String>();
-        addArg(args, cmd, Arg.CLUSTER);
-        addArg(args, cmd, Arg.ENTITY_TYPE);
-        addArg(args, cmd, Arg.ENTITY_NAME);
-        addArg(args, cmd, Arg.NOMINAL_TIME);
-        addArg(args, cmd, Arg.OPERATION);
-        addArg(args, cmd, Arg.WORKFLOW_ID);
-        addArg(args, cmd, Arg.RUN_ID);
-        addArg(args, cmd, Arg.STATUS);
-        addArg(args, cmd, Arg.TIMESTAMP);
-        //special args for user JMS message producer
-        args.add("-" + Arg.TOPIC_NAME.getOptionName()); //user topic
-        args.add("FALCON." + Arg.ENTITY_NAME.getOptionValue(cmd));
-        //note, the user broker impl class arg name to MessageProducer is brokerImplClass
-        args.add("-" + Arg.BRKR_IMPL_CLASS.getOptionName());
-        args.add(Arg.USER_BRKR_IMPL_CLASS.getOptionValue(cmd));
-        args.add("-" + Arg.BRKR_URL.getOptionName());
-        args.add(Arg.USER_BRKR_URL.getOptionValue(cmd));
-        addArg(args, cmd, Arg.BRKR_TTL);
-        addArg(args, cmd, Arg.FEED_NAMES);
-        addArg(args, cmd, Arg.FEED_INSTANCE_PATHS);
-        addArg(args, cmd, Arg.LOG_FILE);
-
-        MessageProducer.main(args.toArray(new String[0]));
+    private void invokeUserMessageProducer(WorkflowExecutionContext context) throws Exception {
+        JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
+                .type(JMSMessageProducer.MessageType.USER)
+                .build();
+        jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS);
     }
 
-    private void invokeFalconMessageProducer(CommandLine cmd) throws Exception {
-        List<String> args = new ArrayList<String>();
-        addArg(args, cmd, Arg.CLUSTER);
-        addArg(args, cmd, Arg.ENTITY_TYPE);
-        addArg(args, cmd, Arg.ENTITY_NAME);
-        addArg(args, cmd, Arg.NOMINAL_TIME);
-        addArg(args, cmd, Arg.OPERATION);
-        addArg(args, cmd, Arg.WORKFLOW_ID);
-        addArg(args, cmd, Arg.RUN_ID);
-        addArg(args, cmd, Arg.STATUS);
-        addArg(args, cmd, Arg.TIMESTAMP);
-        //special args Falcon JMS message producer
-        args.add("-" + Arg.TOPIC_NAME.getOptionName());
-        args.add("FALCON.ENTITY.TOPIC");
-        args.add("-" + Arg.BRKR_IMPL_CLASS.getOptionName());
-        args.add(Arg.BRKR_IMPL_CLASS.getOptionValue(cmd));
-        args.add("-" + Arg.BRKR_URL.getOptionName());
-        args.add(Arg.BRKR_URL.getOptionValue(cmd));
-        addArg(args, cmd, Arg.BRKR_TTL);
-        addArg(args, cmd, Arg.FEED_NAMES);
-        addArg(args, cmd, Arg.FEED_INSTANCE_PATHS);
-        addArg(args, cmd, Arg.LOG_FILE);
-        addArg(args, cmd, Arg.WORKFLOW_USER);
-        addArg(args, cmd, Arg.LOG_DIR);
-
-        MessageProducer.main(args.toArray(new String[0]));
+    private void invokeFalconMessageProducer(WorkflowExecutionContext context) throws Exception {
+        JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
+                .type(JMSMessageProducer.MessageType.FALCON)
+                .build();
+        jmsMessageProducer.sendMessage();
     }
 
-    private void invokeLogProducer(CommandLine cmd) throws Exception {
+    private void invokeLogProducer(WorkflowExecutionContext context) {
         // todo: need to move this out to Falcon in-process
         if (UserGroupInformation.isSecurityEnabled()) {
             LOG.info("Unable to move logs as security is enabled.");
             return;
         }
 
-        List<String> args = new ArrayList<String>();
-        addArg(args, cmd, Arg.WF_ENGINE_URL);
-        addArg(args, cmd, Arg.ENTITY_TYPE);
-        addArg(args, cmd, Arg.USER_SUBFLOW_ID);
-        addArg(args, cmd, Arg.USER_WORKFLOW_ENGINE);
-        addArg(args, cmd, Arg.RUN_ID);
-        addArg(args, cmd, Arg.LOG_DIR);
-        addArg(args, cmd, Arg.STATUS);
-
-        LogMover.main(args.toArray(new String[0]));
-    }
-
-    private void recordLineageMetadata(CommandLine cmd) throws Exception {
-        List<String> args = new ArrayList<String>();
-
-        for (LineageArgs arg : LineageArgs.values()) {
-            if (StringUtils.isNotEmpty(arg.getOptionValue(cmd))) {
-                args.add("-" + arg.getOptionName());
-                args.add(arg.getOptionValue(cmd));
-            }
-        }
-
-        LineageRecorder.main(args.toArray(new String[args.size()]));
-    }
-
-    private void addArg(List<String> args, CommandLine cmd, Arg arg) {
-        if (StringUtils.isNotEmpty(arg.getOptionValue(cmd))) {
-            args.add("-" + arg.getOptionName());
-            args.add(arg.getOptionValue(cmd));
-        }
-    }
-
-    private static CommandLine getCommand(String[] arguments)
-        throws ParseException {
-
-        Options options = new Options();
-        addOption(options, Arg.CLUSTER);
-        addOption(options, Arg.ENTITY_TYPE);
-        addOption(options, Arg.ENTITY_NAME);
-        addOption(options, Arg.NOMINAL_TIME);
-        addOption(options, Arg.OPERATION);
-        addOption(options, Arg.WORKFLOW_ID);
-        addOption(options, Arg.RUN_ID);
-        addOption(options, Arg.STATUS);
-        addOption(options, Arg.TIMESTAMP);
-        addOption(options, Arg.BRKR_IMPL_CLASS);
-        addOption(options, Arg.BRKR_URL);
-        addOption(options, Arg.USER_BRKR_IMPL_CLASS);
-        addOption(options, Arg.USER_BRKR_URL);
-        addOption(options, Arg.BRKR_TTL);
-        addOption(options, Arg.FEED_NAMES);
-        addOption(options, Arg.FEED_INSTANCE_PATHS);
-        addOption(options, Arg.LOG_FILE);
-        addOption(options, Arg.WF_ENGINE_URL);
-        addOption(options, Arg.USER_SUBFLOW_ID);
-        addOption(options, Arg.USER_WORKFLOW_NAME, false);
-        addOption(options, Arg.USER_WORKFLOW_VERSION, false);
-        addOption(options, Arg.USER_WORKFLOW_ENGINE, false);
-        addOption(options, Arg.LOG_DIR);
-        addOption(options, Arg.WORKFLOW_USER);
-        addOption(options, Arg.INPUT_FEED_NAMES, false);
-        addOption(options, Arg.INPUT_FEED_PATHS, false);
-
-        return new GnuParser().parse(options, arguments);
-    }
-
-    private static void addOption(Options options, Arg arg) {
-        addOption(options, arg, true);
-    }
-
-    private static void addOption(Options options, Arg arg, boolean isRequired) {
-        Option option = arg.getOption();
-        option.setRequired(isRequired);
-        options.addOption(option);
+        new JobLogMover().run(context);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index 561303d..feddfdd 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -20,7 +20,7 @@ package org.apache.falcon.oozie.workflow;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.falcon.workflow.FalconPostProcessing;
-import org.apache.falcon.workflow.FalconPostProcessing.Arg;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -45,34 +45,34 @@ public class FalconPostProcessingTest {
     @BeforeClass
     public void setup() throws Exception {
         args = new String[]{
-            "-" + Arg.ENTITY_NAME.getOptionName(), ENTITY_NAME,
-            "-" + Arg.FEED_NAMES.getOptionName(), "out-click-logs,out-raw-logs",
-            "-" + Arg.FEED_INSTANCE_PATHS.getOptionName(),
+            "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), ENTITY_NAME,
+            "-" + WorkflowExecutionArgs.FEED_NAMES.getName(), "out-click-logs,out-raw-logs",
+            "-" + WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(),
             "/out-click-logs/10/05/05/00/20,/out-raw-logs/10/05/05/00/20",
-            "-" + Arg.WORKFLOW_ID.getOptionName(), "workflow-01-00",
-            "-" + Arg.WORKFLOW_USER.getOptionName(), "falcon",
-            "-" + Arg.RUN_ID.getOptionName(), "1",
-            "-" + Arg.NOMINAL_TIME.getOptionName(), "2011-01-01-01-00",
-            "-" + Arg.TIMESTAMP.getOptionName(), "2012-01-01-01-00",
-            "-" + Arg.BRKR_URL.getOptionName(), BROKER_URL,
-            "-" + Arg.BRKR_IMPL_CLASS.getOptionName(), (BROKER_IMPL_CLASS),
-            "-" + Arg.USER_BRKR_URL.getOptionName(), BROKER_URL,
-            "-" + Arg.USER_BRKR_IMPL_CLASS.getOptionName(), (BROKER_IMPL_CLASS),
-            "-" + Arg.ENTITY_TYPE.getOptionName(), ("process"),
-            "-" + Arg.OPERATION.getOptionName(), ("GENERATE"),
-            "-" + Arg.LOG_FILE.getOptionName(), ("/logFile"),
-            "-" + Arg.STATUS.getOptionName(), ("SUCCEEDED"),
-            "-" + Arg.BRKR_TTL.getOptionName(), "10",
-            "-" + Arg.CLUSTER.getOptionName(), "corp",
-            "-" + Arg.WF_ENGINE_URL.getOptionName(), "http://localhost:11000/oozie/",
-            "-" + Arg.LOG_DIR.getOptionName(), "target/log",
-            "-" + Arg.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id" + "test",
-            "-" + Arg.USER_WORKFLOW_ENGINE.getOptionName(), "oozie",
-            "-" + Arg.INPUT_FEED_NAMES.getOptionName(), "in-click-logs,in-raw-logs",
-            "-" + Arg.INPUT_FEED_PATHS.getOptionName(),
+            "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
+            "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), "falcon",
+            "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
+            "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), "2011-01-01-01-00",
+            "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), "2012-01-01-01-00",
+            "-" + WorkflowExecutionArgs.BRKR_URL.getName(), BROKER_URL,
+            "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS,
+            "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), BROKER_URL,
+            "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER_IMPL_CLASS,
+            "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), "process",
+            "-" + WorkflowExecutionArgs.OPERATION.getName(), "GENERATE",
+            "-" + WorkflowExecutionArgs.LOG_FILE.getName(), "/logFile",
+            "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
+            "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "10",
+            "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), "corp",
+            "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie/",
+            "-" + WorkflowExecutionArgs.LOG_DIR.getName(), "target/log",
+            "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id" + "test",
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), "oozie",
+            "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), "in-click-logs,in-raw-logs",
+            "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(),
             "/in-click-logs/10/05/05/00/20,/in-raw-logs/10/05/05/00/20",
-            "-" + Arg.USER_WORKFLOW_NAME.getOptionName(), "test-workflow",
-            "-" + Arg.USER_WORKFLOW_VERSION.getOptionName(), "1.0.0",
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), "test-workflow",
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), "1.0.0",
         };
 
         broker = new BrokerService();
@@ -95,8 +95,8 @@ public class FalconPostProcessingTest {
             @Override
             public void run() {
                 try {
-                    consumer(BROKER_URL, "FALCON." + ENTITY_NAME);
-                    consumer(BROKER_URL, FALCON_TOPIC_NAME);
+                    consumer(BROKER_URL, "FALCON." + ENTITY_NAME);  // user message
+                    consumer(BROKER_URL, FALCON_TOPIC_NAME);  // falcon message
                 } catch (AssertionError e) {
                     error = e;
                 } catch (JMSException ignore) {
@@ -131,13 +131,13 @@ public class FalconPostProcessingTest {
 
         assertMessage(m);
         if (topic.equals(FALCON_TOPIC_NAME)) {
-            Assert.assertEquals(m.getString(Arg.FEED_NAMES.getOptionName()),
+            Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()),
                     "out-click-logs,out-raw-logs");
-            Assert.assertEquals(m.getString(Arg.FEED_INSTANCE_PATHS.getOptionName()),
+            Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
                     "/out-click-logs/10/05/05/00/20,/out-raw-logs/10/05/05/00/20");
         } else {
-            Assert.assertEquals(m.getString(Arg.FEED_NAMES.getOptionName()), "out-click-logs");
-            Assert.assertEquals(m.getString(Arg.FEED_INSTANCE_PATHS.getOptionName()),
+            Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_NAMES.getName()), "out-click-logs");
+            Assert.assertEquals(m.getString(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName()),
                     "/out-click-logs/10/05/05/00/20");
         }
 
@@ -145,15 +145,13 @@ public class FalconPostProcessingTest {
     }
 
     private void assertMessage(MapMessage m) throws JMSException {
-        Assert.assertEquals(m.getString(Arg.ENTITY_NAME.getOptionName()), "agg-coord");
-        Assert.assertEquals(m.getString(Arg.WORKFLOW_ID.getOptionName()), "workflow-01-00");
-        String workflowUser = m.getString(Arg.WORKFLOW_USER.getOptionName());
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.ENTITY_NAME.getName()), "agg-coord");
+        String workflowUser = m.getString(WorkflowExecutionArgs.WORKFLOW_USER.getName());
         if (workflowUser != null) { // in case of user message, its NULL
             Assert.assertEquals(workflowUser, "falcon");
         }
-        Assert.assertEquals(m.getString(Arg.RUN_ID.getOptionName()), "1");
-        Assert.assertEquals(m.getString(Arg.NOMINAL_TIME.getOptionName()), "2011-01-01T01:00Z");
-        Assert.assertEquals(m.getString(Arg.TIMESTAMP.getOptionName()), "2012-01-01T01:00Z");
-        Assert.assertEquals(m.getString(Arg.STATUS.getOptionName()), "SUCCEEDED");
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()), "2011-01-01T01:00Z");
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()), "2012-01-01T01:00Z");
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
index 51def35..89132d6 100644
--- a/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
+++ b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
@@ -117,7 +117,7 @@ public class PigProcessIT {
                 .get(InstancesResult.class);
         Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
 
-        // verify LogMover
+        // verify JobLogMover
         Path oozieLogPath = OozieTestUtils.getOozieLogPath(context.getCluster().getCluster(), jobInfo);
         Assert.assertTrue(context.getCluster().getFileSystem().exists(oozieLogPath));
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c6000363/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java b/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java
index de8bc33..e67fe2a 100644
--- a/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java
+++ b/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java
@@ -22,8 +22,9 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.logging.LogMover;
+import org.apache.falcon.logging.JobLogMover;
 import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.falcon.workflow.engine.OozieClientFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.client.BundleJob;
@@ -176,13 +177,17 @@ public final class OozieTestUtils {
     public static Path getOozieLogPath(Cluster cluster, WorkflowJob jobInfo) throws Exception {
         Path stagingPath = EntityUtil.getLogPath(cluster, cluster);
         final Path logPath = new Path(ClusterHelper.getStorageUrl(cluster), stagingPath);
-        LogMover.main(new String[] {
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(new String[] {
             "-workflowEngineUrl", ClusterHelper.getOozieUrl(cluster),
-            "-subflowId", jobInfo.getId(), "-runId", "1",
+            "-subflowId", jobInfo.getId(),
+            "-runId", "1",
             "-logDir", logPath.toString() + "/job-2012-04-21-00-00",
-            "-status", "SUCCEEDED", "-entityType", "process",
+            "-status", "SUCCEEDED",
+            "-entityType", "process",
             "-userWorkflowEngine", "pig",
-        });
+        }, WorkflowExecutionContext.Type.POST_PROCESSING);
+
+        new JobLogMover().run(context);
 
         return new Path(logPath, "job-2012-04-21-00-00/001/oozie.log");
     }