You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/03/18 12:41:07 UTC

[2/5] FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. Contributed by Shwetha GS

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 4e5e8c6..c31842b 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -21,41 +21,106 @@ package org.apache.falcon.workflow;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
-import org.apache.falcon.converter.OozieProcessMapper;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.EngineType;
 import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.oozie.coordinator.CONTROLS;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.DATAIN;
+import org.apache.falcon.oozie.coordinator.DATAOUT;
+import org.apache.falcon.oozie.coordinator.DATASETS;
+import org.apache.falcon.oozie.coordinator.INPUTEVENTS;
+import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.DELETE;
+import org.apache.falcon.oozie.workflow.PIG;
+import org.apache.falcon.oozie.workflow.PREPARE;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.update.UpdateHelper;
+import org.apache.falcon.util.OozieUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.log4j.Logger;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
 import org.apache.oozie.client.OozieClient;
 
-import java.util.*;
+import javax.xml.bind.JAXBElement;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
 /**
  * Oozie workflow builder for falcon entities.
  */
 public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
+    private static final Logger LOG = Logger.getLogger(OozieProcessWorkflowBuilder.class);
+
+    public OozieProcessWorkflowBuilder(Process entity) {
+        super(entity);
+    }
 
     @Override
-    public Map<String, Properties> newWorkflowSchedule(Process process, List<String> clusters) throws FalconException {
+    public Map<String, Properties> newWorkflowSchedule(String... clusters) throws FalconException {
         Map<String, Properties> propertiesMap = new HashMap<String, Properties>();
 
         for (String clusterName : clusters) {
-            org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, clusterName);
-            Properties properties = newWorkflowSchedule(process, processCluster.getValidity().getStart(), clusterName,
-                    CurrentUser.getUser());
-            if (properties != null) {
-                propertiesMap.put(clusterName, properties);
+            org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(entity, clusterName);
+            if (processCluster.getValidity().getStart().compareTo(processCluster.getValidity().getEnd()) >= 0) {
+                LOG.info("process validity start <= end for cluster " + clusterName + ". Skipping schedule");
+                break;
+            }
+
+            Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, processCluster.getName());
+            Path bundlePath = EntityUtil.getNewStagingPath(cluster, entity);
+            map(cluster, bundlePath);
+            Properties properties = createAppProperties(clusterName, bundlePath, CurrentUser.getUser());
+
+            //Add libpath
+            String libPath = entity.getWorkflow().getLib();
+            if (!StringUtils.isEmpty(libPath)) {
+                String path = libPath.replace("${nameNode}", "");
+                properties.put(OozieClient.LIBPATH, "${nameNode}" + path);
+            }
+
+            if (entity.getInputs() != null) {
+                for (Input in : entity.getInputs().getInputs()) {
+                    if (in.isOptional()) {
+                        addOptionalInputProperties(properties, in, clusterName);
+                    }
+                }
             }
+            propertiesMap.put(clusterName, properties);
         }
         return propertiesMap;
     }
@@ -100,51 +165,734 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
     }
 
     @Override
-    public Properties newWorkflowSchedule(Process process, Date startDate, String clusterName, String user)
-        throws FalconException {
-        org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, clusterName);
-        if (!startDate.before(processCluster.getValidity().getEnd())) {// start time >= end time
-            return null;
+    public Date getNextStartTime(Process process, String cluster, Date now) throws FalconException {
+        org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, cluster);
+        return EntityUtil.getNextStartTime(processCluster.getValidity().getStart(),
+                process.getFrequency(), process.getTimezone(), now);
+    }
+
+    @Override
+    public String[] getWorkflowNames() {
+        return new String[]{EntityUtil.getWorkflowName(Tag.DEFAULT, entity).toString()};
+    }
+
+    private static final String DEFAULT_WF_TEMPLATE = "/config/workflow/process-parent-workflow.xml";
+    private static final int THIRTY_MINUTES = 30 * 60 * 1000;
+
+    @Override
+    public List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
+        try {
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+
+            //Copy user workflow and lib to staging dir
+            Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getPath()),
+                new Path(bundlePath, EntityUtil.PROCESS_USER_DIR));
+            if (entity.getWorkflow().getLib() != null && fs.exists(new Path(entity.getWorkflow().getLib()))) {
+                checksums.putAll(UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getLib()),
+                    new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR)));
+            }
+
+            writeChecksums(fs, new Path(bundlePath, EntityUtil.PROCESS_CHECKSUM_FILE), checksums);
+        } catch (IOException e) {
+            throw new FalconException("Failed to copy user workflow/lib", e);
+        }
+
+        List<COORDINATORAPP> apps = new ArrayList<COORDINATORAPP>();
+        apps.add(createDefaultCoordinator(cluster, bundlePath));
+
+        return apps;
+    }
+
+    private void writeChecksums(FileSystem fs, Path path, Map<String, String> checksums) throws FalconException {
+        try {
+            FSDataOutputStream stream = fs.create(path);
+            try {
+                for (Map.Entry<String, String> entry : checksums.entrySet()) {
+                    stream.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+                }
+            } finally {
+                stream.close();
+            }
+        } catch (IOException e) {
+            throw new FalconException("Failed to copy user workflow/lib", e);
         }
+    }
+
+    private Path getUserWorkflowPath(Cluster cluster, Path bundlePath) throws FalconException {
+        try {
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
+            Path wfPath = new Path(entity.getWorkflow().getPath());
+            if (fs.isFile(wfPath)) {
+                return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName());
+            } else {
+                return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR);
+            }
+        } catch(IOException e) {
+            throw new FalconException("Failed to get workflow path", e);
+        }
+    }
+
+    private Path getUserLibPath(Cluster cluster, Path bundlePath) throws FalconException {
+        try {
+            if (entity.getWorkflow().getLib() == null) {
+                return null;
+            }
+            Path libPath = new Path(entity.getWorkflow().getLib());
 
-        Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, processCluster.getName());
-        Path bundlePath = EntityUtil.getNewStagingPath(cluster, process);
-        Process processClone = (Process) process.copy();
-        EntityUtil.setStartDate(processClone, clusterName, startDate);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
+            if (fs.isFile(libPath)) {
+                return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
+            } else {
+                return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR);
+            }
+        } catch(IOException e) {
+            throw new FalconException("Failed to get user lib path", e);
+        }
+    }
 
-        OozieProcessMapper mapper = new OozieProcessMapper(processClone);
-        if (!mapper.map(cluster, bundlePath)) {
+    /**
+     * Creates default oozie coordinator.
+     *
+     * @param cluster    - Cluster for which the coordiantor app need to be created
+     * @param bundlePath - bundle path
+     * @return COORDINATORAPP
+     * @throws FalconException on Error
+     */
+    public COORDINATORAPP createDefaultCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
+        if (entity == null) {
             return null;
         }
 
-        Properties properties = createAppProperties(clusterName, bundlePath, user);
+        COORDINATORAPP coord = new COORDINATORAPP();
+        String coordName = EntityUtil.getWorkflowName(Tag.DEFAULT, entity).toString();
+        Path coordPath = getCoordPath(bundlePath, coordName);
+
+        // coord attributes
+        initializeCoordAttributes(cluster, entity, coord, coordName);
+
+        CONTROLS controls = initializeControls(entity); // controls
+        coord.setControls(controls);
+
+        // Configuration
+        Map<String, String> props = createCoordDefaultConfiguration(cluster, coordPath, coordName);
+
+        initializeInputPaths(cluster, entity, coord, props); // inputs
+        initializeOutputPaths(cluster, entity, coord, props);  // outputs
+
+        Workflow processWorkflow = entity.getWorkflow();
+        propagateUserWorkflowProperties(processWorkflow, props, entity.getName());
+
+        // create parent wf
+        createWorkflow(cluster, entity, processWorkflow, coordName, coordPath);
+
+        WORKFLOW wf = new WORKFLOW();
+        wf.setAppPath(getStoragePath(coordPath.toString()));
+        wf.setConfiguration(getCoordConfig(props));
+
+        // set coord action to parent wf
+        org.apache.falcon.oozie.coordinator.ACTION action = new org.apache.falcon.oozie.coordinator.ACTION();
+        action.setWorkflow(wf);
+        coord.setAction(action);
+
+        return coord;
+    }
+
+    private void initializeCoordAttributes(Cluster cluster, Process process, COORDINATORAPP coord, String coordName) {
+        coord.setName(coordName);
+        org.apache.falcon.entity.v0.process.Cluster processCluster =
+            ProcessHelper.getCluster(process, cluster.getName());
+        coord.setStart(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()));
+        coord.setEnd(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()));
+        coord.setTimezone(process.getTimezone().getID());
+        coord.setFrequency("${coord:" + process.getFrequency().toString() + "}");
+    }
+
+    private CONTROLS initializeControls(Process process)
+        throws FalconException {
+        CONTROLS controls = new CONTROLS();
+        controls.setConcurrency(String.valueOf(process.getParallel()));
+        controls.setExecution(process.getOrder().name());
+
+        Frequency timeout = process.getTimeout();
+        long frequencyInMillis = ExpressionHelper.get().evaluate(process.getFrequency().toString(), Long.class);
+        long timeoutInMillis;
+        if (timeout != null) {
+            timeoutInMillis = ExpressionHelper.get().
+                evaluate(process.getTimeout().toString(), Long.class);
+        } else {
+            timeoutInMillis = frequencyInMillis * 6;
+            if (timeoutInMillis < THIRTY_MINUTES) {
+                timeoutInMillis = THIRTY_MINUTES;
+            }
+        }
+        controls.setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
+
+        if (timeoutInMillis / frequencyInMillis * 2 > 0) {
+            controls.setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
+        }
+
+        return controls;
+    }
 
-        //Add libpath
-        String libPath = process.getWorkflow().getLib();
-        if (!StringUtils.isEmpty(libPath)) {
-            String path = libPath.replace("${nameNode}", "");
-            properties.put(OozieClient.LIBPATH, "${nameNode}" + path);
+    private void initializeInputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
+        Map<String, String> props) throws FalconException {
+        if (process.getInputs() == null) {
+            props.put("falconInputFeeds", "NONE");
+            props.put("falconInPaths", "IGNORE");
+            return;
         }
 
-        if (process.getInputs() != null) {
-            for (Input in : process.getInputs().getInputs()) {
-                if (in.isOptional()) {
-                    addOptionalInputProperties(properties, in, clusterName);
+        List<String> inputFeeds = new ArrayList<String>();
+        List<String> inputPaths = new ArrayList<String>();
+        List<String> inputFeedStorageTypes = new ArrayList<String>();
+        for (Input input : process.getInputs().getInputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+
+            if (!input.isOptional()) {
+                if (coord.getDatasets() == null) {
+                    coord.setDatasets(new DATASETS());
+                }
+                if (coord.getInputEvents() == null) {
+                    coord.setInputEvents(new INPUTEVENTS());
                 }
+
+                SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(), LocationType.DATA);
+                coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
+
+                DATAIN datain = createDataIn(input);
+                coord.getInputEvents().getDataIn().add(datain);
             }
+
+            String inputExpr = null;
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
+                props.put(input.getName(), inputExpr);
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                inputExpr = "${coord:dataIn('" + input.getName() + "')}";
+                propagateCatalogTableProperties(input, (CatalogStorage) storage, props);
+            }
+
+            inputFeeds.add(feed.getName());
+            inputPaths.add(inputExpr);
+            inputFeedStorageTypes.add(storage.getType().name());
         }
-        return properties;
+
+        propagateLateDataProperties(inputFeeds, inputPaths, inputFeedStorageTypes, props);
     }
 
-    @Override
-    public Date getNextStartTime(Process process, String cluster, Date now) throws FalconException {
-        org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, cluster);
-        return EntityUtil.getNextStartTime(processCluster.getValidity().getStart(),
-                process.getFrequency(), process.getTimezone(), now);
+    private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputPaths,
+        List<String> inputFeedStorageTypes, Map<String, String> props) {
+        // populate late data handler - should-record action
+        props.put("falconInputFeeds", join(inputFeeds.iterator(), '#'));
+        props.put("falconInPaths", join(inputPaths.iterator(), '#'));
+
+        // storage type for each corresponding feed sent as a param to LateDataHandler
+        // needed to compute usage based on storage type in LateDataHandler
+        props.put("falconInputFeedStorageTypes", join(inputFeedStorageTypes.iterator(), '#'));
+    }
+
+    private void initializeOutputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
+        Map<String, String> props) throws FalconException {
+        if (process.getOutputs() == null) {
+            props.put(ARG.feedNames.getPropName(), "NONE");
+            props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
+            return;
+        }
+
+        if (coord.getDatasets() == null) {
+            coord.setDatasets(new DATASETS());
+        }
+
+        if (coord.getOutputEvents() == null) {
+            coord.setOutputEvents(new OUTPUTEVENTS());
+        }
+
+        List<String> outputFeeds = new ArrayList<String>();
+        List<String> outputPaths = new ArrayList<String>();
+        for (Output output : process.getOutputs().getOutputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+
+            SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, output.getName(), LocationType.DATA);
+            coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
+
+            DATAOUT dataout = createDataOut(output);
+            coord.getOutputEvents().getDataOut().add(dataout);
+
+            String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
+            outputFeeds.add(feed.getName());
+            outputPaths.add(outputExpr);
+
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                props.put(output.getName(), outputExpr);
+
+                propagateFileSystemProperties(output, feed, cluster, coord, storage, props);
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                propagateCatalogTableProperties(output, (CatalogStorage) storage, props);
+            }
+        }
+
+        // Output feed name and path for parent workflow
+        props.put(ARG.feedNames.getPropName(), join(outputFeeds.iterator(), ','));
+        props.put(ARG.feedInstancePaths.getPropName(), join(outputPaths.iterator(), ','));
+    }
+
+    private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
+        String datasetName, LocationType locationType) throws FalconException {
+
+        SYNCDATASET syncdataset = new SYNCDATASET();
+        syncdataset.setName(datasetName);
+        syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+
+        String uriTemplate = storage.getUriTemplate(locationType);
+        if (storage.getType() == Storage.TYPE.TABLE) {
+            uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+        }
+        syncdataset.setUriTemplate(uriTemplate);
+
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
+        syncdataset.setTimezone(feed.getTimezone().getID());
+
+        if (feed.getAvailabilityFlag() == null) {
+            syncdataset.setDoneFlag("");
+        } else {
+            syncdataset.setDoneFlag(feed.getAvailabilityFlag());
+        }
+
+        return syncdataset;
+    }
+
+    private DATAOUT createDataOut(Output output) {
+        DATAOUT dataout = new DATAOUT();
+        dataout.setName(output.getName());
+        dataout.setDataset(output.getName());
+        dataout.setInstance(getELExpression(output.getInstance()));
+        return dataout;
+    }
+
+    private DATAIN createDataIn(Input input) {
+        DATAIN datain = new DATAIN();
+        datain.setName(input.getName());
+        datain.setDataset(input.getName());
+        datain.setStartInstance(getELExpression(input.getStart()));
+        datain.setEndInstance(getELExpression(input.getEnd()));
+        return datain;
+    }
+
+    private void propagateFileSystemProperties(Output output, Feed feed, Cluster cluster, COORDINATORAPP coord,
+        Storage storage, Map<String, String> props)
+        throws FalconException {
+
+        // stats and meta paths
+        createOutputEvent(output, feed, cluster, LocationType.STATS, coord, props, storage);
+        createOutputEvent(output, feed, cluster, LocationType.META, coord, props, storage);
+        createOutputEvent(output, feed, cluster, LocationType.TMP, coord, props, storage);
+    }
+
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    private void createOutputEvent(Output output, Feed feed, Cluster cluster, LocationType locType,
+        COORDINATORAPP coord, Map<String, String> props, Storage storage)
+        throws FalconException {
+
+        String name = output.getName();
+        String type = locType.name().toLowerCase();
+
+        SYNCDATASET dataset = createDataSet(feed, cluster, storage, name + type, locType);
+        coord.getDatasets().getDatasetOrAsyncDataset().add(dataset);
+
+        DATAOUT dataout = new DATAOUT();
+        dataout.setName(name + type);
+        dataout.setDataset(name + type);
+        dataout.setInstance(getELExpression(output.getInstance()));
+
+        OUTPUTEVENTS outputEvents = coord.getOutputEvents();
+        if (outputEvents == null) {
+            outputEvents = new OUTPUTEVENTS();
+            coord.setOutputEvents(outputEvents);
+        }
+        outputEvents.getDataOut().add(dataout);
+
+        String outputExpr = "${coord:dataOut('" + name + type + "')}";
+        props.put(name + "." + type, outputExpr);
+    }
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+    private void propagateCommonCatalogTableProperties(CatalogStorage tableStorage,
+        Map<String, String> props, String prefix) {
+        props.put(prefix + "_storage_type", tableStorage.getType().name());
+        props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
+        props.put(prefix + "_database", tableStorage.getDatabase());
+        props.put(prefix + "_table", tableStorage.getTable());
+    }
+
+    private void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage,
+        Map<String, String> props) {
+        String prefix = "falcon_" + input.getName();
+
+        propagateCommonCatalogTableProperties(tableStorage, props, prefix);
+
+        props.put(prefix + "_partition_filter_pig",
+            "${coord:dataInPartitionFilter('" + input.getName() + "', 'pig')}");
+        props.put(prefix + "_partition_filter_hive",
+            "${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}");
+        props.put(prefix + "_partition_filter_java",
+            "${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}");
+    }
+
+    private void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage,
+        Map<String, String> props) {
+        String prefix = "falcon_" + output.getName();
+
+        propagateCommonCatalogTableProperties(tableStorage, props, prefix);
+
+        props.put(prefix + "_dataout_partitions",
+            "${coord:dataOutPartitions('" + output.getName() + "')}");
+        props.put(prefix + "_dated_partition_value", "${coord:dataOutPartitionValue('"
+            + output.getName() + "', '" + tableStorage.getDatedPartitionKey() + "')}");
+    }
+
+    private String join(Iterator<String> itr, char sep) {
+        String joinedStr = StringUtils.join(itr, sep);
+        if (joinedStr.isEmpty()) {
+            joinedStr = "null";
+        }
+        return joinedStr;
+    }
+
+    private String getELExpression(String expr) {
+        if (expr != null) {
+            expr = "${" + expr + "}";
+        }
+        return expr;
     }
 
     @Override
-    public String[] getWorkflowNames(Process process) {
-        return new String[]{EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString()};
+    protected Map<String, String> getEntityProperties() {
+        Map<String, String> props = new HashMap<String, String>();
+        if (entity.getProperties() != null) {
+            for (Property prop : entity.getProperties().getProperties()) {
+                props.put(prop.getName(), prop.getValue());
+            }
+        }
+        return props;
+    }
+
+    private void propagateUserWorkflowProperties(Workflow processWorkflow,
+        Map<String, String> props, String processName) {
+        props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
+            processWorkflow.getName(), processName));
+        props.put("userWorkflowVersion", processWorkflow.getVersion());
+        props.put("userWorkflowEngine", processWorkflow.getEngine().value());
+    }
+
+    protected void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
+        String wfName, Path parentWfPath) throws FalconException {
+        WORKFLOWAPP wfApp = getWorkflowTemplate(DEFAULT_WF_TEMPLATE);
+        wfApp.setName(wfName);
+        try {
+            addLibExtensionsToWorkflow(cluster, wfApp, EntityType.PROCESS, null);
+        } catch (IOException e) {
+            throw new FalconException("Failed to add library extensions for the workflow", e);
+        }
+
+        String userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent()).toString();
+        EngineType engineType = processWorkflow.getEngine();
+        for (Object object : wfApp.getDecisionOrForkOrJoin()) {
+            if (!(object instanceof ACTION)) {
+                continue;
+            }
+
+            ACTION action = (ACTION) object;
+            String actionName = action.getName();
+            if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
+                action.getSubWorkflow().setAppPath("${nameNode}" + userWfPath);
+            } else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
+                decoratePIGAction(cluster, process, action.getPig(), parentWfPath);
+            } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
+                decorateHiveAction(cluster, process, action, parentWfPath);
+            } else if (FALCON_ACTIONS.contains(actionName)) {
+                decorateWithOozieRetries(action);
+            }
+        }
+
+        //Create parent workflow
+        marshal(cluster, wfApp, parentWfPath);
+    }
+
+    private void decoratePIGAction(Cluster cluster, Process process,
+        PIG pigAction, Path parentWfPath) throws FalconException {
+        Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
+        pigAction.setScript("${nameNode}" + userWfPath.toString());
+
+        addPrepareDeleteOutputPath(process, pigAction);
+
+        final List<String> paramList = pigAction.getParam();
+        addInputFeedsAsParams(paramList, process, cluster, EngineType.PIG.name().toLowerCase());
+        addOutputFeedsAsParams(paramList, process, cluster);
+
+        propagateProcessProperties(pigAction, process);
+
+        Storage.TYPE storageType = getStorageType(cluster, process);
+        if (Storage.TYPE.TABLE == storageType) {
+            // adds hive-site.xml in pig classpath
+            setupHiveConfiguration(cluster, parentWfPath, ""); // DO NOT ADD PREFIX!!!
+            pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
+        }
+
+        addArchiveForCustomJars(cluster, pigAction.getArchive(),
+            getUserLibPath(cluster, parentWfPath.getParent()));
+    }
+
+    private void decorateHiveAction(Cluster cluster, Process process, ACTION wfAction,
+        Path parentWfPath) throws FalconException {
+
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(wfAction);
+        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+        Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
+        hiveAction.setScript("${nameNode}" + userWfPath.toString());
+
+        addPrepareDeleteOutputPath(process, hiveAction);
+
+        final List<String> paramList = hiveAction.getParam();
+        addInputFeedsAsParams(paramList, process, cluster, EngineType.HIVE.name().toLowerCase());
+        addOutputFeedsAsParams(paramList, process, cluster);
+
+        propagateProcessProperties(hiveAction, process);
+
+        setupHiveConfiguration(cluster, parentWfPath, "falcon-");
+
+        addArchiveForCustomJars(cluster, hiveAction.getArchive(),
+            getUserLibPath(cluster, parentWfPath.getParent()));
+
+        OozieUtils.marshalHiveAction(wfAction, actionJaxbElement);
+    }
+
+    private void addPrepareDeleteOutputPath(Process process,
+        PIG pigAction) throws FalconException {
+        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
+        if (deleteOutputPathList.isEmpty()) {
+            return;
+        }
+
+        final PREPARE prepare = new PREPARE();
+        final List<DELETE> deleteList = prepare.getDelete();
+
+        for (String deletePath : deleteOutputPathList) {
+            final DELETE delete = new DELETE();
+            delete.setPath(deletePath);
+            deleteList.add(delete);
+        }
+
+        if (!deleteList.isEmpty()) {
+            pigAction.setPrepare(prepare);
+        }
+    }
+
+    private void addPrepareDeleteOutputPath(Process process,
+        org.apache.falcon.oozie.hive.ACTION hiveAction)
+        throws FalconException {
+
+        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
+        if (deleteOutputPathList.isEmpty()) {
+            return;
+        }
+
+        org.apache.falcon.oozie.hive.PREPARE prepare = new org.apache.falcon.oozie.hive.PREPARE();
+        List<org.apache.falcon.oozie.hive.DELETE> deleteList = prepare.getDelete();
+
+        for (String deletePath : deleteOutputPathList) {
+            org.apache.falcon.oozie.hive.DELETE delete = new org.apache.falcon.oozie.hive.DELETE();
+            delete.setPath(deletePath);
+            deleteList.add(delete);
+        }
+
+        if (!deleteList.isEmpty()) {
+            hiveAction.setPrepare(prepare);
+        }
+    }
+
+    private List<String> getPrepareDeleteOutputPathList(Process process) throws FalconException {
+        final List<String> deleteList = new ArrayList<String>();
+        if (process.getOutputs() == null) {
+            return deleteList;
+        }
+
+        for (Output output : process.getOutputs().getOutputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+
+            if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
+                continue; // prepare delete only applies to FileSystem storage
+            }
+
+            deleteList.add("${wf:conf('" + output.getName() + "')}");
+        }
+
+        return deleteList;
+    }
+
+    private void addInputFeedsAsParams(List<String> paramList, Process process, Cluster cluster,
+        String engineType) throws FalconException {
+        if (process.getInputs() == null) {
+            return;
+        }
+
+        for (Input input : process.getInputs().getInputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+
+            final String inputName = input.getName();
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                paramList.add(inputName + "=${" + inputName + "}"); // no prefix for backwards compatibility
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                final String paramName = "falcon_" + inputName; // prefix 'falcon' for new params
+                Map<String, String> props = new HashMap<String, String>();
+                propagateCommonCatalogTableProperties((CatalogStorage) storage, props, paramName);
+                for (String key : props.keySet()) {
+                    paramList.add(key + "=${wf:conf('" + key + "')}");
+                }
+
+                paramList.add(paramName + "_filter=${wf:conf('"
+                    + paramName + "_partition_filter_" + engineType + "')}");
+            }
+        }
+    }
+
+    private void addOutputFeedsAsParams(List<String> paramList, Process process,
+        Cluster cluster) throws FalconException {
+        if (process.getOutputs() == null) {
+            return;
+        }
+
+        for (Output output : process.getOutputs().getOutputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                final String outputName = output.getName();  // no prefix for backwards compatibility
+                paramList.add(outputName + "=${" + outputName + "}");
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                Map<String, String> props = new HashMap<String, String>();
+                propagateCatalogTableProperties(output, (CatalogStorage) storage, props); // prefix is auto added
+                for (String key : props.keySet()) {
+                    paramList.add(key + "=${wf:conf('" + key + "')}");
+                }
+            }
+        }
+    }
+
+    private void propagateProcessProperties(PIG pigAction, Process process) {
+        org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
+        if (processProperties == null) {
+            return;
+        }
+
+        // Propagate user defined properties to job configuration
+        final List<org.apache.falcon.oozie.workflow.CONFIGURATION.Property> configuration =
+            pigAction.getConfiguration().getProperty();
+
+        // Propagate user defined properties to pig script as macros
+        // passed as parameters -p name=value that can be accessed as $name
+        final List<String> paramList = pigAction.getParam();
+
+        for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
+            org.apache.falcon.oozie.workflow.CONFIGURATION.Property configProperty =
+                new org.apache.falcon.oozie.workflow.CONFIGURATION.Property();
+            configProperty.setName(property.getName());
+            configProperty.setValue(property.getValue());
+            configuration.add(configProperty);
+
+            paramList.add(property.getName() + "=" + property.getValue());
+        }
+    }
+
+    private void propagateProcessProperties(org.apache.falcon.oozie.hive.ACTION hiveAction, Process process) {
+        org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
+        if (processProperties == null) {
+            return;
+        }
+
+        // Propagate user defined properties to job configuration
+        final List<org.apache.falcon.oozie.hive.CONFIGURATION.Property> configuration =
+            hiveAction.getConfiguration().getProperty();
+
+        // Propagate user defined properties to pig script as macros
+        // passed as parameters -p name=value that can be accessed as $name
+        final List<String> paramList = hiveAction.getParam();
+
+        for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
+            org.apache.falcon.oozie.hive.CONFIGURATION.Property configProperty =
+                new org.apache.falcon.oozie.hive.CONFIGURATION.Property();
+            configProperty.setName(property.getName());
+            configProperty.setValue(property.getValue());
+            configuration.add(configProperty);
+
+            paramList.add(property.getName() + "=" + property.getValue());
+        }
+    }
+
+    private Storage.TYPE getStorageType(Cluster cluster, Process process) throws FalconException {
+        Storage.TYPE storageType = Storage.TYPE.FILESYSTEM;
+        if (process.getInputs() == null) {
+            return storageType;
+        }
+
+        for (Input input : process.getInputs().getInputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+            storageType = FeedHelper.getStorageType(feed, cluster);
+            if (Storage.TYPE.TABLE == storageType) {
+                break;
+            }
+        }
+
+        return storageType;
+    }
+
+    // creates hive-site.xml configuration in conf dir.
+    private void setupHiveConfiguration(Cluster cluster, Path wfPath,
+        String prefix) throws FalconException {
+        String catalogUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
+        try {
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+            Path confPath = new Path(wfPath, "conf");
+            createHiveConf(fs, confPath, catalogUrl, cluster, prefix);
+        } catch (IOException e) {
+            throw new FalconException(e);
+        }
+    }
+
+    private void addArchiveForCustomJars(Cluster cluster, List<String> archiveList,
+        Path libPath) throws FalconException {
+        if (libPath == null) {
+            return;
+        }
+
+        try {
+            final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
+            if (fs.isFile(libPath)) {  // File, not a Dir
+                archiveList.add(libPath.toString());
+                return;
+            }
+
+            // lib path is a directory, add each file under the lib dir to archive
+            final FileStatus[] fileStatuses = fs.listStatus(libPath, new PathFilter() {
+                @Override
+                public boolean accept(Path path) {
+                    try {
+                        return fs.isFile(path) && path.getName().endsWith(".jar");
+                    } catch (IOException ignore) {
+                        return false;
+                    }
+                }
+            });
+
+            for (FileStatus fileStatus : fileStatuses) {
+                archiveList.add(fileStatus.getPath().toString());
+            }
+        } catch (IOException e) {
+            throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperLateProcessTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperLateProcessTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperLateProcessTest.java
deleted file mode 100644
index fbda0ea..0000000
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperLateProcessTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.converter;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.hadoop.conf.Configuration;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-
-/**
- * Test class for late data processing.
- */
-public class OozieProcessMapperLateProcessTest {
-
-    private static final String CLUSTER_XML = "/config/late/late-cluster.xml";
-    private static final String FEED1_XML = "/config/late/late-feed1.xml";
-    private static final String FEED2_XML = "/config/late/late-feed2.xml";
-    private static final String FEED3_XML = "/config/late/late-feed3.xml";
-    private static final String PROCESS1_XML = "/config/late/late-process1.xml";
-    private static final String PROCESS2_XML = "/config/late/late-process2.xml";
-    private static final ConfigurationStore STORE = ConfigurationStore.get();
-
-    private static EmbeddedCluster dfsCluster;
-
-    @BeforeClass
-    public void setUpDFS() throws Exception {
-
-        cleanupStore();
-
-        dfsCluster = EmbeddedCluster.newCluster("testCluster");
-        Configuration conf = dfsCluster.getConf();
-        String hdfsUrl = conf.get("fs.default.name");
-
-        Cluster cluster = (Cluster) EntityType.CLUSTER.getUnmarshaller()
-                .unmarshal(this.getClass().getResource(CLUSTER_XML));
-        ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
-
-        STORE.publish(EntityType.CLUSTER, cluster);
-
-        Feed feed1 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
-                this.getClass().getResource(FEED1_XML));
-        Feed feed2 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
-                this.getClass().getResource(FEED2_XML));
-        Feed feed3 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
-                this.getClass().getResource(FEED3_XML));
-
-        STORE.publish(EntityType.FEED, feed1);
-        STORE.publish(EntityType.FEED, feed2);
-        STORE.publish(EntityType.FEED, feed3);
-
-        Process process1 = (Process) EntityType.PROCESS.getUnmarshaller()
-                .unmarshal(this.getClass().getResource(PROCESS1_XML));
-        STORE.publish(EntityType.PROCESS, process1);
-        Process process2 = (Process) EntityType.PROCESS.getUnmarshaller()
-                .unmarshal(this.getClass().getResource(PROCESS2_XML));
-        STORE.publish(EntityType.PROCESS, process2);
-    }
-
-    private void cleanupStore() throws FalconException {
-        STORE.remove(EntityType.PROCESS, "late-process1");
-        STORE.remove(EntityType.PROCESS, "late-process2");
-        STORE.remove(EntityType.FEED, "late-feed1");
-        STORE.remove(EntityType.FEED, "late-feed2");
-        STORE.remove(EntityType.FEED, "late-feed3");
-        STORE.remove(EntityType.CLUSTER, "late-cluster");
-    }
-
-    @AfterClass
-    public void tearDown() throws Exception {
-        cleanupStore();
-        dfsCluster.shutdown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
deleted file mode 100644
index 22bf9fe..0000000
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ /dev/null
@@ -1,557 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.converter;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Validity;
-import org.apache.falcon.entity.v0.process.Workflow;
-import org.apache.falcon.messaging.EntityInstanceMessage;
-import org.apache.falcon.oozie.bundle.BUNDLEAPP;
-import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.SYNCDATASET;
-import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.DECISION;
-import org.apache.falcon.oozie.workflow.PIG;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.stream.StreamSource;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-/**
- * Test for the Falcon entities mapping into Oozie artifacts.
- */
-public class OozieProcessMapperTest extends AbstractTestBase {
-
-    private String hdfsUrl;
-    private FileSystem fs;
-
-    @BeforeClass
-    public void setUpDFS() throws Exception {
-        CurrentUser.authenticate("falcon");
-
-        EmbeddedCluster cluster = EmbeddedCluster.newCluster("testCluster");
-        Configuration conf = cluster.getConf();
-        hdfsUrl = conf.get("fs.default.name");
-    }
-
-    @BeforeMethod
-    public void setUp() throws Exception {
-        super.setup();
-
-        ConfigurationStore store = ConfigurationStore.get();
-        Cluster cluster = store.get(EntityType.CLUSTER, "corp");
-        ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
-        ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).setEndpoint("thrift://localhost:49083");
-        fs = new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration());
-        fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/PROCESS/ext.jar")).close();
-
-        Process process = store.get(EntityType.PROCESS, "clicksummary");
-        Path wfpath = new Path(process.getWorkflow().getPath());
-        assert new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration()).mkdirs(wfpath);
-    }
-
-    public void testDefCoordMap(Process process, COORDINATORAPP coord) throws Exception {
-        assertEquals("FALCON_PROCESS_DEFAULT_" + process.getName(), coord.getName());
-        Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
-        assertEquals(SchemaHelper.formatDateUTC(processValidity.getStart()), coord.getStart());
-        assertEquals(SchemaHelper.formatDateUTC(processValidity.getEnd()), coord.getEnd());
-        assertEquals("${coord:" + process.getFrequency().toString() + "}", coord.getFrequency());
-        assertEquals(process.getTimezone().getID(), coord.getTimezone());
-
-        assertEquals(process.getParallel() + "", coord.getControls().getConcurrency());
-        assertEquals(process.getOrder().name(), coord.getControls().getExecution());
-
-        assertEquals(process.getInputs().getInputs().get(0).getName(),
-                coord.getInputEvents().getDataIn().get(0).getName());
-        assertEquals(process.getInputs().getInputs().get(0).getName(),
-                coord.getInputEvents().getDataIn().get(0).getDataset());
-        assertEquals("${" + process.getInputs().getInputs().get(0).getStart() + "}",
-                coord.getInputEvents().getDataIn().get(0).getStartInstance());
-        assertEquals("${" + process.getInputs().getInputs().get(0).getEnd() + "}",
-                coord.getInputEvents().getDataIn().get(0).getEndInstance());
-
-        assertEquals(process.getInputs().getInputs().get(1).getName(),
-                coord.getInputEvents().getDataIn().get(1).getName());
-        assertEquals(process.getInputs().getInputs().get(1).getName(),
-                coord.getInputEvents().getDataIn().get(1).getDataset());
-        assertEquals("${" + process.getInputs().getInputs().get(1).getStart() + "}",
-                coord.getInputEvents().getDataIn().get(1).getStartInstance());
-        assertEquals("${" + process.getInputs().getInputs().get(1).getEnd() + "}",
-                coord.getInputEvents().getDataIn().get(1).getEndInstance());
-
-        assertEquals(process.getOutputs().getOutputs().get(0).getName() + "stats",
-                coord.getOutputEvents().getDataOut().get(1).getName());
-        assertEquals(process.getOutputs().getOutputs().get(0).getName() + "meta",
-                coord.getOutputEvents().getDataOut().get(2).getName());
-        assertEquals(process.getOutputs().getOutputs().get(0).getName() + "tmp",
-                coord.getOutputEvents().getDataOut().get(3).getName());
-
-        assertEquals(process.getOutputs().getOutputs().get(0).getName(),
-                coord.getOutputEvents().getDataOut().get(0).getName());
-        assertEquals("${" + process.getOutputs().getOutputs().get(0).getInstance() + "}",
-                coord.getOutputEvents().getDataOut().get(0).getInstance());
-        assertEquals(process.getOutputs().getOutputs().get(0).getName(),
-                coord.getOutputEvents().getDataOut().get(0).getDataset());
-
-        assertEquals(6, coord.getDatasets().getDatasetOrAsyncDataset().size());
-
-        ConfigurationStore store = ConfigurationStore.get();
-        Feed feed = store.get(EntityType.FEED, process.getInputs().getInputs().get(0).getFeed());
-        SYNCDATASET ds = (SYNCDATASET) coord.getDatasets().getDatasetOrAsyncDataset().get(0);
-
-        final org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
-        assertEquals(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()), ds.getInitialInstance());
-        assertEquals(feed.getTimezone().getID(), ds.getTimezone());
-        assertEquals("${coord:" + feed.getFrequency().toString() + "}", ds.getFrequency());
-        assertEquals("", ds.getDoneFlag());
-        assertEquals(ds.getUriTemplate(),
-                FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA));
-
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-        assertEquals(props.get("mapred.job.priority"), "LOW");
-
-        assertLibExtensions(coord);
-    }
-
-    @Test
-    public void testBundle() throws Exception {
-        String path = StartupProperties.get().getProperty("system.lib.location");
-        if (!new File(path).exists()) {
-            Assert.assertTrue(new File(path).mkdirs());
-        }
-        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
-
-        WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
-        testParentWorkflow(process, parentWorkflow);
-    }
-
-    @Test
-    public void testBundle1() throws Exception {
-        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
-        process.setFrequency(Frequency.fromString("minutes(1)"));
-        process.setTimeout(Frequency.fromString("minutes(15)"));
-
-        WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "30", "15");
-        testParentWorkflow(process, parentWorkflow);
-    }
-
-    @Test
-    public void testPigProcessMapper() throws Exception {
-        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "pig-process");
-        Assert.assertEquals("pig", process.getWorkflow().getEngine().value());
-
-        prepare(process);
-        WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
-        testParentWorkflow(process, parentWorkflow);
-
-        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-
-        ACTION pigActionNode = (ACTION) decisionOrForkOrJoin.get(3);
-        Assert.assertEquals("user-pig-job", pigActionNode.getName());
-
-        final PIG pigAction = pigActionNode.getPig();
-        Assert.assertEquals(pigAction.getScript(), "${nameNode}/falcon/staging/workflows/pig-process/user/id.pig");
-        Assert.assertNotNull(pigAction.getPrepare());
-        Assert.assertEquals(1, pigAction.getPrepare().getDelete().size());
-        Assert.assertFalse(pigAction.getParam().isEmpty());
-        Assert.assertEquals(5, pigAction.getParam().size());
-        Assert.assertEquals(Collections.EMPTY_LIST, pigAction.getArchive());
-        Assert.assertTrue(pigAction.getFile().size() > 0);
-
-        ACTION oozieAction = (ACTION) decisionOrForkOrJoin.get(5);
-        Assert.assertEquals("user-oozie-workflow", oozieAction.getName());
-        Assert.assertEquals("#USER_WF_PATH#", oozieAction.getSubWorkflow().getAppPath());
-    }
-
-    @Test
-    public void testHiveProcessMapper() throws Exception {
-        URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
-        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
-        ConfigurationStore.get().publish(EntityType.FEED, inFeed);
-
-        resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
-        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
-        ConfigurationStore.get().publish(EntityType.FEED, outFeed);
-
-        resource = this.getClass().getResource("/config/process/hive-process.xml");
-        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
-        ConfigurationStore.get().publish(EntityType.PROCESS, process);
-
-        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
-        prepare(process);
-        OozieProcessMapper mapper = new OozieProcessMapper(process);
-        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
-        mapper.map(cluster, bundlePath);
-        assertTrue(fs.exists(bundlePath));
-
-        BUNDLEAPP bundle = getBundle(bundlePath);
-        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
-        assertEquals(1, bundle.getCoordinator().size());
-        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
-                bundle.getCoordinator().get(0).getName());
-        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
-
-        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-
-        // verify table props
-        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
-        for (Map.Entry<String, String> entry : props.entrySet()) {
-            if (expected.containsKey(entry.getKey())) {
-                Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
-            }
-        }
-
-        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
-        testParentWorkflow(process, parentWorkflow);
-
-        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-
-        ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
-        Assert.assertEquals("user-hive-job", hiveNode.getName());
-
-        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = mapper.unMarshalHiveAction(hiveNode);
-        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
-
-        Assert.assertEquals(hiveAction.getScript(),
-                "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
-        Assert.assertNull(hiveAction.getPrepare());
-        Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
-        Assert.assertFalse(hiveAction.getParam().isEmpty());
-        Assert.assertEquals(11, hiveAction.getParam().size());
-    }
-
-    private void prepare(Process process) throws IOException {
-        Path wf = new Path(process.getWorkflow().getPath());
-        fs.mkdirs(wf.getParent());
-        fs.create(wf).close();
-    }
-
-    @Test
-    public void testProcessMapperForTableStorage() throws Exception {
-        URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
-        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
-        ConfigurationStore.get().publish(EntityType.FEED, inFeed);
-
-        resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
-        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
-        ConfigurationStore.get().publish(EntityType.FEED, outFeed);
-
-        resource = this.getClass().getResource("/config/process/pig-process-table.xml");
-        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
-        ConfigurationStore.get().publish(EntityType.PROCESS, process);
-
-        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
-        OozieProcessMapper mapper = new OozieProcessMapper(process);
-        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
-        mapper.map(cluster, bundlePath);
-        assertTrue(fs.exists(bundlePath));
-
-        BUNDLEAPP bundle = getBundle(bundlePath);
-        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
-        assertEquals(1, bundle.getCoordinator().size());
-        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
-                bundle.getCoordinator().get(0).getName());
-        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
-
-        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-
-        // verify table props
-        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
-        for (Map.Entry<String, String> entry : props.entrySet()) {
-            if (expected.containsKey(entry.getKey())) {
-                Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
-            }
-        }
-
-        // verify the late data params
-        Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
-        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
-        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
-
-        // verify the post processing params
-        Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getFeed());
-        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
-    }
-
-    private Map<String, String> getExpectedProperties(Feed inFeed, Feed outFeed, Process process,
-                                                      Cluster cluster) throws FalconException {
-        Map<String, String> expected = new HashMap<String, String>();
-        for (Input input : process.getInputs().getInputs()) {
-            CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, inFeed);
-            propagateStorageProperties(input.getName(), storage, expected);
-        }
-
-        for (Output output : process.getOutputs().getOutputs()) {
-            CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, outFeed);
-            propagateStorageProperties(output.getName(), storage, expected);
-        }
-
-        return expected;
-    }
-
-    private void propagateStorageProperties(String feedName, CatalogStorage tableStorage,
-                                            Map<String, String> props) {
-        String prefix = "falcon_" + feedName;
-        props.put(prefix + "_storage_type", tableStorage.getType().name());
-        props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
-        props.put(prefix + "_database", tableStorage.getDatabase());
-        props.put(prefix + "_table", tableStorage.getTable());
-
-        if (prefix.equals("falcon_input")) {
-            props.put(prefix + "_partition_filter_pig", "${coord:dataInPartitionFilter('input', 'pig')}");
-            props.put(prefix + "_partition_filter_hive", "${coord:dataInPartitionFilter('input', 'hive')}");
-            props.put(prefix + "_partition_filter_java", "${coord:dataInPartitionFilter('input', 'java')}");
-        } else if (prefix.equals("falcon_output")) {
-            props.put(prefix + "_dataout_partitions", "${coord:dataOutPartitions('output')}");
-        }
-    }
-
-    @Test
-    public void testProcessWorkflowMapper() throws Exception {
-        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
-        Workflow processWorkflow = process.getWorkflow();
-        Assert.assertEquals("test", processWorkflow.getName());
-        Assert.assertEquals("1.0.0", processWorkflow.getVersion());
-    }
-
-    @SuppressWarnings("unchecked")
-    private void assertLibExtensions(COORDINATORAPP coord) throws Exception {
-        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
-        WORKFLOWAPP wf = ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
-                fs.open(new Path(wfPath, "workflow.xml")))).getValue();
-        List<Object> actions = wf.getDecisionOrForkOrJoin();
-        for (Object obj : actions) {
-            if (!(obj instanceof ACTION)) {
-                continue;
-            }
-            ACTION action = (ACTION) obj;
-            List<String> files = null;
-            if (action.getJava() != null) {
-                files = action.getJava().getFile();
-            } else if (action.getPig() != null) {
-                files = action.getPig().getFile();
-            } else if (action.getMapReduce() != null) {
-                files = action.getMapReduce().getFile();
-            }
-            if (files != null) {
-                Assert.assertTrue(files.get(files.size() - 1)
-                        .endsWith("/projects/falcon/working/libext/PROCESS/ext.jar"));
-            }
-        }
-    }
-
-    private WORKFLOWAPP initializeProcessMapper(Process process, String throttle, String timeout)
-        throws Exception {
-        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
-        OozieProcessMapper mapper = new OozieProcessMapper(process);
-        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
-        mapper.map(cluster, bundlePath);
-        assertTrue(fs.exists(bundlePath));
-
-        BUNDLEAPP bundle = getBundle(bundlePath);
-        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
-        assertEquals(1, bundle.getCoordinator().size());
-        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
-                bundle.getCoordinator().get(0).getName());
-        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
-
-        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
-        testDefCoordMap(process, coord);
-        assertEquals(coord.getControls().getThrottle(), throttle);
-        assertEquals(coord.getControls().getTimeout(), timeout);
-
-        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        return getParentWorkflow(new Path(wfPath));
-    }
-
-    public void testParentWorkflow(Process process, WORKFLOWAPP parentWorkflow) {
-        Assert.assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(), parentWorkflow.getName());
-
-        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-        Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
-        Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
-        Assert.assertEquals("user-workflow", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
-        Assert.assertEquals("user-pig-job", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
-        Assert.assertEquals("user-hive-job", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
-        Assert.assertEquals("user-oozie-workflow", ((ACTION) decisionOrForkOrJoin.get(5)).getName());
-        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
-        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
-        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(1)).getRetryMax());
-        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(1)).getRetryInterval());
-        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(6)).getRetryMax());
-        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(6)).getRetryInterval());
-        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryMax());
-        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryInterval());
-    }
-
-    private COORDINATORAPP getCoordinator(Path path) throws Exception {
-        String bundleStr = readFile(path);
-
-        Unmarshaller unmarshaller = JAXBContext.newInstance(COORDINATORAPP.class).createUnmarshaller();
-        SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
-        Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-coordinator-0.3.xsd"));
-        unmarshaller.setSchema(schema);
-        JAXBElement<COORDINATORAPP> jaxbBundle = unmarshaller.unmarshal(
-                new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), COORDINATORAPP.class);
-        return jaxbBundle.getValue();
-    }
-
-    @SuppressWarnings("unchecked")
-    private WORKFLOWAPP getParentWorkflow(Path path) throws Exception {
-        String workflow = readFile(new Path(path, "workflow.xml"));
-
-        JAXBContext wfAppContext = JAXBContext.newInstance(WORKFLOWAPP.class);
-        Unmarshaller unmarshaller = wfAppContext.createUnmarshaller();
-        return ((JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
-                new StreamSource(new ByteArrayInputStream(workflow.trim().getBytes())))).getValue();
-    }
-
-    private BUNDLEAPP getBundle(Path path) throws Exception {
-        String bundleStr = readFile(new Path(path, "bundle.xml"));
-
-        Unmarshaller unmarshaller = JAXBContext.newInstance(BUNDLEAPP.class).createUnmarshaller();
-        SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
-        Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-bundle-0.1.xsd"));
-        unmarshaller.setSchema(schema);
-        JAXBElement<BUNDLEAPP> jaxbBundle = unmarshaller.unmarshal(
-                new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), BUNDLEAPP.class);
-        return jaxbBundle.getValue();
-    }
-
-    private String readFile(Path path) throws Exception {
-        BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
-        String line;
-        StringBuilder contents = new StringBuilder();
-        while ((line = reader.readLine()) != null) {
-            contents.append(line);
-        }
-        return contents.toString();
-    }
-
-    @Override
-    @AfterMethod
-    public void cleanup() throws Exception {
-        super.cleanup();
-        ConfigurationStore.get().remove(EntityType.PROCESS, "table-process");
-        ConfigurationStore.get().remove(EntityType.FEED, "clicks-raw-table");
-        ConfigurationStore.get().remove(EntityType.FEED, "clicks-summary-table");
-        ConfigurationStore.get().remove(EntityType.PROCESS, "dumb-process");
-    }
-
-    @Test
-    public void testProcessWithNoInputsAndOutputs() throws Exception {
-        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
-        ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
-
-        URL resource = this.getClass().getResource("/config/process/dumb-process.xml");
-        Process processEntity = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
-        ConfigurationStore.get().publish(EntityType.PROCESS, processEntity);
-
-        OozieProcessMapper mapper = new OozieProcessMapper(processEntity);
-        Path bundlePath = new Path("/falcon/staging/workflows", processEntity.getName());
-        mapper.map(cluster, bundlePath);
-        assertTrue(fs.exists(bundlePath));
-
-        BUNDLEAPP bundle = getBundle(bundlePath);
-        assertEquals(EntityUtil.getWorkflowName(processEntity).toString(), bundle.getName());
-        assertEquals(1, bundle.getCoordinator().size());
-        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, processEntity).toString(),
-                bundle.getCoordinator().get(0).getName());
-        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
-
-        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-
-        String[] expected = {
-            EntityInstanceMessage.ARG.feedNames.getPropName(),
-            EntityInstanceMessage.ARG.feedInstancePaths.getPropName(),
-            "falconInputFeeds",
-            "falconInPaths",
-            "userWorkflowName",
-            "userWorkflowVersion",
-            "userWorkflowEngine",
-        };
-
-        for (String property : expected) {
-            Assert.assertTrue(props.containsKey(property), "expected property missing: " + property);
-        }
-    }
-}