You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/07/10 08:57:35 UTC

[6/9] FALCON-369 Refactor workflow builder. Contributed by Shwetha GS

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
new file mode 100644
index 0000000..c87bc86
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.oozie.OozieCoordinatorBuilder;
+import org.apache.falcon.oozie.OozieEntityBuilder;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.coordinator.CONTROLS;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.DATAIN;
+import org.apache.falcon.oozie.coordinator.DATAOUT;
+import org.apache.falcon.oozie.coordinator.DATASETS;
+import org.apache.falcon.oozie.coordinator.INPUTEVENTS;
+import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.hadoop.fs.Path;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Builds oozie coordinator for process.
+ */
+public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<Process> {
+    private static final int THIRTY_MINUTES = 30 * 60 * 1000;
+
+    public ProcessExecutionCoordinatorBuilder(Process entity) {
+        super(entity, Tag.DEFAULT);
+    }
+
+    @Override public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
+        String coordName = getEntityName();
+        Path coordPath = getBuildPath(buildPath);
+        copySharedLibs(cluster, new Path(coordPath, "lib"));
+
+        COORDINATORAPP coord = new COORDINATORAPP();
+        // coord attributes
+        initializeCoordAttributes(cluster, coord, coordName);
+
+        CONTROLS controls = initializeControls(); // controls
+        coord.setControls(controls);
+
+        // Configuration
+        Properties props = createCoordDefaultConfiguration(cluster, coordName);
+
+        initializeInputPaths(cluster, coord, props); // inputs
+        initializeOutputPaths(cluster, coord, props);  // outputs
+
+        Workflow processWorkflow = entity.getWorkflow();
+        propagateUserWorkflowProperties(processWorkflow, props);
+
+        // create parent wf
+        Properties wfProps = OozieOrchestrationWorkflowBuilder.get(entity, Tag.DEFAULT).build(cluster, coordPath);
+
+        WORKFLOW wf = new WORKFLOW();
+        wf.setAppPath(getStoragePath(wfProps.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+        wf.setConfiguration(getConfig(props));
+
+        // set coord action to parent wf
+        org.apache.falcon.oozie.coordinator.ACTION action = new org.apache.falcon.oozie.coordinator.ACTION();
+        action.setWorkflow(wf);
+        coord.setAction(action);
+
+        marshal(cluster, coord, coordPath);
+        return Arrays.asList(getProperties(coordPath, coordName));
+    }
+
+    private void initializeCoordAttributes(Cluster cluster, COORDINATORAPP coord, String coordName) {
+        coord.setName(coordName);
+        org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(entity,
+            cluster.getName());
+        coord.setStart(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()));
+        coord.setEnd(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()));
+        coord.setTimezone(entity.getTimezone().getID());
+        coord.setFrequency("${coord:" + entity.getFrequency().toString() + "}");
+    }
+
+    private CONTROLS initializeControls()
+        throws FalconException {
+        CONTROLS controls = new CONTROLS();
+        controls.setConcurrency(String.valueOf(entity.getParallel()));
+        controls.setExecution(entity.getOrder().name());
+
+        Frequency timeout = entity.getTimeout();
+        long frequencyInMillis = ExpressionHelper.get().evaluate(entity.getFrequency().toString(), Long.class);
+        long timeoutInMillis;
+        if (timeout != null) {
+            timeoutInMillis = ExpressionHelper.get().
+                evaluate(entity.getTimeout().toString(), Long.class);
+        } else {
+            timeoutInMillis = frequencyInMillis * 6;
+            if (timeoutInMillis < THIRTY_MINUTES) {
+                timeoutInMillis = THIRTY_MINUTES;
+            }
+        }
+        controls.setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
+
+        if (timeoutInMillis / frequencyInMillis * 2 > 0) {
+            controls.setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
+        }
+
+        return controls;
+    }
+
+    private void initializeInputPaths(Cluster cluster, COORDINATORAPP coord, Properties props) throws FalconException {
+        if (entity.getInputs() == null) {
+            props.put("falconInputFeeds", "NONE");
+            props.put("falconInPaths", IGNORE);
+            return;
+        }
+
+        List<String> inputFeeds = new ArrayList<String>();
+        List<String> inputPaths = new ArrayList<String>();
+        List<String> inputFeedStorageTypes = new ArrayList<String>();
+        for (Input input : entity.getInputs().getInputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+
+            if (!input.isOptional()) {
+                if (coord.getDatasets() == null) {
+                    coord.setDatasets(new DATASETS());
+                }
+                if (coord.getInputEvents() == null) {
+                    coord.setInputEvents(new INPUTEVENTS());
+                }
+
+                SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(), LocationType.DATA);
+                coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
+
+                DATAIN datain = createDataIn(input);
+                coord.getInputEvents().getDataIn().add(datain);
+            }
+
+            String inputExpr = null;
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
+                props.put(input.getName(), inputExpr);
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                inputExpr = "${coord:dataIn('" + input.getName() + "')}";
+                propagateCatalogTableProperties(input, (CatalogStorage) storage, props);
+            }
+
+            inputFeeds.add(feed.getName());
+            inputPaths.add(inputExpr);
+            inputFeedStorageTypes.add(storage.getType().name());
+        }
+
+        propagateLateDataProperties(inputFeeds, inputPaths, inputFeedStorageTypes, props);
+    }
+
+    private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputPaths,
+        List<String> inputFeedStorageTypes, Properties props) {
+        // populate late data handler - should-record action
+        props.put("falconInputFeeds", StringUtils.join(inputFeeds, '#'));
+        props.put("falconInPaths", StringUtils.join(inputPaths, '#'));
+
+        // storage type for each corresponding feed sent as a param to LateDataHandler
+        // needed to compute usage based on storage type in LateDataHandler
+        props.put("falconInputFeedStorageTypes", StringUtils.join(inputFeedStorageTypes, '#'));
+    }
+
+    private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
+        String datasetName, LocationType locationType) throws FalconException {
+        SYNCDATASET syncdataset = new SYNCDATASET();
+        syncdataset.setName(datasetName);
+        syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+
+        String uriTemplate = storage.getUriTemplate(locationType);
+        if (storage.getType() == Storage.TYPE.TABLE) {
+            uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+        }
+        syncdataset.setUriTemplate(uriTemplate);
+
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
+        syncdataset.setTimezone(feed.getTimezone().getID());
+
+        if (feed.getAvailabilityFlag() == null) {
+            syncdataset.setDoneFlag("");
+        } else {
+            syncdataset.setDoneFlag(feed.getAvailabilityFlag());
+        }
+
+        return syncdataset;
+    }
+
+    private DATAIN createDataIn(Input input) {
+        DATAIN datain = new DATAIN();
+        datain.setName(input.getName());
+        datain.setDataset(input.getName());
+        datain.setStartInstance(getELExpression(input.getStart()));
+        datain.setEndInstance(getELExpression(input.getEnd()));
+        return datain;
+    }
+
+    private String getELExpression(String expr) {
+        if (expr != null) {
+            expr = "${" + expr + "}";
+        }
+        return expr;
+    }
+
+    private void initializeOutputPaths(Cluster cluster, COORDINATORAPP coord, Properties props) throws FalconException {
+        if (entity.getOutputs() == null) {
+            props.put(ARG.feedNames.getPropName(), "NONE");
+            props.put(ARG.feedInstancePaths.getPropName(), IGNORE);
+            return;
+        }
+
+        if (coord.getDatasets() == null) {
+            coord.setDatasets(new DATASETS());
+        }
+
+        if (coord.getOutputEvents() == null) {
+            coord.setOutputEvents(new OUTPUTEVENTS());
+        }
+
+        List<String> outputFeeds = new ArrayList<String>();
+        List<String> outputPaths = new ArrayList<String>();
+        for (Output output : entity.getOutputs().getOutputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+
+            SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, output.getName(), LocationType.DATA);
+            coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
+
+            DATAOUT dataout = createDataOut(output);
+            coord.getOutputEvents().getDataOut().add(dataout);
+
+            String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
+            outputFeeds.add(feed.getName());
+            outputPaths.add(outputExpr);
+
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                props.put(output.getName(), outputExpr);
+
+                propagateFileSystemProperties(output, feed, cluster, coord, storage, props);
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                propagateCatalogTableProperties(output, (CatalogStorage) storage, props);
+            }
+        }
+
+        // Output feed name and path for parent workflow
+        props.put(ARG.feedNames.getPropName(), StringUtils.join(outputFeeds, ','));
+        props.put(ARG.feedInstancePaths.getPropName(), StringUtils.join(outputPaths, ','));
+    }
+
+    private DATAOUT createDataOut(Output output) {
+        DATAOUT dataout = new DATAOUT();
+        dataout.setName(output.getName());
+        dataout.setDataset(output.getName());
+        dataout.setInstance(getELExpression(output.getInstance()));
+        return dataout;
+    }
+
+    private void propagateFileSystemProperties(Output output, Feed feed, Cluster cluster, COORDINATORAPP coord,
+        Storage storage, Properties props) throws FalconException {
+        // stats and meta paths
+        createOutputEvent(output, feed, cluster, LocationType.STATS, coord, props, storage);
+        createOutputEvent(output, feed, cluster, LocationType.META, coord, props, storage);
+        createOutputEvent(output, feed, cluster, LocationType.TMP, coord, props, storage);
+    }
+
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    private void createOutputEvent(Output output, Feed feed, Cluster cluster, LocationType locType,
+        COORDINATORAPP coord, Properties props, Storage storage) throws FalconException {
+        String name = output.getName();
+        String type = locType.name().toLowerCase();
+
+        SYNCDATASET dataset = createDataSet(feed, cluster, storage, name + type, locType);
+        coord.getDatasets().getDatasetOrAsyncDataset().add(dataset);
+
+        DATAOUT dataout = new DATAOUT();
+        dataout.setName(name + type);
+        dataout.setDataset(name + type);
+        dataout.setInstance(getELExpression(output.getInstance()));
+
+        OUTPUTEVENTS outputEvents = coord.getOutputEvents();
+        if (outputEvents == null) {
+            outputEvents = new OUTPUTEVENTS();
+            coord.setOutputEvents(outputEvents);
+        }
+        outputEvents.getDataOut().add(dataout);
+
+        String outputExpr = "${coord:dataOut('" + name + type + "')}";
+        props.put(name + "." + type, outputExpr);
+    }
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+    private void propagateUserWorkflowProperties(Workflow processWorkflow, Properties props) {
+        props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
+            processWorkflow.getName(), entity.getName()));
+        props.put("userWorkflowVersion", processWorkflow.getVersion());
+        props.put("userWorkflowEngine", processWorkflow.getEngine().value());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
new file mode 100644
index 0000000..0d9abdb
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.CONFIGURATION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Base class for building orchestration workflow for process.
+ */
+public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Process> {
+    private static final String DEFAULT_WF_TEMPLATE = "/workflow/process-parent-workflow.xml";
+    private static final Set<String> FALCON_PROCESS_HIVE_ACTIONS = new HashSet<String>(
+        Arrays.asList(new String[]{"recordsize", "user-oozie-workflow", "user-pig-job", "user-hive-job", }));
+
+    protected ProcessExecutionWorkflowBuilder(Process entity) {
+        super(entity, Tag.DEFAULT);
+    }
+
+    @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
+        WORKFLOWAPP wfApp = getWorkflow(DEFAULT_WF_TEMPLATE);
+        String wfName = EntityUtil.getWorkflowName(Tag.DEFAULT, entity).toString();
+        wfApp.setName(wfName);
+
+        addLibExtensionsToWorkflow(cluster, wfApp, null);
+
+        final boolean isTableStorageType = isTableStorageType(cluster);
+        if (isTableStorageType) {
+            setupHiveCredentials(cluster, buildPath, wfApp);
+        }
+
+        for (Object object : wfApp.getDecisionOrForkOrJoin()) {
+            if (!(object instanceof ACTION)) {
+                continue;
+            }
+
+            ACTION action = (ACTION) object;
+            String actionName = action.getName();
+            if (FALCON_ACTIONS.contains(actionName)) {
+                decorateWithOozieRetries(action);
+                if (isTableStorageType && actionName.equals("recordsize")) {
+                    // adds hive-site.xml in actions classpath
+                    action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
+                }
+            }
+
+            decorateAction(action, cluster, buildPath);
+        }
+
+        //Create parent workflow
+        marshal(cluster, wfApp, buildPath);
+        return getProperties(buildPath, wfName);
+    }
+
+    protected abstract void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException;
+
+    private void setupHiveCredentials(Cluster cluster, Path buildPath, WORKFLOWAPP wfApp) throws FalconException {
+        // create hive-site.xml file so actions can use it in the classpath
+        createHiveConfiguration(cluster, buildPath, ""); // DO NOT ADD PREFIX!!!
+
+        if (isSecurityEnabled) {
+            // add hcatalog credentials for secure mode and add a reference to each action
+            addHCatalogCredentials(wfApp, cluster, HIVE_CREDENTIAL_NAME, FALCON_PROCESS_HIVE_ACTIONS);
+        }
+    }
+
+    protected void addInputFeedsAsParams(List<String> paramList, Cluster cluster) throws FalconException {
+        if (entity.getInputs() == null) {
+            return;
+        }
+
+        for (Input input : entity.getInputs().getInputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+
+            final String inputName = input.getName();
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                paramList.add(inputName + "=${" + inputName + "}"); // no prefix for backwards compatibility
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                final String paramName = "falcon_" + inputName; // prefix 'falcon' for new params
+                Properties props = new Properties();
+                propagateCommonCatalogTableProperties((CatalogStorage) storage, props, paramName);
+                for (Object key : props.keySet()) {
+                    paramList.add(key + "=${wf:conf('" + key + "')}");
+                }
+
+                paramList.add(paramName + "_filter=${wf:conf('"
+                    + paramName + "_partition_filter_" + entity.getWorkflow().getEngine().name().toLowerCase() + "')}");
+            }
+        }
+    }
+
+    protected void addOutputFeedsAsParams(List<String> paramList, Cluster cluster) throws FalconException {
+        if (entity.getOutputs() == null) {
+            return;
+        }
+
+        for (Output output : entity.getOutputs().getOutputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                final String outputName = output.getName();  // no prefix for backwards compatibility
+                paramList.add(outputName + "=${" + outputName + "}");
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                Properties props = new Properties();
+                propagateCatalogTableProperties(output, (CatalogStorage) storage, props); // prefix is auto added
+                for (Object key : props.keySet()) {
+                    paramList.add(key + "=${wf:conf('" + key + "')}");
+                }
+
+                final String paramName = "falcon_" + output.getName(); // prefix 'falcon' for new params
+                paramList.add(paramName + "_partitions=${wf:conf('"
+                    + paramName + "_partitions_" + entity.getWorkflow().getEngine().name().toLowerCase() + "')}");
+            }
+        }
+    }
+
+    protected void propagateEntityProperties(CONFIGURATION conf, List<String> paramList) {
+        Properties entityProperties = getEntityProperties(entity);
+
+        // Propagate user defined properties to job configuration
+        final List<org.apache.falcon.oozie.workflow.CONFIGURATION.Property> configuration = conf.getProperty();
+
+        // Propagate user defined properties to pig script as macros
+        // passed as parameters -p name=value that can be accessed as $name
+        for (Entry<Object, Object> entry: entityProperties.entrySet()) {
+            org.apache.falcon.oozie.workflow.CONFIGURATION.Property configProperty =
+                new org.apache.falcon.oozie.workflow.CONFIGURATION.Property();
+            configProperty.setName((String) entry.getKey());
+            configProperty.setValue((String) entry.getValue());
+            configuration.add(configProperty);
+
+            paramList.add(entry.getKey() + "=" + entry.getValue());
+        }
+    }
+
+    protected List<String> getPrepareDeleteOutputPathList() throws FalconException {
+        final List<String> deleteList = new ArrayList<String>();
+        if (entity.getOutputs() == null) {
+            return deleteList;
+        }
+
+        for (Output output : entity.getOutputs().getOutputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+
+            if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
+                continue; // prepare delete only applies to FileSystem storage
+            }
+
+            deleteList.add("${wf:conf('" + output.getName() + "')}");
+        }
+
+        return deleteList;
+    }
+
+    protected void addArchiveForCustomJars(Cluster cluster, List<String> archiveList,
+        Path libPath) throws FalconException {
+        if (libPath == null) {
+            return;
+        }
+
+        try {
+            final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
+            if (fs.isFile(libPath)) {  // File, not a Dir
+                archiveList.add(libPath.toString());
+                return;
+            }
+
+            // lib path is a directory, add each file under the lib dir to archive
+            final FileStatus[] fileStatuses = fs.listStatus(libPath, new PathFilter() {
+                @Override
+                public boolean accept(Path path) {
+                    try {
+                        return fs.isFile(path) && path.getName().endsWith(".jar");
+                    } catch (IOException ignore) {
+                        return false;
+                    }
+                }
+            });
+
+            for (FileStatus fileStatus : fileStatuses) {
+                archiveList.add(fileStatus.getPath().toString());
+            }
+        } catch (IOException e) {
+            throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
deleted file mode 100644
index a0406e6..0000000
--- a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
+++ /dev/null
@@ -1,636 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.FalconRuntimException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.ExternalId;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.oozie.bundle.BUNDLEAPP;
-import org.apache.falcon.oozie.bundle.COORDINATOR;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.ObjectFactory;
-import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.CREDENTIAL;
-import org.apache.falcon.oozie.workflow.CREDENTIALS;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.security.SecurityUtil;
-import org.apache.falcon.service.FalconPathFilter;
-import org.apache.falcon.service.SharedLibraryHostingService;
-import org.apache.falcon.util.OozieUtils;
-import org.apache.falcon.util.RuntimeProperties;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.falcon.workflow.engine.OozieWorkflowEngine;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.oozie.client.OozieClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.StringWriter;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * Base workflow builder for falcon entities.
- * @param <T>
- */
-public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBuilder<T> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(OozieWorkflowBuilder.class);
-    protected static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();
-
-    protected static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}";
-    protected static final String ACTUAL_TIME_EL = "${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}";
-
-    protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
-    protected static final String MR_QUEUE_NAME = "queueName";
-    protected static final String MR_JOB_PRIORITY = "jobPriority";
-
-    protected static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";
-
-    public static final String METASTOREURIS = "hive.metastore.uris";
-    public static final String METASTORE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal";
-    public static final String METASTORE_USE_THRIFT_SASL = "hive.metastore.sasl.enabled";
-
-    protected static final String IGNORE = "IGNORE";
-
-    public static final Set<String> FALCON_ACTIONS = new HashSet<String>(
-        Arrays.asList(new String[]{"recordsize", "succeeded-post-processing", "failed-post-processing", }));
-
-    protected static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
-        @Override
-        public boolean accept(Path path) {
-            return path.getName().startsWith("falcon");
-        }
-
-        @Override
-        public String getJarName(Path path) {
-            String name = path.getName();
-            if (name.endsWith(".jar")) {
-                name = name.substring(0, name.indexOf(".jar"));
-            }
-            return name;
-        }
-    };
-
-    protected final boolean isSecurityEnabled;
-
-    protected OozieWorkflowBuilder(T entity) {
-        super(entity);
-        isSecurityEnabled = SecurityUtil.isSecurityEnabled();
-    }
-
-    protected Path getCoordPath(Path bundlePath, String coordName) {
-        Tag tag = EntityUtil.getWorkflowNameTag(coordName, entity);
-        return new Path(bundlePath, tag.name());
-    }
-
-    protected abstract Map<String, String> getEntityProperties();
-
-    public boolean map(Cluster cluster, Path bundlePath) throws FalconException {
-        BUNDLEAPP bundleApp = new BUNDLEAPP();
-        bundleApp.setName(EntityUtil.getWorkflowName(entity).toString());
-        // all the properties are set prior to bundle and coordinators creation
-
-        List<COORDINATORAPP> coordinators = getCoordinators(cluster, bundlePath);
-        if (coordinators.size() == 0) {
-            return false;
-        }
-
-        for (COORDINATORAPP coordinatorapp : coordinators) {
-            Path coordPath = getCoordPath(bundlePath, coordinatorapp.getName());
-            String coordXmlName = marshal(cluster, coordinatorapp, coordPath,
-                EntityUtil.getWorkflowNameSuffix(coordinatorapp.getName(), entity));
-
-            // copy falcon libs to the workflow dir
-            copySharedLibs(cluster, coordinatorapp);
-
-            // add the coordinator to the bundle
-            COORDINATOR bundleCoord = new COORDINATOR();
-            bundleCoord.setName(coordinatorapp.getName());
-            bundleCoord.setAppPath(getStoragePath(coordPath) + "/" + coordXmlName);
-            bundleApp.getCoordinator().add(bundleCoord);
-        }
-
-        // create logs dir once since its at the root of the bundle path
-        createLogsDir(cluster);
-
-        marshal(cluster, bundleApp, bundlePath); // write the bundle
-        return true;
-    }
-
-    private void addExtensionJars(FileSystem fs, Path path, WORKFLOWAPP wf) throws IOException {
-        FileStatus[] libs = null;
-        try {
-            libs = fs.listStatus(path);
-        } catch(FileNotFoundException ignore) {
-            //Ok if the libext is not configured
-        }
-
-        if (libs == null) {
-            return;
-        }
-
-        for(FileStatus lib : libs) {
-            if (lib.isDir()) {
-                continue;
-            }
-
-            for(Object obj: wf.getDecisionOrForkOrJoin()) {
-                if (!(obj instanceof ACTION)) {
-                    continue;
-                }
-                ACTION action = (ACTION) obj;
-                List<String> files = null;
-                if (action.getJava() != null) {
-                    files = action.getJava().getFile();
-                } else if (action.getPig() != null) {
-                    files = action.getPig().getFile();
-                } else if (action.getMapReduce() != null) {
-                    files = action.getMapReduce().getFile();
-                }
-                if (files != null) {
-                    files.add(lib.getPath().toString());
-                }
-            }
-        }
-    }
-
-    protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, EntityType type, String lifecycle)
-        throws IOException, FalconException {
-        String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(
-                ClusterHelper.getConfiguration(cluster));
-        addExtensionJars(fs, new Path(libext), wf);
-        addExtensionJars(fs, new Path(libext, type.name()), wf);
-        if (StringUtils.isNotEmpty(lifecycle)) {
-            addExtensionJars(fs, new Path(libext, type.name() + "/" + lifecycle), wf);
-        }
-    }
-
-    private void copySharedLibs(Cluster cluster, COORDINATORAPP coordinatorapp) throws FalconException {
-        try {
-            String coordPath = coordinatorapp.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-            Path libPath = new Path(coordPath, "lib");
-            SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
-                libPath, cluster, FALCON_JAR_FILTER);
-        } catch (IOException e) {
-            throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);
-        }
-    }
-
-    public abstract List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException;
-
-    protected org.apache.falcon.oozie.coordinator.CONFIGURATION getCoordConfig(Map<String, String> propMap) {
-        org.apache.falcon.oozie.coordinator.CONFIGURATION conf
-            = new org.apache.falcon.oozie.coordinator.CONFIGURATION();
-        List<org.apache.falcon.oozie.coordinator.CONFIGURATION.Property> props = conf.getProperty();
-        for (Entry<String, String> prop : propMap.entrySet()) {
-            props.add(createCoordProperty(prop.getKey(), prop.getValue()));
-        }
-        return conf;
-    }
-
-    protected Map<String, String> createCoordDefaultConfiguration(Cluster cluster, String coordName) {
-        Map<String, String> props = new HashMap<String, String>();
-        props.put(ARG.entityName.getPropName(), entity.getName());
-        props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
-        props.put(ARG.nominalTime.getPropName(), NOMINAL_TIME_EL);
-        props.put(ARG.timeStamp.getPropName(), ACTUAL_TIME_EL);
-
-        addBrokerProperties(cluster, props);
-
-        props.put(OozieClient.EXTERNAL_ID,
-            new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
-                "${coord:nominalTime()}").getId());
-        props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster));
-
-        addLateDataProperties(props);
-
-        addClusterProperties(cluster, props);
-
-        props.put(MR_QUEUE_NAME, "default");
-        props.put(MR_JOB_PRIORITY, "NORMAL");
-
-        //props in entity override the set props.
-        props.putAll(getEntityProperties());
-
-        // this cannot be overridden
-        props.put("logDir", getStoragePath(EntityUtil.getLogPath(cluster, entity)));
-
-        return props;
-    }
-
-    private void addBrokerProperties(Cluster cluster, Map<String, String> props) {
-        props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster));
-        props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster));
-
-        String falconBrokerUrl = StartupProperties.get().getProperty(
-                ARG.brokerUrl.getPropName(), "tcp://localhost:61616?daemon=true");
-        props.put(ARG.brokerUrl.getPropName(), falconBrokerUrl);
-
-        String falconBrokerImplClass = StartupProperties.get().getProperty(
-                ARG.brokerImplClass.getPropName(), ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
-        props.put(ARG.brokerImplClass.getPropName(), falconBrokerImplClass);
-
-        String jmsMessageTTL = StartupProperties.get().getProperty(
-                "broker.ttlInMins", DEFAULT_BROKER_MSG_TTL.toString());
-        props.put(ARG.brokerTTL.getPropName(), jmsMessageTTL);
-    }
-
-    private void addLateDataProperties(Map<String, String> props) {
-        try {
-            if (EntityUtil.getLateProcess(entity) == null
-                || EntityUtil.getLateProcess(entity).getLateInputs() == null
-                || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
-                props.put("shouldRecord", "false");
-            } else {
-                props.put("shouldRecord", "true");
-            }
-        } catch (FalconException e) {
-            LOG.error("Unable to get Late Process for entity: {}", entity, e);
-            throw new FalconRuntimException(e);
-        }
-    }
-
-    private void addClusterProperties(Cluster cluster, Map<String, String> props) {
-        props.put(ARG.cluster.getPropName(), cluster.getName());
-
-        if (cluster.getProperties() != null) {
-            for (Property prop : cluster.getProperties().getProperties()) {
-                props.put(prop.getName(), prop.getValue());
-            }
-        }
-    }
-
-    protected org.apache.falcon.oozie.coordinator.CONFIGURATION.Property createCoordProperty(String name,
-        String value) {
-        org.apache.falcon.oozie.coordinator.CONFIGURATION.Property prop
-            = new org.apache.falcon.oozie.coordinator.CONFIGURATION.Property();
-        prop.setName(name);
-        prop.setValue(value);
-        return prop;
-    }
-
-    protected void marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
-        throws FalconException {
-        try {
-            Marshaller marshaller = jaxbContext.createMarshaller();
-            marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(
-                outPath.toUri(), ClusterHelper.getConfiguration(cluster));
-            OutputStream out = fs.create(outPath);
-            try {
-                marshaller.marshal(jaxbElement, out);
-            } finally {
-                out.close();
-            }
-            if (LOG.isDebugEnabled()) {
-                StringWriter writer = new StringWriter();
-                marshaller.marshal(jaxbElement, writer);
-                LOG.debug("Writing definition to {} on cluster {}", outPath, cluster.getName());
-                LOG.debug(writer.getBuffer().toString());
-            }
-
-            LOG.info("Marshalled {} to {}", jaxbElement.getDeclaredType(), outPath);
-        } catch (Exception e) {
-            throw new FalconException("Unable to marshall app object", e);
-        }
-    }
-
-    private void createLogsDir(Cluster cluster) throws FalconException {
-        Path logsDir = EntityUtil.getLogPath(cluster, entity);
-        try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(
-                    ClusterHelper.getConfiguration(cluster));
-            if (fs.exists(logsDir)) {
-                return;
-            }
-
-            fs.mkdirs(logsDir);
-
-            // logs are copied with in oozie as the user in Post Processing and hence 777 permissions
-            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-            fs.setPermission(logsDir, permission);
-        } catch (Exception e) {
-            throw new FalconException("Unable to create logs dir at: " + logsDir, e);
-        }
-    }
-
-    protected String marshal(Cluster cluster, COORDINATORAPP coord, Path outPath,
-                           String name) throws FalconException {
-        name = (StringUtils.isEmpty(name) ? "coordinator" : name) + ".xml";
-        marshal(cluster, new ObjectFactory().createCoordinatorApp(coord),
-                OozieUtils.COORD_JAXB_CONTEXT, new Path(outPath, name));
-        return name;
-    }
-
-    protected void marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
-        marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
-            OozieUtils.BUNDLE_JAXB_CONTEXT, new Path(outPath, "bundle.xml"));
-    }
-
-    protected void marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
-        marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
-            OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
-    }
-
-    protected String getStoragePath(Path path) {
-        if (path != null) {
-            return getStoragePath(path.toString());
-        }
-        return null;
-    }
-
-    protected String getStoragePath(String path) {
-        if (StringUtils.isNotEmpty(path)) {
-            if (new Path(path).toUri().getScheme() == null) {
-                path = "${nameNode}" + path;
-            }
-        }
-        return path;
-    }
-
-    protected WORKFLOWAPP getWorkflowTemplate(String template) throws FalconException {
-        InputStream resourceAsStream = null;
-        try {
-            resourceAsStream = OozieWorkflowBuilder.class.getResourceAsStream(template);
-            Unmarshaller unmarshaller = OozieUtils.WORKFLOW_JAXB_CONTEXT.createUnmarshaller();
-            @SuppressWarnings("unchecked")
-            JAXBElement<WORKFLOWAPP> jaxbElement = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
-                resourceAsStream);
-            return jaxbElement.getValue();
-        } catch (JAXBException e) {
-            throw new FalconException(e);
-        } finally {
-            IOUtils.closeQuietly(resourceAsStream);
-        }
-    }
-
-    protected COORDINATORAPP getCoordinatorTemplate(String template) throws FalconException {
-        InputStream resourceAsStream = null;
-        try {
-            resourceAsStream = OozieWorkflowBuilder.class.getResourceAsStream(template);
-            Unmarshaller unmarshaller = OozieUtils.COORD_JAXB_CONTEXT.createUnmarshaller();
-            @SuppressWarnings("unchecked")
-            JAXBElement<COORDINATORAPP> jaxbElement = (JAXBElement<COORDINATORAPP>)
-                unmarshaller.unmarshal(resourceAsStream);
-            return jaxbElement.getValue();
-        } catch (JAXBException e) {
-            throw new FalconException(e);
-        } finally {
-            IOUtils.closeQuietly(resourceAsStream);
-        }
-    }
-
-    // creates hive-site.xml configuration in conf dir for the given cluster on the same cluster.
-    protected void createHiveConfiguration(Cluster cluster, Path workflowPath,
-                                           String prefix) throws FalconException {
-        Configuration hiveConf = getHiveCredentialsAsConf(cluster);
-
-        try {
-            Configuration conf = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
-
-            // create hive conf to stagingDir
-            Path confPath = new Path(workflowPath + "/conf");
-
-            persistHiveConfiguration(fs, confPath, hiveConf, prefix);
-        } catch (IOException e) {
-            throw new FalconException("Unable to create create hive site", e);
-        }
-    }
-
-    protected void persistHiveConfiguration(FileSystem fs, Path confPath,
-                                            Cluster cluster, String prefix) throws IOException {
-        Configuration hiveConf = getHiveCredentialsAsConf(cluster);
-        persistHiveConfiguration(fs, confPath, hiveConf, prefix);
-    }
-
-    private void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf,
-                                          String prefix) throws IOException {
-        OutputStream out = null;
-        try {
-            out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
-            hiveConf.writeXml(out);
-        } finally {
-            IOUtils.closeQuietly(out);
-        }
-    }
-
-    private Configuration getHiveCredentialsAsConf(Cluster cluster) {
-        Map<String, String> hiveCredentials = getHiveCredentials(cluster);
-
-        Configuration hiveConf = new Configuration(false);
-        for (Entry<String, String> entry : hiveCredentials.entrySet()) {
-            hiveConf.set(entry.getKey(), entry.getValue());
-        }
-
-        return hiveConf;
-    }
-
-    private Map<String, String> getHiveCredentials(Cluster cluster) {
-        Map<String, String> hiveCredentials = new HashMap<String, String>();
-
-        String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
-        if (metaStoreUrl == null) {
-            throw new IllegalStateException(
-                    "Registry interface is not defined in cluster: " + cluster.getName());
-        }
-
-        // Propagate the hive properties from cluster entity
-        Map<String, String> hiveProperties = ClusterHelper.geHiveProperties(cluster);
-        if (hiveProperties != null && !hiveProperties.isEmpty()) {
-            hiveCredentials.putAll(hiveProperties);
-        }
-
-        hiveCredentials.put(METASTOREURIS, metaStoreUrl);
-        hiveCredentials.put("hive.metastore.execute.setugi", "true");
-        hiveCredentials.put("hcatNode", metaStoreUrl.replace("thrift", "hcat"));
-        hiveCredentials.put("hcat.metastore.uri", metaStoreUrl);
-
-        if (isSecurityEnabled) {
-            String principal = ClusterHelper
-                    .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL);
-            hiveCredentials.put(METASTORE_KERBEROS_PRINCIPAL, principal);
-            hiveCredentials.put(METASTORE_USE_THRIFT_SASL, "true");
-            hiveCredentials.put("hcat.metastore.principal", principal);
-        }
-
-        return hiveCredentials;
-    }
-
-    /**
-     * This is only necessary if table is involved and is secure mode.
-     *
-     * @param cluster        cluster entity
-     * @param credentialName credential name
-     * @return CREDENTIALS object
-     */
-    protected CREDENTIAL createHCatalogCredential(Cluster cluster, String credentialName) {
-        final String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
-
-        CREDENTIAL credential = new CREDENTIAL();
-        credential.setName(credentialName);
-        credential.setType("hcat");
-
-        credential.getProperty().add(createProperty("hcat.metastore.uri", metaStoreUrl));
-        credential.getProperty().add(createProperty("hcat.metastore.principal",
-                ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL)));
-
-        return credential;
-    }
-
-    private CREDENTIAL.Property createProperty(String name, String value) {
-        CREDENTIAL.Property property = new CREDENTIAL.Property();
-        property.setName(name);
-        property.setValue(value);
-        return property;
-    }
-
-    /**
-     * This is only necessary if table is involved and is secure mode.
-     *
-     * @param workflowApp workflow xml
-     * @param cluster     cluster entity
-     */
-    protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster,
-                                          String credentialName) {
-        CREDENTIALS credentials = workflowApp.getCredentials();
-        if (credentials == null) {
-            credentials = new CREDENTIALS();
-        }
-
-        credentials.getCredential().add(createHCatalogCredential(cluster, credentialName));
-
-        // add credential for workflow
-        workflowApp.setCredentials(credentials);
-    }
-
-    /**
-     * This is only necessary if table is involved and is secure mode.
-     *
-     * @param workflowApp workflow xml
-     * @param cluster     cluster entity
-     */
-    protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster,
-                                          String credentialName, Set<String> actions) {
-        addHCatalogCredentials(workflowApp, cluster, credentialName);
-
-        // add credential to each action
-        for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
-            if (!(object instanceof ACTION)) {
-                continue;
-            }
-
-            ACTION action = (ACTION) object;
-            String actionName = action.getName();
-            if (actions.contains(actionName)) {
-                action.setCred(credentialName);
-            }
-        }
-    }
-
-    protected abstract boolean shouldSetupHiveConfiguration(Cluster cluster,
-                                                            T entity) throws FalconException;
-
-    protected void decorateWithOozieRetries(ACTION action) {
-        Properties props = RuntimeProperties.get();
-        action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
-        action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
-    }
-
-    protected Properties createAppProperties(String clusterName, Path bundlePath, String user) throws FalconException {
-        Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName);
-        Properties properties = new Properties();
-        if (cluster.getProperties() != null) {
-            addClusterProperties(properties, cluster.getProperties().getProperties());
-        }
-        properties.setProperty(OozieWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
-        properties.setProperty(OozieWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster));
-        properties.setProperty(OozieClient.BUNDLE_APP_PATH,
-                "${" + OozieWorkflowEngine.NAME_NODE + "}" + bundlePath.toString());
-        properties.setProperty("colo.name", cluster.getColo());
-
-        properties.setProperty(OozieClient.USER_NAME, user);
-        properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true");
-        properties.setProperty("falcon.libpath", ClusterHelper.getLocation(cluster, "working") + "/lib");
-
-        if (shouldSetupHiveConfiguration(cluster, entity)) {
-            propagateHiveCredentials(cluster, properties);
-        }
-
-        LOG.info("Cluster: {}, PROPS: {}", cluster.getName(), properties);
-        return properties;
-    }
-
-    private void addClusterProperties(Properties properties, List<Property> clusterProperties) {
-        for (Property prop : clusterProperties) {
-            properties.setProperty(prop.getName(), prop.getValue());
-        }
-    }
-
-    /**
-     * This method propagates hive credentials for coordinator to authenticate against hive
-     * for data availability triggers.
-     *
-     * @param cluster cluster entity
-     * @param properties property object
-     */
-    private void propagateHiveCredentials(Cluster cluster, Properties properties) {
-        Map<String, String> hiveCredentials = getHiveCredentials(cluster);
-        for (Entry<String, String> entry : hiveCredentials.entrySet()) {
-            properties.setProperty(entry.getKey(), entry.getValue());
-        }
-    }
-
-    public abstract Date getNextStartTime(T entity, String cluster, Date now) throws FalconException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index c65bed9..38be792 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -32,6 +32,7 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.oozie.OozieEntityBuilder;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesResult.Instance;
@@ -42,8 +43,6 @@ import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.update.UpdateHelper;
 import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.util.RuntimeProperties;
-import org.apache.falcon.workflow.OozieWorkflowBuilder;
-import org.apache.falcon.workflow.WorkflowBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -142,22 +141,20 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
 
         if (!schedClusters.isEmpty()) {
-            WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
-            Map<String, Properties> newFlows =
-                builder.newWorkflowSchedule(schedClusters.toArray(new String[schedClusters.size()]));
-            for (Map.Entry<String, Properties> entry : newFlows.entrySet()) {
-                String cluster = entry.getKey();
-                LOG.info("Scheduling {} on cluster {}", entity.toShortString(), cluster);
-                scheduleEntity(cluster, entry.getValue(), entity);
-                commitStagingPath(cluster, entry.getValue().getProperty(OozieClient.BUNDLE_APP_PATH));
+            OozieEntityBuilder builder = OozieEntityBuilder.get(entity);
+            for (String clusterName: schedClusters) {
+                Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
+                LOG.info("Scheduling {} on cluster {}", entity.toShortString(), clusterName);
+                Path buildPath = EntityUtil.getNewStagingPath(cluster, entity);
+                Properties properties = builder.build(cluster, buildPath);
+                scheduleEntity(clusterName, properties, entity);
+                commitStagingPath(cluster, buildPath);
             }
         }
     }
 
-    private void commitStagingPath(String cluster, String path) throws FalconException {
-        path = StringUtils.removeStart(path, "${nameNode}");
-        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
-        FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(clusterEntity));
+    private void commitStagingPath(Cluster cluster, Path path) throws FalconException {
+        FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
         try {
             fs.create(new Path(path, EntityUtil.SUCCEEDED_FILE_NAME)).close();
         } catch (IOException e) {
@@ -405,21 +402,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     @Override
     public InstancesResult getRunningInstances(Entity entity, List<LifeCycle> lifeCycles) throws FalconException {
         try {
-            WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
             Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
             List<Instance> runInstances = new ArrayList<Instance>();
-            String[] wfNames = builder.getWorkflowNames();
-            List<String> coordNames = new ArrayList<String>();
-            for (String wfName : wfNames) {
-                if (!isCoordApplicable(wfName, lifeCycles)) {
-                    continue;
-                }
-                coordNames.add(wfName);
-            }
 
             for (String cluster : clusters) {
                 ProxyOozieClient client = OozieClientFactory.get(cluster);
-                List<WorkflowJob> wfs = getRunningWorkflows(cluster, coordNames);
+                List<String> wfNames = EntityUtil.getWorkflowNames(entity, cluster);
+                List<WorkflowJob> wfs = getRunningWorkflows(cluster, wfNames);
                 if (wfs != null) {
                     for (WorkflowJob job : wfs) {
                         WorkflowJob wf = client.getJobInfo(job.getId());
@@ -958,8 +947,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             LOG.info("Updating entity through Workflow Engine {}", newEntity.toShortString());
             Date newEndTime = EntityUtil.getEndTime(newEntity, cluster);
             if (newEndTime.before(now())) {
-                throw new FalconException("New end time for " + newEntity.getName() + " is past current time. Entity "
-                    + "can't be updated. Use remove and add");
+                throw new FalconException("Entity's end time " + SchemaHelper.formatDateUTC(newEndTime)
+                    + " is before current time. Entity can't be updated. Use remove and add");
             }
 
             LOG.debug("Updating for cluster: {}, bundle: {}", cluster, bundle.getId());
@@ -974,8 +963,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
             LOG.debug("Going to update! : {} for cluster {}, bundle: {}",
                     newEntity.toShortString(), cluster, bundle.getId());
-            effectiveTime = updateInternal(oldEntity, newEntity, cluster, bundle,
-                    false, effectiveTime, CurrentUser.getUser());
+            effectiveTime = updateInternal(oldEntity, newEntity, clusterEntity, bundle, false, effectiveTime,
+                CurrentUser.getUser());
             LOG.info("Entity update complete: {} for cluster {}, bundle: {}",
                     newEntity.toShortString(), cluster, bundle.getId());
         }
@@ -999,8 +988,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
             LOG.info("Triggering update for {}, {}", cluster, affectedProcBundle.getId());
 
-            Date depEndTime = updateInternal(affectedEntity, affectedEntity, cluster,
-                    affectedProcBundle, false, effectiveTime, affectedProcBundle.getUser());
+            Date depEndTime = updateInternal(affectedEntity, affectedEntity, clusterEntity, affectedProcBundle,
+                false, effectiveTime, affectedProcBundle.getUser());
             if (effectiveTime == null || effectiveTime.after(depEndTime)) {
                 effectiveTime = depEndTime;
             }
@@ -1091,20 +1080,17 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    private Date updateInternal(Entity oldEntity, Entity newEntity, String cluster,
-                                BundleJob oldBundle, boolean alreadyCreated, Date inEffectiveTime,
-                                String user) throws FalconException {
-        OozieWorkflowBuilder<Entity> builder =
-                (OozieWorkflowBuilder<Entity>) WorkflowBuilder.getBuilder(ENGINE, oldEntity);
-
+    private Date updateInternal(Entity oldEntity, Entity newEntity, Cluster cluster, BundleJob oldBundle,
+        boolean alreadyCreated, Date inEffectiveTime, String user) throws FalconException {
         Job.Status oldBundleStatus = oldBundle.getStatus();
+        String clusterName = cluster.getName();
+
         //Suspend coords as bundle suspend doesn't suspend coords synchronously
-        suspendCoords(cluster, oldBundle);
+        suspendCoords(clusterName, oldBundle);
 
-        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
-        Path stagingPath = EntityUtil.getLatestStagingPath(clusterEntity, oldEntity);
+        Path stagingPath = EntityUtil.getLatestStagingPath(cluster, oldEntity);
         //find last scheduled bundle
-        BundleJob latestBundle = findBundleForStagingPath(cluster, oldEntity, stagingPath);
+        BundleJob latestBundle = findBundleForStagingPath(clusterName, oldEntity, stagingPath);
         Date effectiveTime;
         if (oldBundle.getAppPath().endsWith(stagingPath.toUri().getPath())
                 || latestBundle == null || !alreadyCreated) {
@@ -1121,13 +1107,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             }
 
             //pick start time for new bundle which is after effectiveTime
-            effectiveTime = builder.getNextStartTime(newEntity, cluster, effectiveTime);
+            effectiveTime = EntityUtil.getNextStartTime(newEntity, cluster, effectiveTime);
 
             //schedule new bundle
             String newBundleId = scheduleForUpdate(newEntity, cluster, effectiveTime, user);
             //newBundleId and latestBundle will be null if effectiveTime = process end time
             if (newBundleId != null) {
-                latestBundle = getBundleInfo(cluster, newBundleId);
+                latestBundle = getBundleInfo(clusterName, newBundleId);
                 LOG.info("New bundle {} scheduled successfully with start time {}",
                         newBundleId, SchemaHelper.formatDateUTC(effectiveTime));
             }
@@ -1144,37 +1130,37 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
         if (effectiveTime != null) {
             //set endtime for old coords
-            updateCoords(cluster, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime);
+            updateCoords(clusterName, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime);
         }
 
         if (oldBundleStatus != Job.Status.SUSPENDED
                 && oldBundleStatus != Job.Status.PREPSUSPENDED) {
             //resume coords
-            resumeCoords(cluster, oldBundle);
+            resumeCoords(clusterName, oldBundle);
         }
 
         //latestBundle will be null if effectiveTime = process end time
         if (latestBundle != null) {
             //create _SUCCESS in staging path to mark update is complete(to handle roll-forward for updates)
-            commitStagingPath(cluster, latestBundle.getAppPath());
+            commitStagingPath(cluster, new Path(latestBundle.getAppPath()));
         }
         return effectiveTime;
     }
 
-    private String scheduleForUpdate(Entity entity, String cluster, Date startDate, String user)
+    private String scheduleForUpdate(Entity entity, Cluster cluster, Date startDate, String user)
         throws FalconException {
         Entity clone = entity.copy();
 
         String currentUser = CurrentUser.getUser();
         switchUser(user);
         try {
-            EntityUtil.setStartDate(clone, cluster, startDate);
-            WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, clone);
-            Map<String, Properties> bundleProps = builder.newWorkflowSchedule(cluster);
-            LOG.info("Scheduling {} on cluster {} with props {}",
-                    entity.toShortString(), cluster, bundleProps);
-            if (bundleProps != null && bundleProps.size() > 0) {
-                return scheduleEntity(cluster, bundleProps.get(cluster), entity);
+            EntityUtil.setStartDate(clone, cluster.getName(), startDate);
+            Path buildPath = EntityUtil.getNewStagingPath(cluster, clone);
+            OozieEntityBuilder builder = OozieEntityBuilder.get(clone);
+            Properties properties = builder.build(cluster, buildPath);
+            if (properties != null) {
+                LOG.info("Scheduling {} on cluster {} with props {}", entity.toShortString(), cluster, properties);
+                return scheduleEntity(cluster.getName(), properties, entity);
             } else {
                 LOG.info("No new workflow to be scheduled for this " + entity.toShortString());
                 return null;
@@ -1210,7 +1196,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         }
     }
 
-    private List<WorkflowJob> getRunningWorkflows(String cluster, List<String> wfNames) throws FalconException {
+    private List<WorkflowJob> getRunningWorkflows(String cluster, List<String> wfNames) throws
+        FalconException {
         StringBuilder filter = new StringBuilder();
         filter.append(OozieClient.FILTER_STATUS).append('=').append(Job.Status.RUNNING.name());
         for (String wfName : wfNames) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/resources/coordinator/replication-coordinator.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/coordinator/replication-coordinator.xml b/oozie/src/main/resources/coordinator/replication-coordinator.xml
new file mode 100644
index 0000000..693b0bd
--- /dev/null
+++ b/oozie/src/main/resources/coordinator/replication-coordinator.xml
@@ -0,0 +1,51 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<coordinator-app name="#NAME#" frequency="#FREQUENCY#"
+                 start="#START_TIME#" end="#END_TIME" timezone="#TIMEZONE#"
+                 xmlns="uri:oozie:coordinator:0.3">
+    <controls>
+        <concurrency>1</concurrency>
+        <execution>FIFO</execution>
+    </controls>
+    <datasets>
+        <dataset name="input-dataset" frequency="#FEED_FREQ#"
+                 initial-instance="#START_TIME#" timezone="#TIMEZONE#">
+            <uri-template>#FEED_PATH#</uri-template>
+        </dataset>
+        <dataset name="output-dataset" frequency="#FEED_FREQ#"
+                 initial-instance="#START_TIME#" timezone="#TIMEZONE#">
+            <uri-template>#FEED_PATH#</uri-template>
+        </dataset>
+    </datasets>
+    <input-events>
+        <data-in name="input" dataset="input-dataset">
+            <instance>${coord:current(0)}</instance>
+        </data-in>
+    </input-events>
+    <output-events>
+        <data-out name="output" dataset="output-dataset">
+            <instance>${coord:current(0)}</instance>
+        </data-out>
+    </output-events>
+    <action>
+        <workflow>
+            <app-path>#WF_PATH#</app-path>
+            <configuration/>
+        </workflow>
+    </action>
+</coordinator-app>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/resources/workflow/falcon-table-export.hql
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/workflow/falcon-table-export.hql b/oozie/src/main/resources/workflow/falcon-table-export.hql
new file mode 100644
index 0000000..37fd1b7
--- /dev/null
+++ b/oozie/src/main/resources/workflow/falcon-table-export.hql
@@ -0,0 +1,18 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+export table ${falconSourceDatabase}.${falconSourceTable} partition ${falconSourcePartition} to '${falconSourceStagingDir}';

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/resources/workflow/falcon-table-import.hql
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/workflow/falcon-table-import.hql b/oozie/src/main/resources/workflow/falcon-table-import.hql
new file mode 100644
index 0000000..653d580
--- /dev/null
+++ b/oozie/src/main/resources/workflow/falcon-table-import.hql
@@ -0,0 +1,20 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+use ${falconTargetDatabase};
+alter table ${falconTargetTable} drop if exists partition ${falconTargetPartition};
+import table ${falconTargetTable} partition ${falconTargetPartition} from '${falconTargetStagingDir}';

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/resources/workflow/process-parent-workflow.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/workflow/process-parent-workflow.xml b/oozie/src/main/resources/workflow/process-parent-workflow.xml
new file mode 100644
index 0000000..4a2495c
--- /dev/null
+++ b/oozie/src/main/resources/workflow/process-parent-workflow.xml
@@ -0,0 +1,278 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-process-parent-workflow'>
+    <start to='should-record'/>
+    <decision name='should-record'>
+        <switch>
+            <case to="recordsize">
+                ${shouldRecord=="true"}
+            </case>
+            <default to="user-workflow"/>
+        </switch>
+    </decision>
+    <action name='recordsize'>
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+                <!-- HCatalog jars -->
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>hcatalog</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.latedata.LateDataHandler</main-class>
+            <arg>-out</arg>
+            <arg>${logDir}/latedata/${nominalTime}</arg>
+            <arg>-paths</arg>
+            <arg>${falconInPaths}</arg>
+            <arg>-falconInputFeeds</arg>
+            <arg>${falconInputFeeds}</arg>
+            <arg>-falconInputFeedStorageTypes</arg>
+            <arg>${falconInputFeedStorageTypes}</arg>
+            <capture-output/>
+        </java>
+        <ok to="user-workflow"/>
+        <error to="failed-post-processing"/>
+    </action>
+    <decision name='user-workflow'>
+        <switch>
+            <case to="user-oozie-workflow">
+                ${userWorkflowEngine=="oozie"}
+            </case>
+            <case to="user-pig-job">
+                ${userWorkflowEngine=="pig"}
+            </case>
+            <case to="user-hive-job">
+                ${userWorkflowEngine=="hive"}
+            </case>
+            <default to="user-oozie-workflow"/>
+        </switch>
+    </decision>
+    <action name='user-pig-job'>
+        <pig>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.pig</name>
+                    <value>pig,hcatalog</value>
+                </property>
+            </configuration>
+            <script>#USER_WF_PATH#</script>
+        </pig>
+        <ok to="succeeded-post-processing"/>
+        <error to="failed-post-processing"/>
+    </action>
+    <action name="user-hive-job">
+        <hive xmlns="uri:oozie:hive-action:0.2">
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <script>#USER_WF_PATH#</script>
+        </hive>
+        <ok to="succeeded-post-processing"/>
+        <error to="failed-post-processing"/>
+    </action>
+    <action name='user-oozie-workflow'>
+        <sub-workflow>
+            <app-path>#USER_WF_PATH#</app-path>
+            <propagate-configuration/>
+        </sub-workflow>
+        <ok to="succeeded-post-processing"/>
+        <error to="failed-post-processing"/>
+    </action>
+    <action name='succeeded-post-processing'>
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
+            <arg>-cluster</arg>
+            <arg>${cluster}</arg>
+            <arg>-entityType</arg>
+            <arg>${entityType}</arg>
+            <arg>-entityName</arg>
+            <arg>${entityName}</arg>
+            <arg>-nominalTime</arg>
+            <arg>${nominalTime}</arg>
+            <arg>-operation</arg>
+            <arg>GENERATE</arg>
+            <arg>-workflowId</arg>
+            <arg>${wf:id()}</arg>
+            <arg>-runId</arg>
+            <arg>${wf:run()}</arg>
+            <arg>-status</arg>
+            <arg>SUCCEEDED</arg>
+            <arg>-timeStamp</arg>
+            <arg>${timeStamp}</arg>
+            <arg>-brokerImplClass</arg>
+            <arg>${wf:conf("broker.impl.class")}</arg>
+            <arg>-brokerUrl</arg>
+            <arg>${wf:conf("broker.url")}</arg>
+            <arg>-userBrokerImplClass</arg>
+            <arg>${userBrokerImplClass}</arg>
+            <arg>-userBrokerUrl</arg>
+            <arg>${userBrokerUrl}</arg>
+            <arg>-brokerTTL</arg>
+            <arg>${wf:conf("broker.ttlInMins")}</arg>
+            <arg>-feedNames</arg>
+            <arg>${feedNames}</arg>
+            <arg>-feedInstancePaths</arg>
+            <arg>${feedInstancePaths}</arg>
+            <arg>-logFile</arg>
+            <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+            <arg>-workflowEngineUrl</arg>
+            <arg>${workflowEngineUrl}</arg>
+            <arg>-subflowId</arg>
+            <arg>${wf:id()}${userWorkflowEngine == "oozie" ? "@user-oozie-workflow" : ""}</arg>
+            <arg>-userWorkflowEngine</arg>
+            <arg>${userWorkflowEngine}</arg>
+            <arg>-userWorkflowName</arg>
+            <arg>${userWorkflowName}</arg>
+            <arg>-userWorkflowVersion</arg>
+            <arg>${userWorkflowVersion}</arg>
+            <arg>-logDir</arg>
+            <arg>${logDir}/job-${nominalTime}/</arg>
+            <arg>-workflowUser</arg>
+            <arg>${wf:user()}</arg>
+            <arg>-falconInputFeeds</arg>
+            <arg>${falconInputFeeds}</arg>
+            <arg>-falconInputPaths</arg>
+            <arg>${falconInPaths}</arg>
+            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
+            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
+            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
+            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
+            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
+            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
+        </java>
+        <ok to="end"/>
+        <error to="fail"/>
+    </action>
+    <action name='failed-post-processing'>
+        <java>
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
+            <arg>-cluster</arg>
+            <arg>${cluster}</arg>
+            <arg>-entityType</arg>
+            <arg>${entityType}</arg>
+            <arg>-entityName</arg>
+            <arg>${entityName}</arg>
+            <arg>-nominalTime</arg>
+            <arg>${nominalTime}</arg>
+            <arg>-operation</arg>
+            <arg>GENERATE</arg>
+            <arg>-workflowId</arg>
+            <arg>${wf:id()}</arg>
+            <arg>-runId</arg>
+            <arg>${wf:run()}</arg>
+            <arg>-status</arg>
+            <arg>FAILED</arg>
+            <arg>-timeStamp</arg>
+            <arg>${timeStamp}</arg>
+            <arg>-brokerImplClass</arg>
+            <arg>${wf:conf("broker.impl.class")}</arg>
+            <arg>-brokerUrl</arg>
+            <arg>${wf:conf("broker.url")}</arg>
+            <arg>-userBrokerImplClass</arg>
+            <arg>${userBrokerImplClass}</arg>
+            <arg>-userBrokerUrl</arg>
+            <arg>${userBrokerUrl}</arg>
+            <arg>-brokerTTL</arg>
+            <arg>${wf:conf("broker.ttlInMins")}</arg>
+            <arg>-feedNames</arg>
+            <arg>${feedNames}</arg>
+            <arg>-feedInstancePaths</arg>
+            <arg>${feedInstancePaths}</arg>
+            <arg>-logFile</arg>
+            <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+            <arg>-workflowEngineUrl</arg>
+            <arg>${workflowEngineUrl}</arg>
+            <arg>-subflowId</arg>
+            <arg>${wf:id()}${userWorkflowEngine == "oozie" ? "@user-oozie-workflow" : ""}</arg>
+            <arg>-userWorkflowEngine</arg>
+            <arg>${userWorkflowEngine}</arg>
+            <arg>-logDir</arg>
+            <arg>${logDir}/job-${nominalTime}/</arg>
+            <arg>-workflowUser</arg>
+            <arg>${wf:user()}</arg>
+            <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
+            <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
+            <file>${wf:conf("falcon.libpath")}/jms.jar</file>
+            <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
+            <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
+            <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
+        </java>
+        <ok to="fail"/>
+        <error to="fail"/>
+    </action>
+    <kill name="fail">
+        <message>
+            Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+        </message>
+    </kill>
+    <end name='end'/>
+</workflow-app>