You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/03/18 12:41:06 UTC

[1/5] FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. Contributed by Shwetha GS

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 5e4352151 -> e2545b087


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
new file mode 100644
index 0000000..44f5d80
--- /dev/null
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
@@ -0,0 +1,559 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.converter;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Validity;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.messaging.EntityInstanceMessage;
+import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.DECISION;
+import org.apache.falcon.oozie.workflow.PIG;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.OozieUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.OozieProcessWorkflowBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Test for the Falcon entities mapping into Oozie artifacts.
+ */
+public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
+
+    private String hdfsUrl;
+    private FileSystem fs;
+
+    @BeforeClass
+    public void setUpDFS() throws Exception {
+        CurrentUser.authenticate("falcon");
+
+        EmbeddedCluster cluster = EmbeddedCluster.newCluster("testCluster");
+        Configuration conf = cluster.getConf();
+        hdfsUrl = conf.get("fs.default.name");
+    }
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        super.setup();
+
+        ConfigurationStore store = ConfigurationStore.get();
+        Cluster cluster = store.get(EntityType.CLUSTER, "corp");
+        ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
+        ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).setEndpoint("thrift://localhost:49083");
+        fs = new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration());
+        fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/PROCESS/ext.jar")).close();
+
+        Process process = store.get(EntityType.PROCESS, "clicksummary");
+        Path wfpath = new Path(process.getWorkflow().getPath());
+        assert new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration()).mkdirs(wfpath);
+    }
+
+    public void testDefCoordMap(Process process, COORDINATORAPP coord) throws Exception {
+        assertEquals("FALCON_PROCESS_DEFAULT_" + process.getName(), coord.getName());
+        Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
+        assertEquals(SchemaHelper.formatDateUTC(processValidity.getStart()), coord.getStart());
+        assertEquals(SchemaHelper.formatDateUTC(processValidity.getEnd()), coord.getEnd());
+        assertEquals("${coord:" + process.getFrequency().toString() + "}", coord.getFrequency());
+        assertEquals(process.getTimezone().getID(), coord.getTimezone());
+
+        assertEquals(process.getParallel() + "", coord.getControls().getConcurrency());
+        assertEquals(process.getOrder().name(), coord.getControls().getExecution());
+
+        assertEquals(process.getInputs().getInputs().get(0).getName(),
+                coord.getInputEvents().getDataIn().get(0).getName());
+        assertEquals(process.getInputs().getInputs().get(0).getName(),
+                coord.getInputEvents().getDataIn().get(0).getDataset());
+        assertEquals("${" + process.getInputs().getInputs().get(0).getStart() + "}",
+                coord.getInputEvents().getDataIn().get(0).getStartInstance());
+        assertEquals("${" + process.getInputs().getInputs().get(0).getEnd() + "}",
+                coord.getInputEvents().getDataIn().get(0).getEndInstance());
+
+        assertEquals(process.getInputs().getInputs().get(1).getName(),
+                coord.getInputEvents().getDataIn().get(1).getName());
+        assertEquals(process.getInputs().getInputs().get(1).getName(),
+                coord.getInputEvents().getDataIn().get(1).getDataset());
+        assertEquals("${" + process.getInputs().getInputs().get(1).getStart() + "}",
+                coord.getInputEvents().getDataIn().get(1).getStartInstance());
+        assertEquals("${" + process.getInputs().getInputs().get(1).getEnd() + "}",
+                coord.getInputEvents().getDataIn().get(1).getEndInstance());
+
+        assertEquals(process.getOutputs().getOutputs().get(0).getName() + "stats",
+                coord.getOutputEvents().getDataOut().get(1).getName());
+        assertEquals(process.getOutputs().getOutputs().get(0).getName() + "meta",
+                coord.getOutputEvents().getDataOut().get(2).getName());
+        assertEquals(process.getOutputs().getOutputs().get(0).getName() + "tmp",
+                coord.getOutputEvents().getDataOut().get(3).getName());
+
+        assertEquals(process.getOutputs().getOutputs().get(0).getName(),
+                coord.getOutputEvents().getDataOut().get(0).getName());
+        assertEquals("${" + process.getOutputs().getOutputs().get(0).getInstance() + "}",
+                coord.getOutputEvents().getDataOut().get(0).getInstance());
+        assertEquals(process.getOutputs().getOutputs().get(0).getName(),
+                coord.getOutputEvents().getDataOut().get(0).getDataset());
+
+        assertEquals(6, coord.getDatasets().getDatasetOrAsyncDataset().size());
+
+        ConfigurationStore store = ConfigurationStore.get();
+        Feed feed = store.get(EntityType.FEED, process.getInputs().getInputs().get(0).getFeed());
+        SYNCDATASET ds = (SYNCDATASET) coord.getDatasets().getDatasetOrAsyncDataset().get(0);
+
+        final org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
+        assertEquals(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()), ds.getInitialInstance());
+        assertEquals(feed.getTimezone().getID(), ds.getTimezone());
+        assertEquals("${coord:" + feed.getFrequency().toString() + "}", ds.getFrequency());
+        assertEquals("", ds.getDoneFlag());
+        assertEquals(ds.getUriTemplate(),
+                FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA));
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+        assertEquals(props.get("mapred.job.priority"), "LOW");
+
+        assertLibExtensions(coord);
+    }
+
+    @Test
+    public void testBundle() throws Exception {
+        String path = StartupProperties.get().getProperty("system.lib.location");
+        if (!new File(path).exists()) {
+            Assert.assertTrue(new File(path).mkdirs());
+        }
+        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
+
+        WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
+        testParentWorkflow(process, parentWorkflow);
+    }
+
+    @Test
+    public void testBundle1() throws Exception {
+        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
+        process.setFrequency(Frequency.fromString("minutes(1)"));
+        process.setTimeout(Frequency.fromString("minutes(15)"));
+
+        WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "30", "15");
+        testParentWorkflow(process, parentWorkflow);
+    }
+
+    @Test
+    public void testPigProcessMapper() throws Exception {
+        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "pig-process");
+        Assert.assertEquals("pig", process.getWorkflow().getEngine().value());
+
+        prepare(process);
+        WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
+        testParentWorkflow(process, parentWorkflow);
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+        ACTION pigActionNode = (ACTION) decisionOrForkOrJoin.get(3);
+        Assert.assertEquals("user-pig-job", pigActionNode.getName());
+
+        final PIG pigAction = pigActionNode.getPig();
+        Assert.assertEquals(pigAction.getScript(), "${nameNode}/falcon/staging/workflows/pig-process/user/id.pig");
+        Assert.assertNotNull(pigAction.getPrepare());
+        Assert.assertEquals(1, pigAction.getPrepare().getDelete().size());
+        Assert.assertFalse(pigAction.getParam().isEmpty());
+        Assert.assertEquals(5, pigAction.getParam().size());
+        Assert.assertEquals(Collections.EMPTY_LIST, pigAction.getArchive());
+        Assert.assertTrue(pigAction.getFile().size() > 0);
+
+        ACTION oozieAction = (ACTION) decisionOrForkOrJoin.get(5);
+        Assert.assertEquals("user-oozie-workflow", oozieAction.getName());
+        Assert.assertEquals("#USER_WF_PATH#", oozieAction.getSubWorkflow().getAppPath());
+    }
+
+    @Test
+    public void testHiveProcessMapper() throws Exception {
+        URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
+        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, inFeed);
+
+        resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, outFeed);
+
+        resource = this.getClass().getResource("/config/process/hive-process.xml");
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
+        prepare(process);
+        OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process);
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+        builder.map(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        // verify table props
+        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
+        for (Map.Entry<String, String> entry : props.entrySet()) {
+            if (expected.containsKey(entry.getKey())) {
+                Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
+            }
+        }
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+        testParentWorkflow(process, parentWorkflow);
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+        ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
+        Assert.assertEquals("user-hive-job", hiveNode.getName());
+
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
+        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+        Assert.assertEquals(hiveAction.getScript(),
+                "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+        Assert.assertNull(hiveAction.getPrepare());
+        Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
+        Assert.assertFalse(hiveAction.getParam().isEmpty());
+        Assert.assertEquals(11, hiveAction.getParam().size());
+    }
+
+    private void prepare(Process process) throws IOException {
+        Path wf = new Path(process.getWorkflow().getPath());
+        fs.mkdirs(wf.getParent());
+        fs.create(wf).close();
+    }
+
+    @Test
+    public void testProcessMapperForTableStorage() throws Exception {
+        URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
+        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, inFeed);
+
+        resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, outFeed);
+
+        resource = this.getClass().getResource("/config/process/pig-process-table.xml");
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
+        OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process);
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+        builder.map(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        // verify table props
+        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
+        for (Map.Entry<String, String> entry : props.entrySet()) {
+            if (expected.containsKey(entry.getKey())) {
+                Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
+            }
+        }
+
+        // verify the late data params
+        Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
+        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getFeed());
+        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+    }
+
+    private Map<String, String> getExpectedProperties(Feed inFeed, Feed outFeed, Process process,
+                                                      Cluster cluster) throws FalconException {
+        Map<String, String> expected = new HashMap<String, String>();
+        for (Input input : process.getInputs().getInputs()) {
+            CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, inFeed);
+            propagateStorageProperties(input.getName(), storage, expected);
+        }
+
+        for (Output output : process.getOutputs().getOutputs()) {
+            CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, outFeed);
+            propagateStorageProperties(output.getName(), storage, expected);
+        }
+
+        return expected;
+    }
+
+    private void propagateStorageProperties(String feedName, CatalogStorage tableStorage,
+                                            Map<String, String> props) {
+        String prefix = "falcon_" + feedName;
+        props.put(prefix + "_storage_type", tableStorage.getType().name());
+        props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
+        props.put(prefix + "_database", tableStorage.getDatabase());
+        props.put(prefix + "_table", tableStorage.getTable());
+
+        if (prefix.equals("falcon_input")) {
+            props.put(prefix + "_partition_filter_pig", "${coord:dataInPartitionFilter('input', 'pig')}");
+            props.put(prefix + "_partition_filter_hive", "${coord:dataInPartitionFilter('input', 'hive')}");
+            props.put(prefix + "_partition_filter_java", "${coord:dataInPartitionFilter('input', 'java')}");
+        } else if (prefix.equals("falcon_output")) {
+            props.put(prefix + "_dataout_partitions", "${coord:dataOutPartitions('output')}");
+        }
+    }
+
+    @Test
+    public void testProcessWorkflowMapper() throws Exception {
+        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
+        Workflow processWorkflow = process.getWorkflow();
+        Assert.assertEquals("test", processWorkflow.getName());
+        Assert.assertEquals("1.0.0", processWorkflow.getVersion());
+    }
+
+    @SuppressWarnings("unchecked")
+    private void assertLibExtensions(COORDINATORAPP coord) throws Exception {
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
+        WORKFLOWAPP wf = ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
+                fs.open(new Path(wfPath, "workflow.xml")))).getValue();
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+            ACTION action = (ACTION) obj;
+            List<String> files = null;
+            if (action.getJava() != null) {
+                files = action.getJava().getFile();
+            } else if (action.getPig() != null) {
+                files = action.getPig().getFile();
+            } else if (action.getMapReduce() != null) {
+                files = action.getMapReduce().getFile();
+            }
+            if (files != null) {
+                Assert.assertTrue(files.get(files.size() - 1)
+                        .endsWith("/projects/falcon/working/libext/PROCESS/ext.jar"));
+            }
+        }
+    }
+
+    private WORKFLOWAPP initializeProcessMapper(Process process, String throttle, String timeout)
+        throws Exception {
+        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
+        OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process);
+        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+        builder.map(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+        testDefCoordMap(process, coord);
+        assertEquals(coord.getControls().getThrottle(), throttle);
+        assertEquals(coord.getControls().getTimeout(), timeout);
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        return getParentWorkflow(new Path(wfPath));
+    }
+
+    public void testParentWorkflow(Process process, WORKFLOWAPP parentWorkflow) {
+        Assert.assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(), parentWorkflow.getName());
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+        Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
+        Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
+        Assert.assertEquals("user-workflow", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
+        Assert.assertEquals("user-pig-job", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
+        Assert.assertEquals("user-hive-job", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
+        Assert.assertEquals("user-oozie-workflow", ((ACTION) decisionOrForkOrJoin.get(5)).getName());
+        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
+        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
+        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(1)).getRetryMax());
+        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(1)).getRetryInterval());
+        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(6)).getRetryMax());
+        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(6)).getRetryInterval());
+        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryMax());
+        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryInterval());
+    }
+
+    private COORDINATORAPP getCoordinator(Path path) throws Exception {
+        String bundleStr = readFile(path);
+
+        Unmarshaller unmarshaller = JAXBContext.newInstance(COORDINATORAPP.class).createUnmarshaller();
+        SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
+        Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-coordinator-0.3.xsd"));
+        unmarshaller.setSchema(schema);
+        JAXBElement<COORDINATORAPP> jaxbBundle = unmarshaller.unmarshal(
+                new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), COORDINATORAPP.class);
+        return jaxbBundle.getValue();
+    }
+
+    @SuppressWarnings("unchecked")
+    private WORKFLOWAPP getParentWorkflow(Path path) throws Exception {
+        String workflow = readFile(new Path(path, "workflow.xml"));
+
+        JAXBContext wfAppContext = JAXBContext.newInstance(WORKFLOWAPP.class);
+        Unmarshaller unmarshaller = wfAppContext.createUnmarshaller();
+        return ((JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
+                new StreamSource(new ByteArrayInputStream(workflow.trim().getBytes())))).getValue();
+    }
+
+    private BUNDLEAPP getBundle(Path path) throws Exception {
+        String bundleStr = readFile(new Path(path, "bundle.xml"));
+
+        Unmarshaller unmarshaller = JAXBContext.newInstance(BUNDLEAPP.class).createUnmarshaller();
+        SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
+        Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-bundle-0.1.xsd"));
+        unmarshaller.setSchema(schema);
+        JAXBElement<BUNDLEAPP> jaxbBundle = unmarshaller.unmarshal(
+                new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), BUNDLEAPP.class);
+        return jaxbBundle.getValue();
+    }
+
+    private String readFile(Path path) throws Exception {
+        BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
+        String line;
+        StringBuilder contents = new StringBuilder();
+        while ((line = reader.readLine()) != null) {
+            contents.append(line);
+        }
+        return contents.toString();
+    }
+
+    @Override
+    @AfterMethod
+    public void cleanup() throws Exception {
+        super.cleanup();
+        ConfigurationStore.get().remove(EntityType.PROCESS, "table-process");
+        ConfigurationStore.get().remove(EntityType.FEED, "clicks-raw-table");
+        ConfigurationStore.get().remove(EntityType.FEED, "clicks-summary-table");
+        ConfigurationStore.get().remove(EntityType.PROCESS, "dumb-process");
+    }
+
+    @Test
+    public void testProcessWithNoInputsAndOutputs() throws Exception {
+        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
+        ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
+
+        URL resource = this.getClass().getResource("/config/process/dumb-process.xml");
+        Process processEntity = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, processEntity);
+
+        OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(processEntity);
+        Path bundlePath = new Path("/falcon/staging/workflows", processEntity.getName());
+        builder.map(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(processEntity).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, processEntity).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        String[] expected = {
+            EntityInstanceMessage.ARG.feedNames.getPropName(),
+            EntityInstanceMessage.ARG.feedInstancePaths.getPropName(),
+            "falconInputFeeds",
+            "falconInPaths",
+            "userWorkflowName",
+            "userWorkflowVersion",
+            "userWorkflowEngine",
+        };
+
+        for (String property : expected) {
+            Assert.assertTrue(props.containsKey(property), "expected property missing: " + property);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
----------------------------------------------------------------------
diff --git a/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java b/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
index 1e7cc04..eb4173e 100644
--- a/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
+++ b/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
@@ -363,7 +363,7 @@ public class FeedEvictorTest {
         }
     }
 
-    @Test
+    @Test(enabled = false)
     public void testEvictionWithEmptyDirs() throws Exception {
         try {
             Configuration conf = cluster.getConf();


[3/5] FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. Contributed by Shwetha GS

Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
index e5a01ca..990fdc5 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
@@ -18,22 +18,64 @@
 
 package org.apache.falcon.workflow;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.FalconRuntimException;
+import org.apache.falcon.Tag;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.ExternalId;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Property;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.bundle.COORDINATOR;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.ObjectFactory;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.service.FalconPathFilter;
+import org.apache.falcon.service.SharedLibraryHostingService;
+import org.apache.falcon.util.OozieUtils;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.engine.OozieWorkflowEngine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Logger;
 import org.apache.oozie.client.OozieClient;
 
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.util.Arrays;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.Set;
 
 /**
  * Base workflow builder for falcon entities.
@@ -44,6 +86,334 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
     private static final Logger LOG = Logger.getLogger(OozieWorkflowBuilder.class);
     protected static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();
 
+    protected static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}";
+    protected static final String ACTUAL_TIME_EL = "${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}";
+
+    protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
+    protected static final String MR_QUEUE_NAME = "queueName";
+    protected static final String MR_JOB_PRIORITY = "jobPriority";
+
+    public static final Set<String> FALCON_ACTIONS = new HashSet<String>(
+        Arrays.asList(new String[]{"recordsize", "succeeded-post-processing", "failed-post-processing", }));
+
+    protected static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
+        @Override
+        public boolean accept(Path path) {
+            return path.getName().startsWith("falcon");
+        }
+
+        @Override
+        public String getJarName(Path path) {
+            String name = path.getName();
+            if (name.endsWith(".jar")) {
+                name = name.substring(0, name.indexOf(".jar"));
+            }
+            return name;
+        }
+    };
+
+    protected OozieWorkflowBuilder(T entity) {
+        super(entity);
+    }
+
+    protected Path getCoordPath(Path bundlePath, String coordName) {
+        Tag tag = EntityUtil.getWorkflowNameTag(coordName, entity);
+        return new Path(bundlePath, tag.name());
+    }
+
+    protected abstract Map<String, String> getEntityProperties();
+
+    public boolean map(Cluster cluster, Path bundlePath) throws FalconException {
+        BUNDLEAPP bundleApp = new BUNDLEAPP();
+        bundleApp.setName(EntityUtil.getWorkflowName(entity).toString());
+        // all the properties are set prior to bundle and coordinators creation
+
+        List<COORDINATORAPP> coordinators = getCoordinators(cluster, bundlePath);
+        if (coordinators.size() == 0) {
+            return false;
+        }
+        for (COORDINATORAPP coordinatorapp : coordinators) {
+            Path coordPath = getCoordPath(bundlePath, coordinatorapp.getName());
+            String coordXmlName = marshal(cluster, coordinatorapp, coordPath,
+                EntityUtil.getWorkflowNameSuffix(coordinatorapp.getName(), entity));
+            createLogsDir(cluster, coordPath);
+            COORDINATOR bundleCoord = new COORDINATOR();
+            bundleCoord.setName(coordinatorapp.getName());
+            bundleCoord.setAppPath(getStoragePath(coordPath) + "/" + coordXmlName);
+            bundleApp.getCoordinator().add(bundleCoord);
+
+            copySharedLibs(cluster, coordPath);
+        }
+
+        marshal(cluster, bundleApp, bundlePath);
+        return true;
+    }
+
+    private void addExtensionJars(FileSystem fs, Path path, WORKFLOWAPP wf) throws IOException {
+        FileStatus[] libs = null;
+        try {
+            libs = fs.listStatus(path);
+        } catch(FileNotFoundException ignore) {
+            //Ok if the libext is not configured
+        }
+
+        if (libs == null) {
+            return;
+        }
+
+        for(FileStatus lib : libs) {
+            if (lib.isDir()) {
+                continue;
+            }
+
+            for(Object obj: wf.getDecisionOrForkOrJoin()) {
+                if (!(obj instanceof ACTION)) {
+                    continue;
+                }
+                ACTION action = (ACTION) obj;
+                List<String> files = null;
+                if (action.getJava() != null) {
+                    files = action.getJava().getFile();
+                } else if (action.getPig() != null) {
+                    files = action.getPig().getFile();
+                } else if (action.getMapReduce() != null) {
+                    files = action.getMapReduce().getFile();
+                }
+                if (files != null) {
+                    files.add(lib.getPath().toString());
+                }
+            }
+        }
+    }
+
+    protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, EntityType type, String lifecycle)
+        throws IOException, FalconException {
+        String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
+        addExtensionJars(fs, new Path(libext), wf);
+        addExtensionJars(fs, new Path(libext, type.name()), wf);
+        if (StringUtils.isNotEmpty(lifecycle)) {
+            addExtensionJars(fs, new Path(libext, type.name() + "/" + lifecycle), wf);
+        }
+    }
+
+    private void copySharedLibs(Cluster cluster, Path coordPath) throws FalconException {
+        try {
+            Path libPath = new Path(coordPath, "lib");
+            SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
+                libPath, cluster, FALCON_JAR_FILTER);
+        } catch (IOException e) {
+            throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);
+        }
+    }
+
+    public abstract List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException;
+
+    protected org.apache.falcon.oozie.coordinator.CONFIGURATION getCoordConfig(Map<String, String> propMap) {
+        org.apache.falcon.oozie.coordinator.CONFIGURATION conf
+            = new org.apache.falcon.oozie.coordinator.CONFIGURATION();
+        List<org.apache.falcon.oozie.coordinator.CONFIGURATION.Property> props = conf.getProperty();
+        for (Entry<String, String> prop : propMap.entrySet()) {
+            props.add(createCoordProperty(prop.getKey(), prop.getValue()));
+        }
+        return conf;
+    }
+
+    protected Map<String, String> createCoordDefaultConfiguration(Cluster cluster, Path coordPath, String coordName) {
+        Map<String, String> props = new HashMap<String, String>();
+        props.put(ARG.entityName.getPropName(), entity.getName());
+        props.put(ARG.nominalTime.getPropName(), NOMINAL_TIME_EL);
+        props.put(ARG.timeStamp.getPropName(), ACTUAL_TIME_EL);
+        props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster));
+        props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster));
+        String falconBrokerUrl = StartupProperties.get().getProperty(ARG.brokerUrl.getPropName(),
+            "tcp://localhost:61616?daemon=true");
+        props.put(ARG.brokerUrl.getPropName(), falconBrokerUrl);
+        String falconBrokerImplClass = StartupProperties.get().getProperty(ARG.brokerImplClass.getPropName(),
+            ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
+        props.put(ARG.brokerImplClass.getPropName(), falconBrokerImplClass);
+        String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
+            DEFAULT_BROKER_MSG_TTL.toString());
+        props.put(ARG.brokerTTL.getPropName(), jmsMessageTTL);
+        props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
+        props.put("logDir", getStoragePath(new Path(coordPath, "../../logs")));
+        props.put(OozieClient.EXTERNAL_ID,
+            new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
+                "${coord:nominalTime()}").getId());
+        props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster));
+        try {
+            if (EntityUtil.getLateProcess(entity) == null
+                || EntityUtil.getLateProcess(entity).getLateInputs() == null
+                || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
+                props.put("shouldRecord", "false");
+            } else {
+                props.put("shouldRecord", "true");
+            }
+        } catch (FalconException e) {
+            LOG.error("Unable to get Late Process for entity:" + entity, e);
+            throw new FalconRuntimException(e);
+        }
+        props.put("entityName", entity.getName());
+        props.put("entityType", entity.getEntityType().name().toLowerCase());
+        props.put(ARG.cluster.getPropName(), cluster.getName());
+        if (cluster.getProperties() != null) {
+            for (Property prop : cluster.getProperties().getProperties()) {
+                props.put(prop.getName(), prop.getValue());
+            }
+        }
+
+        props.put(MR_QUEUE_NAME, "default");
+        props.put(MR_JOB_PRIORITY, "NORMAL");
+        //props in entity override the set props.
+        props.putAll(getEntityProperties());
+        return props;
+    }
+
+    protected org.apache.falcon.oozie.coordinator.CONFIGURATION.Property createCoordProperty(String name,
+        String value) {
+        org.apache.falcon.oozie.coordinator.CONFIGURATION.Property prop
+            = new org.apache.falcon.oozie.coordinator.CONFIGURATION.Property();
+        prop.setName(name);
+        prop.setValue(value);
+        return prop;
+    }
+
+    protected void marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
+        throws FalconException {
+        try {
+            Marshaller marshaller = jaxbContext.createMarshaller();
+            marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(
+                outPath.toUri(), ClusterHelper.getConfiguration(cluster));
+            OutputStream out = fs.create(outPath);
+            try {
+                marshaller.marshal(jaxbElement, out);
+            } finally {
+                out.close();
+            }
+            if (LOG.isDebugEnabled()) {
+                StringWriter writer = new StringWriter();
+                marshaller.marshal(jaxbElement, writer);
+                LOG.debug("Writing definition to " + outPath + " on cluster " + cluster.getName());
+                LOG.debug(writer.getBuffer());
+            }
+
+            LOG.info("Marshalled " + jaxbElement.getDeclaredType() + " to " + outPath);
+        } catch (Exception e) {
+            throw new FalconException("Unable to marshall app object", e);
+        }
+    }
+
+    private void createLogsDir(Cluster cluster, Path coordPath) throws FalconException {
+        try {
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(
+                coordPath.toUri(), ClusterHelper.getConfiguration(cluster));
+            Path logsDir = new Path(coordPath, "../../logs");
+            fs.mkdirs(logsDir);
+
+            // logs are copied with in oozie as the user in Post Processing and hence 777 permissions
+            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+            fs.setPermission(logsDir, permission);
+        } catch (Exception e) {
+            throw new FalconException("Unable to create temp dir in " + coordPath, e);
+        }
+    }
+
+    protected String marshal(Cluster cluster, COORDINATORAPP coord, Path outPath, String name) throws FalconException {
+        if (StringUtils.isEmpty(name)) {
+            name = "coordinator";
+        }
+        name = name + ".xml";
+        marshal(cluster, new ObjectFactory().createCoordinatorApp(coord), OozieUtils.COORD_JAXB_CONTEXT,
+            new Path(outPath, name));
+        return name;
+    }
+
+    protected void marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
+        marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
+            OozieUtils.BUNDLE_JAXB_CONTEXT, new Path(outPath, "bundle.xml"));
+    }
+
+    protected void marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
+        marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
+            OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
+    }
+
+    protected String getStoragePath(Path path) {
+        if (path != null) {
+            return getStoragePath(path.toString());
+        }
+        return null;
+    }
+
+    protected String getStoragePath(String path) {
+        if (StringUtils.isNotEmpty(path)) {
+            if (new Path(path).toUri().getScheme() == null) {
+                path = "${nameNode}" + path;
+            }
+        }
+        return path;
+    }
+
+    protected WORKFLOWAPP getWorkflowTemplate(String template) throws FalconException {
+        InputStream resourceAsStream = null;
+        try {
+            resourceAsStream = OozieWorkflowBuilder.class.getResourceAsStream(template);
+            Unmarshaller unmarshaller = OozieUtils.WORKFLOW_JAXB_CONTEXT.createUnmarshaller();
+            @SuppressWarnings("unchecked")
+            JAXBElement<WORKFLOWAPP> jaxbElement = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
+                resourceAsStream);
+            return jaxbElement.getValue();
+        } catch (JAXBException e) {
+            throw new FalconException(e);
+        } finally {
+            IOUtils.closeQuietly(resourceAsStream);
+        }
+    }
+
+    protected COORDINATORAPP getCoordinatorTemplate(String template) throws FalconException {
+        InputStream resourceAsStream = null;
+        try {
+            resourceAsStream = OozieWorkflowBuilder.class.getResourceAsStream(template);
+            Unmarshaller unmarshaller = OozieUtils.COORD_JAXB_CONTEXT.createUnmarshaller();
+            @SuppressWarnings("unchecked")
+            JAXBElement<COORDINATORAPP> jaxbElement = (JAXBElement<COORDINATORAPP>)
+                unmarshaller.unmarshal(resourceAsStream);
+            return jaxbElement.getValue();
+        } catch (JAXBException e) {
+            throw new FalconException(e);
+        } finally {
+            IOUtils.closeQuietly(resourceAsStream);
+        }
+    }
+
+    protected void createHiveConf(FileSystem fs, Path confPath, String metastoreUrl,
+        Cluster cluster, String prefix) throws IOException {
+        Configuration hiveConf = new Configuration(false);
+        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUrl);
+        hiveConf.set("hive.metastore.local", "false");
+
+        if (UserGroupInformation.isSecurityEnabled()) {
+            hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
+                ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL));
+            hiveConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
+        }
+
+        OutputStream out = null;
+        try {
+            out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
+            hiveConf.writeXml(out);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+
+    protected void decorateWithOozieRetries(ACTION action) {
+        Properties props = RuntimeProperties.get();
+        action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
+        action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
+    }
+
     protected Properties createAppProperties(String clusterName, Path bundlePath, String user) throws FalconException {
         Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName);
         Properties properties = new Properties();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index ac8862e..d819e93 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -116,7 +116,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
         if (!schedClusters.isEmpty()) {
             WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
-            Map<String, Properties> newFlows = builder.newWorkflowSchedule(entity, schedClusters);
+            Map<String, Properties> newFlows = builder.newWorkflowSchedule(schedClusters.toArray(new
+                String[schedClusters.size()]));
             for (Map.Entry<String, Properties> entry : newFlows.entrySet()) {
                 String cluster = entry.getKey();
                 LOG.info("Scheduling " + entity.toShortString() + " on cluster " + cluster);
@@ -380,7 +381,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
             Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
             List<Instance> runInstances = new ArrayList<Instance>();
-            String[] wfNames = builder.getWorkflowNames(entity);
+            String[] wfNames = builder.getWorkflowNames();
             List<String> coordNames = new ArrayList<String>();
             for (String wfName : wfNames) {
                 if (EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString().equals(wfName)) {
@@ -1059,11 +1060,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     private String scheduleForUpdate(Entity entity, String cluster, Date startDate, String user)
         throws FalconException {
-        WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
-        Properties bundleProps = builder.newWorkflowSchedule(entity, startDate, cluster, user);
+        Entity clone = entity.copy();
+        EntityUtil.setStartDate(entity, cluster, startDate);
+        WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, clone);
+        Map<String, Properties> bundleProps = builder.newWorkflowSchedule(cluster);
         LOG.info("Scheduling " + entity.toShortString() + " on cluster " + cluster + " with props " + bundleProps);
-        if (bundleProps != null) {
-            return scheduleEntity(cluster, bundleProps, entity);
+        if (bundleProps != null && bundleProps.size() > 0) {
+            return scheduleEntity(cluster, bundleProps.get(cluster), entity);
         } else {
             LOG.info("No new workflow to be scheduled for this " + entity.toShortString());
             return null;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
deleted file mode 100644
index e638961..0000000
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ /dev/null
@@ -1,833 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.converter;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.ProcessHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Property;
-import org.apache.falcon.entity.v0.process.Workflow;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.oozie.coordinator.CONTROLS;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.DATAIN;
-import org.apache.falcon.oozie.coordinator.DATAOUT;
-import org.apache.falcon.oozie.coordinator.DATASETS;
-import org.apache.falcon.oozie.coordinator.INPUTEVENTS;
-import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
-import org.apache.falcon.oozie.coordinator.SYNCDATASET;
-import org.apache.falcon.oozie.coordinator.WORKFLOW;
-import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.DELETE;
-import org.apache.falcon.oozie.workflow.PIG;
-import org.apache.falcon.oozie.workflow.PREPARE;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.update.UpdateHelper;
-import org.apache.hadoop.fs.*;
-import org.apache.xerces.dom.ElementNSImpl;
-import org.w3c.dom.Document;
-
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.dom.DOMResult;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class maps the Falcon entities into Oozie artifacts.
- */
-public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
-    private static final String DEFAULT_WF_TEMPLATE = "/config/workflow/process-parent-workflow.xml";
-    private static final int THIRTY_MINUTES = 30 * 60 * 1000;
-
-    public OozieProcessMapper(Process entity) {
-        super(entity);
-    }
-
-    @Override
-    protected List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
-        try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
-            Process process = getEntity();
-
-            //Copy user workflow and lib to staging dir
-            Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(process.getWorkflow().getPath()),
-                    new Path(bundlePath, EntityUtil.PROCESS_USER_DIR));
-            if (process.getWorkflow().getLib() != null && fs.exists(new Path(process.getWorkflow().getLib()))) {
-                checksums.putAll(UpdateHelper.checksumAndCopy(fs, new Path(process.getWorkflow().getLib()),
-                        new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR)));
-            }
-
-            writeChecksums(fs, new Path(bundlePath, EntityUtil.PROCESS_CHECKSUM_FILE), checksums);
-        } catch (IOException e) {
-            throw new FalconException("Failed to copy user workflow/lib", e);
-        }
-
-        List<COORDINATORAPP> apps = new ArrayList<COORDINATORAPP>();
-        apps.add(createDefaultCoordinator(cluster, bundlePath));
-
-        return apps;
-    }
-
-    private void writeChecksums(FileSystem fs, Path path, Map<String, String> checksums) throws FalconException {
-        try {
-            FSDataOutputStream stream = fs.create(path);
-            try {
-                for (Map.Entry<String, String> entry : checksums.entrySet()) {
-                    stream.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
-                }
-            } finally {
-                stream.close();
-            }
-        } catch (IOException e) {
-            throw new FalconException("Failed to copy user workflow/lib", e);
-        }
-    }
-
-    private Path getUserWorkflowPath(Cluster cluster, Path bundlePath) throws FalconException {
-        try {
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
-            Process process = getEntity();
-            Path wfPath = new Path(process.getWorkflow().getPath());
-            if (fs.isFile(wfPath)) {
-                return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName());
-            } else {
-                return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR);
-            }
-        } catch(IOException e) {
-            throw new FalconException("Failed to get workflow path", e);
-        }
-    }
-
-    private Path getUserLibPath(Cluster cluster, Path bundlePath) throws FalconException {
-        try {
-            Process process = getEntity();
-            if (process.getWorkflow().getLib() == null) {
-                return null;
-            }
-            Path libPath = new Path(process.getWorkflow().getLib());
-
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
-            if (fs.isFile(libPath)) {
-                return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
-            } else {
-                return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR);
-            }
-        } catch(IOException e) {
-            throw new FalconException("Failed to get user lib path", e);
-        }
-    }
-
-    /**
-     * Creates default oozie coordinator.
-     *
-     * @param cluster    - Cluster for which the coordiantor app need to be created
-     * @param bundlePath - bundle path
-     * @return COORDINATORAPP
-     * @throws FalconException on Error
-     */
-    public COORDINATORAPP createDefaultCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
-        Process process = getEntity();
-        if (process == null) {
-            return null;
-        }
-
-        COORDINATORAPP coord = new COORDINATORAPP();
-        String coordName = EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString();
-        Path coordPath = getCoordPath(bundlePath, coordName);
-
-        // coord attributes
-        initializeCoordAttributes(cluster, process, coord, coordName);
-
-        CONTROLS controls = initializeControls(process); // controls
-        coord.setControls(controls);
-
-        // Configuration
-        Map<String, String> props = createCoordDefaultConfiguration(cluster, coordPath, coordName);
-
-        initializeInputPaths(cluster, process, coord, props); // inputs
-        initializeOutputPaths(cluster, process, coord, props);  // outputs
-
-        Workflow processWorkflow = process.getWorkflow();
-        propagateUserWorkflowProperties(processWorkflow, props, process.getName());
-
-        // create parent wf
-        createWorkflow(cluster, process, processWorkflow, coordName, coordPath);
-
-        WORKFLOW wf = new WORKFLOW();
-        wf.setAppPath(getStoragePath(coordPath.toString()));
-        wf.setConfiguration(getCoordConfig(props));
-
-        // set coord action to parent wf
-        org.apache.falcon.oozie.coordinator.ACTION action = new org.apache.falcon.oozie.coordinator.ACTION();
-        action.setWorkflow(wf);
-        coord.setAction(action);
-
-        return coord;
-    }
-
-    private void initializeCoordAttributes(Cluster cluster, Process process, COORDINATORAPP coord, String coordName) {
-        coord.setName(coordName);
-        org.apache.falcon.entity.v0.process.Cluster processCluster =
-                ProcessHelper.getCluster(process, cluster.getName());
-        coord.setStart(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()));
-        coord.setEnd(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()));
-        coord.setTimezone(process.getTimezone().getID());
-        coord.setFrequency("${coord:" + process.getFrequency().toString() + "}");
-    }
-
-    private CONTROLS initializeControls(Process process)
-        throws FalconException {
-        CONTROLS controls = new CONTROLS();
-        controls.setConcurrency(String.valueOf(process.getParallel()));
-        controls.setExecution(process.getOrder().name());
-
-        Frequency timeout = process.getTimeout();
-        long frequencyInMillis = ExpressionHelper.get().evaluate(process.getFrequency().toString(), Long.class);
-        long timeoutInMillis;
-        if (timeout != null) {
-            timeoutInMillis = ExpressionHelper.get().
-                    evaluate(process.getTimeout().toString(), Long.class);
-        } else {
-            timeoutInMillis = frequencyInMillis * 6;
-            if (timeoutInMillis < THIRTY_MINUTES) {
-                timeoutInMillis = THIRTY_MINUTES;
-            }
-        }
-        controls.setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
-
-        if (timeoutInMillis / frequencyInMillis * 2 > 0) {
-            controls.setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
-        }
-
-        return controls;
-    }
-
-    private void initializeInputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
-                                      Map<String, String> props) throws FalconException {
-        if (process.getInputs() == null) {
-            props.put("falconInputFeeds", "NONE");
-            props.put("falconInPaths", "IGNORE");
-            return;
-        }
-
-        List<String> inputFeeds = new ArrayList<String>();
-        List<String> inputPaths = new ArrayList<String>();
-        List<String> inputFeedStorageTypes = new ArrayList<String>();
-        for (Input input : process.getInputs().getInputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
-            Storage storage = FeedHelper.createStorage(cluster, feed);
-
-            if (!input.isOptional()) {
-                if (coord.getDatasets() == null) {
-                    coord.setDatasets(new DATASETS());
-                }
-                if (coord.getInputEvents() == null) {
-                    coord.setInputEvents(new INPUTEVENTS());
-                }
-
-                SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(), LocationType.DATA);
-                coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
-
-                DATAIN datain = createDataIn(input);
-                coord.getInputEvents().getDataIn().add(datain);
-            }
-
-            String inputExpr = null;
-            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
-                inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
-                props.put(input.getName(), inputExpr);
-            } else if (storage.getType() == Storage.TYPE.TABLE) {
-                inputExpr = "${coord:dataIn('" + input.getName() + "')}";
-                propagateCatalogTableProperties(input, (CatalogStorage) storage, props);
-            }
-
-            inputFeeds.add(feed.getName());
-            inputPaths.add(inputExpr);
-            inputFeedStorageTypes.add(storage.getType().name());
-        }
-
-        propagateLateDataProperties(inputFeeds, inputPaths, inputFeedStorageTypes, props);
-    }
-
-    private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputPaths,
-                                             List<String> inputFeedStorageTypes, Map<String, String> props) {
-        // populate late data handler - should-record action
-        props.put("falconInputFeeds", join(inputFeeds.iterator(), '#'));
-        props.put("falconInPaths", join(inputPaths.iterator(), '#'));
-
-        // storage type for each corresponding feed sent as a param to LateDataHandler
-        // needed to compute usage based on storage type in LateDataHandler
-        props.put("falconInputFeedStorageTypes", join(inputFeedStorageTypes.iterator(), '#'));
-    }
-
-    private void initializeOutputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
-                                       Map<String, String> props) throws FalconException {
-        if (process.getOutputs() == null) {
-            props.put(ARG.feedNames.getPropName(), "NONE");
-            props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
-            return;
-        }
-
-        if (coord.getDatasets() == null) {
-            coord.setDatasets(new DATASETS());
-        }
-
-        if (coord.getOutputEvents() == null) {
-            coord.setOutputEvents(new OUTPUTEVENTS());
-        }
-
-        List<String> outputFeeds = new ArrayList<String>();
-        List<String> outputPaths = new ArrayList<String>();
-        for (Output output : process.getOutputs().getOutputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
-            Storage storage = FeedHelper.createStorage(cluster, feed);
-
-            SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, output.getName(), LocationType.DATA);
-            coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
-
-            DATAOUT dataout = createDataOut(output);
-            coord.getOutputEvents().getDataOut().add(dataout);
-
-            String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
-            outputFeeds.add(feed.getName());
-            outputPaths.add(outputExpr);
-
-            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
-                props.put(output.getName(), outputExpr);
-
-                propagateFileSystemProperties(output, feed, cluster, coord, storage, props);
-            } else if (storage.getType() == Storage.TYPE.TABLE) {
-                propagateCatalogTableProperties(output, (CatalogStorage) storage, props);
-            }
-        }
-
-        // Output feed name and path for parent workflow
-        props.put(ARG.feedNames.getPropName(), join(outputFeeds.iterator(), ','));
-        props.put(ARG.feedInstancePaths.getPropName(), join(outputPaths.iterator(), ','));
-    }
-
-    private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
-                                      String datasetName, LocationType locationType) throws FalconException {
-
-        SYNCDATASET syncdataset = new SYNCDATASET();
-        syncdataset.setName(datasetName);
-        syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
-
-        String uriTemplate = storage.getUriTemplate(locationType);
-        if (storage.getType() == Storage.TYPE.TABLE) {
-            uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
-        }
-        syncdataset.setUriTemplate(uriTemplate);
-
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
-        syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
-        syncdataset.setTimezone(feed.getTimezone().getID());
-
-        if (feed.getAvailabilityFlag() == null) {
-            syncdataset.setDoneFlag("");
-        } else {
-            syncdataset.setDoneFlag(feed.getAvailabilityFlag());
-        }
-
-        return syncdataset;
-    }
-
-    private DATAOUT createDataOut(Output output) {
-        DATAOUT dataout = new DATAOUT();
-        dataout.setName(output.getName());
-        dataout.setDataset(output.getName());
-        dataout.setInstance(getELExpression(output.getInstance()));
-        return dataout;
-    }
-
-    private DATAIN createDataIn(Input input) {
-        DATAIN datain = new DATAIN();
-        datain.setName(input.getName());
-        datain.setDataset(input.getName());
-        datain.setStartInstance(getELExpression(input.getStart()));
-        datain.setEndInstance(getELExpression(input.getEnd()));
-        return datain;
-    }
-
-    private void propagateFileSystemProperties(Output output, Feed feed, Cluster cluster, COORDINATORAPP coord,
-                                               Storage storage, Map<String, String> props)
-        throws FalconException {
-
-        // stats and meta paths
-        createOutputEvent(output, feed, cluster, LocationType.STATS, coord, props, storage);
-        createOutputEvent(output, feed, cluster, LocationType.META, coord, props, storage);
-        createOutputEvent(output, feed, cluster, LocationType.TMP, coord, props, storage);
-    }
-
-    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
-    private void createOutputEvent(Output output, Feed feed, Cluster cluster, LocationType locType,
-                                   COORDINATORAPP coord, Map<String, String> props, Storage storage)
-        throws FalconException {
-
-        String name = output.getName();
-        String type = locType.name().toLowerCase();
-
-        SYNCDATASET dataset = createDataSet(feed, cluster, storage, name + type, locType);
-        coord.getDatasets().getDatasetOrAsyncDataset().add(dataset);
-
-        DATAOUT dataout = new DATAOUT();
-        dataout.setName(name + type);
-        dataout.setDataset(name + type);
-        dataout.setInstance(getELExpression(output.getInstance()));
-
-        OUTPUTEVENTS outputEvents = coord.getOutputEvents();
-        if (outputEvents == null) {
-            outputEvents = new OUTPUTEVENTS();
-            coord.setOutputEvents(outputEvents);
-        }
-        outputEvents.getDataOut().add(dataout);
-
-        String outputExpr = "${coord:dataOut('" + name + type + "')}";
-        props.put(name + "." + type, outputExpr);
-    }
-    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
-
-    private void propagateCommonCatalogTableProperties(CatalogStorage tableStorage,
-                                                       Map<String, String> props, String prefix) {
-        props.put(prefix + "_storage_type", tableStorage.getType().name());
-        props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
-        props.put(prefix + "_database", tableStorage.getDatabase());
-        props.put(prefix + "_table", tableStorage.getTable());
-    }
-
-    private void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage,
-                                                 Map<String, String> props) {
-        String prefix = "falcon_" + input.getName();
-
-        propagateCommonCatalogTableProperties(tableStorage, props, prefix);
-
-        props.put(prefix + "_partition_filter_pig",
-                "${coord:dataInPartitionFilter('" + input.getName() + "', 'pig')}");
-        props.put(prefix + "_partition_filter_hive",
-                "${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}");
-        props.put(prefix + "_partition_filter_java",
-                "${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}");
-    }
-
-    private void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage,
-                                                 Map<String, String> props) {
-        String prefix = "falcon_" + output.getName();
-
-        propagateCommonCatalogTableProperties(tableStorage, props, prefix);
-
-        props.put(prefix + "_dataout_partitions",
-                "${coord:dataOutPartitions('" + output.getName() + "')}");
-        props.put(prefix + "_dated_partition_value", "${coord:dataOutPartitionValue('"
-                + output.getName() + "', '" + tableStorage.getDatedPartitionKey() + "')}");
-    }
-
-    private String join(Iterator<String> itr, char sep) {
-        String joinedStr = StringUtils.join(itr, sep);
-        if (joinedStr.isEmpty()) {
-            joinedStr = "null";
-        }
-        return joinedStr;
-    }
-
-    private String getELExpression(String expr) {
-        if (expr != null) {
-            expr = "${" + expr + "}";
-        }
-        return expr;
-    }
-
-    @Override
-    protected Map<String, String> getEntityProperties() {
-        Process process = getEntity();
-        Map<String, String> props = new HashMap<String, String>();
-        if (process.getProperties() != null) {
-            for (Property prop : process.getProperties().getProperties()) {
-                props.put(prop.getName(), prop.getValue());
-            }
-        }
-        return props;
-    }
-
-    private void propagateUserWorkflowProperties(Workflow processWorkflow,
-                                                 Map<String, String> props, String processName) {
-        props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
-                processWorkflow.getName(), processName));
-        props.put("userWorkflowVersion", processWorkflow.getVersion());
-        props.put("userWorkflowEngine", processWorkflow.getEngine().value());
-    }
-
-    protected void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
-                                  String wfName, Path parentWfPath) throws FalconException {
-        WORKFLOWAPP wfApp = getWorkflowTemplate(DEFAULT_WF_TEMPLATE);
-        wfApp.setName(wfName);
-        try {
-            addLibExtensionsToWorkflow(cluster, wfApp, EntityType.PROCESS, null);
-        } catch (IOException e) {
-            throw new FalconException("Failed to add library extensions for the workflow", e);
-        }
-
-        String userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent()).toString();
-        EngineType engineType = processWorkflow.getEngine();
-        for (Object object : wfApp.getDecisionOrForkOrJoin()) {
-            if (!(object instanceof ACTION)) {
-                continue;
-            }
-
-            ACTION action = (ACTION) object;
-            String actionName = action.getName();
-            if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
-                action.getSubWorkflow().setAppPath("${nameNode}" + userWfPath);
-            } else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
-                decoratePIGAction(cluster, process, action.getPig(), parentWfPath);
-            } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
-                decorateHiveAction(cluster, process, action, parentWfPath);
-            } else if (FALCON_ACTIONS.contains(actionName)) {
-                decorateWithOozieRetries(action);
-            }
-        }
-
-        //Create parent workflow
-        marshal(cluster, wfApp, parentWfPath);
-    }
-
-    private void decoratePIGAction(Cluster cluster, Process process,
-                                   PIG pigAction, Path parentWfPath) throws FalconException {
-        Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
-        pigAction.setScript("${nameNode}" + userWfPath.toString());
-
-        addPrepareDeleteOutputPath(process, pigAction);
-
-        final List<String> paramList = pigAction.getParam();
-        addInputFeedsAsParams(paramList, process, cluster, EngineType.PIG.name().toLowerCase());
-        addOutputFeedsAsParams(paramList, process, cluster);
-
-        propagateProcessProperties(pigAction, process);
-
-        Storage.TYPE storageType = getStorageType(cluster, process);
-        if (Storage.TYPE.TABLE == storageType) {
-            // adds hive-site.xml in pig classpath
-            setupHiveConfiguration(cluster, parentWfPath, ""); // DO NOT ADD PREFIX!!!
-            pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
-        }
-
-        addArchiveForCustomJars(cluster, pigAction.getArchive(),
-                getUserLibPath(cluster, parentWfPath.getParent()));
-    }
-
-    private void decorateHiveAction(Cluster cluster, Process process, ACTION wfAction,
-                                    Path parentWfPath) throws FalconException {
-
-        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = unMarshalHiveAction(wfAction);
-        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
-
-        Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
-        hiveAction.setScript("${nameNode}" + userWfPath.toString());
-
-        addPrepareDeleteOutputPath(process, hiveAction);
-
-        final List<String> paramList = hiveAction.getParam();
-        addInputFeedsAsParams(paramList, process, cluster, EngineType.HIVE.name().toLowerCase());
-        addOutputFeedsAsParams(paramList, process, cluster);
-
-        propagateProcessProperties(hiveAction, process);
-
-        setupHiveConfiguration(cluster, parentWfPath, "falcon-");
-
-        addArchiveForCustomJars(cluster, hiveAction.getArchive(),
-                getUserLibPath(cluster, parentWfPath.getParent()));
-
-        marshalHiveAction(wfAction, actionJaxbElement);
-    }
-
-    private void addPrepareDeleteOutputPath(Process process,
-                                            PIG pigAction) throws FalconException {
-        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
-        if (deleteOutputPathList.isEmpty()) {
-            return;
-        }
-
-        final PREPARE prepare = new PREPARE();
-        final List<DELETE> deleteList = prepare.getDelete();
-
-        for (String deletePath : deleteOutputPathList) {
-            final DELETE delete = new DELETE();
-            delete.setPath(deletePath);
-            deleteList.add(delete);
-        }
-
-        if (!deleteList.isEmpty()) {
-            pigAction.setPrepare(prepare);
-        }
-    }
-
-    private void addPrepareDeleteOutputPath(Process process,
-                                            org.apache.falcon.oozie.hive.ACTION hiveAction)
-        throws FalconException {
-
-        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
-        if (deleteOutputPathList.isEmpty()) {
-            return;
-        }
-
-        org.apache.falcon.oozie.hive.PREPARE prepare = new org.apache.falcon.oozie.hive.PREPARE();
-        List<org.apache.falcon.oozie.hive.DELETE> deleteList = prepare.getDelete();
-
-        for (String deletePath : deleteOutputPathList) {
-            org.apache.falcon.oozie.hive.DELETE delete = new org.apache.falcon.oozie.hive.DELETE();
-            delete.setPath(deletePath);
-            deleteList.add(delete);
-        }
-
-        if (!deleteList.isEmpty()) {
-            hiveAction.setPrepare(prepare);
-        }
-    }
-
-    private List<String> getPrepareDeleteOutputPathList(Process process) throws FalconException {
-        final List<String> deleteList = new ArrayList<String>();
-        if (process.getOutputs() == null) {
-            return deleteList;
-        }
-
-        for (Output output : process.getOutputs().getOutputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
-
-            if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
-                continue; // prepare delete only applies to FileSystem storage
-            }
-
-            deleteList.add("${wf:conf('" + output.getName() + "')}");
-        }
-
-        return deleteList;
-    }
-
-    private void addInputFeedsAsParams(List<String> paramList, Process process, Cluster cluster,
-                                       String engineType) throws FalconException {
-        if (process.getInputs() == null) {
-            return;
-        }
-
-        for (Input input : process.getInputs().getInputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
-            Storage storage = FeedHelper.createStorage(cluster, feed);
-
-            final String inputName = input.getName();
-            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
-                paramList.add(inputName + "=${" + inputName + "}"); // no prefix for backwards compatibility
-            } else if (storage.getType() == Storage.TYPE.TABLE) {
-                final String paramName = "falcon_" + inputName; // prefix 'falcon' for new params
-                Map<String, String> props = new HashMap<String, String>();
-                propagateCommonCatalogTableProperties((CatalogStorage) storage, props, paramName);
-                for (String key : props.keySet()) {
-                    paramList.add(key + "=${wf:conf('" + key + "')}");
-                }
-
-                paramList.add(paramName + "_filter=${wf:conf('"
-                        + paramName + "_partition_filter_" + engineType + "')}");
-            }
-        }
-    }
-
-    private void addOutputFeedsAsParams(List<String> paramList, Process process,
-                                        Cluster cluster) throws FalconException {
-        if (process.getOutputs() == null) {
-            return;
-        }
-
-        for (Output output : process.getOutputs().getOutputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
-            Storage storage = FeedHelper.createStorage(cluster, feed);
-
-            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
-                final String outputName = output.getName();  // no prefix for backwards compatibility
-                paramList.add(outputName + "=${" + outputName + "}");
-            } else if (storage.getType() == Storage.TYPE.TABLE) {
-                Map<String, String> props = new HashMap<String, String>();
-                propagateCatalogTableProperties(output, (CatalogStorage) storage, props); // prefix is auto added
-                for (String key : props.keySet()) {
-                    paramList.add(key + "=${wf:conf('" + key + "')}");
-                }
-            }
-        }
-    }
-
-    private void propagateProcessProperties(PIG pigAction, Process process) {
-        org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
-        if (processProperties == null) {
-            return;
-        }
-
-        // Propagate user defined properties to job configuration
-        final List<org.apache.falcon.oozie.workflow.CONFIGURATION.Property> configuration =
-                pigAction.getConfiguration().getProperty();
-
-        // Propagate user defined properties to pig script as macros
-        // passed as parameters -p name=value that can be accessed as $name
-        final List<String> paramList = pigAction.getParam();
-
-        for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
-            org.apache.falcon.oozie.workflow.CONFIGURATION.Property configProperty =
-                    new org.apache.falcon.oozie.workflow.CONFIGURATION.Property();
-            configProperty.setName(property.getName());
-            configProperty.setValue(property.getValue());
-            configuration.add(configProperty);
-
-            paramList.add(property.getName() + "=" + property.getValue());
-        }
-    }
-
-    private void propagateProcessProperties(org.apache.falcon.oozie.hive.ACTION hiveAction, Process process) {
-        org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
-        if (processProperties == null) {
-            return;
-        }
-
-        // Propagate user defined properties to job configuration
-        final List<org.apache.falcon.oozie.hive.CONFIGURATION.Property> configuration =
-                hiveAction.getConfiguration().getProperty();
-
-        // Propagate user defined properties to pig script as macros
-        // passed as parameters -p name=value that can be accessed as $name
-        final List<String> paramList = hiveAction.getParam();
-
-        for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
-            org.apache.falcon.oozie.hive.CONFIGURATION.Property configProperty =
-                    new org.apache.falcon.oozie.hive.CONFIGURATION.Property();
-            configProperty.setName(property.getName());
-            configProperty.setValue(property.getValue());
-            configuration.add(configProperty);
-
-            paramList.add(property.getName() + "=" + property.getValue());
-        }
-    }
-
-    private Storage.TYPE getStorageType(Cluster cluster, Process process) throws FalconException {
-        Storage.TYPE storageType = Storage.TYPE.FILESYSTEM;
-        if (process.getInputs() == null) {
-            return storageType;
-        }
-
-        for (Input input : process.getInputs().getInputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
-            storageType = FeedHelper.getStorageType(feed, cluster);
-            if (Storage.TYPE.TABLE == storageType) {
-                break;
-            }
-        }
-
-        return storageType;
-    }
-
-    // creates hive-site.xml configuration in conf dir.
-    private void setupHiveConfiguration(Cluster cluster, Path wfPath,
-                                        String prefix) throws FalconException {
-        String catalogUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
-        try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
-            Path confPath = new Path(wfPath, "conf");
-            createHiveConf(fs, confPath, catalogUrl, cluster, prefix);
-        } catch (IOException e) {
-            throw new FalconException(e);
-        }
-    }
-
-    private void addArchiveForCustomJars(Cluster cluster, List<String> archiveList,
-                                         Path libPath) throws FalconException {
-        if (libPath == null) {
-            return;
-        }
-
-        try {
-            final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
-            if (fs.isFile(libPath)) {  // File, not a Dir
-                archiveList.add(libPath.toString());
-                return;
-            }
-
-            // lib path is a directory, add each file under the lib dir to archive
-            final FileStatus[] fileStatuses = fs.listStatus(libPath, new PathFilter() {
-                @Override
-                public boolean accept(Path path) {
-                    try {
-                        return fs.isFile(path) && path.getName().endsWith(".jar");
-                    } catch (IOException ignore) {
-                        return false;
-                    }
-                }
-            });
-
-            for (FileStatus fileStatus : fileStatuses) {
-                archiveList.add(fileStatus.getPath().toString());
-            }
-        } catch (IOException e) {
-            throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    protected JAXBElement<org.apache.falcon.oozie.hive.ACTION> unMarshalHiveAction(ACTION wfAction) {
-        try {
-            Unmarshaller unmarshaller = HIVE_ACTION_JAXB_CONTEXT.createUnmarshaller();
-            unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
-            return (JAXBElement<org.apache.falcon.oozie.hive.ACTION>)
-                    unmarshaller.unmarshal((ElementNSImpl) wfAction.getAny());
-        } catch (JAXBException e) {
-            throw new RuntimeException("Unable to unmarshall hive action.", e);
-        }
-    }
-
-    protected void marshalHiveAction(ACTION wfAction,
-                                     JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionjaxbElement) {
-        try {
-            DOMResult hiveActionDOM = new DOMResult();
-            Marshaller marshaller = HIVE_ACTION_JAXB_CONTEXT.createMarshaller();
-            marshaller.marshal(actionjaxbElement, hiveActionDOM);
-            wfAction.setAny(((Document) hiveActionDOM.getNode()).getDocumentElement());
-        } catch (JAXBException e) {
-            throw new RuntimeException("Unable to marshall hive action.", e);
-        }
-    }
-}


[4/5] FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. Contributed by Shwetha GS

Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
deleted file mode 100644
index e610df2..0000000
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.converter;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.SYNCDATASET;
-import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.DECISION;
-import org.apache.falcon.oozie.workflow.JAVA;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Tests for Oozie workflow definition for feed replication & retention.
- */
-public class OozieFeedMapperTest {
-    private EmbeddedCluster srcMiniDFS;
-    private EmbeddedCluster trgMiniDFS;
-    private final ConfigurationStore store = ConfigurationStore.get();
-    private Cluster srcCluster;
-    private Cluster trgCluster;
-    private Cluster alphaTrgCluster;
-    private Cluster betaTrgCluster;
-    private Feed feed;
-    private Feed tableFeed;
-    private Feed fsReplFeed;
-
-    private static final String SRC_CLUSTER_PATH = "/src-cluster.xml";
-    private static final String TRG_CLUSTER_PATH = "/trg-cluster.xml";
-    private static final String FEED = "/feed.xml";
-    private static final String TABLE_FEED = "/table-replication-feed.xml";
-    private static final String FS_REPLICATION_FEED = "/fs-replication-feed.xml";
-
-    @BeforeClass
-    public void setUpDFS() throws Exception {
-        CurrentUser.authenticate("falcon");
-
-        srcMiniDFS = EmbeddedCluster.newCluster("cluster1");
-        String srcHdfsUrl = srcMiniDFS.getConf().get("fs.default.name");
-
-        trgMiniDFS = EmbeddedCluster.newCluster("cluster2");
-        String trgHdfsUrl = trgMiniDFS.getConf().get("fs.default.name");
-
-        cleanupStore();
-
-        srcCluster = (Cluster) storeEntity(EntityType.CLUSTER, SRC_CLUSTER_PATH, srcHdfsUrl);
-        trgCluster = (Cluster) storeEntity(EntityType.CLUSTER, TRG_CLUSTER_PATH, trgHdfsUrl);
-        alphaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/trg-cluster-alpha.xml", trgHdfsUrl);
-        betaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/trg-cluster-beta.xml", trgHdfsUrl);
-
-        feed = (Feed) storeEntity(EntityType.FEED, FEED, null);
-        fsReplFeed = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED, null);
-        tableFeed = (Feed) storeEntity(EntityType.FEED, TABLE_FEED, null);
-    }
-
-    protected Entity storeEntity(EntityType type, String template, String writeEndpoint) throws Exception {
-        Unmarshaller unmarshaller = type.getUnmarshaller();
-        Entity entity = (Entity) unmarshaller
-                .unmarshal(OozieFeedMapperTest.class.getResource(template));
-        store.publish(type, entity);
-
-        if (type == EntityType.CLUSTER) {
-            Cluster cluster = (Cluster) entity;
-            ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(writeEndpoint);
-            FileSystem fs = new Path(writeEndpoint).getFileSystem(EmbeddedCluster.newConfiguration());
-            fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/retention/ext.jar")).close();
-            fs.create(
-                    new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/replication/ext.jar")).close();
-        }
-        return entity;
-    }
-
-    protected void cleanupStore() throws FalconException {
-        for (EntityType type : EntityType.values()) {
-            Collection<String> entities = store.getEntities(type);
-            for (String entity : entities) {
-                store.remove(type, entity);
-            }
-        }
-    }
-
-    @AfterClass
-    public void stopDFS() {
-        srcMiniDFS.shutdown();
-        trgMiniDFS.shutdown();
-    }
-
-    @Test
-    public void testReplicationCoordsForFSStorage() throws Exception {
-        OozieFeedMapper feedMapper = new OozieFeedMapper(feed);
-        List<COORDINATORAPP> coords = feedMapper.getCoordinators(trgCluster,
-                new Path("/projects/falcon/"));
-        //Assert retention coord
-        COORDINATORAPP coord = coords.get(0);
-        assertLibExtensions(coord, "retention");
-
-        //Assert replication coord
-        coord = coords.get(1);
-        Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
-        Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION", coord
-                .getAction().getWorkflow().getAppPath());
-        Assert.assertEquals("FALCON_FEED_REPLICATION_" + feed.getName() + "_"
-                + srcCluster.getName(), coord.getName());
-        Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
-        SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
-                .getDatasetOrAsyncDataset().get(0);
-        SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
-                .getDatasetOrAsyncDataset().get(1);
-
-        Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
-        Assert.assertEquals("input-dataset", inputDataset.getName());
-        Assert.assertEquals(
-                ClusterHelper.getReadOnlyStorageUrl(srcCluster)
-                        + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
-                inputDataset.getUriTemplate());
-
-        Assert.assertEquals("${coord:minutes(20)}",
-                outputDataset.getFrequency());
-        Assert.assertEquals("output-dataset", outputDataset.getName());
-        Assert.assertEquals(ClusterHelper.getStorageUrl(trgCluster)
-                + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
-                        outputDataset.getUriTemplate());
-        String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
-        String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
-        String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
-        Assert.assertEquals("input", inEventName);
-        Assert.assertEquals("input-dataset", inEventDataset);
-        Assert.assertEquals("${now(0,-40)}", inEventInstance);
-
-        String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
-        Assert.assertEquals("${now(0,-40)}", outEventInstance);
-
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-
-        // verify the replication param that feed replicator depends on
-        String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
-        Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
-
-        Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}");
-        Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
-        Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
-        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
-
-        // verify the late data params
-        Assert.assertEquals(props.get("falconInputFeeds"), feed.getName());
-        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
-        Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions);
-        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
-
-        // verify the post processing params
-        Assert.assertEquals(props.get("feedNames"), feed.getName());
-        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
-
-        // verify workflow params
-        Assert.assertEquals(props.get("userWorkflowName"), "replication-policy");
-        Assert.assertEquals(props.get("userWorkflowVersion"), "0.5");
-        Assert.assertEquals(props.get("userWorkflowEngine"), "falcon");
-
-        // verify default params
-        Assert.assertEquals(props.get("queueName"), "default");
-        Assert.assertEquals(props.get("jobPriority"), "NORMAL");
-        Assert.assertEquals(props.get("maxMaps"), "5");
-
-        assertLibExtensions(coord, "replication");
-        assertWorkflowRetries(coord);
-    }
-
-    private void assertWorkflowRetries(COORDINATORAPP coord) throws JAXBException, IOException {
-        WORKFLOWAPP wf = getWorkflowapp(coord);
-        List<Object> actions = wf.getDecisionOrForkOrJoin();
-        for (Object obj : actions) {
-            if (!(obj instanceof ACTION)) {
-                continue;
-            }
-            ACTION action = (ACTION) obj;
-            String actionName = action.getName();
-            if (AbstractOozieEntityMapper.FALCON_ACTIONS.contains(actionName)) {
-                Assert.assertEquals(action.getRetryMax(), "3");
-                Assert.assertEquals(action.getRetryInterval(), "1");
-            }
-        }
-    }
-
-    private void assertLibExtensions(COORDINATORAPP coord, String lifecycle) throws Exception {
-        WORKFLOWAPP wf = getWorkflowapp(coord);
-        List<Object> actions = wf.getDecisionOrForkOrJoin();
-        for (Object obj : actions) {
-            if (!(obj instanceof ACTION)) {
-                continue;
-            }
-            ACTION action = (ACTION) obj;
-            List<String> files = null;
-            if (action.getJava() != null) {
-                files = action.getJava().getFile();
-            } else if (action.getPig() != null) {
-                files = action.getPig().getFile();
-            } else if (action.getMapReduce() != null) {
-                files = action.getMapReduce().getFile();
-            }
-            if (files != null) {
-                Assert.assertTrue(files.get(files.size() - 1).endsWith("/projects/falcon/working/libext/FEED/"
-                        + lifecycle + "/ext.jar"));
-            }
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private WORKFLOWAPP getWorkflowapp(COORDINATORAPP coord) throws JAXBException, IOException {
-        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
-        return ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
-                trgMiniDFS.getFileSystem().open(new Path(wfPath, "workflow.xml")))).getValue();
-    }
-
-    @Test
-    public void testReplicationCoordsForFSStorageWithMultipleTargets() throws Exception {
-        OozieFeedMapper feedMapper = new OozieFeedMapper(fsReplFeed);
-
-        List<COORDINATORAPP> alphaCoords = feedMapper.getCoordinators(alphaTrgCluster, new Path("/alpha/falcon/"));
-        final COORDINATORAPP alphaCoord = alphaCoords.get(0);
-        Assert.assertEquals(alphaCoord.getStart(), "2012-10-01T12:05Z");
-        Assert.assertEquals(alphaCoord.getEnd(), "2012-10-01T12:11Z");
-
-        String pathsWithPartitions = getPathsWithPartitions(srcCluster, alphaTrgCluster, fsReplFeed);
-        assertReplCoord(alphaCoord, fsReplFeed, alphaTrgCluster.getName(), pathsWithPartitions);
-
-        List<COORDINATORAPP> betaCoords = feedMapper.getCoordinators(betaTrgCluster, new Path("/beta/falcon/"));
-        final COORDINATORAPP betaCoord = betaCoords.get(0);
-        Assert.assertEquals(betaCoord.getStart(), "2012-10-01T12:10Z");
-        Assert.assertEquals(betaCoord.getEnd(), "2012-10-01T12:26Z");
-
-        pathsWithPartitions = getPathsWithPartitions(srcCluster, betaTrgCluster, fsReplFeed);
-        assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster.getName(), pathsWithPartitions);
-    }
-
-    private String getPathsWithPartitions(Cluster sourceCluster, Cluster targetCluster,
-                                          Feed aFeed) throws FalconException {
-        String srcPart = FeedHelper.normalizePartitionExpression(
-                FeedHelper.getCluster(aFeed, sourceCluster.getName()).getPartition());
-        srcPart = FeedHelper.evaluateClusterExp(sourceCluster, srcPart);
-        String targetPart = FeedHelper.normalizePartitionExpression(
-                FeedHelper.getCluster(aFeed, targetCluster.getName()).getPartition());
-        targetPart = FeedHelper.evaluateClusterExp(targetCluster, targetPart);
-
-        StringBuilder pathsWithPartitions = new StringBuilder();
-        pathsWithPartitions.append("${coord:dataIn('input')}/")
-                .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
-
-        String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
-        parts = StringUtils.stripEnd(parts, "/");
-        return parts;
-    }
-
-    private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, String clusterName,
-                                 String pathsWithPartitions) throws JAXBException, IOException {
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(aFeed, clusterName);
-        Date startDate = feedCluster.getValidity().getStart();
-        Assert.assertEquals(coord.getStart(), SchemaHelper.formatDateUTC(startDate));
-
-        Date endDate = feedCluster.getValidity().getEnd();
-        Assert.assertEquals(coord.getEnd(), SchemaHelper.formatDateUTC(endDate));
-
-        WORKFLOWAPP workflow = getWorkflowapp(coord);
-        assertWorkflowDefinition(fsReplFeed, workflow);
-
-        List<Object> actions = workflow.getDecisionOrForkOrJoin();
-        System.out.println("actions = " + actions);
-
-        ACTION replicationActionNode = (ACTION) actions.get(4);
-        Assert.assertEquals(replicationActionNode.getName(), "replication");
-
-        JAVA replication = replicationActionNode.getJava();
-        List<String> args = replication.getArg();
-        Assert.assertEquals(args.size(), 11);
-
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-
-        Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
-        Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}/" + srcCluster.getColo());
-        Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
-        Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
-        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
-        Assert.assertEquals(props.get("maxMaps"), "33");
-    }
-
-    public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP parentWorkflow) {
-        Assert.assertEquals(EntityUtil.getWorkflowName(Tag.REPLICATION, aFeed).toString(), parentWorkflow.getName());
-
-        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-        Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
-        Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
-        Assert.assertEquals("replication-decision", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
-        Assert.assertEquals("table-export", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
-        Assert.assertEquals("replication", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
-        Assert.assertEquals("post-replication-decision", ((DECISION) decisionOrForkOrJoin.get(5)).getName());
-        Assert.assertEquals("table-import", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
-        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
-        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(8)).getName());
-    }
-
-    @Test
-    public void testReplicationCoordsForTableStorage() throws Exception {
-        OozieFeedMapper feedMapper = new OozieFeedMapper(tableFeed);
-        List<COORDINATORAPP> coords = feedMapper.getCoordinators(
-                trgCluster, new Path("/projects/falcon/"));
-        COORDINATORAPP coord = coords.get(0);
-
-        Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
-        Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION",
-                coord.getAction().getWorkflow().getAppPath());
-        Assert.assertEquals("FALCON_FEED_REPLICATION_" + tableFeed.getName() + "_"
-                + srcCluster.getName(), coord.getName());
-        Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
-
-        SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
-                .getDatasetOrAsyncDataset().get(0);
-        Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
-        Assert.assertEquals("input-dataset", inputDataset.getName());
-
-        String sourceRegistry = ClusterHelper.getInterface(srcCluster, Interfacetype.REGISTRY).getEndpoint();
-        sourceRegistry = sourceRegistry.replace("thrift", "hcat");
-        Assert.assertEquals(inputDataset.getUriTemplate(),
-                sourceRegistry + "/source_db/source_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
-
-        SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
-                .getDatasetOrAsyncDataset().get(1);
-        Assert.assertEquals(outputDataset.getFrequency(), "${coord:minutes(20)}");
-        Assert.assertEquals("output-dataset", outputDataset.getName());
-
-        String targetRegistry = ClusterHelper.getInterface(trgCluster, Interfacetype.REGISTRY).getEndpoint();
-        targetRegistry = targetRegistry.replace("thrift", "hcat");
-        Assert.assertEquals(outputDataset.getUriTemplate(),
-                targetRegistry + "/target_db/target_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
-
-        String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
-        String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
-        String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
-        Assert.assertEquals("input", inEventName);
-        Assert.assertEquals("input-dataset", inEventDataset);
-        Assert.assertEquals("${now(0,-40)}", inEventInstance);
-
-        String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
-        Assert.assertEquals("${now(0,-40)}", outEventInstance);
-
-        // assert FS staging area
-        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        final FileSystem fs = trgMiniDFS.getFileSystem();
-        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts")));
-        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-export.hql")));
-        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-import.hql")));
-
-        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf")));
-        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-source-hive-site.xml")));
-        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-target-hive-site.xml")));
-
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-
-        final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
-        final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster, tableFeed);
-
-        // verify the replication param that feed replicator depends on
-        Assert.assertEquals(props.get("sourceRelativePaths"), "IGNORE");
-
-        Assert.assertTrue(props.containsKey("distcpSourcePaths"));
-        Assert.assertEquals(props.get("distcpSourcePaths"),
-                FeedHelper.getStagingDir(srcCluster, tableFeed, srcStorage, Tag.REPLICATION)
-                        + "/ds=${coord:dataOutPartitionValue('output', 'ds')}/"
-                        + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/data");
-
-        Assert.assertTrue(props.containsKey("distcpTargetPaths"));
-        Assert.assertEquals(props.get("distcpTargetPaths"),
-                FeedHelper.getStagingDir(trgCluster, tableFeed, trgStorage, Tag.REPLICATION)
-                        + "/ds=${coord:dataOutPartitionValue('output', 'ds')}/"
-                        + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/data");
-
-        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.TABLE.name());
-
-        // verify table props
-        assertTableStorageProperties(srcCluster, srcStorage, props, "falconSource");
-        assertTableStorageProperties(trgCluster, trgStorage, props, "falconTarget");
-
-        // verify the late data params
-        Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
-        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
-        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
-
-        // verify the post processing params
-        Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
-        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
-    }
-
-    private void assertTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
-                                              Map<String, String> props, String prefix) {
-        Assert.assertEquals(props.get(prefix + "NameNode"), ClusterHelper.getStorageUrl(cluster));
-        Assert.assertEquals(props.get(prefix + "JobTracker"), ClusterHelper.getMREndPoint(cluster));
-        Assert.assertEquals(props.get(prefix + "HcatNode"), tableStorage.getCatalogUrl());
-
-        Assert.assertEquals(props.get(prefix + "Database"), tableStorage.getDatabase());
-        Assert.assertEquals(props.get(prefix + "Table"), tableStorage.getTable());
-        Assert.assertEquals(props.get(prefix + "Partition"), "${coord:dataInPartitionFilter('input', 'hive')}");
-    }
-
-    @Test
-    public void testRetentionCoords() throws FalconException, JAXBException, IOException {
-        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName());
-        final Calendar instance = Calendar.getInstance();
-        instance.roll(Calendar.YEAR, 1);
-        cluster.getValidity().setEnd(instance.getTime());
-
-        OozieFeedMapper feedMapper = new OozieFeedMapper(feed);
-        List<COORDINATORAPP> coords = feedMapper.getCoordinators(srcCluster, new Path("/projects/falcon/"));
-        COORDINATORAPP coord = coords.get(0);
-
-        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
-        Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
-        Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
-
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-
-        String feedDataPath = props.get("feedDataPath");
-        String storageType = props.get("falconFeedStorageType");
-
-        // verify the param that feed evictor depends on
-        Assert.assertEquals(storageType, Storage.TYPE.FILESYSTEM.name());
-
-        final Storage storage = FeedHelper.createStorage(cluster, feed);
-        if (feedDataPath != null) {
-            Assert.assertEquals(feedDataPath, storage.getUriTemplate()
-                    .replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
-        }
-
-        if (storageType != null) {
-            Assert.assertEquals(storageType, storage.getType().name());
-        }
-
-        // verify the post processing params
-        Assert.assertEquals(props.get("feedNames"), feed.getName());
-        Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
-
-        assertWorkflowRetries(coord);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
new file mode 100644
index 0000000..182d9cb
--- /dev/null
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.converter;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.DECISION;
+import org.apache.falcon.oozie.workflow.JAVA;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.workflow.OozieFeedWorkflowBuilder;
+import org.apache.falcon.workflow.OozieWorkflowBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for Oozie workflow definition for feed replication & retention.
+ */
+public class OozieFeedWorkflowBuilderTest {
+    private EmbeddedCluster srcMiniDFS;
+    private EmbeddedCluster trgMiniDFS;
+    private final ConfigurationStore store = ConfigurationStore.get();
+    private Cluster srcCluster;
+    private Cluster trgCluster;
+    private Cluster alphaTrgCluster;
+    private Cluster betaTrgCluster;
+    private Feed feed;
+    private Feed tableFeed;
+    private Feed fsReplFeed;
+
+    private static final String SRC_CLUSTER_PATH = "/src-cluster.xml";
+    private static final String TRG_CLUSTER_PATH = "/trg-cluster.xml";
+    private static final String FEED = "/feed.xml";
+    private static final String TABLE_FEED = "/table-replication-feed.xml";
+    private static final String FS_REPLICATION_FEED = "/fs-replication-feed.xml";
+
+    @BeforeClass
+    public void setUpDFS() throws Exception {
+        CurrentUser.authenticate("falcon");
+
+        srcMiniDFS = EmbeddedCluster.newCluster("cluster1");
+        String srcHdfsUrl = srcMiniDFS.getConf().get("fs.default.name");
+
+        trgMiniDFS = EmbeddedCluster.newCluster("cluster2");
+        String trgHdfsUrl = trgMiniDFS.getConf().get("fs.default.name");
+
+        cleanupStore();
+
+        srcCluster = (Cluster) storeEntity(EntityType.CLUSTER, SRC_CLUSTER_PATH, srcHdfsUrl);
+        trgCluster = (Cluster) storeEntity(EntityType.CLUSTER, TRG_CLUSTER_PATH, trgHdfsUrl);
+        alphaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/trg-cluster-alpha.xml", trgHdfsUrl);
+        betaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/trg-cluster-beta.xml", trgHdfsUrl);
+
+        feed = (Feed) storeEntity(EntityType.FEED, FEED, null);
+        fsReplFeed = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED, null);
+        tableFeed = (Feed) storeEntity(EntityType.FEED, TABLE_FEED, null);
+    }
+
+    protected Entity storeEntity(EntityType type, String template, String writeEndpoint) throws Exception {
+        Unmarshaller unmarshaller = type.getUnmarshaller();
+        Entity entity = (Entity) unmarshaller
+                .unmarshal(OozieFeedWorkflowBuilderTest.class.getResource(template));
+        store.publish(type, entity);
+
+        if (type == EntityType.CLUSTER) {
+            Cluster cluster = (Cluster) entity;
+            ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(writeEndpoint);
+            FileSystem fs = new Path(writeEndpoint).getFileSystem(EmbeddedCluster.newConfiguration());
+            fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/retention/ext.jar")).close();
+            fs.create(
+                    new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/replication/ext.jar")).close();
+        }
+        return entity;
+    }
+
+    protected void cleanupStore() throws FalconException {
+        for (EntityType type : EntityType.values()) {
+            Collection<String> entities = store.getEntities(type);
+            for (String entity : entities) {
+                store.remove(type, entity);
+            }
+        }
+    }
+
+    @AfterClass
+    public void stopDFS() {
+        srcMiniDFS.shutdown();
+        trgMiniDFS.shutdown();
+    }
+
+    @Test
+    public void testReplicationCoordsForFSStorage() throws Exception {
+        OozieWorkflowBuilder builder = new OozieFeedWorkflowBuilder(feed);
+        List<COORDINATORAPP> coords = builder.getCoordinators(trgCluster, new Path("/projects/falcon/"));
+        //Assert retention coord
+        COORDINATORAPP coord = coords.get(0);
+        assertLibExtensions(coord, "retention");
+
+        //Assert replication coord
+        coord = coords.get(1);
+        Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
+        Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION", coord
+                .getAction().getWorkflow().getAppPath());
+        Assert.assertEquals("FALCON_FEED_REPLICATION_" + feed.getName() + "_"
+                + srcCluster.getName(), coord.getName());
+        Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
+        SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
+                .getDatasetOrAsyncDataset().get(0);
+        SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
+                .getDatasetOrAsyncDataset().get(1);
+
+        Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
+        Assert.assertEquals("input-dataset", inputDataset.getName());
+        Assert.assertEquals(
+                ClusterHelper.getReadOnlyStorageUrl(srcCluster)
+                        + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
+                inputDataset.getUriTemplate());
+
+        Assert.assertEquals("${coord:minutes(20)}",
+                outputDataset.getFrequency());
+        Assert.assertEquals("output-dataset", outputDataset.getName());
+        Assert.assertEquals(ClusterHelper.getStorageUrl(trgCluster)
+                + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
+                        outputDataset.getUriTemplate());
+        String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
+        String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
+        String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
+        Assert.assertEquals("input", inEventName);
+        Assert.assertEquals("input-dataset", inEventDataset);
+        Assert.assertEquals("${now(0,-40)}", inEventInstance);
+
+        String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
+        Assert.assertEquals("${now(0,-40)}", outEventInstance);
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        // verify the replication param that feed replicator depends on
+        String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
+        Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
+
+        Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
+        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
+
+        // verify the late data params
+        Assert.assertEquals(props.get("falconInputFeeds"), feed.getName());
+        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions);
+        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), feed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+
+        // verify workflow params
+        Assert.assertEquals(props.get("userWorkflowName"), "replication-policy");
+        Assert.assertEquals(props.get("userWorkflowVersion"), "0.5");
+        Assert.assertEquals(props.get("userWorkflowEngine"), "falcon");
+
+        // verify default params
+        Assert.assertEquals(props.get("queueName"), "default");
+        Assert.assertEquals(props.get("jobPriority"), "NORMAL");
+        Assert.assertEquals(props.get("maxMaps"), "5");
+
+        assertLibExtensions(coord, "replication");
+        assertWorkflowRetries(coord);
+    }
+
+    private void assertWorkflowRetries(COORDINATORAPP coord) throws JAXBException, IOException {
+        WORKFLOWAPP wf = getWorkflowapp(coord);
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+            ACTION action = (ACTION) obj;
+            String actionName = action.getName();
+            if (OozieWorkflowBuilder.FALCON_ACTIONS.contains(actionName)) {
+                Assert.assertEquals(action.getRetryMax(), "3");
+                Assert.assertEquals(action.getRetryInterval(), "1");
+            }
+        }
+    }
+
+    private void assertLibExtensions(COORDINATORAPP coord, String lifecycle) throws Exception {
+        WORKFLOWAPP wf = getWorkflowapp(coord);
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+            ACTION action = (ACTION) obj;
+            List<String> files = null;
+            if (action.getJava() != null) {
+                files = action.getJava().getFile();
+            } else if (action.getPig() != null) {
+                files = action.getPig().getFile();
+            } else if (action.getMapReduce() != null) {
+                files = action.getMapReduce().getFile();
+            }
+            if (files != null) {
+                Assert.assertTrue(files.get(files.size() - 1).endsWith("/projects/falcon/working/libext/FEED/"
+                        + lifecycle + "/ext.jar"));
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private WORKFLOWAPP getWorkflowapp(COORDINATORAPP coord) throws JAXBException, IOException {
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
+        return ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
+                trgMiniDFS.getFileSystem().open(new Path(wfPath, "workflow.xml")))).getValue();
+    }
+
+    @Test
+    public void testReplicationCoordsForFSStorageWithMultipleTargets() throws Exception {
+        OozieWorkflowBuilder builder = new OozieFeedWorkflowBuilder(fsReplFeed);
+
+        List<COORDINATORAPP> alphaCoords = builder.getCoordinators(alphaTrgCluster, new Path("/alpha/falcon/"));
+        final COORDINATORAPP alphaCoord = alphaCoords.get(0);
+        Assert.assertEquals(alphaCoord.getStart(), "2012-10-01T12:05Z");
+        Assert.assertEquals(alphaCoord.getEnd(), "2012-10-01T12:11Z");
+
+        String pathsWithPartitions = getPathsWithPartitions(srcCluster, alphaTrgCluster, fsReplFeed);
+        assertReplCoord(alphaCoord, fsReplFeed, alphaTrgCluster.getName(), pathsWithPartitions);
+
+        List<COORDINATORAPP> betaCoords = builder.getCoordinators(betaTrgCluster, new Path("/beta/falcon/"));
+        final COORDINATORAPP betaCoord = betaCoords.get(0);
+        Assert.assertEquals(betaCoord.getStart(), "2012-10-01T12:10Z");
+        Assert.assertEquals(betaCoord.getEnd(), "2012-10-01T12:26Z");
+
+        pathsWithPartitions = getPathsWithPartitions(srcCluster, betaTrgCluster, fsReplFeed);
+        assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster.getName(), pathsWithPartitions);
+    }
+
+    private String getPathsWithPartitions(Cluster sourceCluster, Cluster targetCluster,
+                                          Feed aFeed) throws FalconException {
+        String srcPart = FeedHelper.normalizePartitionExpression(
+                FeedHelper.getCluster(aFeed, sourceCluster.getName()).getPartition());
+        srcPart = FeedHelper.evaluateClusterExp(sourceCluster, srcPart);
+        String targetPart = FeedHelper.normalizePartitionExpression(
+                FeedHelper.getCluster(aFeed, targetCluster.getName()).getPartition());
+        targetPart = FeedHelper.evaluateClusterExp(targetCluster, targetPart);
+
+        StringBuilder pathsWithPartitions = new StringBuilder();
+        pathsWithPartitions.append("${coord:dataIn('input')}/")
+                .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
+
+        String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
+        parts = StringUtils.stripEnd(parts, "/");
+        return parts;
+    }
+
+    private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, String clusterName,
+                                 String pathsWithPartitions) throws JAXBException, IOException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(aFeed, clusterName);
+        Date startDate = feedCluster.getValidity().getStart();
+        Assert.assertEquals(coord.getStart(), SchemaHelper.formatDateUTC(startDate));
+
+        Date endDate = feedCluster.getValidity().getEnd();
+        Assert.assertEquals(coord.getEnd(), SchemaHelper.formatDateUTC(endDate));
+
+        WORKFLOWAPP workflow = getWorkflowapp(coord);
+        assertWorkflowDefinition(fsReplFeed, workflow);
+
+        List<Object> actions = workflow.getDecisionOrForkOrJoin();
+        System.out.println("actions = " + actions);
+
+        ACTION replicationActionNode = (ACTION) actions.get(4);
+        Assert.assertEquals(replicationActionNode.getName(), "replication");
+
+        JAVA replication = replicationActionNode.getJava();
+        List<String> args = replication.getArg();
+        Assert.assertEquals(args.size(), 11);
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
+        Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}/" + srcCluster.getColo());
+        Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
+        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
+        Assert.assertEquals(props.get("maxMaps"), "33");
+    }
+
+    public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP parentWorkflow) {
+        Assert.assertEquals(EntityUtil.getWorkflowName(Tag.REPLICATION, aFeed).toString(), parentWorkflow.getName());
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+        Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
+        Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
+        Assert.assertEquals("replication-decision", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
+        Assert.assertEquals("table-export", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
+        Assert.assertEquals("replication", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
+        Assert.assertEquals("post-replication-decision", ((DECISION) decisionOrForkOrJoin.get(5)).getName());
+        Assert.assertEquals("table-import", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
+        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
+        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(8)).getName());
+    }
+
+    @Test
+    public void testReplicationCoordsForTableStorage() throws Exception {
+        OozieWorkflowBuilder builder = new OozieFeedWorkflowBuilder(tableFeed);
+        List<COORDINATORAPP> coords = builder.getCoordinators(
+                trgCluster, new Path("/projects/falcon/"));
+        COORDINATORAPP coord = coords.get(0);
+
+        Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
+        Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION",
+                coord.getAction().getWorkflow().getAppPath());
+        Assert.assertEquals("FALCON_FEED_REPLICATION_" + tableFeed.getName() + "_"
+                + srcCluster.getName(), coord.getName());
+        Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
+
+        SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
+                .getDatasetOrAsyncDataset().get(0);
+        Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
+        Assert.assertEquals("input-dataset", inputDataset.getName());
+
+        String sourceRegistry = ClusterHelper.getInterface(srcCluster, Interfacetype.REGISTRY).getEndpoint();
+        sourceRegistry = sourceRegistry.replace("thrift", "hcat");
+        Assert.assertEquals(inputDataset.getUriTemplate(),
+                sourceRegistry + "/source_db/source_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
+
+        SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
+                .getDatasetOrAsyncDataset().get(1);
+        Assert.assertEquals(outputDataset.getFrequency(), "${coord:minutes(20)}");
+        Assert.assertEquals("output-dataset", outputDataset.getName());
+
+        String targetRegistry = ClusterHelper.getInterface(trgCluster, Interfacetype.REGISTRY).getEndpoint();
+        targetRegistry = targetRegistry.replace("thrift", "hcat");
+        Assert.assertEquals(outputDataset.getUriTemplate(),
+                targetRegistry + "/target_db/target_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
+
+        String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
+        String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
+        String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
+        Assert.assertEquals("input", inEventName);
+        Assert.assertEquals("input-dataset", inEventDataset);
+        Assert.assertEquals("${now(0,-40)}", inEventInstance);
+
+        String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
+        Assert.assertEquals("${now(0,-40)}", outEventInstance);
+
+        // assert FS staging area
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        final FileSystem fs = trgMiniDFS.getFileSystem();
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts")));
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-export.hql")));
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-import.hql")));
+
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf")));
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-source-hive-site.xml")));
+        Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-target-hive-site.xml")));
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
+        final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster, tableFeed);
+
+        // verify the replication param that feed replicator depends on
+        Assert.assertEquals(props.get("sourceRelativePaths"), "IGNORE");
+
+        Assert.assertTrue(props.containsKey("distcpSourcePaths"));
+        Assert.assertEquals(props.get("distcpSourcePaths"),
+                FeedHelper.getStagingDir(srcCluster, tableFeed, srcStorage, Tag.REPLICATION)
+                        + "/ds=${coord:dataOutPartitionValue('output', 'ds')}/"
+                        + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/data");
+
+        Assert.assertTrue(props.containsKey("distcpTargetPaths"));
+        Assert.assertEquals(props.get("distcpTargetPaths"),
+                FeedHelper.getStagingDir(trgCluster, tableFeed, trgStorage, Tag.REPLICATION)
+                        + "/ds=${coord:dataOutPartitionValue('output', 'ds')}/"
+                        + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/data");
+
+        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.TABLE.name());
+
+        // verify table props
+        assertTableStorageProperties(srcCluster, srcStorage, props, "falconSource");
+        assertTableStorageProperties(trgCluster, trgStorage, props, "falconTarget");
+
+        // verify the late data params
+        Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
+        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+    }
+
+    private void assertTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
+                                              Map<String, String> props, String prefix) {
+        Assert.assertEquals(props.get(prefix + "NameNode"), ClusterHelper.getStorageUrl(cluster));
+        Assert.assertEquals(props.get(prefix + "JobTracker"), ClusterHelper.getMREndPoint(cluster));
+        Assert.assertEquals(props.get(prefix + "HcatNode"), tableStorage.getCatalogUrl());
+
+        Assert.assertEquals(props.get(prefix + "Database"), tableStorage.getDatabase());
+        Assert.assertEquals(props.get(prefix + "Table"), tableStorage.getTable());
+        Assert.assertEquals(props.get(prefix + "Partition"), "${coord:dataInPartitionFilter('input', 'hive')}");
+    }
+
+    @Test
+    public void testRetentionCoords() throws FalconException, JAXBException, IOException {
+        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName());
+        final Calendar instance = Calendar.getInstance();
+        instance.roll(Calendar.YEAR, 1);
+        cluster.getValidity().setEnd(instance.getTime());
+
+        OozieWorkflowBuilder builder = new OozieFeedWorkflowBuilder(feed);
+        List<COORDINATORAPP> coords = builder.getCoordinators(srcCluster, new Path("/projects/falcon/"));
+        COORDINATORAPP coord = coords.get(0);
+
+        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
+        Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
+        Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        String feedDataPath = props.get("feedDataPath");
+        String storageType = props.get("falconFeedStorageType");
+
+        // verify the param that feed evictor depends on
+        Assert.assertEquals(storageType, Storage.TYPE.FILESYSTEM.name());
+
+        final Storage storage = FeedHelper.createStorage(cluster, feed);
+        if (feedDataPath != null) {
+            Assert.assertEquals(feedDataPath, storage.getUriTemplate()
+                    .replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+        }
+
+        if (storageType != null) {
+            Assert.assertEquals(storageType, storage.getType().name());
+        }
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), feed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
+
+        assertWorkflowRetries(coord);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
deleted file mode 100644
index f443939..0000000
--- a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.converter;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.FalconRuntimException;
-import org.apache.falcon.Tag;
-import org.apache.commons.io.IOUtils;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.ExternalId;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.oozie.bundle.BUNDLEAPP;
-import org.apache.falcon.oozie.bundle.COORDINATOR;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.ObjectFactory;
-import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.security.SecurityUtil;
-import org.apache.falcon.service.FalconPathFilter;
-import org.apache.falcon.service.SharedLibraryHostingService;
-import org.apache.falcon.util.RuntimeProperties;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.OozieClient;
-
-import javax.xml.bind.*;
-import java.io.*;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Map.Entry;
-import java.util.Set;
-
-/**
- * Entity mapper base class that allows an entity to be mapped to oozie bundle.
- * @param <T>
- */
-public abstract class AbstractOozieEntityMapper<T extends Entity> {
-
-    private static final Logger LOG = Logger.getLogger(AbstractOozieEntityMapper.class);
-
-    protected static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}";
-
-    protected static final String ACTUAL_TIME_EL = "${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}";
-    protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
-    protected static final String MR_QUEUE_NAME = "queueName";
-    protected static final String MR_JOB_PRIORITY = "jobPriority";
-
-    protected static final JAXBContext WORKFLOW_JAXB_CONTEXT;
-    protected static final JAXBContext COORD_JAXB_CONTEXT;
-    protected static final JAXBContext BUNDLE_JAXB_CONTEXT;
-    protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
-    public static final Set<String> FALCON_ACTIONS = new HashSet<String>(Arrays.asList(new String[] { "recordsize",
-        "succeeded-post-processing", "failed-post-processing", }));
-
-    protected static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
-        @Override
-        public boolean accept(Path path) {
-            return path.getName().startsWith("falcon");
-        }
-
-        @Override
-        public String getJarName(Path path) {
-            String name = path.getName();
-            if (name.endsWith(".jar")) {
-                name = name.substring(0, name.indexOf(".jar"));
-            }
-            return name;
-        }
-    };
-
-    static {
-        try {
-            WORKFLOW_JAXB_CONTEXT = JAXBContext.newInstance(WORKFLOWAPP.class);
-            COORD_JAXB_CONTEXT = JAXBContext.newInstance(COORDINATORAPP.class);
-            BUNDLE_JAXB_CONTEXT = JAXBContext.newInstance(BUNDLEAPP.class);
-            HIVE_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(
-                    org.apache.falcon.oozie.hive.ACTION.class.getPackage().getName());
-        } catch (JAXBException e) {
-            throw new RuntimeException("Unable to create JAXB context", e);
-        }
-    }
-
-    private final T entity;
-
-    protected AbstractOozieEntityMapper(T entity) {
-        this.entity = entity;
-    }
-
-    protected T getEntity() {
-        return entity;
-    }
-
-    protected Path getCoordPath(Path bundlePath, String coordName) {
-        Tag tag = EntityUtil.getWorkflowNameTag(coordName, getEntity());
-        return new Path(bundlePath, tag.name());
-    }
-
-    protected abstract Map<String, String> getEntityProperties();
-
-    public boolean map(Cluster cluster, Path bundlePath) throws FalconException {
-        BUNDLEAPP bundleApp = new BUNDLEAPP();
-        bundleApp.setName(EntityUtil.getWorkflowName(entity).toString());
-        // all the properties are set prior to bundle and coordinators creation
-
-        List<COORDINATORAPP> coordinators = getCoordinators(cluster, bundlePath);
-        if (coordinators.size() == 0) {
-            return false;
-        }
-        for (COORDINATORAPP coordinatorapp : coordinators) {
-            Path coordPath = getCoordPath(bundlePath, coordinatorapp.getName());
-            String coordXmlName = marshal(cluster, coordinatorapp, coordPath,
-                    EntityUtil.getWorkflowNameSuffix(coordinatorapp.getName(), entity));
-            createLogsDir(cluster, coordPath);
-            COORDINATOR bundleCoord = new COORDINATOR();
-            bundleCoord.setName(coordinatorapp.getName());
-            bundleCoord.setAppPath(getStoragePath(coordPath) + "/" + coordXmlName);
-            bundleApp.getCoordinator().add(bundleCoord);
-
-            copySharedLibs(cluster, coordPath);
-        }
-
-        marshal(cluster, bundleApp, bundlePath);
-        return true;
-    }
-
-    private void addExtensionJars(FileSystem fs, Path path, WORKFLOWAPP wf) throws IOException {
-        FileStatus[] libs = null;
-        try {
-            libs = fs.listStatus(path);
-        } catch(FileNotFoundException ignore) {
-            //Ok if the libext is not configured
-        }
-
-        if (libs == null) {
-            return;
-        }
-
-        for(FileStatus lib : libs) {
-            if (lib.isDir()) {
-                continue;
-            }
-
-            for(Object obj: wf.getDecisionOrForkOrJoin()) {
-                if (!(obj instanceof ACTION)) {
-                    continue;
-                }
-                ACTION action = (ACTION) obj;
-                List<String> files = null;
-                if (action.getJava() != null) {
-                    files = action.getJava().getFile();
-                } else if (action.getPig() != null) {
-                    files = action.getPig().getFile();
-                } else if (action.getMapReduce() != null) {
-                    files = action.getMapReduce().getFile();
-                }
-                if (files != null) {
-                    files.add(lib.getPath().toString());
-                }
-            }
-        }
-    }
-
-    protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, EntityType type, String lifecycle)
-        throws IOException, FalconException {
-        String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
-        addExtensionJars(fs, new Path(libext), wf);
-        addExtensionJars(fs, new Path(libext, type.name()), wf);
-        if (StringUtils.isNotEmpty(lifecycle)) {
-            addExtensionJars(fs, new Path(libext, type.name() + "/" + lifecycle), wf);
-        }
-    }
-
-    private void copySharedLibs(Cluster cluster, Path coordPath) throws FalconException {
-        try {
-            Path libPath = new Path(coordPath, "lib");
-            SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
-                    libPath, cluster, FALCON_JAR_FILTER);
-        } catch (IOException e) {
-            throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);
-        }
-    }
-
-    protected abstract List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException;
-
-    protected org.apache.falcon.oozie.coordinator.CONFIGURATION getCoordConfig(Map<String, String> propMap) {
-        org.apache.falcon.oozie.coordinator.CONFIGURATION conf
-            = new org.apache.falcon.oozie.coordinator.CONFIGURATION();
-        List<org.apache.falcon.oozie.coordinator.CONFIGURATION.Property> props = conf.getProperty();
-        for (Entry<String, String> prop : propMap.entrySet()) {
-            props.add(createCoordProperty(prop.getKey(), prop.getValue()));
-        }
-        return conf;
-    }
-
-    protected Map<String, String> createCoordDefaultConfiguration(Cluster cluster, Path coordPath, String coordName) {
-        Map<String, String> props = new HashMap<String, String>();
-        props.put(ARG.entityName.getPropName(), entity.getName());
-        props.put(ARG.nominalTime.getPropName(), NOMINAL_TIME_EL);
-        props.put(ARG.timeStamp.getPropName(), ACTUAL_TIME_EL);
-        props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster));
-        props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster));
-        String falconBrokerUrl = StartupProperties.get().getProperty(ARG.brokerUrl.getPropName(),
-                "tcp://localhost:61616?daemon=true");
-        props.put(ARG.brokerUrl.getPropName(), falconBrokerUrl);
-        String falconBrokerImplClass = StartupProperties.get().getProperty(ARG.brokerImplClass.getPropName(),
-                ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
-        props.put(ARG.brokerImplClass.getPropName(), falconBrokerImplClass);
-        String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
-                DEFAULT_BROKER_MSG_TTL.toString());
-        props.put(ARG.brokerTTL.getPropName(), jmsMessageTTL);
-        props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
-        props.put("logDir", getStoragePath(new Path(coordPath, "../../logs")));
-        props.put(OozieClient.EXTERNAL_ID,
-                new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
-                        "${coord:nominalTime()}").getId());
-        props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster));
-        try {
-            if (EntityUtil.getLateProcess(entity) == null
-                    || EntityUtil.getLateProcess(entity).getLateInputs() == null
-                    || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
-                props.put("shouldRecord", "false");
-            } else {
-                props.put("shouldRecord", "true");
-            }
-        } catch (FalconException e) {
-            LOG.error("Unable to get Late Process for entity:" + entity, e);
-            throw new FalconRuntimException(e);
-        }
-        props.put("entityName", entity.getName());
-        props.put("entityType", entity.getEntityType().name().toLowerCase());
-        props.put(ARG.cluster.getPropName(), cluster.getName());
-        if (cluster.getProperties() != null) {
-            for (Property prop : cluster.getProperties().getProperties()) {
-                props.put(prop.getName(), prop.getValue());
-            }
-        }
-
-        props.put(MR_QUEUE_NAME, "default");
-        props.put(MR_JOB_PRIORITY, "NORMAL");
-        //props in entity override the set props.
-        props.putAll(getEntityProperties());
-        return props;
-    }
-
-    protected org.apache.falcon.oozie.coordinator.CONFIGURATION.Property createCoordProperty(String name,
-                                                                                             String value) {
-        org.apache.falcon.oozie.coordinator.CONFIGURATION.Property prop
-            = new org.apache.falcon.oozie.coordinator.CONFIGURATION.Property();
-        prop.setName(name);
-        prop.setValue(value);
-        return prop;
-    }
-
-    protected void marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
-        throws FalconException {
-        try {
-            Marshaller marshaller = jaxbContext.createMarshaller();
-            marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(
-                    outPath.toUri(), ClusterHelper.getConfiguration(cluster));
-            OutputStream out = fs.create(outPath);
-            try {
-                marshaller.marshal(jaxbElement, out);
-            } finally {
-                out.close();
-            }
-            if (LOG.isDebugEnabled()) {
-                StringWriter writer = new StringWriter();
-                marshaller.marshal(jaxbElement, writer);
-                LOG.debug("Writing definition to " + outPath + " on cluster " + cluster.getName());
-                LOG.debug(writer.getBuffer());
-            }
-
-            LOG.info("Marshalled " + jaxbElement.getDeclaredType() + " to " + outPath);
-        } catch (Exception e) {
-            throw new FalconException("Unable to marshall app object", e);
-        }
-    }
-
-    private void createLogsDir(Cluster cluster, Path coordPath) throws FalconException {
-        try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(
-                    coordPath.toUri(), ClusterHelper.getConfiguration(cluster));
-            Path logsDir = new Path(coordPath, "../../logs");
-            fs.mkdirs(logsDir);
-
-            // logs are copied with in oozie as the user in Post Processing and hence 777 permissions
-            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-            fs.setPermission(logsDir, permission);
-        } catch (Exception e) {
-            throw new FalconException("Unable to create temp dir in " + coordPath, e);
-        }
-    }
-
-    protected String marshal(Cluster cluster, COORDINATORAPP coord, Path outPath, String name) throws FalconException {
-        if (StringUtils.isEmpty(name)) {
-            name = "coordinator";
-        }
-        name = name + ".xml";
-        marshal(cluster, new ObjectFactory().createCoordinatorApp(coord), COORD_JAXB_CONTEXT, new Path(outPath, name));
-        return name;
-    }
-
-    protected void marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
-
-        marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
-                BUNDLE_JAXB_CONTEXT,
-                new Path(outPath, "bundle.xml"));
-    }
-
-    protected void marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
-
-        marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
-                WORKFLOW_JAXB_CONTEXT,
-                new Path(outPath, "workflow.xml"));
-    }
-
-    protected String getStoragePath(Path path) {
-        if (path != null) {
-            return getStoragePath(path.toString());
-        }
-        return null;
-    }
-
-    protected String getStoragePath(String path) {
-        if (StringUtils.isNotEmpty(path)) {
-            if (new Path(path).toUri().getScheme() == null) {
-                path = "${nameNode}" + path;
-            }
-        }
-        return path;
-    }
-
-    protected WORKFLOWAPP getWorkflowTemplate(String template) throws FalconException {
-        InputStream resourceAsStream = null;
-        try {
-            resourceAsStream = AbstractOozieEntityMapper.class.getResourceAsStream(template);
-            Unmarshaller unmarshaller = WORKFLOW_JAXB_CONTEXT.createUnmarshaller();
-            @SuppressWarnings("unchecked")
-            JAXBElement<WORKFLOWAPP> jaxbElement = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
-                    resourceAsStream);
-            return jaxbElement.getValue();
-        } catch (JAXBException e) {
-            throw new FalconException(e);
-        } finally {
-            IOUtils.closeQuietly(resourceAsStream);
-        }
-    }
-
-    protected COORDINATORAPP getCoordinatorTemplate(String template) throws FalconException {
-        InputStream resourceAsStream = null;
-        try {
-            resourceAsStream = AbstractOozieEntityMapper.class.getResourceAsStream(template);
-            Unmarshaller unmarshaller = COORD_JAXB_CONTEXT.createUnmarshaller();
-            @SuppressWarnings("unchecked")
-            JAXBElement<COORDINATORAPP> jaxbElement = (JAXBElement<COORDINATORAPP>)
-                    unmarshaller.unmarshal(resourceAsStream);
-            return jaxbElement.getValue();
-        } catch (JAXBException e) {
-            throw new FalconException(e);
-        } finally {
-            IOUtils.closeQuietly(resourceAsStream);
-        }
-    }
-
-    protected void createHiveConf(FileSystem fs, Path confPath, String metastoreUrl,
-                                  Cluster cluster, String prefix) throws IOException {
-        Configuration hiveConf = new Configuration(false);
-        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUrl);
-        hiveConf.set("hive.metastore.local", "false");
-
-        if (UserGroupInformation.isSecurityEnabled()) {
-            hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
-                    ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL));
-            hiveConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
-        }
-
-        OutputStream out = null;
-        try {
-            out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
-            hiveConf.writeXml(out);
-        } finally {
-            IOUtils.closeQuietly(out);
-        }
-    }
-
-    protected void decorateWithOozieRetries(ACTION action) {
-        Properties props = RuntimeProperties.get();
-        action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
-        action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
index 2f53370..9e1c82d 100644
--- a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
+++ b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
@@ -17,8 +17,20 @@
  */
 package org.apache.falcon.util;
 
+import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.hive.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.xerces.dom.ElementNSImpl;
+import org.w3c.dom.Document;
 
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.dom.DOMResult;
 import java.io.ByteArrayInputStream;
 import java.util.Map;
 import java.util.Properties;
@@ -27,6 +39,23 @@ import java.util.Properties;
  * Help methods relating to oozie configuration.
  */
 public final class OozieUtils {
+    public static final JAXBContext WORKFLOW_JAXB_CONTEXT;
+    public static final JAXBContext COORD_JAXB_CONTEXT;
+    public static final JAXBContext BUNDLE_JAXB_CONTEXT;
+    protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
+
+    static {
+        try {
+            WORKFLOW_JAXB_CONTEXT = JAXBContext.newInstance(WORKFLOWAPP.class);
+            COORD_JAXB_CONTEXT = JAXBContext.newInstance(COORDINATORAPP.class);
+            BUNDLE_JAXB_CONTEXT = JAXBContext.newInstance(BUNDLEAPP.class);
+            HIVE_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(
+                org.apache.falcon.oozie.hive.ACTION.class.getPackage().getName());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXB context", e);
+        }
+    }
+
 
     private OozieUtils() {}
 
@@ -39,4 +68,29 @@ public final class OozieUtils {
         }
         return jobprops;
     }
+
+    @SuppressWarnings("unchecked")
+    public static  JAXBElement<ACTION> unMarshalHiveAction(org.apache.falcon.oozie.workflow.ACTION wfAction) {
+        try {
+            Unmarshaller unmarshaller = HIVE_ACTION_JAXB_CONTEXT.createUnmarshaller();
+            unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
+            return (JAXBElement<org.apache.falcon.oozie.hive.ACTION>)
+                unmarshaller.unmarshal((ElementNSImpl) wfAction.getAny());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to unmarshall hive action.", e);
+        }
+    }
+
+    public static  void marshalHiveAction(org.apache.falcon.oozie.workflow.ACTION wfAction,
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionjaxbElement) {
+        try {
+            DOMResult hiveActionDOM = new DOMResult();
+            Marshaller marshaller = HIVE_ACTION_JAXB_CONTEXT.createMarshaller();
+            marshaller.marshal(actionjaxbElement, hiveActionDOM);
+            wfAction.setAny(((Document) hiveActionDOM.getNode()).getDocumentElement());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to marshall hive action.", e);
+        }
+    }
+
 }


[5/5] git commit: FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. Contributed by Shwetha GS

Posted by sh...@apache.org.
FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. Contributed by Shwetha GS


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/e2545b08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/e2545b08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/e2545b08

Branch: refs/heads/master
Commit: e2545b0874d206f3be88d4b3ac7003eae1161c44
Parents: 5e43521
Author: Shwetha GS <sh...@gmail.com>
Authored: Tue Mar 18 17:10:54 2014 +0530
Committer: Shwetha GS <sh...@gmail.com>
Committed: Tue Mar 18 17:10:54 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/falcon/util/ReflectionUtils.java |  33 +-
 .../apache/falcon/workflow/WorkflowBuilder.java |  20 +-
 .../apache/falcon/util/ReflectionUtilsTest.java |  49 ++
 .../falcon/converter/OozieFeedMapper.java       | 596 -------------
 .../workflow/OozieFeedWorkflowBuilder.java      | 594 ++++++++++++-
 .../falcon/converter/OozieFeedMapperTest.java   | 505 -----------
 .../converter/OozieFeedWorkflowBuilderTest.java | 505 +++++++++++
 .../converter/AbstractOozieEntityMapper.java    | 428 ----------
 .../java/org/apache/falcon/util/OozieUtils.java |  54 ++
 .../falcon/workflow/OozieWorkflowBuilder.java   | 370 ++++++++
 .../workflow/engine/OozieWorkflowEngine.java    |  15 +-
 .../falcon/converter/OozieProcessMapper.java    | 833 -------------------
 .../workflow/OozieProcessWorkflowBuilder.java   | 822 +++++++++++++++++-
 .../OozieProcessMapperLateProcessTest.java      |  96 ---
 .../converter/OozieProcessMapperTest.java       | 557 -------------
 .../OozieProcessWorkflowBuilderTest.java        | 559 +++++++++++++
 .../falcon/retention/FeedEvictorTest.java       |   2 +-
 18 files changed, 2936 insertions(+), 3104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5748d27..f3ddf96 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,8 @@ Trunk (Unreleased)
     (Venkatesh Seetharam)
    
   IMPROVEMENTS
+    FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. (Shwetha GS)
+
     FALCON-355 Remove SLAMonitoringService. (Shwetha GS)
 
     FALCON-333 jsp-api dependency is defined twice. (Jean-Baptiste

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java b/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
index 4a00fa9..80022e0 100644
--- a/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
+++ b/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
@@ -20,6 +20,7 @@ package org.apache.falcon.util;
 
 import org.apache.falcon.FalconException;
 
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 
 /**
@@ -30,12 +31,11 @@ public final class ReflectionUtils {
     private ReflectionUtils() {}
 
     public static <T> T getInstance(String classKey) throws FalconException {
-        String clazzName = StartupProperties.get().getProperty(classKey);
-        try {
-            return ReflectionUtils.<T>getInstanceByClassName(clazzName);
-        } catch (FalconException e) {
-            throw new FalconException("Unable to get instance for key: " + classKey, e);
-        }
+        return ReflectionUtils.<T>getInstanceByClassName(StartupProperties.get().getProperty(classKey));
+    }
+
+    public static <T> T getInstance(String classKey, Class<?> argCls, Object arg) throws FalconException {
+        return ReflectionUtils.<T>getInstanceByClassName(StartupProperties.get().getProperty(classKey), argCls, arg);
     }
 
     @SuppressWarnings("unchecked")
@@ -52,4 +52,25 @@ public final class ReflectionUtils {
             throw new FalconException("Unable to get instance for " + clazzName, e);
         }
     }
+
+    /**
+     * Invokes constructor with one argument.
+     * @param clazzName - classname
+     * @param argCls - Class of the argument
+     * @param arg - constructor argument
+     * @param <T> - instance type
+     * @return Class instance
+     * @throws FalconException
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T getInstanceByClassName(String clazzName, Class<?> argCls, Object arg) throws
+        FalconException {
+        try {
+            Class<T> clazz = (Class<T>) ReflectionUtils.class.getClassLoader().loadClass(clazzName);
+            Constructor<T> constructor = clazz.getConstructor(argCls);
+            return constructor.newInstance(arg);
+        } catch (Exception e) {
+            throw new FalconException("Unable to get instance for " + clazzName, e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
index 26243e7..1f9a8c8 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
@@ -22,8 +22,6 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.util.ReflectionUtils;
 
-import java.util.Date;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -32,16 +30,22 @@ import java.util.Properties;
  * @param <T>
  */
 public abstract class WorkflowBuilder<T extends Entity> {
+    protected T entity;
+
+    protected WorkflowBuilder(T entity) {
+        this.entity = entity;
+    }
+
+    public T getEntity() {
+        return  entity;
+    }
 
     public static WorkflowBuilder<Entity> getBuilder(String engine, Entity entity) throws FalconException {
         String classKey = engine + "." + entity.getEntityType().name().toLowerCase() + ".workflow.builder";
-        return ReflectionUtils.getInstance(classKey);
+        return ReflectionUtils.getInstance(classKey, entity.getEntityType().getEntityClass(), entity);
     }
 
-    public abstract Map<String, Properties> newWorkflowSchedule(T entity, List<String> clusters) throws FalconException;
-
-    public abstract Properties newWorkflowSchedule(T entity, Date startDate, String clusterName, String user)
-        throws FalconException;
+    public abstract Map<String, Properties> newWorkflowSchedule(String... clusters) throws FalconException;
 
-    public abstract String[] getWorkflowNames(T entity);
+    public abstract String[] getWorkflowNames();
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/common/src/test/java/org/apache/falcon/util/ReflectionUtilsTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/util/ReflectionUtilsTest.java b/common/src/test/java/org/apache/falcon/util/ReflectionUtilsTest.java
new file mode 100644
index 0000000..bc0bce0
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/util/ReflectionUtilsTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.parser.ClusterEntityParser;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Tests ReflectionUtils.
+ */
+@Test
+public class ReflectionUtilsTest {
+    public void testGetInstance() throws FalconException {
+        //with 1 arg constructor, arg null
+        Object e = ReflectionUtils.getInstanceByClassName("org.apache.falcon.FalconException", Throwable.class, null);
+        Assert.assertTrue(e instanceof  FalconException);
+
+        //with 1 arg constructor, arg not null
+        e = ReflectionUtils.getInstanceByClassName("org.apache.falcon.FalconException", Throwable.class,
+            new Throwable());
+        Assert.assertTrue(e instanceof  FalconException);
+
+        //no constructor, using get() method
+        e = ReflectionUtils.getInstanceByClassName("org.apache.falcon.util.StartupProperties");
+        Assert.assertTrue(e instanceof  StartupProperties);
+
+        //with empty constructor
+        e = ReflectionUtils.getInstanceByClassName("org.apache.falcon.entity.parser.ClusterEntityParser");
+        Assert.assertTrue(e instanceof ClusterEntityParser);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
deleted file mode 100644
index 2b3315f..0000000
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ /dev/null
@@ -1,596 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.converter;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Property;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.messaging.EntityInstanceMessage.EntityOps;
-import org.apache.falcon.oozie.coordinator.ACTION;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.SYNCDATASET;
-import org.apache.falcon.oozie.coordinator.WORKFLOW;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.util.BuildProperties;
-import org.apache.falcon.util.RuntimeProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.*;
-
-/**
- * Mapper which maps feed definition to oozie workflow definitions for
- * replication & retention.
- */
-public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
-
-    private static final Logger LOG = Logger.getLogger(OozieFeedMapper.class);
-
-    private final RetentionOozieWorkflowMapper retentionMapper = new RetentionOozieWorkflowMapper();
-    private final ReplicationOozieWorkflowMapper replicationMapper = new ReplicationOozieWorkflowMapper();
-
-    public OozieFeedMapper(Feed feed) {
-        super(feed);
-    }
-
-    @Override
-    protected List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
-        List<COORDINATORAPP> coords = new ArrayList<COORDINATORAPP>();
-        COORDINATORAPP retentionCoord = getRetentionCoordinator(cluster, bundlePath);
-        if (retentionCoord != null) {
-            coords.add(retentionCoord);
-        }
-        List<COORDINATORAPP> replicationCoords = getReplicationCoordinators(cluster, bundlePath);
-        coords.addAll(replicationCoords);
-        return coords;
-    }
-
-    private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
-
-        Feed feed = getEntity();
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
-
-        if (feedCluster.getValidity().getEnd().before(new Date())) {
-            LOG.warn("Feed Retention is not applicable as Feed's end time for cluster " + cluster.getName()
-                    + " is not in the future");
-            return null;
-        }
-
-        return retentionMapper.getRetentionCoordinator(cluster, bundlePath, feed, feedCluster);
-    }
-
-    private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath)
-        throws FalconException {
-
-        Feed feed = getEntity();
-        List<COORDINATORAPP> replicationCoords = new ArrayList<COORDINATORAPP>();
-
-        if (FeedHelper.getCluster(feed, targetCluster.getName()).getType() == ClusterType.TARGET) {
-            String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, feed).toString();
-            Path basePath = getCoordPath(bundlePath, coordName);
-            replicationMapper.createReplicatonWorkflow(targetCluster, basePath, coordName);
-
-            for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
-                if (feedCluster.getType() == ClusterType.SOURCE) {
-                    COORDINATORAPP coord = replicationMapper.createAndGetCoord(feed,
-                            (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, feedCluster.getName()),
-                            targetCluster, bundlePath);
-
-                    if (coord != null) {
-                        replicationCoords.add(coord);
-                    }
-                }
-            }
-        }
-
-        return replicationCoords;
-    }
-
-    @Override
-    protected Map<String, String> getEntityProperties() {
-        Feed feed = getEntity();
-        Map<String, String> props = new HashMap<String, String>();
-        if (feed.getProperties() != null) {
-            for (Property prop : feed.getProperties().getProperties()) {
-                props.put(prop.getName(), prop.getValue());
-            }
-        }
-        return props;
-    }
-
-    private final class RetentionOozieWorkflowMapper {
-
-        private static final String RETENTION_WF_TEMPLATE = "/config/workflow/retention-workflow.xml";
-
-        private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath, Feed feed,
-                                                       org.apache.falcon.entity.v0.feed.Cluster feedCluster)
-            throws FalconException {
-
-            COORDINATORAPP retentionApp = new COORDINATORAPP();
-            String coordName = EntityUtil.getWorkflowName(Tag.RETENTION, feed).toString();
-            retentionApp.setName(coordName);
-            retentionApp.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
-            retentionApp.setStart(SchemaHelper.formatDateUTC(new Date()));
-            retentionApp.setTimezone(feed.getTimezone().getID());
-            TimeUnit timeUnit = feed.getFrequency().getTimeUnit();
-            if (timeUnit == TimeUnit.hours || timeUnit == TimeUnit.minutes) {
-                retentionApp.setFrequency("${coord:hours(6)}");
-            } else {
-                retentionApp.setFrequency("${coord:days(1)}");
-            }
-
-            Path wfPath = getCoordPath(bundlePath, coordName);
-            retentionApp.setAction(getRetentionWorkflowAction(cluster, wfPath, coordName));
-            return retentionApp;
-        }
-
-        private ACTION getRetentionWorkflowAction(Cluster cluster, Path wfPath, String wfName)
-            throws FalconException {
-            Feed feed = getEntity();
-            ACTION retentionAction = new ACTION();
-            WORKFLOW retentionWorkflow = new WORKFLOW();
-            createRetentionWorkflow(cluster, wfPath, wfName);
-            retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
-
-            Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
-            props.put("timeZone", feed.getTimezone().getID());
-            props.put("frequency", feed.getFrequency().getTimeUnit().name());
-
-            final Storage storage = FeedHelper.createStorage(cluster, feed);
-            props.put("falconFeedStorageType", storage.getType().name());
-
-            String feedDataPath = storage.getUriTemplate();
-            props.put("feedDataPath",
-                    feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
-
-            org.apache.falcon.entity.v0.feed.Cluster feedCluster =
-                    FeedHelper.getCluster(feed, cluster.getName());
-            props.put("limit", feedCluster.getRetention().getLimit().toString());
-
-            props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
-            props.put(ARG.feedNames.getPropName(), feed.getName());
-            props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
-
-            props.put("falconInputFeeds", feed.getName());
-            props.put("falconInPaths", "IGNORE");
-
-            propagateUserWorkflowProperties(props, "eviction");
-
-            retentionWorkflow.setConfiguration(getCoordConfig(props));
-            retentionAction.setWorkflow(retentionWorkflow);
-
-            return retentionAction;
-        }
-
-        private void createRetentionWorkflow(Cluster cluster, Path wfPath, String wfName) throws FalconException {
-            try {
-                WORKFLOWAPP retWfApp = getWorkflowTemplate(RETENTION_WF_TEMPLATE);
-                retWfApp.setName(wfName);
-                addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention");
-                addOozieRetries(retWfApp);
-                marshal(cluster, retWfApp, wfPath);
-            } catch(IOException e) {
-                throw new FalconException("Unable to create retention workflow", e);
-            }
-        }
-    }
-
-    private class ReplicationOozieWorkflowMapper {
-        private static final String MR_MAX_MAPS = "maxMaps";
-
-        private static final int THIRTY_MINUTES = 30 * 60 * 1000;
-
-        private static final String REPLICATION_COORD_TEMPLATE = "/config/coordinator/replication-coordinator.xml";
-        private static final String REPLICATION_WF_TEMPLATE = "/config/workflow/replication-workflow.xml";
-
-        private static final String TIMEOUT = "timeout";
-        private static final String PARALLEL = "parallel";
-
-        private void createReplicatonWorkflow(Cluster cluster, Path wfPath, String wfName)
-            throws FalconException {
-            try {
-                WORKFLOWAPP repWFapp = getWorkflowTemplate(REPLICATION_WF_TEMPLATE);
-                repWFapp.setName(wfName);
-                addLibExtensionsToWorkflow(cluster, repWFapp, EntityType.FEED, "replication");
-                addOozieRetries(repWFapp);
-                marshal(cluster, repWFapp, wfPath);
-            } catch(IOException e) {
-                throw new FalconException("Unable to create replication workflow", e);
-            }
-
-        }
-
-        private COORDINATORAPP createAndGetCoord(Feed feed, Cluster srcCluster, Cluster trgCluster,
-                                                 Path bundlePath) throws FalconException {
-            long replicationDelayInMillis = getReplicationDelayInMillis(feed, srcCluster);
-            Date sourceStartDate = getStartDate(feed, srcCluster, replicationDelayInMillis);
-            Date sourceEndDate = getEndDate(feed, srcCluster);
-
-            Date targetStartDate = getStartDate(feed, trgCluster, replicationDelayInMillis);
-            Date targetEndDate = getEndDate(feed, trgCluster);
-
-            if (noOverlapExists(sourceStartDate, sourceEndDate,
-                    targetStartDate, targetEndDate)) {
-                LOG.warn("Not creating replication coordinator, as the source cluster:"
-                        + srcCluster.getName()
-                        + " and target cluster: "
-                        + trgCluster.getName()
-                        + " do not have overlapping dates");
-                return null;
-            }
-
-            COORDINATORAPP replicationCoord;
-            try {
-                replicationCoord = getCoordinatorTemplate(REPLICATION_COORD_TEMPLATE);
-            } catch (FalconException e) {
-                throw new FalconException("Cannot unmarshall replication coordinator template", e);
-            }
-
-            String coordName = EntityUtil.getWorkflowName(
-                    Tag.REPLICATION, Arrays.asList(srcCluster.getName()), feed).toString();
-            String start = sourceStartDate.after(targetStartDate)
-                    ? SchemaHelper.formatDateUTC(sourceStartDate) : SchemaHelper.formatDateUTC(targetStartDate);
-            String end = sourceEndDate.before(targetEndDate)
-                    ? SchemaHelper.formatDateUTC(sourceEndDate) : SchemaHelper.formatDateUTC(targetEndDate);
-
-            initializeCoordAttributes(replicationCoord, coordName, feed, start, end, replicationDelayInMillis);
-            setCoordControls(feed, replicationCoord);
-
-            final Storage sourceStorage = FeedHelper.createReadOnlyStorage(srcCluster, feed);
-            initializeInputDataSet(feed, srcCluster, replicationCoord, sourceStorage);
-
-            final Storage targetStorage = FeedHelper.createStorage(trgCluster, feed);
-            initializeOutputDataSet(feed, trgCluster, replicationCoord, targetStorage);
-
-            Path wfPath = getCoordPath(bundlePath, coordName);
-            ACTION replicationWorkflowAction = getReplicationWorkflowAction(
-                    srcCluster, trgCluster, wfPath, coordName, sourceStorage, targetStorage);
-            replicationCoord.setAction(replicationWorkflowAction);
-
-            return replicationCoord;
-        }
-
-        private Date getStartDate(Feed feed, Cluster cluster, long replicationDelayInMillis) {
-            Date startDate = FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart();
-            return replicationDelayInMillis == 0 ? startDate : new Date(startDate.getTime() + replicationDelayInMillis);
-        }
-
-        private Date getEndDate(Feed feed, Cluster cluster) {
-            return FeedHelper.getCluster(feed, cluster.getName()).getValidity().getEnd();
-        }
-
-        private boolean noOverlapExists(Date sourceStartDate, Date sourceEndDate,
-                                        Date targetStartDate, Date targetEndDate) {
-            return sourceStartDate.after(targetEndDate) || targetStartDate.after(sourceEndDate);
-        }
-
-        private void initializeCoordAttributes(COORDINATORAPP replicationCoord, String coordName,
-                                               Feed feed, String start, String end, long delayInMillis) {
-            replicationCoord.setName(coordName);
-            replicationCoord.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
-
-            if (delayInMillis > 0) {
-                long delayInMins = -1 * delayInMillis / (1000 * 60);
-                String elExp = "${now(0," + delayInMins + ")}";
-
-                replicationCoord.getInputEvents().getDataIn().get(0).getInstance().set(0, elExp);
-                replicationCoord.getOutputEvents().getDataOut().get(0).setInstance(elExp);
-            }
-
-            replicationCoord.setStart(start);
-            replicationCoord.setEnd(end);
-            replicationCoord.setTimezone(feed.getTimezone().getID());
-        }
-
-        private long getReplicationDelayInMillis(Feed feed, Cluster srcCluster) throws FalconException {
-            Frequency replicationDelay = FeedHelper.getCluster(feed, srcCluster.getName()).getDelay();
-            long delayInMillis=0;
-            if (replicationDelay != null) {
-                delayInMillis = ExpressionHelper.get().evaluate(
-                        replicationDelay.toString(), Long.class);
-            }
-
-            return delayInMillis;
-        }
-
-        private void setCoordControls(Feed feed, COORDINATORAPP replicationCoord) throws FalconException {
-            long frequencyInMillis = ExpressionHelper.get().evaluate(
-                    feed.getFrequency().toString(), Long.class);
-            long timeoutInMillis = frequencyInMillis * 6;
-            if (timeoutInMillis < THIRTY_MINUTES) {
-                timeoutInMillis = THIRTY_MINUTES;
-            }
-
-            Map<String, String> props = getEntityProperties();
-            String timeout = props.get(TIMEOUT);
-            if (timeout!=null) {
-                try{
-                    timeoutInMillis= ExpressionHelper.get().evaluate(timeout, Long.class);
-                } catch (Exception ignore) {
-                    LOG.error("Unable to evaluate timeout:", ignore);
-                }
-            }
-            replicationCoord.getControls().setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
-            replicationCoord.getControls().setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
-
-            String parallelProp = props.get(PARALLEL);
-            int parallel = 1;
-            if (parallelProp != null) {
-                try {
-                    parallel = Integer.parseInt(parallelProp);
-                } catch (NumberFormatException ignore) {
-                    LOG.error("Unable to parse parallel:", ignore);
-                }
-            }
-            replicationCoord.getControls().setConcurrency(String.valueOf(parallel));
-        }
-
-        private void initializeInputDataSet(Feed feed, Cluster srcCluster, COORDINATORAPP replicationCoord,
-                                            Storage sourceStorage) throws FalconException {
-            SYNCDATASET inputDataset = (SYNCDATASET)
-                    replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(0);
-
-            String uriTemplate = sourceStorage.getUriTemplate(LocationType.DATA);
-            if (sourceStorage.getType() == Storage.TYPE.TABLE) {
-                uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
-            }
-            inputDataset.setUriTemplate(uriTemplate);
-
-            setDatasetValues(inputDataset, feed, srcCluster);
-
-            if (feed.getAvailabilityFlag() == null) {
-                inputDataset.setDoneFlag("");
-            } else {
-                inputDataset.setDoneFlag(feed.getAvailabilityFlag());
-            }
-        }
-
-        private void initializeOutputDataSet(Feed feed, Cluster targetCluster, COORDINATORAPP replicationCoord,
-                                             Storage targetStorage) throws FalconException {
-            SYNCDATASET outputDataset = (SYNCDATASET)
-                    replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(1);
-
-            String uriTemplate = targetStorage.getUriTemplate(LocationType.DATA);
-            if (targetStorage.getType() == Storage.TYPE.TABLE) {
-                uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
-            }
-            outputDataset.setUriTemplate(uriTemplate);
-
-            setDatasetValues(outputDataset, feed, targetCluster);
-        }
-
-        private void setDatasetValues(SYNCDATASET dataset, Feed feed, Cluster cluster) {
-            dataset.setInitialInstance(SchemaHelper.formatDateUTC(
-                    FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart()));
-            dataset.setTimezone(feed.getTimezone().getID());
-            dataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
-        }
-
-        private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path wfPath,
-                                                    String wfName, Storage sourceStorage,
-                                                    Storage targetStorage) throws FalconException {
-            ACTION replicationAction = new ACTION();
-            WORKFLOW replicationWF = new WORKFLOW();
-            try {
-                replicationWF.setAppPath(getStoragePath(wfPath.toString()));
-                Feed feed = getEntity();
-
-                Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfPath, wfName);
-                props.put("srcClusterName", srcCluster.getName());
-                props.put("srcClusterColo", srcCluster.getColo());
-                if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
-                    props.put(MR_MAX_MAPS, getDefaultMaxMaps());
-                }
-
-                // the storage type is uniform across source and target feeds for replication
-                props.put("falconFeedStorageType", sourceStorage.getType().name());
-
-                String instancePaths = null;
-                if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) {
-                    String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
-                    instancePaths = pathsWithPartitions;
-
-                    propagateFileSystemCopyProperties(pathsWithPartitions, props);
-                } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
-                    instancePaths = "${coord:dataIn('input')}";
-
-                    final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
-                    propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource");
-                    final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage;
-                    propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
-                    propagateTableCopyProperties(srcCluster, sourceTableStorage,
-                            trgCluster, targetTableStorage, props);
-                    setupHiveConfiguration(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, wfPath);
-                }
-
-                propagateLateDataProperties(feed, instancePaths, sourceStorage.getType().name(), props);
-                propagateUserWorkflowProperties(props, "replication");
-
-                replicationWF.setConfiguration(getCoordConfig(props));
-                replicationAction.setWorkflow(replicationWF);
-
-            } catch (Exception e) {
-                throw new FalconException("Unable to create replication workflow", e);
-            }
-
-            return replicationAction;
-        }
-
-        private String getDefaultMaxMaps() {
-            return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
-        }
-
-        private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster,
-                                              Feed feed) throws FalconException {
-            String srcPart = FeedHelper.normalizePartitionExpression(
-                    FeedHelper.getCluster(feed, srcCluster.getName()).getPartition());
-            srcPart = FeedHelper.evaluateClusterExp(srcCluster, srcPart);
-
-            String targetPart = FeedHelper.normalizePartitionExpression(
-                    FeedHelper.getCluster(feed, trgCluster.getName()).getPartition());
-            targetPart = FeedHelper.evaluateClusterExp(trgCluster, targetPart);
-
-            StringBuilder pathsWithPartitions = new StringBuilder();
-            pathsWithPartitions.append("${coord:dataIn('input')}/")
-                    .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
-
-            String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
-            parts = StringUtils.stripEnd(parts, "/");
-            return parts;
-        }
-
-        private void propagateFileSystemCopyProperties(String pathsWithPartitions,
-                                                       Map<String, String> props) throws FalconException {
-            props.put("sourceRelativePaths", pathsWithPartitions);
-
-            props.put("distcpSourcePaths", "${coord:dataIn('input')}");
-            props.put("distcpTargetPaths", "${coord:dataOut('output')}");
-        }
-
-        private void propagateTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
-                                                     Map<String, String> props, String prefix) {
-            props.put(prefix + "NameNode", ClusterHelper.getStorageUrl(cluster));
-            props.put(prefix + "JobTracker", ClusterHelper.getMREndPoint(cluster));
-            props.put(prefix + "HcatNode", tableStorage.getCatalogUrl());
-
-            props.put(prefix + "Database", tableStorage.getDatabase());
-            props.put(prefix + "Table", tableStorage.getTable());
-            props.put(prefix + "Partition", "${coord:dataInPartitionFilter('input', 'hive')}");
-        }
-
-        private void setupHiveConfiguration(Cluster srcCluster, CatalogStorage sourceStorage,
-                                            Cluster trgCluster, CatalogStorage targetStorage, Path wfPath)
-            throws IOException, FalconException {
-            Configuration conf = ClusterHelper.getConfiguration(trgCluster);
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
-
-            // copy import export scripts to stagingDir
-            Path scriptPath = new Path(wfPath, "scripts");
-            copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-export.hql");
-            copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-import.hql");
-
-            // create hive conf to stagingDir
-            Path confPath = new Path(wfPath + "/conf");
-            createHiveConf(fs, confPath, sourceStorage.getCatalogUrl(), srcCluster, "falcon-source-");
-            createHiveConf(fs, confPath, targetStorage.getCatalogUrl(), trgCluster, "falcon-target-");
-        }
-
-        private void copyHiveScript(FileSystem fs, Path scriptPath,
-                                    String localScriptPath, String scriptName) throws IOException {
-            OutputStream out = null;
-            InputStream in = null;
-            try {
-                out = fs.create(new Path(scriptPath, scriptName));
-                in = OozieFeedMapper.class.getResourceAsStream(localScriptPath + scriptName);
-                IOUtils.copy(in, out);
-            } finally {
-                IOUtils.closeQuietly(in);
-                IOUtils.closeQuietly(out);
-            }
-        }
-
-        private void propagateTableCopyProperties(Cluster srcCluster, CatalogStorage sourceStorage,
-                                                  Cluster trgCluster, CatalogStorage targetStorage,
-                                                  Map<String, String> props) {
-            // create staging dirs for export at source & set it as distcpSourcePaths
-            String sourceDatedPartitionKey = sourceStorage.getDatedPartitionKey();
-            String sourceStagingDir =
-                    FeedHelper.getStagingDir(srcCluster, getEntity(), sourceStorage, Tag.REPLICATION)
-                    + "/" + sourceDatedPartitionKey
-                    + "=${coord:dataOutPartitionValue('output', '" + sourceDatedPartitionKey + "')}";
-            props.put("distcpSourcePaths", sourceStagingDir + "/" + NOMINAL_TIME_EL + "/data");
-
-            // create staging dirs for import at target & set it as distcpTargetPaths
-            String targetDatedPartitionKey = targetStorage.getDatedPartitionKey();
-            String targetStagingDir =
-                    FeedHelper.getStagingDir(trgCluster, getEntity(), targetStorage, Tag.REPLICATION)
-                    + "/" + targetDatedPartitionKey
-                    + "=${coord:dataOutPartitionValue('output', '" + targetDatedPartitionKey + "')}";
-            props.put("distcpTargetPaths", targetStagingDir + "/" + NOMINAL_TIME_EL + "/data");
-
-            props.put("sourceRelativePaths", "IGNORE"); // this will bot be used for Table storage.
-        }
-
-        private void propagateLateDataProperties(Feed feed, String instancePaths,
-                                                 String falconFeedStorageType, Map<String, String> props) {
-            // todo these pairs are the same but used in different context
-            // late data handler - should-record action
-            props.put("falconInputFeeds", feed.getName());
-            props.put("falconInPaths", instancePaths);
-
-            // storage type for each corresponding feed - in this case only one feed is involved
-            // needed to compute usage based on storage type in LateDataHandler
-            props.put("falconInputFeedStorageTypes", falconFeedStorageType);
-
-            // falcon post processing
-            props.put(ARG.feedNames.getPropName(), feed.getName());
-            props.put(ARG.feedInstancePaths.getPropName(), "${coord:dataOut('output')}");
-        }
-    }
-
-    private void addOozieRetries(WORKFLOWAPP workflow) {
-        for (Object object : workflow.getDecisionOrForkOrJoin()) {
-            if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
-                continue;
-            }
-            org.apache.falcon.oozie.workflow.ACTION action = (org.apache.falcon.oozie.workflow.ACTION) object;
-            String actionName = action.getName();
-            if (FALCON_ACTIONS.contains(actionName)) {
-                decorateWithOozieRetries(action);
-            }
-        }
-    }
-
-    private void propagateUserWorkflowProperties(Map<String, String> props, String policy) {
-        props.put("userWorkflowName", policy + "-policy");
-        props.put("userWorkflowEngine", "falcon");
-
-        String version;
-        try {
-            version = BuildProperties.get().getProperty("build.version");
-        } catch (Exception e) {  // unfortunate that this is only available in prism/webapp
-            version = "0.5";
-        }
-        props.put("userWorkflowVersion", version);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
index 5e3a30e..2008c2d 100644
--- a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
+++ b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
@@ -18,60 +18,83 @@
 
 package org.apache.falcon.workflow;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
-import org.apache.falcon.converter.AbstractOozieEntityMapper;
-import org.apache.falcon.converter.OozieFeedMapper;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Property;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.messaging.EntityInstanceMessage.EntityOps;
+import org.apache.falcon.oozie.coordinator.ACTION;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.BuildProperties;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
 
-import java.util.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
 /**
  * Workflow definition builder for feed replication & retention.
  */
 public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
+    private static final Logger LOG = Logger.getLogger(OozieFeedWorkflowBuilder.class);
+
+    public OozieFeedWorkflowBuilder(Feed entity) {
+        super(entity);
+    }
 
     @Override
-    public Map<String, Properties> newWorkflowSchedule(Feed feed, List<String> clusters) throws FalconException {
+    public Map<String, Properties> newWorkflowSchedule(String... clusters) throws FalconException {
         Map<String, Properties> propertiesMap = new HashMap<String, Properties>();
 
         for (String clusterName : clusters) {
-            org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
-            Properties properties = newWorkflowSchedule(feed, feedCluster.getValidity().getStart(), clusterName,
-                    CurrentUser.getUser());
-            if (properties == null) {
-                continue;
+            org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, clusterName);
+            if (!feedCluster.getValidity().getStart().before(feedCluster.getValidity().getEnd())) {
+                LOG.info("feed validity start <= end for cluster " + clusterName + ". Skipping schedule");
+                break;
             }
-            propertiesMap.put(clusterName, properties);
-        }
-        return propertiesMap;
-    }
 
-    @Override
-    public Properties newWorkflowSchedule(Feed feed, Date startDate, String clusterName, String user)
-        throws FalconException {
-
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
-        if (!startDate.before(feedCluster.getValidity().getEnd())) {
-            return null;
-        }
+            Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, feedCluster.getName());
+            Path bundlePath = EntityUtil.getNewStagingPath(cluster, entity);
 
-        Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, feedCluster.getName());
-        Path bundlePath = EntityUtil.getNewStagingPath(cluster, feed);
-        Feed feedClone = (Feed) feed.copy();
-        EntityUtil.setStartDate(feedClone, clusterName, startDate);
-
-        AbstractOozieEntityMapper<Feed> mapper = new OozieFeedMapper(feedClone);
-        if (!mapper.map(cluster, bundlePath)) {
-            return null;
+            if (!map(cluster, bundlePath)) {
+                break;
+            }
+            propertiesMap.put(clusterName, createAppProperties(clusterName, bundlePath, CurrentUser.getUser()));
         }
-        return createAppProperties(clusterName, bundlePath, user);
+        return propertiesMap;
     }
 
     @Override
@@ -82,9 +105,518 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
     }
 
     @Override
-    public String[] getWorkflowNames(Feed entity) {
+    public String[] getWorkflowNames() {
         return new String[]{
                 EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString(),
                 EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString(), };
     }
+
+    private final RetentionOozieWorkflowMapper retentionMapper = new RetentionOozieWorkflowMapper();
+    private final ReplicationOozieWorkflowMapper replicationMapper = new ReplicationOozieWorkflowMapper();
+
+    @Override
+    public List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
+        List<COORDINATORAPP> coords = new ArrayList<COORDINATORAPP>();
+        COORDINATORAPP retentionCoord = getRetentionCoordinator(cluster, bundlePath);
+        if (retentionCoord != null) {
+            coords.add(retentionCoord);
+        }
+        List<COORDINATORAPP> replicationCoords = getReplicationCoordinators(cluster, bundlePath);
+        coords.addAll(replicationCoords);
+        return coords;
+    }
+
+    private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+
+        if (feedCluster.getValidity().getEnd().before(new Date())) {
+            LOG.warn("Feed Retention is not applicable as Feed's end time for cluster " + cluster.getName()
+                + " is not in the future");
+            return null;
+        }
+
+        return retentionMapper.getRetentionCoordinator(cluster, bundlePath, entity, feedCluster);
+    }
+
+    private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath)
+        throws FalconException {
+        List<COORDINATORAPP> replicationCoords = new ArrayList<COORDINATORAPP>();
+
+        if (FeedHelper.getCluster(entity, targetCluster.getName()).getType() == ClusterType.TARGET) {
+            String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString();
+            Path basePath = getCoordPath(bundlePath, coordName);
+            replicationMapper.createReplicatonWorkflow(targetCluster, basePath, coordName);
+
+            for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : entity.getClusters().getClusters()) {
+                if (feedCluster.getType() == ClusterType.SOURCE) {
+                    COORDINATORAPP coord = replicationMapper.createAndGetCoord(entity,
+                        (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, feedCluster.getName()),
+                        targetCluster, bundlePath);
+
+                    if (coord != null) {
+                        replicationCoords.add(coord);
+                    }
+                }
+            }
+        }
+
+        return replicationCoords;
+    }
+
+    @Override
+    protected Map<String, String> getEntityProperties() {
+        Map<String, String> props = new HashMap<String, String>();
+        if (entity.getProperties() != null) {
+            for (Property prop : entity.getProperties().getProperties()) {
+                props.put(prop.getName(), prop.getValue());
+            }
+        }
+        return props;
+    }
+
+    private final class RetentionOozieWorkflowMapper {
+
+        private static final String RETENTION_WF_TEMPLATE = "/config/workflow/retention-workflow.xml";
+
+        private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath, Feed feed,
+            org.apache.falcon.entity.v0.feed.Cluster feedCluster) throws FalconException {
+            COORDINATORAPP retentionApp = new COORDINATORAPP();
+            String coordName = EntityUtil.getWorkflowName(Tag.RETENTION, feed).toString();
+            retentionApp.setName(coordName);
+            retentionApp.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
+            retentionApp.setStart(SchemaHelper.formatDateUTC(new Date()));
+            retentionApp.setTimezone(feed.getTimezone().getID());
+            TimeUnit timeUnit = feed.getFrequency().getTimeUnit();
+            if (timeUnit == TimeUnit.hours || timeUnit == TimeUnit.minutes) {
+                retentionApp.setFrequency("${coord:hours(6)}");
+            } else {
+                retentionApp.setFrequency("${coord:days(1)}");
+            }
+
+            Path wfPath = getCoordPath(bundlePath, coordName);
+            retentionApp.setAction(getRetentionWorkflowAction(cluster, wfPath, coordName));
+            return retentionApp;
+        }
+
+        private ACTION getRetentionWorkflowAction(Cluster cluster, Path wfPath, String wfName)
+            throws FalconException {
+            ACTION retentionAction = new ACTION();
+            WORKFLOW retentionWorkflow = new WORKFLOW();
+            createRetentionWorkflow(cluster, wfPath, wfName);
+            retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
+
+            Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
+            props.put("timeZone", entity.getTimezone().getID());
+            props.put("frequency", entity.getFrequency().getTimeUnit().name());
+
+            final Storage storage = FeedHelper.createStorage(cluster, entity);
+            props.put("falconFeedStorageType", storage.getType().name());
+
+            String feedDataPath = storage.getUriTemplate();
+            props.put("feedDataPath",
+                feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+
+            org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+            props.put("limit", feedCluster.getRetention().getLimit().toString());
+
+            props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
+            props.put(ARG.feedNames.getPropName(), entity.getName());
+            props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
+
+            props.put("falconInputFeeds", entity.getName());
+            props.put("falconInPaths", "IGNORE");
+
+            propagateUserWorkflowProperties(props, "eviction");
+
+            retentionWorkflow.setConfiguration(getCoordConfig(props));
+            retentionAction.setWorkflow(retentionWorkflow);
+            return retentionAction;
+        }
+
+        private void createRetentionWorkflow(Cluster cluster, Path wfPath, String wfName) throws FalconException {
+            try {
+                WORKFLOWAPP retWfApp = getWorkflowTemplate(RETENTION_WF_TEMPLATE);
+                retWfApp.setName(wfName);
+                addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention");
+                addOozieRetries(retWfApp);
+                marshal(cluster, retWfApp, wfPath);
+            } catch(IOException e) {
+                throw new FalconException("Unable to create retention workflow", e);
+            }
+        }
+    }
+
+    private class ReplicationOozieWorkflowMapper {
+        private static final String MR_MAX_MAPS = "maxMaps";
+
+        private static final int THIRTY_MINUTES = 30 * 60 * 1000;
+
+        private static final String REPLICATION_COORD_TEMPLATE = "/config/coordinator/replication-coordinator.xml";
+        private static final String REPLICATION_WF_TEMPLATE = "/config/workflow/replication-workflow.xml";
+
+        private static final String TIMEOUT = "timeout";
+        private static final String PARALLEL = "parallel";
+
+        private void createReplicatonWorkflow(Cluster cluster, Path wfPath, String wfName)
+            throws FalconException {
+            try {
+                WORKFLOWAPP repWFapp = getWorkflowTemplate(REPLICATION_WF_TEMPLATE);
+                repWFapp.setName(wfName);
+                addLibExtensionsToWorkflow(cluster, repWFapp, EntityType.FEED, "replication");
+                addOozieRetries(repWFapp);
+                marshal(cluster, repWFapp, wfPath);
+            } catch(IOException e) {
+                throw new FalconException("Unable to create replication workflow", e);
+            }
+
+        }
+
+        private COORDINATORAPP createAndGetCoord(Feed feed, Cluster srcCluster, Cluster trgCluster,
+            Path bundlePath) throws FalconException {
+            long replicationDelayInMillis = getReplicationDelayInMillis(feed, srcCluster);
+            Date sourceStartDate = getStartDate(feed, srcCluster, replicationDelayInMillis);
+            Date sourceEndDate = getEndDate(feed, srcCluster);
+
+            Date targetStartDate = getStartDate(feed, trgCluster, replicationDelayInMillis);
+            Date targetEndDate = getEndDate(feed, trgCluster);
+
+            if (noOverlapExists(sourceStartDate, sourceEndDate,
+                targetStartDate, targetEndDate)) {
+                LOG.warn("Not creating replication coordinator, as the source cluster:" + srcCluster.getName()
+                    + "and target cluster: " + trgCluster.getName() + " do not have overlapping dates");
+                return null;
+            }
+
+            COORDINATORAPP replicationCoord;
+            try {
+                replicationCoord = getCoordinatorTemplate(REPLICATION_COORD_TEMPLATE);
+            } catch (FalconException e) {
+                throw new FalconException("Cannot unmarshall replication coordinator template", e);
+            }
+
+            String coordName = EntityUtil.getWorkflowName(
+                Tag.REPLICATION, Arrays.asList(srcCluster.getName()), feed).toString();
+            String start = sourceStartDate.after(targetStartDate)
+                ? SchemaHelper.formatDateUTC(sourceStartDate) : SchemaHelper.formatDateUTC(targetStartDate);
+            String end = sourceEndDate.before(targetEndDate)
+                ? SchemaHelper.formatDateUTC(sourceEndDate) : SchemaHelper.formatDateUTC(targetEndDate);
+
+            initializeCoordAttributes(replicationCoord, coordName, feed, start, end, replicationDelayInMillis);
+            setCoordControls(feed, replicationCoord);
+
+            final Storage sourceStorage = FeedHelper.createReadOnlyStorage(srcCluster, feed);
+            initializeInputDataSet(feed, srcCluster, replicationCoord, sourceStorage);
+
+            final Storage targetStorage = FeedHelper.createStorage(trgCluster, feed);
+            initializeOutputDataSet(feed, trgCluster, replicationCoord, targetStorage);
+
+            Path wfPath = getCoordPath(bundlePath, coordName);
+            ACTION replicationWorkflowAction = getReplicationWorkflowAction(
+                srcCluster, trgCluster, wfPath, coordName, sourceStorage, targetStorage);
+            replicationCoord.setAction(replicationWorkflowAction);
+
+            return replicationCoord;
+        }
+
+        private Date getStartDate(Feed feed, Cluster cluster, long replicationDelayInMillis) {
+            Date startDate = FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart();
+            return replicationDelayInMillis == 0 ? startDate : new Date(startDate.getTime() + replicationDelayInMillis);
+        }
+
+        private Date getEndDate(Feed feed, Cluster cluster) {
+            return FeedHelper.getCluster(feed, cluster.getName()).getValidity().getEnd();
+        }
+
+        private boolean noOverlapExists(Date sourceStartDate, Date sourceEndDate,
+            Date targetStartDate, Date targetEndDate) {
+            return sourceStartDate.after(targetEndDate) || targetStartDate.after(sourceEndDate);
+        }
+
+        private void initializeCoordAttributes(COORDINATORAPP replicationCoord, String coordName,
+            Feed feed, String start, String end, long delayInMillis) {
+            replicationCoord.setName(coordName);
+            replicationCoord.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+
+            if (delayInMillis > 0) {
+                long delayInMins = -1 * delayInMillis / (1000 * 60);
+                String elExp = "${now(0," + delayInMins + ")}";
+
+                replicationCoord.getInputEvents().getDataIn().get(0).getInstance().set(0, elExp);
+                replicationCoord.getOutputEvents().getDataOut().get(0).setInstance(elExp);
+            }
+
+            replicationCoord.setStart(start);
+            replicationCoord.setEnd(end);
+            replicationCoord.setTimezone(feed.getTimezone().getID());
+        }
+
+        private long getReplicationDelayInMillis(Feed feed, Cluster srcCluster) throws FalconException {
+            Frequency replicationDelay = FeedHelper.getCluster(feed, srcCluster.getName()).getDelay();
+            long delayInMillis=0;
+            if (replicationDelay != null) {
+                delayInMillis = ExpressionHelper.get().evaluate(
+                    replicationDelay.toString(), Long.class);
+            }
+
+            return delayInMillis;
+        }
+
+        private void setCoordControls(Feed feed, COORDINATORAPP replicationCoord) throws FalconException {
+            long frequencyInMillis = ExpressionHelper.get().evaluate(
+                feed.getFrequency().toString(), Long.class);
+            long timeoutInMillis = frequencyInMillis * 6;
+            if (timeoutInMillis < THIRTY_MINUTES) {
+                timeoutInMillis = THIRTY_MINUTES;
+            }
+
+            Map<String, String> props = getEntityProperties();
+            String timeout = props.get(TIMEOUT);
+            if (timeout!=null) {
+                try{
+                    timeoutInMillis= ExpressionHelper.get().evaluate(timeout, Long.class);
+                } catch (Exception ignore) {
+                    LOG.error("Unable to evaluate timeout:", ignore);
+                }
+            }
+            replicationCoord.getControls().setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
+            replicationCoord.getControls().setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
+
+            String parallelProp = props.get(PARALLEL);
+            int parallel = 1;
+            if (parallelProp != null) {
+                try {
+                    parallel = Integer.parseInt(parallelProp);
+                } catch (NumberFormatException ignore) {
+                    LOG.error("Unable to parse parallel:", ignore);
+                }
+            }
+            replicationCoord.getControls().setConcurrency(String.valueOf(parallel));
+        }
+
+        private void initializeInputDataSet(Feed feed, Cluster srcCluster, COORDINATORAPP replicationCoord,
+            Storage sourceStorage) throws FalconException {
+            SYNCDATASET inputDataset = (SYNCDATASET)
+                replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(0);
+
+            String uriTemplate = sourceStorage.getUriTemplate(LocationType.DATA);
+            if (sourceStorage.getType() == Storage.TYPE.TABLE) {
+                uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+            }
+            inputDataset.setUriTemplate(uriTemplate);
+
+            setDatasetValues(inputDataset, feed, srcCluster);
+
+            if (feed.getAvailabilityFlag() == null) {
+                inputDataset.setDoneFlag("");
+            } else {
+                inputDataset.setDoneFlag(feed.getAvailabilityFlag());
+            }
+        }
+
+        private void initializeOutputDataSet(Feed feed, Cluster targetCluster, COORDINATORAPP replicationCoord,
+            Storage targetStorage) throws FalconException {
+            SYNCDATASET outputDataset = (SYNCDATASET)
+                replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(1);
+
+            String uriTemplate = targetStorage.getUriTemplate(LocationType.DATA);
+            if (targetStorage.getType() == Storage.TYPE.TABLE) {
+                uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+            }
+            outputDataset.setUriTemplate(uriTemplate);
+
+            setDatasetValues(outputDataset, feed, targetCluster);
+        }
+
+        private void setDatasetValues(SYNCDATASET dataset, Feed feed, Cluster cluster) {
+            dataset.setInitialInstance(SchemaHelper.formatDateUTC(
+                FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart()));
+            dataset.setTimezone(feed.getTimezone().getID());
+            dataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+        }
+
+        private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path wfPath,
+            String wfName, Storage sourceStorage,
+            Storage targetStorage) throws FalconException {
+            ACTION replicationAction = new ACTION();
+            WORKFLOW replicationWF = new WORKFLOW();
+            try {
+                replicationWF.setAppPath(getStoragePath(wfPath.toString()));
+                Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfPath, wfName);
+                props.put("srcClusterName", srcCluster.getName());
+                props.put("srcClusterColo", srcCluster.getColo());
+                if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
+                    props.put(MR_MAX_MAPS, getDefaultMaxMaps());
+                }
+
+                // the storage type is uniform across source and target feeds for replication
+                props.put("falconFeedStorageType", sourceStorage.getType().name());
+
+                String instancePaths = null;
+                if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) {
+                    String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, entity);
+                    instancePaths = pathsWithPartitions;
+
+                    propagateFileSystemCopyProperties(pathsWithPartitions, props);
+                } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
+                    instancePaths = "${coord:dataIn('input')}";
+
+                    final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
+                    propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource");
+                    final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage;
+                    propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
+                    propagateTableCopyProperties(srcCluster, sourceTableStorage,
+                        trgCluster, targetTableStorage, props);
+                    setupHiveConfiguration(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, wfPath);
+                }
+
+                propagateLateDataProperties(entity, instancePaths, sourceStorage.getType().name(), props);
+                propagateUserWorkflowProperties(props, "replication");
+
+                replicationWF.setConfiguration(getCoordConfig(props));
+                replicationAction.setWorkflow(replicationWF);
+
+            } catch (Exception e) {
+                throw new FalconException("Unable to create replication workflow", e);
+            }
+
+            return replicationAction;
+        }
+
+        private String getDefaultMaxMaps() {
+            return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
+        }
+
+        private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster,
+            Feed feed) throws FalconException {
+            String srcPart = FeedHelper.normalizePartitionExpression(
+                FeedHelper.getCluster(feed, srcCluster.getName()).getPartition());
+            srcPart = FeedHelper.evaluateClusterExp(srcCluster, srcPart);
+
+            String targetPart = FeedHelper.normalizePartitionExpression(
+                FeedHelper.getCluster(feed, trgCluster.getName()).getPartition());
+            targetPart = FeedHelper.evaluateClusterExp(trgCluster, targetPart);
+
+            StringBuilder pathsWithPartitions = new StringBuilder();
+            pathsWithPartitions.append("${coord:dataIn('input')}/")
+                .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
+
+            String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
+            parts = StringUtils.stripEnd(parts, "/");
+            return parts;
+        }
+
+        private void propagateFileSystemCopyProperties(String pathsWithPartitions,
+            Map<String, String> props) throws FalconException {
+            props.put("sourceRelativePaths", pathsWithPartitions);
+
+            props.put("distcpSourcePaths", "${coord:dataIn('input')}");
+            props.put("distcpTargetPaths", "${coord:dataOut('output')}");
+        }
+
+        private void propagateTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
+            Map<String, String> props, String prefix) {
+            props.put(prefix + "NameNode", ClusterHelper.getStorageUrl(cluster));
+            props.put(prefix + "JobTracker", ClusterHelper.getMREndPoint(cluster));
+            props.put(prefix + "HcatNode", tableStorage.getCatalogUrl());
+
+            props.put(prefix + "Database", tableStorage.getDatabase());
+            props.put(prefix + "Table", tableStorage.getTable());
+            props.put(prefix + "Partition", "${coord:dataInPartitionFilter('input', 'hive')}");
+        }
+
+        private void setupHiveConfiguration(Cluster srcCluster, CatalogStorage sourceStorage,
+            Cluster trgCluster, CatalogStorage targetStorage, Path wfPath)
+            throws IOException, FalconException {
+            Configuration conf = ClusterHelper.getConfiguration(trgCluster);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+
+            // copy import export scripts to stagingDir
+            Path scriptPath = new Path(wfPath, "scripts");
+            copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-export.hql");
+            copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-import.hql");
+
+            // create hive conf to stagingDir
+            Path confPath = new Path(wfPath + "/conf");
+            createHiveConf(fs, confPath, sourceStorage.getCatalogUrl(), srcCluster, "falcon-source-");
+            createHiveConf(fs, confPath, targetStorage.getCatalogUrl(), trgCluster, "falcon-target-");
+        }
+
+        private void copyHiveScript(FileSystem fs, Path scriptPath,
+            String localScriptPath, String scriptName) throws IOException {
+            OutputStream out = null;
+            InputStream in = null;
+            try {
+                out = fs.create(new Path(scriptPath, scriptName));
+                in = OozieFeedWorkflowBuilder.class.getResourceAsStream(localScriptPath + scriptName);
+                IOUtils.copy(in, out);
+            } finally {
+                IOUtils.closeQuietly(in);
+                IOUtils.closeQuietly(out);
+            }
+        }
+
+        private void propagateTableCopyProperties(Cluster srcCluster, CatalogStorage sourceStorage,
+            Cluster trgCluster, CatalogStorage targetStorage,
+            Map<String, String> props) {
+            // create staging dirs for export at source & set it as distcpSourcePaths
+            String sourceDatedPartitionKey = sourceStorage.getDatedPartitionKey();
+            String sourceStagingDir =
+                FeedHelper.getStagingDir(srcCluster, entity, sourceStorage, Tag.REPLICATION)
+                    + "/" + sourceDatedPartitionKey
+                    + "=${coord:dataOutPartitionValue('output', '" + sourceDatedPartitionKey + "')}";
+            props.put("distcpSourcePaths", sourceStagingDir + "/" + NOMINAL_TIME_EL + "/data");
+
+            // create staging dirs for import at target & set it as distcpTargetPaths
+            String targetDatedPartitionKey = targetStorage.getDatedPartitionKey();
+            String targetStagingDir =
+                FeedHelper.getStagingDir(trgCluster, entity, targetStorage, Tag.REPLICATION)
+                    + "/" + targetDatedPartitionKey
+                    + "=${coord:dataOutPartitionValue('output', '" + targetDatedPartitionKey + "')}";
+            props.put("distcpTargetPaths", targetStagingDir + "/" + NOMINAL_TIME_EL + "/data");
+
+            props.put("sourceRelativePaths", "IGNORE"); // this will bot be used for Table storage.
+        }
+
+        private void propagateLateDataProperties(Feed feed, String instancePaths,
+            String falconFeedStorageType, Map<String, String> props) {
+            // todo these pairs are the same but used in different context
+            // late data handler - should-record action
+            props.put("falconInputFeeds", feed.getName());
+            props.put("falconInPaths", instancePaths);
+
+            // storage type for each corresponding feed - in this case only one feed is involved
+            // needed to compute usage based on storage type in LateDataHandler
+            props.put("falconInputFeedStorageTypes", falconFeedStorageType);
+
+            // falcon post processing
+            props.put(ARG.feedNames.getPropName(), feed.getName());
+            props.put(ARG.feedInstancePaths.getPropName(), "${coord:dataOut('output')}");
+        }
+    }
+
+    private void addOozieRetries(WORKFLOWAPP workflow) {
+        for (Object object : workflow.getDecisionOrForkOrJoin()) {
+            if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+                continue;
+            }
+            org.apache.falcon.oozie.workflow.ACTION action = (org.apache.falcon.oozie.workflow.ACTION) object;
+            String actionName = action.getName();
+            if (FALCON_ACTIONS.contains(actionName)) {
+                decorateWithOozieRetries(action);
+            }
+        }
+    }
+
+    private void propagateUserWorkflowProperties(Map<String, String> props, String policy) {
+        props.put("userWorkflowName", policy + "-policy");
+        props.put("userWorkflowEngine", "falcon");
+
+        String version;
+        try {
+            version = BuildProperties.get().getProperty("build.version");
+        } catch (Exception e) {  // unfortunate that this is only available in prism/webapp
+            version = "0.5";
+        }
+        props.put("userWorkflowVersion", version);
+    }
 }


[2/5] FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. Contributed by Shwetha GS

Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 4e5e8c6..c31842b 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -21,41 +21,106 @@ package org.apache.falcon.workflow;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
-import org.apache.falcon.converter.OozieProcessMapper;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.EngineType;
 import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.oozie.coordinator.CONTROLS;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.DATAIN;
+import org.apache.falcon.oozie.coordinator.DATAOUT;
+import org.apache.falcon.oozie.coordinator.DATASETS;
+import org.apache.falcon.oozie.coordinator.INPUTEVENTS;
+import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.DELETE;
+import org.apache.falcon.oozie.workflow.PIG;
+import org.apache.falcon.oozie.workflow.PREPARE;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.update.UpdateHelper;
+import org.apache.falcon.util.OozieUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.log4j.Logger;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
 import org.apache.oozie.client.OozieClient;
 
-import java.util.*;
+import javax.xml.bind.JAXBElement;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
 /**
  * Oozie workflow builder for falcon entities.
  */
 public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
+    private static final Logger LOG = Logger.getLogger(OozieProcessWorkflowBuilder.class);
+
+    public OozieProcessWorkflowBuilder(Process entity) {
+        super(entity);
+    }
 
     @Override
-    public Map<String, Properties> newWorkflowSchedule(Process process, List<String> clusters) throws FalconException {
+    public Map<String, Properties> newWorkflowSchedule(String... clusters) throws FalconException {
         Map<String, Properties> propertiesMap = new HashMap<String, Properties>();
 
         for (String clusterName : clusters) {
-            org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, clusterName);
-            Properties properties = newWorkflowSchedule(process, processCluster.getValidity().getStart(), clusterName,
-                    CurrentUser.getUser());
-            if (properties != null) {
-                propertiesMap.put(clusterName, properties);
+            org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(entity, clusterName);
+            if (processCluster.getValidity().getStart().compareTo(processCluster.getValidity().getEnd()) >= 0) {
+                LOG.info("process validity start <= end for cluster " + clusterName + ". Skipping schedule");
+                break;
+            }
+
+            Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, processCluster.getName());
+            Path bundlePath = EntityUtil.getNewStagingPath(cluster, entity);
+            map(cluster, bundlePath);
+            Properties properties = createAppProperties(clusterName, bundlePath, CurrentUser.getUser());
+
+            //Add libpath
+            String libPath = entity.getWorkflow().getLib();
+            if (!StringUtils.isEmpty(libPath)) {
+                String path = libPath.replace("${nameNode}", "");
+                properties.put(OozieClient.LIBPATH, "${nameNode}" + path);
+            }
+
+            if (entity.getInputs() != null) {
+                for (Input in : entity.getInputs().getInputs()) {
+                    if (in.isOptional()) {
+                        addOptionalInputProperties(properties, in, clusterName);
+                    }
+                }
             }
+            propertiesMap.put(clusterName, properties);
         }
         return propertiesMap;
     }
@@ -100,51 +165,734 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
     }
 
     @Override
-    public Properties newWorkflowSchedule(Process process, Date startDate, String clusterName, String user)
-        throws FalconException {
-        org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, clusterName);
-        if (!startDate.before(processCluster.getValidity().getEnd())) {// start time >= end time
-            return null;
+    public Date getNextStartTime(Process process, String cluster, Date now) throws FalconException {
+        org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, cluster);
+        return EntityUtil.getNextStartTime(processCluster.getValidity().getStart(),
+                process.getFrequency(), process.getTimezone(), now);
+    }
+
+    @Override
+    public String[] getWorkflowNames() {
+        return new String[]{EntityUtil.getWorkflowName(Tag.DEFAULT, entity).toString()};
+    }
+
+    private static final String DEFAULT_WF_TEMPLATE = "/config/workflow/process-parent-workflow.xml";
+    private static final int THIRTY_MINUTES = 30 * 60 * 1000;
+
+    @Override
+    public List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
+        try {
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+
+            //Copy user workflow and lib to staging dir
+            Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getPath()),
+                new Path(bundlePath, EntityUtil.PROCESS_USER_DIR));
+            if (entity.getWorkflow().getLib() != null && fs.exists(new Path(entity.getWorkflow().getLib()))) {
+                checksums.putAll(UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getLib()),
+                    new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR)));
+            }
+
+            writeChecksums(fs, new Path(bundlePath, EntityUtil.PROCESS_CHECKSUM_FILE), checksums);
+        } catch (IOException e) {
+            throw new FalconException("Failed to copy user workflow/lib", e);
+        }
+
+        List<COORDINATORAPP> apps = new ArrayList<COORDINATORAPP>();
+        apps.add(createDefaultCoordinator(cluster, bundlePath));
+
+        return apps;
+    }
+
+    private void writeChecksums(FileSystem fs, Path path, Map<String, String> checksums) throws FalconException {
+        try {
+            FSDataOutputStream stream = fs.create(path);
+            try {
+                for (Map.Entry<String, String> entry : checksums.entrySet()) {
+                    stream.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+                }
+            } finally {
+                stream.close();
+            }
+        } catch (IOException e) {
+            throw new FalconException("Failed to copy user workflow/lib", e);
         }
+    }
+
+    private Path getUserWorkflowPath(Cluster cluster, Path bundlePath) throws FalconException {
+        try {
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
+            Path wfPath = new Path(entity.getWorkflow().getPath());
+            if (fs.isFile(wfPath)) {
+                return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName());
+            } else {
+                return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR);
+            }
+        } catch(IOException e) {
+            throw new FalconException("Failed to get workflow path", e);
+        }
+    }
+
+    private Path getUserLibPath(Cluster cluster, Path bundlePath) throws FalconException {
+        try {
+            if (entity.getWorkflow().getLib() == null) {
+                return null;
+            }
+            Path libPath = new Path(entity.getWorkflow().getLib());
 
-        Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, processCluster.getName());
-        Path bundlePath = EntityUtil.getNewStagingPath(cluster, process);
-        Process processClone = (Process) process.copy();
-        EntityUtil.setStartDate(processClone, clusterName, startDate);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
+            if (fs.isFile(libPath)) {
+                return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
+            } else {
+                return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR);
+            }
+        } catch(IOException e) {
+            throw new FalconException("Failed to get user lib path", e);
+        }
+    }
 
-        OozieProcessMapper mapper = new OozieProcessMapper(processClone);
-        if (!mapper.map(cluster, bundlePath)) {
+    /**
+     * Creates default oozie coordinator.
+     *
+     * @param cluster    - Cluster for which the coordiantor app need to be created
+     * @param bundlePath - bundle path
+     * @return COORDINATORAPP
+     * @throws FalconException on Error
+     */
+    public COORDINATORAPP createDefaultCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
+        if (entity == null) {
             return null;
         }
 
-        Properties properties = createAppProperties(clusterName, bundlePath, user);
+        COORDINATORAPP coord = new COORDINATORAPP();
+        String coordName = EntityUtil.getWorkflowName(Tag.DEFAULT, entity).toString();
+        Path coordPath = getCoordPath(bundlePath, coordName);
+
+        // coord attributes
+        initializeCoordAttributes(cluster, entity, coord, coordName);
+
+        CONTROLS controls = initializeControls(entity); // controls
+        coord.setControls(controls);
+
+        // Configuration
+        Map<String, String> props = createCoordDefaultConfiguration(cluster, coordPath, coordName);
+
+        initializeInputPaths(cluster, entity, coord, props); // inputs
+        initializeOutputPaths(cluster, entity, coord, props);  // outputs
+
+        Workflow processWorkflow = entity.getWorkflow();
+        propagateUserWorkflowProperties(processWorkflow, props, entity.getName());
+
+        // create parent wf
+        createWorkflow(cluster, entity, processWorkflow, coordName, coordPath);
+
+        WORKFLOW wf = new WORKFLOW();
+        wf.setAppPath(getStoragePath(coordPath.toString()));
+        wf.setConfiguration(getCoordConfig(props));
+
+        // set coord action to parent wf
+        org.apache.falcon.oozie.coordinator.ACTION action = new org.apache.falcon.oozie.coordinator.ACTION();
+        action.setWorkflow(wf);
+        coord.setAction(action);
+
+        return coord;
+    }
+
+    private void initializeCoordAttributes(Cluster cluster, Process process, COORDINATORAPP coord, String coordName) {
+        coord.setName(coordName);
+        org.apache.falcon.entity.v0.process.Cluster processCluster =
+            ProcessHelper.getCluster(process, cluster.getName());
+        coord.setStart(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()));
+        coord.setEnd(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()));
+        coord.setTimezone(process.getTimezone().getID());
+        coord.setFrequency("${coord:" + process.getFrequency().toString() + "}");
+    }
+
+    private CONTROLS initializeControls(Process process)
+        throws FalconException {
+        CONTROLS controls = new CONTROLS();
+        controls.setConcurrency(String.valueOf(process.getParallel()));
+        controls.setExecution(process.getOrder().name());
+
+        Frequency timeout = process.getTimeout();
+        long frequencyInMillis = ExpressionHelper.get().evaluate(process.getFrequency().toString(), Long.class);
+        long timeoutInMillis;
+        if (timeout != null) {
+            timeoutInMillis = ExpressionHelper.get().
+                evaluate(process.getTimeout().toString(), Long.class);
+        } else {
+            timeoutInMillis = frequencyInMillis * 6;
+            if (timeoutInMillis < THIRTY_MINUTES) {
+                timeoutInMillis = THIRTY_MINUTES;
+            }
+        }
+        controls.setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
+
+        if (timeoutInMillis / frequencyInMillis * 2 > 0) {
+            controls.setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
+        }
+
+        return controls;
+    }
 
-        //Add libpath
-        String libPath = process.getWorkflow().getLib();
-        if (!StringUtils.isEmpty(libPath)) {
-            String path = libPath.replace("${nameNode}", "");
-            properties.put(OozieClient.LIBPATH, "${nameNode}" + path);
+    private void initializeInputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
+        Map<String, String> props) throws FalconException {
+        if (process.getInputs() == null) {
+            props.put("falconInputFeeds", "NONE");
+            props.put("falconInPaths", "IGNORE");
+            return;
         }
 
-        if (process.getInputs() != null) {
-            for (Input in : process.getInputs().getInputs()) {
-                if (in.isOptional()) {
-                    addOptionalInputProperties(properties, in, clusterName);
+        List<String> inputFeeds = new ArrayList<String>();
+        List<String> inputPaths = new ArrayList<String>();
+        List<String> inputFeedStorageTypes = new ArrayList<String>();
+        for (Input input : process.getInputs().getInputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+
+            if (!input.isOptional()) {
+                if (coord.getDatasets() == null) {
+                    coord.setDatasets(new DATASETS());
+                }
+                if (coord.getInputEvents() == null) {
+                    coord.setInputEvents(new INPUTEVENTS());
                 }
+
+                SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(), LocationType.DATA);
+                coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
+
+                DATAIN datain = createDataIn(input);
+                coord.getInputEvents().getDataIn().add(datain);
             }
+
+            String inputExpr = null;
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
+                props.put(input.getName(), inputExpr);
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                inputExpr = "${coord:dataIn('" + input.getName() + "')}";
+                propagateCatalogTableProperties(input, (CatalogStorage) storage, props);
+            }
+
+            inputFeeds.add(feed.getName());
+            inputPaths.add(inputExpr);
+            inputFeedStorageTypes.add(storage.getType().name());
         }
-        return properties;
+
+        propagateLateDataProperties(inputFeeds, inputPaths, inputFeedStorageTypes, props);
     }
 
-    @Override
-    public Date getNextStartTime(Process process, String cluster, Date now) throws FalconException {
-        org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, cluster);
-        return EntityUtil.getNextStartTime(processCluster.getValidity().getStart(),
-                process.getFrequency(), process.getTimezone(), now);
+    private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputPaths,
+        List<String> inputFeedStorageTypes, Map<String, String> props) {
+        // populate late data handler - should-record action
+        props.put("falconInputFeeds", join(inputFeeds.iterator(), '#'));
+        props.put("falconInPaths", join(inputPaths.iterator(), '#'));
+
+        // storage type for each corresponding feed sent as a param to LateDataHandler
+        // needed to compute usage based on storage type in LateDataHandler
+        props.put("falconInputFeedStorageTypes", join(inputFeedStorageTypes.iterator(), '#'));
+    }
+
+    private void initializeOutputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
+        Map<String, String> props) throws FalconException {
+        if (process.getOutputs() == null) {
+            props.put(ARG.feedNames.getPropName(), "NONE");
+            props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
+            return;
+        }
+
+        if (coord.getDatasets() == null) {
+            coord.setDatasets(new DATASETS());
+        }
+
+        if (coord.getOutputEvents() == null) {
+            coord.setOutputEvents(new OUTPUTEVENTS());
+        }
+
+        List<String> outputFeeds = new ArrayList<String>();
+        List<String> outputPaths = new ArrayList<String>();
+        for (Output output : process.getOutputs().getOutputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+
+            SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, output.getName(), LocationType.DATA);
+            coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
+
+            DATAOUT dataout = createDataOut(output);
+            coord.getOutputEvents().getDataOut().add(dataout);
+
+            String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
+            outputFeeds.add(feed.getName());
+            outputPaths.add(outputExpr);
+
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                props.put(output.getName(), outputExpr);
+
+                propagateFileSystemProperties(output, feed, cluster, coord, storage, props);
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                propagateCatalogTableProperties(output, (CatalogStorage) storage, props);
+            }
+        }
+
+        // Output feed name and path for parent workflow
+        props.put(ARG.feedNames.getPropName(), join(outputFeeds.iterator(), ','));
+        props.put(ARG.feedInstancePaths.getPropName(), join(outputPaths.iterator(), ','));
+    }
+
+    private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
+        String datasetName, LocationType locationType) throws FalconException {
+
+        SYNCDATASET syncdataset = new SYNCDATASET();
+        syncdataset.setName(datasetName);
+        syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+
+        String uriTemplate = storage.getUriTemplate(locationType);
+        if (storage.getType() == Storage.TYPE.TABLE) {
+            uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+        }
+        syncdataset.setUriTemplate(uriTemplate);
+
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
+        syncdataset.setTimezone(feed.getTimezone().getID());
+
+        if (feed.getAvailabilityFlag() == null) {
+            syncdataset.setDoneFlag("");
+        } else {
+            syncdataset.setDoneFlag(feed.getAvailabilityFlag());
+        }
+
+        return syncdataset;
+    }
+
+    private DATAOUT createDataOut(Output output) {
+        DATAOUT dataout = new DATAOUT();
+        dataout.setName(output.getName());
+        dataout.setDataset(output.getName());
+        dataout.setInstance(getELExpression(output.getInstance()));
+        return dataout;
+    }
+
+    private DATAIN createDataIn(Input input) {
+        DATAIN datain = new DATAIN();
+        datain.setName(input.getName());
+        datain.setDataset(input.getName());
+        datain.setStartInstance(getELExpression(input.getStart()));
+        datain.setEndInstance(getELExpression(input.getEnd()));
+        return datain;
+    }
+
+    private void propagateFileSystemProperties(Output output, Feed feed, Cluster cluster, COORDINATORAPP coord,
+        Storage storage, Map<String, String> props)
+        throws FalconException {
+
+        // stats and meta paths
+        createOutputEvent(output, feed, cluster, LocationType.STATS, coord, props, storage);
+        createOutputEvent(output, feed, cluster, LocationType.META, coord, props, storage);
+        createOutputEvent(output, feed, cluster, LocationType.TMP, coord, props, storage);
+    }
+
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    private void createOutputEvent(Output output, Feed feed, Cluster cluster, LocationType locType,
+        COORDINATORAPP coord, Map<String, String> props, Storage storage)
+        throws FalconException {
+
+        String name = output.getName();
+        String type = locType.name().toLowerCase();
+
+        SYNCDATASET dataset = createDataSet(feed, cluster, storage, name + type, locType);
+        coord.getDatasets().getDatasetOrAsyncDataset().add(dataset);
+
+        DATAOUT dataout = new DATAOUT();
+        dataout.setName(name + type);
+        dataout.setDataset(name + type);
+        dataout.setInstance(getELExpression(output.getInstance()));
+
+        OUTPUTEVENTS outputEvents = coord.getOutputEvents();
+        if (outputEvents == null) {
+            outputEvents = new OUTPUTEVENTS();
+            coord.setOutputEvents(outputEvents);
+        }
+        outputEvents.getDataOut().add(dataout);
+
+        String outputExpr = "${coord:dataOut('" + name + type + "')}";
+        props.put(name + "." + type, outputExpr);
+    }
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+    private void propagateCommonCatalogTableProperties(CatalogStorage tableStorage,
+        Map<String, String> props, String prefix) {
+        props.put(prefix + "_storage_type", tableStorage.getType().name());
+        props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
+        props.put(prefix + "_database", tableStorage.getDatabase());
+        props.put(prefix + "_table", tableStorage.getTable());
+    }
+
+    private void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage,
+        Map<String, String> props) {
+        String prefix = "falcon_" + input.getName();
+
+        propagateCommonCatalogTableProperties(tableStorage, props, prefix);
+
+        props.put(prefix + "_partition_filter_pig",
+            "${coord:dataInPartitionFilter('" + input.getName() + "', 'pig')}");
+        props.put(prefix + "_partition_filter_hive",
+            "${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}");
+        props.put(prefix + "_partition_filter_java",
+            "${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}");
+    }
+
+    private void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage,
+        Map<String, String> props) {
+        String prefix = "falcon_" + output.getName();
+
+        propagateCommonCatalogTableProperties(tableStorage, props, prefix);
+
+        props.put(prefix + "_dataout_partitions",
+            "${coord:dataOutPartitions('" + output.getName() + "')}");
+        props.put(prefix + "_dated_partition_value", "${coord:dataOutPartitionValue('"
+            + output.getName() + "', '" + tableStorage.getDatedPartitionKey() + "')}");
+    }
+
+    private String join(Iterator<String> itr, char sep) {
+        String joinedStr = StringUtils.join(itr, sep);
+        if (joinedStr.isEmpty()) {
+            joinedStr = "null";
+        }
+        return joinedStr;
+    }
+
+    private String getELExpression(String expr) {
+        if (expr != null) {
+            expr = "${" + expr + "}";
+        }
+        return expr;
     }
 
     @Override
-    public String[] getWorkflowNames(Process process) {
-        return new String[]{EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString()};
+    protected Map<String, String> getEntityProperties() {
+        Map<String, String> props = new HashMap<String, String>();
+        if (entity.getProperties() != null) {
+            for (Property prop : entity.getProperties().getProperties()) {
+                props.put(prop.getName(), prop.getValue());
+            }
+        }
+        return props;
+    }
+
+    private void propagateUserWorkflowProperties(Workflow processWorkflow,
+        Map<String, String> props, String processName) {
+        props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
+            processWorkflow.getName(), processName));
+        props.put("userWorkflowVersion", processWorkflow.getVersion());
+        props.put("userWorkflowEngine", processWorkflow.getEngine().value());
+    }
+
+    protected void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
+        String wfName, Path parentWfPath) throws FalconException {
+        WORKFLOWAPP wfApp = getWorkflowTemplate(DEFAULT_WF_TEMPLATE);
+        wfApp.setName(wfName);
+        try {
+            addLibExtensionsToWorkflow(cluster, wfApp, EntityType.PROCESS, null);
+        } catch (IOException e) {
+            throw new FalconException("Failed to add library extensions for the workflow", e);
+        }
+
+        String userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent()).toString();
+        EngineType engineType = processWorkflow.getEngine();
+        for (Object object : wfApp.getDecisionOrForkOrJoin()) {
+            if (!(object instanceof ACTION)) {
+                continue;
+            }
+
+            ACTION action = (ACTION) object;
+            String actionName = action.getName();
+            if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
+                action.getSubWorkflow().setAppPath("${nameNode}" + userWfPath);
+            } else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
+                decoratePIGAction(cluster, process, action.getPig(), parentWfPath);
+            } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
+                decorateHiveAction(cluster, process, action, parentWfPath);
+            } else if (FALCON_ACTIONS.contains(actionName)) {
+                decorateWithOozieRetries(action);
+            }
+        }
+
+        //Create parent workflow
+        marshal(cluster, wfApp, parentWfPath);
+    }
+
+    private void decoratePIGAction(Cluster cluster, Process process,
+        PIG pigAction, Path parentWfPath) throws FalconException {
+        Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
+        pigAction.setScript("${nameNode}" + userWfPath.toString());
+
+        addPrepareDeleteOutputPath(process, pigAction);
+
+        final List<String> paramList = pigAction.getParam();
+        addInputFeedsAsParams(paramList, process, cluster, EngineType.PIG.name().toLowerCase());
+        addOutputFeedsAsParams(paramList, process, cluster);
+
+        propagateProcessProperties(pigAction, process);
+
+        Storage.TYPE storageType = getStorageType(cluster, process);
+        if (Storage.TYPE.TABLE == storageType) {
+            // adds hive-site.xml in pig classpath
+            setupHiveConfiguration(cluster, parentWfPath, ""); // DO NOT ADD PREFIX!!!
+            pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
+        }
+
+        addArchiveForCustomJars(cluster, pigAction.getArchive(),
+            getUserLibPath(cluster, parentWfPath.getParent()));
+    }
+
+    private void decorateHiveAction(Cluster cluster, Process process, ACTION wfAction,
+        Path parentWfPath) throws FalconException {
+
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(wfAction);
+        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+        Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
+        hiveAction.setScript("${nameNode}" + userWfPath.toString());
+
+        addPrepareDeleteOutputPath(process, hiveAction);
+
+        final List<String> paramList = hiveAction.getParam();
+        addInputFeedsAsParams(paramList, process, cluster, EngineType.HIVE.name().toLowerCase());
+        addOutputFeedsAsParams(paramList, process, cluster);
+
+        propagateProcessProperties(hiveAction, process);
+
+        setupHiveConfiguration(cluster, parentWfPath, "falcon-");
+
+        addArchiveForCustomJars(cluster, hiveAction.getArchive(),
+            getUserLibPath(cluster, parentWfPath.getParent()));
+
+        OozieUtils.marshalHiveAction(wfAction, actionJaxbElement);
+    }
+
+    private void addPrepareDeleteOutputPath(Process process,
+        PIG pigAction) throws FalconException {
+        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
+        if (deleteOutputPathList.isEmpty()) {
+            return;
+        }
+
+        final PREPARE prepare = new PREPARE();
+        final List<DELETE> deleteList = prepare.getDelete();
+
+        for (String deletePath : deleteOutputPathList) {
+            final DELETE delete = new DELETE();
+            delete.setPath(deletePath);
+            deleteList.add(delete);
+        }
+
+        if (!deleteList.isEmpty()) {
+            pigAction.setPrepare(prepare);
+        }
+    }
+
+    private void addPrepareDeleteOutputPath(Process process,
+        org.apache.falcon.oozie.hive.ACTION hiveAction)
+        throws FalconException {
+
+        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
+        if (deleteOutputPathList.isEmpty()) {
+            return;
+        }
+
+        org.apache.falcon.oozie.hive.PREPARE prepare = new org.apache.falcon.oozie.hive.PREPARE();
+        List<org.apache.falcon.oozie.hive.DELETE> deleteList = prepare.getDelete();
+
+        for (String deletePath : deleteOutputPathList) {
+            org.apache.falcon.oozie.hive.DELETE delete = new org.apache.falcon.oozie.hive.DELETE();
+            delete.setPath(deletePath);
+            deleteList.add(delete);
+        }
+
+        if (!deleteList.isEmpty()) {
+            hiveAction.setPrepare(prepare);
+        }
+    }
+
+    private List<String> getPrepareDeleteOutputPathList(Process process) throws FalconException {
+        final List<String> deleteList = new ArrayList<String>();
+        if (process.getOutputs() == null) {
+            return deleteList;
+        }
+
+        for (Output output : process.getOutputs().getOutputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+
+            if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
+                continue; // prepare delete only applies to FileSystem storage
+            }
+
+            deleteList.add("${wf:conf('" + output.getName() + "')}");
+        }
+
+        return deleteList;
+    }
+
+    private void addInputFeedsAsParams(List<String> paramList, Process process, Cluster cluster,
+        String engineType) throws FalconException {
+        if (process.getInputs() == null) {
+            return;
+        }
+
+        for (Input input : process.getInputs().getInputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+
+            final String inputName = input.getName();
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                paramList.add(inputName + "=${" + inputName + "}"); // no prefix for backwards compatibility
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                final String paramName = "falcon_" + inputName; // prefix 'falcon' for new params
+                Map<String, String> props = new HashMap<String, String>();
+                propagateCommonCatalogTableProperties((CatalogStorage) storage, props, paramName);
+                for (String key : props.keySet()) {
+                    paramList.add(key + "=${wf:conf('" + key + "')}");
+                }
+
+                paramList.add(paramName + "_filter=${wf:conf('"
+                    + paramName + "_partition_filter_" + engineType + "')}");
+            }
+        }
+    }
+
+    private void addOutputFeedsAsParams(List<String> paramList, Process process,
+        Cluster cluster) throws FalconException {
+        if (process.getOutputs() == null) {
+            return;
+        }
+
+        for (Output output : process.getOutputs().getOutputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                final String outputName = output.getName();  // no prefix for backwards compatibility
+                paramList.add(outputName + "=${" + outputName + "}");
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                Map<String, String> props = new HashMap<String, String>();
+                propagateCatalogTableProperties(output, (CatalogStorage) storage, props); // prefix is auto added
+                for (String key : props.keySet()) {
+                    paramList.add(key + "=${wf:conf('" + key + "')}");
+                }
+            }
+        }
+    }
+
+    private void propagateProcessProperties(PIG pigAction, Process process) {
+        org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
+        if (processProperties == null) {
+            return;
+        }
+
+        // Propagate user defined properties to job configuration
+        final List<org.apache.falcon.oozie.workflow.CONFIGURATION.Property> configuration =
+            pigAction.getConfiguration().getProperty();
+
+        // Propagate user defined properties to pig script as macros
+        // passed as parameters -p name=value that can be accessed as $name
+        final List<String> paramList = pigAction.getParam();
+
+        for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
+            org.apache.falcon.oozie.workflow.CONFIGURATION.Property configProperty =
+                new org.apache.falcon.oozie.workflow.CONFIGURATION.Property();
+            configProperty.setName(property.getName());
+            configProperty.setValue(property.getValue());
+            configuration.add(configProperty);
+
+            paramList.add(property.getName() + "=" + property.getValue());
+        }
+    }
+
+    private void propagateProcessProperties(org.apache.falcon.oozie.hive.ACTION hiveAction, Process process) {
+        org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
+        if (processProperties == null) {
+            return;
+        }
+
+        // Propagate user defined properties to job configuration
+        final List<org.apache.falcon.oozie.hive.CONFIGURATION.Property> configuration =
+            hiveAction.getConfiguration().getProperty();
+
+        // Propagate user defined properties to pig script as macros
+        // passed as parameters -p name=value that can be accessed as $name
+        final List<String> paramList = hiveAction.getParam();
+
+        for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
+            org.apache.falcon.oozie.hive.CONFIGURATION.Property configProperty =
+                new org.apache.falcon.oozie.hive.CONFIGURATION.Property();
+            configProperty.setName(property.getName());
+            configProperty.setValue(property.getValue());
+            configuration.add(configProperty);
+
+            paramList.add(property.getName() + "=" + property.getValue());
+        }
+    }
+
+    private Storage.TYPE getStorageType(Cluster cluster, Process process) throws FalconException {
+        Storage.TYPE storageType = Storage.TYPE.FILESYSTEM;
+        if (process.getInputs() == null) {
+            return storageType;
+        }
+
+        for (Input input : process.getInputs().getInputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+            storageType = FeedHelper.getStorageType(feed, cluster);
+            if (Storage.TYPE.TABLE == storageType) {
+                break;
+            }
+        }
+
+        return storageType;
+    }
+
+    // creates hive-site.xml configuration in conf dir.
+    private void setupHiveConfiguration(Cluster cluster, Path wfPath,
+        String prefix) throws FalconException {
+        String catalogUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
+        try {
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+            Path confPath = new Path(wfPath, "conf");
+            createHiveConf(fs, confPath, catalogUrl, cluster, prefix);
+        } catch (IOException e) {
+            throw new FalconException(e);
+        }
+    }
+
+    private void addArchiveForCustomJars(Cluster cluster, List<String> archiveList,
+        Path libPath) throws FalconException {
+        if (libPath == null) {
+            return;
+        }
+
+        try {
+            final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
+            if (fs.isFile(libPath)) {  // File, not a Dir
+                archiveList.add(libPath.toString());
+                return;
+            }
+
+            // lib path is a directory, add each file under the lib dir to archive
+            final FileStatus[] fileStatuses = fs.listStatus(libPath, new PathFilter() {
+                @Override
+                public boolean accept(Path path) {
+                    try {
+                        return fs.isFile(path) && path.getName().endsWith(".jar");
+                    } catch (IOException ignore) {
+                        return false;
+                    }
+                }
+            });
+
+            for (FileStatus fileStatus : fileStatuses) {
+                archiveList.add(fileStatus.getPath().toString());
+            }
+        } catch (IOException e) {
+            throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperLateProcessTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperLateProcessTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperLateProcessTest.java
deleted file mode 100644
index fbda0ea..0000000
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperLateProcessTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.converter;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.hadoop.conf.Configuration;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-
-/**
- * Test class for late data processing.
- */
-public class OozieProcessMapperLateProcessTest {
-
-    private static final String CLUSTER_XML = "/config/late/late-cluster.xml";
-    private static final String FEED1_XML = "/config/late/late-feed1.xml";
-    private static final String FEED2_XML = "/config/late/late-feed2.xml";
-    private static final String FEED3_XML = "/config/late/late-feed3.xml";
-    private static final String PROCESS1_XML = "/config/late/late-process1.xml";
-    private static final String PROCESS2_XML = "/config/late/late-process2.xml";
-    private static final ConfigurationStore STORE = ConfigurationStore.get();
-
-    private static EmbeddedCluster dfsCluster;
-
-    @BeforeClass
-    public void setUpDFS() throws Exception {
-
-        cleanupStore();
-
-        dfsCluster = EmbeddedCluster.newCluster("testCluster");
-        Configuration conf = dfsCluster.getConf();
-        String hdfsUrl = conf.get("fs.default.name");
-
-        Cluster cluster = (Cluster) EntityType.CLUSTER.getUnmarshaller()
-                .unmarshal(this.getClass().getResource(CLUSTER_XML));
-        ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
-
-        STORE.publish(EntityType.CLUSTER, cluster);
-
-        Feed feed1 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
-                this.getClass().getResource(FEED1_XML));
-        Feed feed2 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
-                this.getClass().getResource(FEED2_XML));
-        Feed feed3 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
-                this.getClass().getResource(FEED3_XML));
-
-        STORE.publish(EntityType.FEED, feed1);
-        STORE.publish(EntityType.FEED, feed2);
-        STORE.publish(EntityType.FEED, feed3);
-
-        Process process1 = (Process) EntityType.PROCESS.getUnmarshaller()
-                .unmarshal(this.getClass().getResource(PROCESS1_XML));
-        STORE.publish(EntityType.PROCESS, process1);
-        Process process2 = (Process) EntityType.PROCESS.getUnmarshaller()
-                .unmarshal(this.getClass().getResource(PROCESS2_XML));
-        STORE.publish(EntityType.PROCESS, process2);
-    }
-
-    private void cleanupStore() throws FalconException {
-        STORE.remove(EntityType.PROCESS, "late-process1");
-        STORE.remove(EntityType.PROCESS, "late-process2");
-        STORE.remove(EntityType.FEED, "late-feed1");
-        STORE.remove(EntityType.FEED, "late-feed2");
-        STORE.remove(EntityType.FEED, "late-feed3");
-        STORE.remove(EntityType.CLUSTER, "late-cluster");
-    }
-
-    @AfterClass
-    public void tearDown() throws Exception {
-        cleanupStore();
-        dfsCluster.shutdown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
deleted file mode 100644
index 22bf9fe..0000000
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ /dev/null
@@ -1,557 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.converter;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Validity;
-import org.apache.falcon.entity.v0.process.Workflow;
-import org.apache.falcon.messaging.EntityInstanceMessage;
-import org.apache.falcon.oozie.bundle.BUNDLEAPP;
-import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.SYNCDATASET;
-import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.DECISION;
-import org.apache.falcon.oozie.workflow.PIG;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.stream.StreamSource;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-/**
- * Test for the Falcon entities mapping into Oozie artifacts.
- */
-public class OozieProcessMapperTest extends AbstractTestBase {
-
-    private String hdfsUrl;
-    private FileSystem fs;
-
-    @BeforeClass
-    public void setUpDFS() throws Exception {
-        CurrentUser.authenticate("falcon");
-
-        EmbeddedCluster cluster = EmbeddedCluster.newCluster("testCluster");
-        Configuration conf = cluster.getConf();
-        hdfsUrl = conf.get("fs.default.name");
-    }
-
-    @BeforeMethod
-    public void setUp() throws Exception {
-        super.setup();
-
-        ConfigurationStore store = ConfigurationStore.get();
-        Cluster cluster = store.get(EntityType.CLUSTER, "corp");
-        ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
-        ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).setEndpoint("thrift://localhost:49083");
-        fs = new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration());
-        fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/PROCESS/ext.jar")).close();
-
-        Process process = store.get(EntityType.PROCESS, "clicksummary");
-        Path wfpath = new Path(process.getWorkflow().getPath());
-        assert new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration()).mkdirs(wfpath);
-    }
-
-    public void testDefCoordMap(Process process, COORDINATORAPP coord) throws Exception {
-        assertEquals("FALCON_PROCESS_DEFAULT_" + process.getName(), coord.getName());
-        Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
-        assertEquals(SchemaHelper.formatDateUTC(processValidity.getStart()), coord.getStart());
-        assertEquals(SchemaHelper.formatDateUTC(processValidity.getEnd()), coord.getEnd());
-        assertEquals("${coord:" + process.getFrequency().toString() + "}", coord.getFrequency());
-        assertEquals(process.getTimezone().getID(), coord.getTimezone());
-
-        assertEquals(process.getParallel() + "", coord.getControls().getConcurrency());
-        assertEquals(process.getOrder().name(), coord.getControls().getExecution());
-
-        assertEquals(process.getInputs().getInputs().get(0).getName(),
-                coord.getInputEvents().getDataIn().get(0).getName());
-        assertEquals(process.getInputs().getInputs().get(0).getName(),
-                coord.getInputEvents().getDataIn().get(0).getDataset());
-        assertEquals("${" + process.getInputs().getInputs().get(0).getStart() + "}",
-                coord.getInputEvents().getDataIn().get(0).getStartInstance());
-        assertEquals("${" + process.getInputs().getInputs().get(0).getEnd() + "}",
-                coord.getInputEvents().getDataIn().get(0).getEndInstance());
-
-        assertEquals(process.getInputs().getInputs().get(1).getName(),
-                coord.getInputEvents().getDataIn().get(1).getName());
-        assertEquals(process.getInputs().getInputs().get(1).getName(),
-                coord.getInputEvents().getDataIn().get(1).getDataset());
-        assertEquals("${" + process.getInputs().getInputs().get(1).getStart() + "}",
-                coord.getInputEvents().getDataIn().get(1).getStartInstance());
-        assertEquals("${" + process.getInputs().getInputs().get(1).getEnd() + "}",
-                coord.getInputEvents().getDataIn().get(1).getEndInstance());
-
-        assertEquals(process.getOutputs().getOutputs().get(0).getName() + "stats",
-                coord.getOutputEvents().getDataOut().get(1).getName());
-        assertEquals(process.getOutputs().getOutputs().get(0).getName() + "meta",
-                coord.getOutputEvents().getDataOut().get(2).getName());
-        assertEquals(process.getOutputs().getOutputs().get(0).getName() + "tmp",
-                coord.getOutputEvents().getDataOut().get(3).getName());
-
-        assertEquals(process.getOutputs().getOutputs().get(0).getName(),
-                coord.getOutputEvents().getDataOut().get(0).getName());
-        assertEquals("${" + process.getOutputs().getOutputs().get(0).getInstance() + "}",
-                coord.getOutputEvents().getDataOut().get(0).getInstance());
-        assertEquals(process.getOutputs().getOutputs().get(0).getName(),
-                coord.getOutputEvents().getDataOut().get(0).getDataset());
-
-        assertEquals(6, coord.getDatasets().getDatasetOrAsyncDataset().size());
-
-        ConfigurationStore store = ConfigurationStore.get();
-        Feed feed = store.get(EntityType.FEED, process.getInputs().getInputs().get(0).getFeed());
-        SYNCDATASET ds = (SYNCDATASET) coord.getDatasets().getDatasetOrAsyncDataset().get(0);
-
-        final org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
-        assertEquals(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()), ds.getInitialInstance());
-        assertEquals(feed.getTimezone().getID(), ds.getTimezone());
-        assertEquals("${coord:" + feed.getFrequency().toString() + "}", ds.getFrequency());
-        assertEquals("", ds.getDoneFlag());
-        assertEquals(ds.getUriTemplate(),
-                FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA));
-
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-        assertEquals(props.get("mapred.job.priority"), "LOW");
-
-        assertLibExtensions(coord);
-    }
-
-    @Test
-    public void testBundle() throws Exception {
-        String path = StartupProperties.get().getProperty("system.lib.location");
-        if (!new File(path).exists()) {
-            Assert.assertTrue(new File(path).mkdirs());
-        }
-        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
-
-        WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
-        testParentWorkflow(process, parentWorkflow);
-    }
-
-    @Test
-    public void testBundle1() throws Exception {
-        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
-        process.setFrequency(Frequency.fromString("minutes(1)"));
-        process.setTimeout(Frequency.fromString("minutes(15)"));
-
-        WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "30", "15");
-        testParentWorkflow(process, parentWorkflow);
-    }
-
-    @Test
-    public void testPigProcessMapper() throws Exception {
-        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "pig-process");
-        Assert.assertEquals("pig", process.getWorkflow().getEngine().value());
-
-        prepare(process);
-        WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
-        testParentWorkflow(process, parentWorkflow);
-
-        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-
-        ACTION pigActionNode = (ACTION) decisionOrForkOrJoin.get(3);
-        Assert.assertEquals("user-pig-job", pigActionNode.getName());
-
-        final PIG pigAction = pigActionNode.getPig();
-        Assert.assertEquals(pigAction.getScript(), "${nameNode}/falcon/staging/workflows/pig-process/user/id.pig");
-        Assert.assertNotNull(pigAction.getPrepare());
-        Assert.assertEquals(1, pigAction.getPrepare().getDelete().size());
-        Assert.assertFalse(pigAction.getParam().isEmpty());
-        Assert.assertEquals(5, pigAction.getParam().size());
-        Assert.assertEquals(Collections.EMPTY_LIST, pigAction.getArchive());
-        Assert.assertTrue(pigAction.getFile().size() > 0);
-
-        ACTION oozieAction = (ACTION) decisionOrForkOrJoin.get(5);
-        Assert.assertEquals("user-oozie-workflow", oozieAction.getName());
-        Assert.assertEquals("#USER_WF_PATH#", oozieAction.getSubWorkflow().getAppPath());
-    }
-
-    @Test
-    public void testHiveProcessMapper() throws Exception {
-        URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
-        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
-        ConfigurationStore.get().publish(EntityType.FEED, inFeed);
-
-        resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
-        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
-        ConfigurationStore.get().publish(EntityType.FEED, outFeed);
-
-        resource = this.getClass().getResource("/config/process/hive-process.xml");
-        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
-        ConfigurationStore.get().publish(EntityType.PROCESS, process);
-
-        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
-        prepare(process);
-        OozieProcessMapper mapper = new OozieProcessMapper(process);
-        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
-        mapper.map(cluster, bundlePath);
-        assertTrue(fs.exists(bundlePath));
-
-        BUNDLEAPP bundle = getBundle(bundlePath);
-        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
-        assertEquals(1, bundle.getCoordinator().size());
-        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
-                bundle.getCoordinator().get(0).getName());
-        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
-
-        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-
-        // verify table props
-        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
-        for (Map.Entry<String, String> entry : props.entrySet()) {
-            if (expected.containsKey(entry.getKey())) {
-                Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
-            }
-        }
-
-        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
-        testParentWorkflow(process, parentWorkflow);
-
-        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-
-        ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
-        Assert.assertEquals("user-hive-job", hiveNode.getName());
-
-        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = mapper.unMarshalHiveAction(hiveNode);
-        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
-
-        Assert.assertEquals(hiveAction.getScript(),
-                "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
-        Assert.assertNull(hiveAction.getPrepare());
-        Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
-        Assert.assertFalse(hiveAction.getParam().isEmpty());
-        Assert.assertEquals(11, hiveAction.getParam().size());
-    }
-
-    private void prepare(Process process) throws IOException {
-        Path wf = new Path(process.getWorkflow().getPath());
-        fs.mkdirs(wf.getParent());
-        fs.create(wf).close();
-    }
-
-    @Test
-    public void testProcessMapperForTableStorage() throws Exception {
-        URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
-        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
-        ConfigurationStore.get().publish(EntityType.FEED, inFeed);
-
-        resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
-        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
-        ConfigurationStore.get().publish(EntityType.FEED, outFeed);
-
-        resource = this.getClass().getResource("/config/process/pig-process-table.xml");
-        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
-        ConfigurationStore.get().publish(EntityType.PROCESS, process);
-
-        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
-        OozieProcessMapper mapper = new OozieProcessMapper(process);
-        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
-        mapper.map(cluster, bundlePath);
-        assertTrue(fs.exists(bundlePath));
-
-        BUNDLEAPP bundle = getBundle(bundlePath);
-        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
-        assertEquals(1, bundle.getCoordinator().size());
-        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
-                bundle.getCoordinator().get(0).getName());
-        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
-
-        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-
-        // verify table props
-        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
-        for (Map.Entry<String, String> entry : props.entrySet()) {
-            if (expected.containsKey(entry.getKey())) {
-                Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
-            }
-        }
-
-        // verify the late data params
-        Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
-        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
-        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
-
-        // verify the post processing params
-        Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getFeed());
-        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
-    }
-
-    private Map<String, String> getExpectedProperties(Feed inFeed, Feed outFeed, Process process,
-                                                      Cluster cluster) throws FalconException {
-        Map<String, String> expected = new HashMap<String, String>();
-        for (Input input : process.getInputs().getInputs()) {
-            CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, inFeed);
-            propagateStorageProperties(input.getName(), storage, expected);
-        }
-
-        for (Output output : process.getOutputs().getOutputs()) {
-            CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, outFeed);
-            propagateStorageProperties(output.getName(), storage, expected);
-        }
-
-        return expected;
-    }
-
-    private void propagateStorageProperties(String feedName, CatalogStorage tableStorage,
-                                            Map<String, String> props) {
-        String prefix = "falcon_" + feedName;
-        props.put(prefix + "_storage_type", tableStorage.getType().name());
-        props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
-        props.put(prefix + "_database", tableStorage.getDatabase());
-        props.put(prefix + "_table", tableStorage.getTable());
-
-        if (prefix.equals("falcon_input")) {
-            props.put(prefix + "_partition_filter_pig", "${coord:dataInPartitionFilter('input', 'pig')}");
-            props.put(prefix + "_partition_filter_hive", "${coord:dataInPartitionFilter('input', 'hive')}");
-            props.put(prefix + "_partition_filter_java", "${coord:dataInPartitionFilter('input', 'java')}");
-        } else if (prefix.equals("falcon_output")) {
-            props.put(prefix + "_dataout_partitions", "${coord:dataOutPartitions('output')}");
-        }
-    }
-
-    @Test
-    public void testProcessWorkflowMapper() throws Exception {
-        Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
-        Workflow processWorkflow = process.getWorkflow();
-        Assert.assertEquals("test", processWorkflow.getName());
-        Assert.assertEquals("1.0.0", processWorkflow.getVersion());
-    }
-
-    @SuppressWarnings("unchecked")
-    private void assertLibExtensions(COORDINATORAPP coord) throws Exception {
-        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
-        WORKFLOWAPP wf = ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
-                fs.open(new Path(wfPath, "workflow.xml")))).getValue();
-        List<Object> actions = wf.getDecisionOrForkOrJoin();
-        for (Object obj : actions) {
-            if (!(obj instanceof ACTION)) {
-                continue;
-            }
-            ACTION action = (ACTION) obj;
-            List<String> files = null;
-            if (action.getJava() != null) {
-                files = action.getJava().getFile();
-            } else if (action.getPig() != null) {
-                files = action.getPig().getFile();
-            } else if (action.getMapReduce() != null) {
-                files = action.getMapReduce().getFile();
-            }
-            if (files != null) {
-                Assert.assertTrue(files.get(files.size() - 1)
-                        .endsWith("/projects/falcon/working/libext/PROCESS/ext.jar"));
-            }
-        }
-    }
-
-    private WORKFLOWAPP initializeProcessMapper(Process process, String throttle, String timeout)
-        throws Exception {
-        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
-        OozieProcessMapper mapper = new OozieProcessMapper(process);
-        Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
-        mapper.map(cluster, bundlePath);
-        assertTrue(fs.exists(bundlePath));
-
-        BUNDLEAPP bundle = getBundle(bundlePath);
-        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
-        assertEquals(1, bundle.getCoordinator().size());
-        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
-                bundle.getCoordinator().get(0).getName());
-        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
-
-        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
-        testDefCoordMap(process, coord);
-        assertEquals(coord.getControls().getThrottle(), throttle);
-        assertEquals(coord.getControls().getTimeout(), timeout);
-
-        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
-        return getParentWorkflow(new Path(wfPath));
-    }
-
-    public void testParentWorkflow(Process process, WORKFLOWAPP parentWorkflow) {
-        Assert.assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(), parentWorkflow.getName());
-
-        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-        Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
-        Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
-        Assert.assertEquals("user-workflow", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
-        Assert.assertEquals("user-pig-job", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
-        Assert.assertEquals("user-hive-job", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
-        Assert.assertEquals("user-oozie-workflow", ((ACTION) decisionOrForkOrJoin.get(5)).getName());
-        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
-        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
-        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(1)).getRetryMax());
-        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(1)).getRetryInterval());
-        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(6)).getRetryMax());
-        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(6)).getRetryInterval());
-        Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryMax());
-        Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryInterval());
-    }
-
-    private COORDINATORAPP getCoordinator(Path path) throws Exception {
-        String bundleStr = readFile(path);
-
-        Unmarshaller unmarshaller = JAXBContext.newInstance(COORDINATORAPP.class).createUnmarshaller();
-        SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
-        Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-coordinator-0.3.xsd"));
-        unmarshaller.setSchema(schema);
-        JAXBElement<COORDINATORAPP> jaxbBundle = unmarshaller.unmarshal(
-                new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), COORDINATORAPP.class);
-        return jaxbBundle.getValue();
-    }
-
-    @SuppressWarnings("unchecked")
-    private WORKFLOWAPP getParentWorkflow(Path path) throws Exception {
-        String workflow = readFile(new Path(path, "workflow.xml"));
-
-        JAXBContext wfAppContext = JAXBContext.newInstance(WORKFLOWAPP.class);
-        Unmarshaller unmarshaller = wfAppContext.createUnmarshaller();
-        return ((JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
-                new StreamSource(new ByteArrayInputStream(workflow.trim().getBytes())))).getValue();
-    }
-
-    private BUNDLEAPP getBundle(Path path) throws Exception {
-        String bundleStr = readFile(new Path(path, "bundle.xml"));
-
-        Unmarshaller unmarshaller = JAXBContext.newInstance(BUNDLEAPP.class).createUnmarshaller();
-        SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
-        Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-bundle-0.1.xsd"));
-        unmarshaller.setSchema(schema);
-        JAXBElement<BUNDLEAPP> jaxbBundle = unmarshaller.unmarshal(
-                new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), BUNDLEAPP.class);
-        return jaxbBundle.getValue();
-    }
-
-    private String readFile(Path path) throws Exception {
-        BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
-        String line;
-        StringBuilder contents = new StringBuilder();
-        while ((line = reader.readLine()) != null) {
-            contents.append(line);
-        }
-        return contents.toString();
-    }
-
-    @Override
-    @AfterMethod
-    public void cleanup() throws Exception {
-        super.cleanup();
-        ConfigurationStore.get().remove(EntityType.PROCESS, "table-process");
-        ConfigurationStore.get().remove(EntityType.FEED, "clicks-raw-table");
-        ConfigurationStore.get().remove(EntityType.FEED, "clicks-summary-table");
-        ConfigurationStore.get().remove(EntityType.PROCESS, "dumb-process");
-    }
-
-    @Test
-    public void testProcessWithNoInputsAndOutputs() throws Exception {
-        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
-        ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
-
-        URL resource = this.getClass().getResource("/config/process/dumb-process.xml");
-        Process processEntity = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
-        ConfigurationStore.get().publish(EntityType.PROCESS, processEntity);
-
-        OozieProcessMapper mapper = new OozieProcessMapper(processEntity);
-        Path bundlePath = new Path("/falcon/staging/workflows", processEntity.getName());
-        mapper.map(cluster, bundlePath);
-        assertTrue(fs.exists(bundlePath));
-
-        BUNDLEAPP bundle = getBundle(bundlePath);
-        assertEquals(EntityUtil.getWorkflowName(processEntity).toString(), bundle.getName());
-        assertEquals(1, bundle.getCoordinator().size());
-        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, processEntity).toString(),
-                bundle.getCoordinator().get(0).getName());
-        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
-
-        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
-        HashMap<String, String> props = new HashMap<String, String>();
-        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            props.put(prop.getName(), prop.getValue());
-        }
-
-        String[] expected = {
-            EntityInstanceMessage.ARG.feedNames.getPropName(),
-            EntityInstanceMessage.ARG.feedInstancePaths.getPropName(),
-            "falconInputFeeds",
-            "falconInPaths",
-            "userWorkflowName",
-            "userWorkflowVersion",
-            "userWorkflowEngine",
-        };
-
-        for (String property : expected) {
-            Assert.assertTrue(props.containsKey(property), "expected property missing: " + property);
-        }
-    }
-}