You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/07/20 17:21:19 UTC

[2/2] git commit: FALCON-30 Enable embedding pig scripts directly in a process. Contributed by Venkatesh Seetharam

FALCON-30 Enable embedding pig scripts directly in a process. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/09971e66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/09971e66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/09971e66

Branch: refs/heads/master
Commit: 09971e66be57d6c0884415240feed85cb2a444a6
Parents: 689d52a
Author: srikanth.sundarrajan <sr...@inmobi.com>
Authored: Sat Jul 20 20:34:22 2013 +0530
Committer: srikanth.sundarrajan <sr...@inmobi.com>
Committed: Sat Jul 20 20:34:22 2013 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 client/src/main/resources/process-0.1.xsd       |   3 +-
 docs/src/site/twiki/EntitySpecification.twiki   |  39 +-
 .../org/apache/falcon/logging/LogMover.java     |  33 +-
 .../falcon/workflow/FalconPostProcessing.java   |  16 +-
 .../workflow/FalconPostProcessingTest.java      |   3 +-
 pom.xml                                         |   1 +
 .../falcon/converter/OozieProcessMapper.java    | 339 ++++++++---
 .../config/workflow/process-parent-workflow.xml |  44 +-
 .../falcon/converter/AbstractTestBase.java      |  21 +-
 .../converter/OozieProcessMapperTest.java       |  76 +--
 .../config/process/pig-process-0.1.xml          |  53 ++
 webapp/pom.xml                                  |   9 +
 .../java/org/apache/falcon/cli/FalconCLIIT.java |   4 +-
 .../org/apache/falcon/logging/LogMoverIT.java   |  17 +-
 .../apache/falcon/resource/PigProcessIT.java    | 119 ++++
 .../org/apache/falcon/resource/TestContext.java | 128 +++-
 webapp/src/test/resources/apps/pig/data.txt     | 585 +++++++++++++++++++
 webapp/src/test/resources/apps/pig/id.pig       |  20 +
 .../src/test/resources/pig-process-template.xml |  49 ++
 20 files changed, 1392 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 82cf4ae..8bc6646 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,9 @@ Trunk (Unreleased)
 
   IMPROVEMENTS
 
+    FALCON-30 Enable embedding pig scripts directly in a process. (Venkatesh
+    Seetharam via Srikanth Sundarrajan)
+
     FALCON-47 Falcon Replication should support configurable delays in feed, 
     parallel, timeout and bulk transfer with variable frequency (Shaik Idris
     Ali via Srikanth Sundarrajan)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/client/src/main/resources/process-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd
index 529d5c8..c96700c 100644
--- a/client/src/main/resources/process-0.1.xsd
+++ b/client/src/main/resources/process-0.1.xsd
@@ -283,7 +283,7 @@
     </xs:complexType>
 
     <xs:complexType name="workflow">
-        <xs:attribute type="engine-type" name="engine" use="optional"/>
+        <xs:attribute type="engine-type" name="engine" use="optional" default="oozie"/>
         <xs:attribute type="xs:string" name="path" use="required"/>
         <xs:attribute type="xs:string" name="lib" use="optional"/>
     </xs:complexType>
@@ -291,6 +291,7 @@
     <xs:simpleType name="engine-type">
         <xs:restriction base="xs:string">
             <xs:enumeration value="oozie"/>
+            <xs:enumeration value="pig"/>
         </xs:restriction>
     </xs:simpleType>
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index bae2612..ef86055 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -394,12 +394,25 @@ queueName and jobPriority are special properites, which when present are used by
         <property name="queueName" value="hadoopQueue"/>
         <property name="jobPriority" value="VERY_HIGH"/>
 </verbatim>
+
 ---++++ Workflow
-The workflow defines the workflow engine that should be used and the path to the workflow on hdfs. The workflow definition on hdfs contains the actual job that should run and it should confirm to the workflow specification of the engine specified. The libraries required by the workflow should be in lib folder inside the workflow path.
 
-The properties defined in the cluster and cluster properties(nameNode and jobTracker) will also be available for the workflow.
+The workflow defines the workflow engine that should be used and the path to the workflow on hdfs.
+The workflow definition on hdfs contains the actual job that should run and it should confirm to
+the workflow specification of the engine specified. The libraries required by the workflow should
+be in lib folder inside the workflow path.
+
+The properties defined in the cluster and cluster properties(nameNode and jobTracker) will also
+be available for the workflow.
+
+There are 2 engines supported today.
+
+---+++++ Oozie
+
+As part of oozie workflow engine support, users can embed a oozie workflow.
+Refer to oozie [[http://incubator.apache.org/oozie/overview.html][workflow overview]] and
+[[http://incubator.apache.org/oozie/docs/3.1.3/docs/WorkflowFunctionalSpec.html][workflow specification]] for details.
 
-As of now, only oozie workflow engine is supported. Refer to oozie [[http://incubator.apache.org/oozie/overview.html][workflow overview]] and [[http://incubator.apache.org/oozie/docs/3.1.3/docs/WorkflowFunctionalSpec.html][workflow specification]] for details.  
 Syntax:
 <verbatim>
 <process name="[process name]">
@@ -417,7 +430,25 @@ Example:
 ...
 </process>
 </verbatim>
-This defines the workflow engine to be oozie and the workflow xml is defined at /projects/bootcamp/workflow/workflow.xml. The libraries are at /projects/bootcamp/workflow/lib.
+
+This defines the workflow engine to be oozie and the workflow xml is defined at
+/projects/bootcamp/workflow/workflow.xml. The libraries are at /projects/bootcamp/workflow/lib.
+
+---+++++ Pig
+
+Falcon also adds the Pig engine which enables users to embed a Pig script as a process.
+
+Example:
+<verbatim>
+<process name="sample-process">
+...
+    <workflow engine="pig" path="/projects/bootcamp/pig.script"/>
+...
+</process>
+</verbatim>
+
+This defines the workflow engine to be pig and the pig script is defined at
+/projects/bootcamp/pig.script.
 
 ---++++ Retry
 Retry policy defines how the workflow failures should be handled. Two retry policies are defined: backoff and exp-backoff(exponential backoff). Depending on the delay and number of attempts, the workflow is re-tried after specific intervals.

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
index 4021464..e8a85c6 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
@@ -23,6 +23,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.EngineType;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -64,6 +65,7 @@ public class LogMover extends Configured implements Tool {
         private String runId;
         private String logDir;
         private String entityType;
+        private String userWorkflowEngine;
     }
 
     public static void main(String[] args) throws Exception {
@@ -75,6 +77,7 @@ public class LogMover extends Configured implements Tool {
         try {
             ARGS args = new ARGS();
             setupArgs(arguments, args);
+
             OozieClient client = new OozieClient(args.oozieUrl);
             WorkflowJob jobInfo;
             try {
@@ -83,17 +86,18 @@ public class LogMover extends Configured implements Tool {
                 LOG.error("Error getting jobinfo for: " + args.subflowId, e);
                 return 0;
             }
+
             Path path = new Path(args.logDir + "/"
                     + String.format("%03d", Integer.parseInt(args.runId)));
-
             FileSystem fs = path.getFileSystem(getConf());
 
-            if (args.entityType.equalsIgnoreCase(EntityType.FEED.name())) {
-                // if replication wf
+            if (args.entityType.equalsIgnoreCase(EntityType.FEED.name())
+                    || notUserWorkflowEngineIsOozie(args.userWorkflowEngine)) {
+                // if replication wf or PIG Process
                 copyOozieLog(client, fs, path, jobInfo.getId());
                 copyTTlogs(fs, path, jobInfo.getActions().get(2));
             } else {
-                // if process wf
+                // if process wf with oozie engine
                 String subflowId = jobInfo.getExternalId();
                 copyOozieLog(client, fs, path, subflowId);
                 WorkflowJob subflowInfo = client.getJobInfo(subflowId);
@@ -107,7 +111,6 @@ public class LogMover extends Configured implements Tool {
                                 + action.getName());
                     }
                 }
-
             }
 
         } catch (Exception e) {
@@ -116,6 +119,11 @@ public class LogMover extends Configured implements Tool {
         return 0;
     }
 
+    private boolean notUserWorkflowEngineIsOozie(String userWorkflowEngine) {
+        // userWorkflowEngine will be null for replication and "pig" for pig
+        return userWorkflowEngine != null && EngineType.fromValue(userWorkflowEngine) != EngineType.OOZIE;
+    }
+
     private void copyOozieLog(OozieClient client, FileSystem fs, Path path,
                               String id) throws OozieClientException, IOException {
         InputStream in = new ByteArrayInputStream(client.getJobLog(id).getBytes());
@@ -150,23 +158,31 @@ public class LogMover extends Configured implements Tool {
 
     private void setupArgs(String[] arguments, ARGS args) throws ParseException {
         Options options = new Options();
-        Option opt;
-        opt = new Option("workflowEngineUrl", true,
-                "url of workflow engine, ex:oozie");
+
+        Option opt = new Option("workflowEngineUrl", true, "url of workflow engine, ex:oozie");
         opt.setRequired(true);
         options.addOption(opt);
+
         opt = new Option("subflowId", true, "external id of userworkflow");
         opt.setRequired(true);
         options.addOption(opt);
+
+        opt = new Option("userWorkflowEngine", true, "user workflow engine type");
+        opt.setRequired(false);  // replication will NOT have this arg sent
+        options.addOption(opt);
+
         opt = new Option("runId", true, "current workflow's runid");
         opt.setRequired(true);
         options.addOption(opt);
+
         opt = new Option("logDir", true, "log dir where job logs are stored");
         opt.setRequired(true);
         options.addOption(opt);
+
         opt = new Option("status", true, "user workflow status");
         opt.setRequired(true);
         options.addOption(opt);
+
         opt = new Option("entityType", true, "entity type feed or process");
         opt.setRequired(true);
         options.addOption(opt);
@@ -175,6 +191,7 @@ public class LogMover extends Configured implements Tool {
 
         args.oozieUrl = cmd.getOptionValue("workflowEngineUrl");
         args.subflowId = cmd.getOptionValue("subflowId");
+        args.userWorkflowEngine = cmd.getOptionValue("userWorkflowEngine");
         args.runId = cmd.getOptionValue("runId");
         args.logDir = cmd.getOptionValue("logDir");
         args.entityType = cmd.getOptionValue("entityType");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 392f145..914828b 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -59,6 +59,7 @@ public class FalconPostProcessing extends Configured implements Tool {
         LOG_FILE("logFile", "log file path where feeds to be deleted are recorded"),
         WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie"),
         USER_SUBFLOW_ID("subflowId", "external id of user workflow"),
+        USER_WORKFLOW_ENGINE("userWorkflowEngine", "user workflow engine type"),
         LOG_DIR("logDir", "log dir where job logs are copied");
 
         private String name;
@@ -80,7 +81,6 @@ public class FalconPostProcessing extends Configured implements Tool {
         public String getOptionValue(CommandLine cmd) {
             return cmd.getOptionValue(this.name);
         }
-
     }
 
     public static void main(String[] args) throws Exception {
@@ -94,9 +94,11 @@ public class FalconPostProcessing extends Configured implements Tool {
 
         LOG.info("Sending user message " + cmd);
         invokeUserMessageProducer(cmd);
+
         //LogMover doesnt throw exception, a failed logmover will not fail the user workflow
         LOG.info("Moving logs " + cmd);
         invokeLogProducer(cmd);
+
         LOG.info("Sending falcon message " + cmd);
         invokeFalconMessageProducer(cmd);
 
@@ -154,7 +156,6 @@ public class FalconPostProcessing extends Configured implements Tool {
         addArg(args, cmd, Arg.LOG_FILE);
 
         MessageProducer.main(args.toArray(new String[0]));
-
     }
 
     private void invokeLogProducer(CommandLine cmd) throws Exception {
@@ -162,12 +163,12 @@ public class FalconPostProcessing extends Configured implements Tool {
         addArg(args, cmd, Arg.WF_ENGINE_URL);
         addArg(args, cmd, Arg.ENTITY_TYPE);
         addArg(args, cmd, Arg.USER_SUBFLOW_ID);
+        addArg(args, cmd, Arg.USER_WORKFLOW_ENGINE);
         addArg(args, cmd, Arg.RUN_ID);
         addArg(args, cmd, Arg.LOG_DIR);
         addArg(args, cmd, Arg.STATUS);
 
         LogMover.main(args.toArray(new String[0]));
-
     }
 
     private void addArg(List<String> args, CommandLine cmd, Arg arg) {
@@ -198,15 +199,18 @@ public class FalconPostProcessing extends Configured implements Tool {
         addOption(options, Arg.LOG_FILE);
         addOption(options, Arg.WF_ENGINE_URL);
         addOption(options, Arg.USER_SUBFLOW_ID);
+        addOption(options, Arg.USER_WORKFLOW_ENGINE, false);
         addOption(options, Arg.LOG_DIR);
         return new GnuParser().parse(options, arguments);
     }
 
     private static void addOption(Options options, Arg arg) {
+        addOption(options, arg, true);
+    }
+
+    private static void addOption(Options options, Arg arg, boolean isRequired) {
         Option option = arg.getOption();
-        option.setRequired(true);
+        option.setRequired(isRequired);
         options.addOption(option);
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index 5ac8006..c6485cd 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -68,7 +68,8 @@ public class FalconPostProcessingTest {
                             "-" + Arg.WF_ENGINE_URL.getOptionName(),
                             "http://localhost:11000/oozie/",
                             "-" + Arg.LOG_DIR.getOptionName(), "target/log",
-                            "-" + Arg.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id" + "test", };
+                            "-" + Arg.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id" + "test",
+                            "-" + Arg.USER_WORKFLOW_ENGINE.getOptionName(), "oozie", };
         broker = new BrokerService();
         broker.addConnector(BROKER_URL);
         broker.setDataDirectory("target/activemq");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9121dd0..a518f54 100644
--- a/pom.xml
+++ b/pom.xml
@@ -823,6 +823,7 @@
                         <exclude>**/.project</exclude>
                         <exclude>**/.settings/**</exclude>
                         <exclude>**/test-output/**</exclude>
+                        <exclude>**/data.txt</exclude>
                     </excludes>
                 </configuration>
                 <executions>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index 592f089..8f75736 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -21,6 +21,7 @@ package org.apache.falcon.converter;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.ProcessHelper;
@@ -30,18 +31,34 @@ import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.EngineType;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.entity.v0.process.Workflow;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.oozie.coordinator.*;
+import org.apache.falcon.oozie.coordinator.CONTROLS;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.DATAIN;
+import org.apache.falcon.oozie.coordinator.DATAOUT;
+import org.apache.falcon.oozie.coordinator.DATASETS;
+import org.apache.falcon.oozie.coordinator.INPUTEVENTS;
+import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
 import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.SUBWORKFLOW;
+import org.apache.falcon.oozie.workflow.DELETE;
+import org.apache.falcon.oozie.workflow.PIG;
+import org.apache.falcon.oozie.workflow.PREPARE;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 
+import java.io.IOException;
 import java.util.*;
 
 /**
@@ -64,20 +81,6 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         return apps;
     }
 
-    private void createWorkflow(Cluster cluster, String template, String wfName, Path wfPath) throws FalconException {
-        WORKFLOWAPP wfApp = getWorkflowTemplate(template);
-        wfApp.setName(wfName);
-
-        for (Object object : wfApp.getDecisionOrForkOrJoin()) {
-            if (object instanceof ACTION && ((ACTION) object).getName().equals("user-workflow")) {
-                SUBWORKFLOW subWf = ((ACTION) object).getSubWorkflow();
-                subWf.setAppPath(getStoragePath(getEntity().getWorkflow().getPath()));
-            }
-        }
-
-        marshal(cluster, wfApp, wfPath);
-    }
-
     /**
      * Creates default oozie coordinator.
      *
@@ -97,15 +100,57 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         Path coordPath = getCoordPath(bundlePath, coordName);
 
         // coord attributes
+        initializeCoordAttributes(cluster, process, coord, coordName);
+
+        CONTROLS controls = initializeControls(process); // controls
+        coord.setControls(controls);
+
+        // Configuration
+        Map<String, String> props = createCoordDefaultConfiguration(cluster, coordPath, coordName);
+
+        List<String> inputFeeds = new ArrayList<String>();
+        List<String> inputPaths = new ArrayList<String>();
+        initializeInputPaths(cluster, process, coord, props, inputFeeds, inputPaths); // inputs
+        props.put("falconInPaths", join(inputPaths.iterator(), '#'));
+        props.put("falconInputFeeds", join(inputFeeds.iterator(), '#'));
+
+        List<String> outputFeeds = new ArrayList<String>();
+        List<String> outputPaths = new ArrayList<String>();
+        initializeOutputPaths(cluster, process, coord, props, outputFeeds, outputPaths);  // outputs
+        // Output feed name and path for parent workflow
+        props.put(ARG.feedNames.getPropName(), join(outputFeeds.iterator(), ','));
+        props.put(ARG.feedInstancePaths.getPropName(), join(outputPaths.iterator(), ','));
+
+        Workflow processWorkflow = process.getWorkflow();
+        props.put("userWorkflowEngine", processWorkflow.getEngine().value());
+
+        // create parent wf
+        createWorkflow(cluster, process, processWorkflow, coordName, coordPath);
+
+        WORKFLOW wf = new WORKFLOW();
+        wf.setAppPath(getStoragePath(coordPath.toString()));
+        wf.setConfiguration(getCoordConfig(props));
+
+        // set coord action to parent wf
+        org.apache.falcon.oozie.coordinator.ACTION action = new org.apache.falcon.oozie.coordinator.ACTION();
+        action.setWorkflow(wf);
+        coord.setAction(action);
+
+        return coord;
+    }
+
+    private void initializeCoordAttributes(Cluster cluster, Process process, COORDINATORAPP coord, String coordName) {
         coord.setName(coordName);
-        org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process,
-                cluster.getName());
+        org.apache.falcon.entity.v0.process.Cluster processCluster =
+                ProcessHelper.getCluster(process, cluster.getName());
         coord.setStart(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()));
         coord.setEnd(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()));
         coord.setTimezone(process.getTimezone().getID());
         coord.setFrequency("${coord:" + process.getFrequency().toString() + "}");
+    }
 
-        // controls
+    private CONTROLS initializeControls(Process process)
+        throws FalconException {
         CONTROLS controls = new CONTROLS();
         controls.setConcurrency(String.valueOf(process.getParallel()));
         controls.setExecution(process.getOrder().name());
@@ -123,95 +168,80 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
             }
         }
         controls.setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
+
         if (timeoutInMillis / frequencyInMillis * 2 > 0) {
             controls.setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
         }
-        coord.setControls(controls);
-
-        // Configuration
-        Map<String, String> props = createCoordDefaultConfiguration(cluster, coordPath, coordName);
 
-        List<String> inputFeeds = new ArrayList<String>();
-        List<String> inputPaths = new ArrayList<String>();
-        // inputs
-        if (process.getInputs() != null) {
-            for (Input input : process.getInputs().getInputs()) {
-                if (!input.isOptional()) {
-                    if (coord.getDatasets() == null) {
-                        coord.setDatasets(new DATASETS());
-                    }
-                    if (coord.getInputEvents() == null) {
-                        coord.setInputEvents(new INPUTEVENTS());
-                    }
-
-                    SYNCDATASET syncdataset = createDataSet(input.getFeed(), cluster, input.getName(),
-                            LocationType.DATA);
-                    coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
-
-                    DATAIN datain = createDataIn(input);
-                    coord.getInputEvents().getDataIn().add(datain);
-                }
-
-                String inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
-                props.put(input.getName(), inputExpr);
-                inputFeeds.add(input.getName());
-                inputPaths.add(inputExpr);
+        return controls;
+    }
 
-            }
+    private void initializeInputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
+                                      Map<String, String> props, List<String> inputFeeds, List<String> inputPaths)
+        throws FalconException {
+        if (process.getInputs() == null) {
+            return;
         }
-        props.put("falconInPaths", join(inputPaths.iterator(), '#'));
-        props.put("falconInputFeeds", join(inputFeeds.iterator(), '#'));
 
-        // outputs
-        List<String> outputFeeds = new ArrayList<String>();
-        List<String> outputPaths = new ArrayList<String>();
-        if (process.getOutputs() != null) {
-            if (coord.getDatasets() == null) {
-                coord.setDatasets(new DATASETS());
-            }
-            if (coord.getOutputEvents() == null) {
-                coord.setOutputEvents(new OUTPUTEVENTS());
-            }
+        for (Input input : process.getInputs().getInputs()) {
+            if (!input.isOptional()) {
+                if (coord.getDatasets() == null) {
+                    coord.setDatasets(new DATASETS());
+                }
+                if (coord.getInputEvents() == null) {
+                    coord.setInputEvents(new INPUTEVENTS());
+                }
 
-            for (Output output : process.getOutputs().getOutputs()) {
-                SYNCDATASET syncdataset = createDataSet(output.getFeed(), cluster, output.getName(), LocationType.DATA);
+                SYNCDATASET syncdataset = createDataSet(input.getFeed(), cluster, input.getName(),
+                        LocationType.DATA);
                 coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
 
-                DATAOUT dataout = createDataOut(output);
-                coord.getOutputEvents().getDataOut().add(dataout);
-
-                String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
-                props.put(output.getName(), outputExpr);
-                outputFeeds.add(output.getName());
-                outputPaths.add(outputExpr);
-
-                // stats and meta paths
-                createOutputEvent(output.getFeed(), output.getName(), cluster, "stats",
-                        LocationType.STATS, coord, props, output.getInstance());
-                createOutputEvent(output.getFeed(), output.getName(), cluster, "meta",
-                        LocationType.META, coord, props, output.getInstance());
-                createOutputEvent(output.getFeed(), output.getName(), cluster, "tmp",
-                        LocationType.TMP, coord, props, output.getInstance());
-
+                DATAIN datain = createDataIn(input);
+                coord.getInputEvents().getDataIn().add(datain);
             }
+
+            String inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
+            props.put(input.getName(), inputExpr);
+            inputFeeds.add(input.getName());
+            inputPaths.add(inputExpr);
         }
-        // Output feed name and path for parent workflow
-        props.put(ARG.feedNames.getPropName(), join(outputFeeds.iterator(), ','));
-        props.put(ARG.feedInstancePaths.getPropName(), join(outputPaths.iterator(), ','));
+    }
 
-        // create parent wf
-        createWorkflow(cluster, DEFAULT_WF_TEMPLATE, coordName, coordPath);
+    private void initializeOutputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
+                                       Map<String, String> props, List<String> outputFeeds, List<String> outputPaths)
+        throws FalconException {
+        if (process.getOutputs() == null) {
+            return;
+        }
 
-        WORKFLOW wf = new WORKFLOW();
-        wf.setAppPath(getStoragePath(coordPath.toString()));
-        wf.setConfiguration(getCoordConfig(props));
+        if (coord.getDatasets() == null) {
+            coord.setDatasets(new DATASETS());
+        }
 
-        // set coord action to parent wf
-        org.apache.falcon.oozie.coordinator.ACTION action = new org.apache.falcon.oozie.coordinator.ACTION();
-        action.setWorkflow(wf);
-        coord.setAction(action);
+        if (coord.getOutputEvents() == null) {
+            coord.setOutputEvents(new OUTPUTEVENTS());
+        }
 
-        return coord;
+        for (Output output : process.getOutputs().getOutputs()) {
+            SYNCDATASET syncdataset = createDataSet(output.getFeed(), cluster, output.getName(), LocationType.DATA);
+            coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
+
+            DATAOUT dataout = createDataOut(output);
+            coord.getOutputEvents().getDataOut().add(dataout);
+
+            String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
+            props.put(output.getName(), outputExpr);
+            outputFeeds.add(output.getName());
+            outputPaths.add(outputExpr);
+
+            // stats and meta paths
+            createOutputEvent(output.getFeed(), output.getName(), cluster, "stats",
+                    LocationType.STATS, coord, props, output.getInstance());
+            createOutputEvent(output.getFeed(), output.getName(), cluster, "meta",
+                    LocationType.META, coord, props, output.getInstance());
+            createOutputEvent(output.getFeed(), output.getName(), cluster, "tmp",
+                    LocationType.TMP, coord, props, output.getInstance());
+        }
     }
 
     private DATAOUT createDataOut(Output output) {
@@ -261,7 +291,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
 
     private SYNCDATASET createDataSet(String feedName, Cluster cluster, String datasetName,
                                       LocationType locationType) throws FalconException {
-        Feed feed = (Feed) EntityUtil.getEntity(EntityType.FEED, feedName);
+        Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
 
         SYNCDATASET syncdataset = new SYNCDATASET();
         syncdataset.setName(datasetName);
@@ -300,4 +330,127 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         }
         return props;
     }
+
+    private void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
+                                String wfName, Path wfPath) throws FalconException {
+        WORKFLOWAPP wfApp = getWorkflowTemplate(DEFAULT_WF_TEMPLATE);
+        wfApp.setName(wfName);
+
+        EngineType engineType = processWorkflow.getEngine();
+        for (Object object : wfApp.getDecisionOrForkOrJoin()) {
+            if (!(object instanceof ACTION)) {
+                continue;
+            }
+
+            String storagePath = getStoragePath(getEntity().getWorkflow().getPath());
+            ACTION action = (ACTION) object;
+            String actionName = action.getName();
+            if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
+                action.getSubWorkflow().setAppPath(storagePath);
+            } else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
+                decoratePIGAction(cluster, process, processWorkflow, storagePath, action.getPig());
+            }
+        }
+
+        marshal(cluster, wfApp, wfPath);
+    }
+
+    private void decoratePIGAction(Cluster cluster, Process process, Workflow processWorkflow,
+                                   String storagePath, PIG pigAction) throws FalconException {
+
+        pigAction.setScript(storagePath);
+
+        addPrepareDeleteOutputPath(process, pigAction);
+
+        addInputOutputFeedsAsParams(pigAction, process);
+
+        propagateProcessProperties(pigAction, process);
+
+        addArchiveForCustomJars(cluster, processWorkflow, pigAction);
+    }
+
+    private void addPrepareDeleteOutputPath(Process process, PIG pigAction) {
+        final PREPARE prepare = new PREPARE();
+        final List<DELETE> deleteList = prepare.getDelete();
+        for (Output output : process.getOutputs().getOutputs()) {
+            final DELETE delete = new DELETE();
+            delete.setPath("${wf:conf('" + output.getName() + "')}");
+            deleteList.add(delete);
+        }
+
+        if (!deleteList.isEmpty()) {
+            pigAction.setPrepare(prepare);
+        }
+    }
+
+    private void addInputOutputFeedsAsParams(PIG pigAction, Process process) throws FalconException {
+        final List<String> paramList = pigAction.getParam();
+        for (Input input : process.getInputs().getInputs()) {
+            paramList.add(input.getName() + "=${" + input.getName() + "}");
+        }
+
+        for (Output output : process.getOutputs().getOutputs()) {
+            paramList.add(output.getName() + "=${" + output.getName() + "}");
+        }
+    }
+
+    private void propagateProcessProperties(PIG pigAction, Process process) {
+        org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
+        if (processProperties == null) {
+            return;
+        }
+
+        // Propagate user defined properties to job configuration
+        final List<org.apache.falcon.oozie.workflow.CONFIGURATION.Property> configuration =
+                pigAction.getConfiguration().getProperty();
+        for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
+            org.apache.falcon.oozie.workflow.CONFIGURATION.Property configProperty =
+                    new org.apache.falcon.oozie.workflow.CONFIGURATION.Property();
+            configProperty.setName(property.getName());
+            configProperty.setValue(property.getValue());
+            configuration.add(configProperty);
+        }
+
+        // Propagate user defined properties to pig script as macros
+        // passed as parameters -p name=value that can be accessed as $name
+        final List<String> paramList = pigAction.getParam();
+        for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
+            paramList.add(property.getName() + "=" + property.getValue());
+        }
+    }
+
+    private void addArchiveForCustomJars(Cluster cluster, Workflow processWorkflow,
+                                         PIG pigAction) throws FalconException {
+        String processWorkflowLib = processWorkflow.getLib();
+        if (processWorkflowLib == null) {
+            return;
+        }
+
+        Path libPath = new Path(processWorkflowLib);
+        try {
+            final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
+            if (fs.isFile(libPath)) {  // File, not a Dir
+                pigAction.getArchive().add(processWorkflowLib);
+                return;
+            }
+
+            // lib path is a directory, add each file under the lib dir to archive
+            final FileStatus[] fileStatuses = fs.listStatus(libPath, new PathFilter() {
+                @Override
+                public boolean accept(Path path) {
+                    try {
+                        return fs.isFile(path) && path.getName().endsWith(".jar");
+                    } catch (IOException ignore) {
+                        return false;
+                    }
+                }
+            });
+
+            for (FileStatus fileStatus : fileStatuses) {
+                pigAction.getArchive().add(fileStatus.getPath().toString());
+            }
+        } catch (IOException e) {
+            throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/process/src/main/resources/config/workflow/process-parent-workflow.xml
----------------------------------------------------------------------
diff --git a/process/src/main/resources/config/workflow/process-parent-workflow.xml b/process/src/main/resources/config/workflow/process-parent-workflow.xml
index 8c800eb..4f6dd43 100644
--- a/process/src/main/resources/config/workflow/process-parent-workflow.xml
+++ b/process/src/main/resources/config/workflow/process-parent-workflow.xml
@@ -52,7 +52,37 @@
         <error to="fail"/>
     </action>
 
-    <action name='user-workflow'>
+    <decision name='user-workflow'>
+        <switch>
+            <case to="user-oozie-workflow">
+                ${userWorkflowEngine=="oozie"}
+            </case>
+            <case to="user-pig-job">
+                ${userWorkflowEngine=="pig"}
+            </case>
+            <default to="user-oozie-workflow"/>
+        </switch>
+    </decision>
+    <action name='user-pig-job'>
+        <pig>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <script>#USER_WF_PATH#</script>
+        </pig>
+        <ok to="succeeded-post-processing"/>
+        <error to="failed-post-processing"/>
+    </action>
+    <action name='user-oozie-workflow'>
         <sub-workflow>
             <app-path>#USER_WF_PATH#</app-path>
             <propagate-configuration/>
@@ -113,7 +143,9 @@
             <arg>-workflowEngineUrl</arg>
             <arg>${workflowEngineUrl}</arg>
             <arg>-subflowId</arg>
-            <arg>${wf:id()}@user-workflow</arg>
+            <arg>${wf:id()}${userWorkflowEngine == "oozie" ? "@user-oozie-workflow" : ""}</arg>
+            <arg>-userWorkflowEngine</arg>
+            <arg>${userWorkflowEngine}</arg>
             <arg>-logDir</arg>
             <arg>${logDir}/job-${nominalTime}/</arg>
             <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
@@ -179,7 +211,9 @@
             <arg>-workflowEngineUrl</arg>
             <arg>${workflowEngineUrl}</arg>
             <arg>-subflowId</arg>
-            <arg>${wf:id()}@user-workflow</arg>
+            <arg>${wf:id()}${userWorkflowEngine == "oozie" ? "@user-oozie-workflow" : ""}</arg>
+            <arg>-userWorkflowEngine</arg>
+            <arg>${userWorkflowEngine}</arg>
             <arg>-logDir</arg>
             <arg>${logDir}/job-${nominalTime}/</arg>
             <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
@@ -194,8 +228,8 @@
         <error to="fail"/>
     </action>
     <kill name="fail">
-        <message>Workflow failed, error
-            message[${wf:errorMessage(wf:lastErrorNode())}]
+        <message>
+            Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
         </message>
     </kill>
     <end name='end'/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/process/src/test/java/org/apache/falcon/converter/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/AbstractTestBase.java b/process/src/test/java/org/apache/falcon/converter/AbstractTestBase.java
index 9cc02df..2c7ee8b 100644
--- a/process/src/test/java/org/apache/falcon/converter/AbstractTestBase.java
+++ b/process/src/test/java/org/apache/falcon/converter/AbstractTestBase.java
@@ -33,26 +33,27 @@ public class AbstractTestBase {
     private static final String PROCESS_XML = "/config/process/process-0.1.xml";
     private static final String FEED_XML = "/config/feed/feed-0.1.xml";
     private static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml";
+    private static final String PIG_PROCESS_XML = "/config/process/pig-process-0.1.xml";
 
-    private void storeEntity(EntityType type, String name) throws Exception {
+    protected void storeEntity(EntityType type, String name, String resource) throws Exception {
         Unmarshaller unmarshaller = type.getUnmarshaller();
         ConfigurationStore store = ConfigurationStore.get();
         store.remove(type, name);
         switch (type) {
         case CLUSTER:
-            Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(CLUSTER_XML));
+            Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(resource));
             cluster.setName(name);
             store.publish(type, cluster);
             break;
 
         case FEED:
-            Feed feed = (Feed) unmarshaller.unmarshal(this.getClass().getResource(FEED_XML));
+            Feed feed = (Feed) unmarshaller.unmarshal(this.getClass().getResource(resource));
             feed.setName(name);
             store.publish(type, feed);
             break;
 
         case PROCESS:
-            Process process = (Process) unmarshaller.unmarshal(this.getClass().getResource(PROCESS_XML));
+            Process process = (Process) unmarshaller.unmarshal(this.getClass().getResource(resource));
             process.setName(name);
             store.publish(type, process);
             break;
@@ -62,15 +63,17 @@ public class AbstractTestBase {
     }
 
     public void setup() throws Exception {
-        storeEntity(EntityType.CLUSTER, "corp");
-        storeEntity(EntityType.FEED, "clicks");
-        storeEntity(EntityType.FEED, "impressions");
-        storeEntity(EntityType.FEED, "clicksummary");
-        storeEntity(EntityType.PROCESS, "clicksummary");
+        storeEntity(EntityType.CLUSTER, "corp", CLUSTER_XML);
+        storeEntity(EntityType.FEED, "clicks", FEED_XML);
+        storeEntity(EntityType.FEED, "impressions", FEED_XML);
+        storeEntity(EntityType.FEED, "clicksummary", FEED_XML);
+        storeEntity(EntityType.PROCESS, "clicksummary", PROCESS_XML);
+        storeEntity(EntityType.PROCESS, "pig-process", PIG_PROCESS_XML);
     }
 
     public void cleanup() throws Exception {
         ConfigurationStore store = ConfigurationStore.get();
+        store.remove(EntityType.PROCESS, "pig-process");
         store.remove(EntityType.PROCESS, "clicksummary");
         store.remove(EntityType.FEED, "clicksummary");
         store.remove(EntityType.FEED, "impressions");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index fe3cc58..abf664c 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -60,6 +60,8 @@ import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.List;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -161,37 +163,44 @@ public class OozieProcessMapperTest extends AbstractTestBase {
             Assert.assertTrue(new File(path).mkdirs());
         }
         Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
-        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
-        OozieProcessMapper mapper = new OozieProcessMapper(process);
-        Path bundlePath = new Path("/", EntityUtil.getStagingPath(process));
-        mapper.map(cluster, bundlePath);
-
-        FileSystem fs = new Path(hdfsUrl).getFileSystem(new Configuration());
-        assertTrue(fs.exists(bundlePath));
-
-        BUNDLEAPP bundle = getBundle(fs, bundlePath);
-        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
-        assertEquals(1, bundle.getCoordinator().size());
-        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
-                bundle.getCoordinator().get(0).getName());
-        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
-
-        COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
-        testDefCoordMap(process, coord);
-        assertEquals(coord.getControls().getThrottle(), "12");
-        assertEquals(coord.getControls().getTimeout(), "360");
 
-        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        WORKFLOWAPP parentWorkflow = getParentWorkflow(fs, new Path(wfPath));
+        WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
         testParentWorkflow(process, parentWorkflow);
     }
 
     @Test
     public void testBundle1() throws Exception {
         Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
-        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
         process.setFrequency(Frequency.fromString("minutes(1)"));
         process.setTimeout(Frequency.fromString("minutes(15)"));
+
+        WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "30", "15");
+        testParentWorkflow(process, parentWorkflow);
+    }
+
+    @Test
+    public void testPigProcessMapper() throws Exception {
+        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "pig-process");
+        Assert.assertEquals("pig", process.getWorkflow().getEngine().value());
+
+        WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
+        testParentWorkflow(process, parentWorkflow);
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+        ACTION pigAction = (ACTION) decisionOrForkOrJoin.get(3);
+        Assert.assertEquals("user-pig-job", pigAction.getName());
+        Assert.assertEquals("${nameNode}/apps/pig/id.pig", pigAction.getPig().getScript());
+        Assert.assertEquals(Collections.EMPTY_LIST, pigAction.getPig().getArchive());
+
+        ACTION oozieAction = (ACTION) decisionOrForkOrJoin.get(4);
+        Assert.assertEquals("user-oozie-workflow", oozieAction.getName());
+        Assert.assertEquals("#USER_WF_PATH#", oozieAction.getSubWorkflow().getAppPath());
+    }
+
+    private WORKFLOWAPP initializeProcessMapper(Process process, String throttle, String timeout)
+        throws Exception {
+        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
         OozieProcessMapper mapper = new OozieProcessMapper(process);
         Path bundlePath = new Path("/", EntityUtil.getStagingPath(process));
         mapper.map(cluster, bundlePath);
@@ -208,23 +217,22 @@ public class OozieProcessMapperTest extends AbstractTestBase {
 
         COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
         testDefCoordMap(process, coord);
-        assertEquals(coord.getControls().getThrottle(), "30");
-        assertEquals(coord.getControls().getTimeout(), "15");
+        assertEquals(coord.getControls().getThrottle(), throttle);
+        assertEquals(coord.getControls().getTimeout(), timeout);
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        WORKFLOWAPP parentWorkflow = getParentWorkflow(fs, new Path(wfPath));
-        testParentWorkflow(process, parentWorkflow);
+        return getParentWorkflow(fs, new Path(wfPath));
     }
 
     public void testParentWorkflow(Process process, WORKFLOWAPP parentWorkflow) {
         Assert.assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(), parentWorkflow.getName());
-        Assert.assertEquals("should-record", ((DECISION) parentWorkflow.getDecisionOrForkOrJoin().get(0)).getName());
-        Assert.assertEquals("recordsize", ((ACTION) parentWorkflow.getDecisionOrForkOrJoin().get(1)).getName());
-        Assert.assertEquals("user-workflow", ((ACTION) parentWorkflow.getDecisionOrForkOrJoin().get(2)).getName());
-        Assert.assertEquals("succeeded-post-processing",
-                ((ACTION) parentWorkflow.getDecisionOrForkOrJoin().get(3)).getName());
-        Assert.assertEquals("failed-post-processing",
-                ((ACTION) parentWorkflow.getDecisionOrForkOrJoin().get(4)).getName());
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+        Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
+        Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
+        Assert.assertEquals("user-workflow", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
+        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(5)).getName());
+        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
     }
 
     private COORDINATORAPP getCoordinator(FileSystem fs, Path path) throws Exception {
@@ -266,7 +274,7 @@ public class OozieProcessMapperTest extends AbstractTestBase {
     private String readFile(FileSystem fs, Path path) throws Exception {
         BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
         String line;
-        StringBuffer contents = new StringBuffer();
+        StringBuilder contents = new StringBuilder();
         while ((line = reader.readLine()) != null) {
             contents.append(line);
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/process/src/test/resources/config/process/pig-process-0.1.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/pig-process-0.1.xml b/process/src/test/resources/config/process/pig-process-0.1.xml
new file mode 100644
index 0000000..318f0da
--- /dev/null
+++ b/process/src/test/resources/config/process/pig-process-0.1.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<process name="pig-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="impression" feed="impressions" start="today(0,0)" end="today(0,2)"/>
+        <input name="click" feed="clicks" start="yesterday(0,0)" end="latest(0)" partition="*/US"/>
+    </inputs>
+
+    <outputs>
+        <output name="clicksummary" feed="impressions" instance="today(0,0)"/>
+    </outputs>
+
+    <properties>
+        <property name="procprop" value="procprop"/>
+        <property name="mapred.job.priority" value="LOW"/>
+    </properties>
+
+    <!-- how -->
+    <workflow engine="pig" path="/apps/pig/id.pig"/>
+
+    <retry policy="periodic" delay="hours(10)" attempts="3"/>
+
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index e9c1c8f..54d6938 100644
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -209,6 +209,15 @@
                                     <outputDirectory>${project.build.directory}/webapps</outputDirectory>
                                     <destFileName>hadoop.war</destFileName>
                                 </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.pig</groupId>
+                                    <artifactId>pig</artifactId>
+                                    <version>0.11.1</version>
+                                    <type>jar</type>
+                                    <overWrite>false</overWrite>
+                                    <outputDirectory>${project.build.directory}/sharelib</outputDirectory>
+                                    <destFileName>pig.jar</destFileName>
+                                </artifactItem>
                             </artifactItems>
                         </configuration>
                     </execution>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index e87fa06..4730728 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -18,8 +18,8 @@
 
 package org.apache.falcon.cli;
 
-import junit.framework.Assert;
 import org.apache.falcon.resource.TestContext;
+import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -56,7 +56,7 @@ public class FalconCLIIT {
         TestContext context = new TestContext();
         Map<String, String> overlay = context.getUniqueOverlay();
 
-        filePath = context.overlayParametersOverTemplate(context.CLUSTER_TEMPLATE, overlay);
+        filePath = context.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
         Assert.assertEquals(
                 0,
                 executeWithURL("entity -submit -type cluster -file " + filePath));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java b/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java
index ae4b5ad..4e26bff 100644
--- a/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java
+++ b/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java
@@ -63,7 +63,7 @@ public class LogMoverIT {
         overlay.put("cluster", "testCluster");
         TestContext context = new TestContext();
         String file = context.
-                overlayParametersOverTemplate(context.CLUSTER_TEMPLATE, overlay);
+                overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
         testCluster = StandAloneCluster.newCluster(file);
         STORE.publish(EntityType.CLUSTER, testCluster.getCluster());
 /*
@@ -135,7 +135,7 @@ public class LogMoverIT {
         Assert.assertTrue(fs.exists(oozieLogPath));
 
         testLogMoverWithNextRunId(job.getId());
-
+        testLogMoverWithNextRunIdWithEngine(job.getId());
     }
 
     private Path getLogPath() throws FalconException {
@@ -156,7 +156,18 @@ public class LogMoverIT {
         Path oozieLogPath = new Path(getLogPath(),
                 "job-2010-01-01-01-00/001/oozie.log");
         Assert.assertTrue(fs.exists(oozieLogPath));
-
     }
 
+    private void testLogMoverWithNextRunIdWithEngine(String jobId) throws Exception {
+        LogMover.main(new String[]{"-workflowEngineUrl",
+                                   ClusterHelper.getOozieUrl(testCluster.getCluster()),
+                                   "-subflowId", jobId + "@user-workflow", "-runId", "1",
+                                   "-logDir", getLogPath().toString() + "/job-2010-01-01-01-00",
+                                   "-status", "SUCCEEDED", "-entityType", "process",
+                                   "-userWorkflowEngine", "oozie", });
+
+        Path oozieLogPath = new Path(getLogPath(),
+                "job-2010-01-01-01-00/001/oozie.log");
+        Assert.assertTrue(fs.exists(oozieLogPath));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/webapp/src/test/java/org/apache/falcon/resource/PigProcessIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/PigProcessIT.java b/webapp/src/test/java/org/apache/falcon/resource/PigProcessIT.java
new file mode 100644
index 0000000..1cb5825
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/resource/PigProcessIT.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import org.apache.falcon.entity.ClusterHelper;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Integration tests for Pig Processing Engine.
+ */
+@Test
+public class PigProcessIT {
+
+    private static final String SHARE_LIB_HDFS_PATH = "/user/oozie/share/lib";
+
+    private final TestContext context = new TestContext();
+    private Map<String, String> overlay;
+
+
+    @BeforeClass
+    public void prepare() throws Exception {
+        TestContext.prepare();
+
+        overlay = context.getUniqueOverlay();
+
+        String filePath = context.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+        context.setCluster(filePath);
+
+        String storageUrl = ClusterHelper.getStorageUrl(context.getCluster().getCluster());
+        System.out.println("nn = " + storageUrl);
+        TestContext.copyOozieShareLibsToHDFS("./target/sharelib", storageUrl + SHARE_LIB_HDFS_PATH);
+
+        // copyPigScriptToHDFS
+        TestContext.copyResourceToHDFS(
+                "/apps/pig/id.pig", "id.pig", storageUrl + "/falcon/test/apps/pig");
+        // copyTestDataToHDFS
+        TestContext.copyResourceToHDFS(
+                "/apps/pig/data.txt", "data.txt", storageUrl + "/falcon/test/input/2012/04/21/00");
+    }
+
+    @AfterClass
+    public void tearDown() throws IOException {
+        TestContext.deleteOozieShareLibsFromHDFS(SHARE_LIB_HDFS_PATH);
+    }
+
+    @Test
+    public void testSubmitAndSchedulePigProcess() throws Exception {
+
+        Thread.sleep(5000);
+
+        String filePath = context.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
+        // context.setCluster(filePath);
+
+        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        Assert.assertEquals(0,
+                TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
+        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+        Assert.assertEquals(0,
+                TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
+        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        Assert.assertEquals(0,
+                TestContext.executeWithURL("entity -submit -type feed -file " + filePath));
+
+        filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+        Assert.assertEquals(0,
+                TestContext.executeWithURL("entity -submit -type feed -file " + filePath));
+
+        final String pigProcessName = "pig-" + context.getProcessName();
+        overlay.put("processName", pigProcessName);
+
+        filePath = context.overlayParametersOverTemplate(TestContext.PIG_PROCESS_TEMPLATE, overlay);
+        Assert.assertEquals(0,
+                TestContext.executeWithURL("entity -submitAndSchedule -type process -file " + filePath));
+
+/*
+        WorkflowJob jobInfo = context.getWorkflowJob(
+                OozieClient.FILTER_NAME + "=FALCON_PROCESS_DEFAULT_" + pigProcessName);
+        Assert.assertEquals(WorkflowJob.Status.SUCCEEDED, jobInfo.getStatus());
+
+        InstancesResult response = context.service.path("api/instance/running/process/" + pigProcessName)
+                .header("Remote-User", "guest").accept(MediaType.APPLICATION_JSON).get(InstancesResult.class);
+        org.testng.Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+        org.testng.Assert.assertNotNull(response.getInstances());
+        org.testng.Assert.assertEquals(1, response.getInstances().length);
+
+        // verify LogMover
+        Path oozieLogPath = context.getOozieLogPath(jobInfo);
+        System.out.println("oozieLogPath = " + oozieLogPath);
+        Assert.assertTrue(context.getCluster().getFileSystem().exists(oozieLogPath));
+*/
+
+        TestContext.executeWithURL("entity -delete -type process -name " + pigProcessName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/09971e66/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index 9f67a80..6fa07c9 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -23,8 +23,10 @@ import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 import com.sun.jersey.api.client.config.ClientConfig;
 import com.sun.jersey.api.client.config.DefaultClientConfig;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.cli.FalconCLI;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.cluster.util.StandAloneCluster;
 import org.apache.falcon.entity.store.ConfigurationStore;
@@ -34,6 +36,7 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.engine.OozieClientFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -55,6 +58,7 @@ import javax.xml.bind.Marshaller;
 import javax.xml.bind.Unmarshaller;
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileFilter;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.FileReader;
@@ -82,6 +86,7 @@ public class TestContext {
 
     public static final String SAMPLE_PROCESS_XML = "/process-version-0.xml";
     public static final String PROCESS_TEMPLATE = "/process-template.xml";
+    public static final String PIG_PROCESS_TEMPLATE = "/pig-process-template.xml";
 
     public static final String BASE_URL = "http://localhost:41000/falcon-webapp";
     public static final String REMOTE_USER = System.getProperty("user.name");
@@ -244,7 +249,7 @@ public class TestContext {
     /**
      * Converts a InputStream into ServletInputStream.
      *
-     * @param fileName
+     * @param fileName file name
      * @return ServletInputStream
      * @throws java.io.IOException
      */
@@ -386,6 +391,51 @@ public class TestContext {
         return false;
     }
 
+    /*
+    public WorkflowJob getWorkflowJob(String filter) throws Exception {
+        OozieClient ozClient = OozieClientFactory.get(cluster.getCluster());
+
+        List<WorkflowJob> jobs;
+        while (true) {
+            jobs = ozClient.getJobsInfo(filter);
+            System.out.println("jobs = " + jobs);
+            if (jobs.size() > 0) {
+                break;
+            } else {
+                Thread.sleep(1000);
+            }
+        }
+
+        WorkflowJob jobInfo = jobs.get(0);
+        while (true) {
+            if (!(jobInfo.getStatus() == WorkflowJob.Status.RUNNING
+                    || jobInfo.getStatus() == WorkflowJob.Status.PREP)) {
+                break;
+            } else {
+                Thread.sleep(1000);
+                jobInfo = ozClient.getJobInfo(jobInfo.getId());
+                System.out.println("jobInfo = " + jobInfo);
+            }
+        }
+
+        return jobInfo;
+    }
+
+    public Path getOozieLogPath(WorkflowJob jobInfo) throws Exception {
+        Path stagingPath = new Path(ClusterHelper.getLocation(cluster.getCluster(), "staging"),
+                EntityUtil.getStagingPath(cluster.getCluster()) + "/../logs");
+        final Path logPath = new Path(ClusterHelper.getStorageUrl(cluster.getCluster()), stagingPath);
+        LogMover.main(new String[]{"-workflowEngineUrl",
+                ClusterHelper.getOozieUrl(cluster.getCluster()),
+                "-subflowId", jobInfo.getId(), "-runId", "1",
+                "-logDir", logPath.toString() + "/job-2012-04-21-00-00",
+                "-status", "SUCCEEDED", "-entityType", "process",
+                "-userWorkflowEngine", "pig",});
+
+        return new Path(logPath, "job-2012-04-21-00-00/001/oozie.log");
+    }
+    */
+
     public Map<String, String> getUniqueOverlay() throws FalconException {
         Map<String, String> overlay = new HashMap<String, String>();
         long time = System.currentTimeMillis();
@@ -408,7 +458,7 @@ public class TestContext {
         overlay.put("cluster", RandomStringUtils.randomAlphabetic(5));
         TestContext context = new TestContext();
         String file = context.
-                overlayParametersOverTemplate(context.CLUSTER_TEMPLATE, overlay);
+                overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
         EmbeddedCluster cluster = StandAloneCluster.newCluster(file);
 
         cleanupStore();
@@ -422,8 +472,7 @@ public class TestContext {
         Path wfPath = new Path(wfParent, "workflow");
         fs.mkdirs(wfPath);
         fs.copyFromLocalFile(false, true, new Path(TestContext.class.getResource("/fs-workflow.xml").getPath()),
-                new Path(wfPath,
-                        "workflow.xml"));
+                new Path(wfPath, "workflow.xml"));
         fs.mkdirs(new Path(wfParent, "input/2012/04/20/00"));
         Path outPath = new Path(wfParent, "output");
         fs.mkdirs(outPath);
@@ -437,4 +486,75 @@ public class TestContext {
             }
         }
     }
+
+    public static void copyOozieShareLibsToHDFS(String shareLibLocalPath, String shareLibHdfsPath)
+        throws IOException {
+        File shareLibDir = new File(shareLibLocalPath);
+        if (!shareLibDir.exists()) {
+            throw new IllegalArgumentException("Sharelibs dir must exist for tests to run.");
+        }
+
+        File[] jarFiles = shareLibDir.listFiles(new FileFilter() {
+            @Override
+            public boolean accept(File file) {
+                return file.isFile() && file.getName().endsWith(".jar");
+            }
+        });
+
+        for (File jarFile : jarFiles) {
+            copyFileToHDFS(jarFile, shareLibHdfsPath);
+        }
+    }
+
+    public static void copyFileToHDFS(File jarFile, String shareLibHdfsPath) throws IOException {
+        System.out.println("Copying jarFile = " + jarFile);
+        Path shareLibPath = new Path(shareLibHdfsPath);
+        FileSystem fs = shareLibPath.getFileSystem(new Configuration());
+        if (!fs.exists(shareLibPath)) {
+            fs.mkdirs(shareLibPath);
+        }
+
+        OutputStream os = null;
+        InputStream is = null;
+        try {
+            os = fs.create(new Path(shareLibPath, jarFile.getName()));
+            is = new FileInputStream(jarFile);
+            IOUtils.copy(is, os);
+        } finally {
+            IOUtils.closeQuietly(is);
+            IOUtils.closeQuietly(os);
+        }
+    }
+
+    public static void copyResourceToHDFS(String localResource, String resourceName, String hdfsPath)
+        throws IOException {
+        Path appPath = new Path(hdfsPath);
+        FileSystem fs = appPath.getFileSystem(new Configuration());
+        if (!fs.exists(appPath)) {
+            fs.mkdirs(appPath);
+        }
+
+        OutputStream os = null;
+        InputStream is = null;
+        try {
+            os = fs.create(new Path(appPath, resourceName));
+            is = TestContext.class.getResourceAsStream(localResource);
+            IOUtils.copy(is, os);
+        } finally {
+            IOUtils.closeQuietly(is);
+            IOUtils.closeQuietly(os);
+        }
+    }
+
+    public static void deleteOozieShareLibsFromHDFS(String shareLibHdfsPath) throws IOException {
+        Path shareLibPath = new Path(shareLibHdfsPath);
+        FileSystem fs = shareLibPath.getFileSystem(new Configuration());
+        if (fs.exists(shareLibPath)) {
+            fs.delete(shareLibPath, true);
+        }
+    }
+
+    public static int executeWithURL(String command) throws Exception {
+        return new FalconCLI().run((command + " -url " + TestContext.BASE_URL).split("\\s+"));
+    }
 }