You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/03/18 12:41:10 UTC

[5/5] git commit: FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. Contributed by Shwetha GS

FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. Contributed by Shwetha GS


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/e2545b08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/e2545b08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/e2545b08

Branch: refs/heads/master
Commit: e2545b0874d206f3be88d4b3ac7003eae1161c44
Parents: 5e43521
Author: Shwetha GS <sh...@gmail.com>
Authored: Tue Mar 18 17:10:54 2014 +0530
Committer: Shwetha GS <sh...@gmail.com>
Committed: Tue Mar 18 17:10:54 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/falcon/util/ReflectionUtils.java |  33 +-
 .../apache/falcon/workflow/WorkflowBuilder.java |  20 +-
 .../apache/falcon/util/ReflectionUtilsTest.java |  49 ++
 .../falcon/converter/OozieFeedMapper.java       | 596 -------------
 .../workflow/OozieFeedWorkflowBuilder.java      | 594 ++++++++++++-
 .../falcon/converter/OozieFeedMapperTest.java   | 505 -----------
 .../converter/OozieFeedWorkflowBuilderTest.java | 505 +++++++++++
 .../converter/AbstractOozieEntityMapper.java    | 428 ----------
 .../java/org/apache/falcon/util/OozieUtils.java |  54 ++
 .../falcon/workflow/OozieWorkflowBuilder.java   | 370 ++++++++
 .../workflow/engine/OozieWorkflowEngine.java    |  15 +-
 .../falcon/converter/OozieProcessMapper.java    | 833 -------------------
 .../workflow/OozieProcessWorkflowBuilder.java   | 822 +++++++++++++++++-
 .../OozieProcessMapperLateProcessTest.java      |  96 ---
 .../converter/OozieProcessMapperTest.java       | 557 -------------
 .../OozieProcessWorkflowBuilderTest.java        | 559 +++++++++++++
 .../falcon/retention/FeedEvictorTest.java       |   2 +-
 18 files changed, 2936 insertions(+), 3104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5748d27..f3ddf96 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,8 @@ Trunk (Unreleased)
     (Venkatesh Seetharam)
    
   IMPROVEMENTS
+    FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. (Shwetha GS)
+
     FALCON-355 Remove SLAMonitoringService. (Shwetha GS)
 
     FALCON-333 jsp-api dependency is defined twice. (Jean-Baptiste

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java b/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
index 4a00fa9..80022e0 100644
--- a/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
+++ b/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
@@ -20,6 +20,7 @@ package org.apache.falcon.util;
 
 import org.apache.falcon.FalconException;
 
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 
 /**
@@ -30,12 +31,11 @@ public final class ReflectionUtils {
     private ReflectionUtils() {}
 
     public static <T> T getInstance(String classKey) throws FalconException {
-        String clazzName = StartupProperties.get().getProperty(classKey);
-        try {
-            return ReflectionUtils.<T>getInstanceByClassName(clazzName);
-        } catch (FalconException e) {
-            throw new FalconException("Unable to get instance for key: " + classKey, e);
-        }
+        return ReflectionUtils.<T>getInstanceByClassName(StartupProperties.get().getProperty(classKey));
+    }
+
+    public static <T> T getInstance(String classKey, Class<?> argCls, Object arg) throws FalconException {
+        return ReflectionUtils.<T>getInstanceByClassName(StartupProperties.get().getProperty(classKey), argCls, arg);
     }
 
     @SuppressWarnings("unchecked")
@@ -52,4 +52,25 @@ public final class ReflectionUtils {
             throw new FalconException("Unable to get instance for " + clazzName, e);
         }
     }
+
+    /**
+     * Invokes constructor with one argument.
+     * @param clazzName - classname
+     * @param argCls - Class of the argument
+     * @param arg - constructor argument
+     * @param <T> - instance type
+     * @return Class instance
+     * @throws FalconException
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T getInstanceByClassName(String clazzName, Class<?> argCls, Object arg) throws
+        FalconException {
+        try {
+            Class<T> clazz = (Class<T>) ReflectionUtils.class.getClassLoader().loadClass(clazzName);
+            Constructor<T> constructor = clazz.getConstructor(argCls);
+            return constructor.newInstance(arg);
+        } catch (Exception e) {
+            throw new FalconException("Unable to get instance for " + clazzName, e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
index 26243e7..1f9a8c8 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
@@ -22,8 +22,6 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.util.ReflectionUtils;
 
-import java.util.Date;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -32,16 +30,22 @@ import java.util.Properties;
  * @param <T>
  */
 public abstract class WorkflowBuilder<T extends Entity> {
+    protected T entity;
+
+    protected WorkflowBuilder(T entity) {
+        this.entity = entity;
+    }
+
+    public T getEntity() {
+        return  entity;
+    }
 
     public static WorkflowBuilder<Entity> getBuilder(String engine, Entity entity) throws FalconException {
         String classKey = engine + "." + entity.getEntityType().name().toLowerCase() + ".workflow.builder";
-        return ReflectionUtils.getInstance(classKey);
+        return ReflectionUtils.getInstance(classKey, entity.getEntityType().getEntityClass(), entity);
     }
 
-    public abstract Map<String, Properties> newWorkflowSchedule(T entity, List<String> clusters) throws FalconException;
-
-    public abstract Properties newWorkflowSchedule(T entity, Date startDate, String clusterName, String user)
-        throws FalconException;
+    public abstract Map<String, Properties> newWorkflowSchedule(String... clusters) throws FalconException;
 
-    public abstract String[] getWorkflowNames(T entity);
+    public abstract String[] getWorkflowNames();
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/common/src/test/java/org/apache/falcon/util/ReflectionUtilsTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/util/ReflectionUtilsTest.java b/common/src/test/java/org/apache/falcon/util/ReflectionUtilsTest.java
new file mode 100644
index 0000000..bc0bce0
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/util/ReflectionUtilsTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.parser.ClusterEntityParser;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Tests ReflectionUtils.
+ */
+@Test
+public class ReflectionUtilsTest {
+    public void testGetInstance() throws FalconException {
+        //with 1 arg constructor, arg null
+        Object e = ReflectionUtils.getInstanceByClassName("org.apache.falcon.FalconException", Throwable.class, null);
+        Assert.assertTrue(e instanceof  FalconException);
+
+        //with 1 arg constructor, arg not null
+        e = ReflectionUtils.getInstanceByClassName("org.apache.falcon.FalconException", Throwable.class,
+            new Throwable());
+        Assert.assertTrue(e instanceof  FalconException);
+
+        //no constructor, using get() method
+        e = ReflectionUtils.getInstanceByClassName("org.apache.falcon.util.StartupProperties");
+        Assert.assertTrue(e instanceof  StartupProperties);
+
+        //with empty constructor
+        e = ReflectionUtils.getInstanceByClassName("org.apache.falcon.entity.parser.ClusterEntityParser");
+        Assert.assertTrue(e instanceof ClusterEntityParser);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
deleted file mode 100644
index 2b3315f..0000000
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ /dev/null
@@ -1,596 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.converter;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Property;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.messaging.EntityInstanceMessage.EntityOps;
-import org.apache.falcon.oozie.coordinator.ACTION;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.SYNCDATASET;
-import org.apache.falcon.oozie.coordinator.WORKFLOW;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.util.BuildProperties;
-import org.apache.falcon.util.RuntimeProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.*;
-
-/**
- * Mapper which maps feed definition to oozie workflow definitions for
- * replication & retention.
- */
-public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
-
-    private static final Logger LOG = Logger.getLogger(OozieFeedMapper.class);
-
-    private final RetentionOozieWorkflowMapper retentionMapper = new RetentionOozieWorkflowMapper();
-    private final ReplicationOozieWorkflowMapper replicationMapper = new ReplicationOozieWorkflowMapper();
-
-    public OozieFeedMapper(Feed feed) {
-        super(feed);
-    }
-
-    @Override
-    protected List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
-        List<COORDINATORAPP> coords = new ArrayList<COORDINATORAPP>();
-        COORDINATORAPP retentionCoord = getRetentionCoordinator(cluster, bundlePath);
-        if (retentionCoord != null) {
-            coords.add(retentionCoord);
-        }
-        List<COORDINATORAPP> replicationCoords = getReplicationCoordinators(cluster, bundlePath);
-        coords.addAll(replicationCoords);
-        return coords;
-    }
-
-    private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
-
-        Feed feed = getEntity();
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
-
-        if (feedCluster.getValidity().getEnd().before(new Date())) {
-            LOG.warn("Feed Retention is not applicable as Feed's end time for cluster " + cluster.getName()
-                    + " is not in the future");
-            return null;
-        }
-
-        return retentionMapper.getRetentionCoordinator(cluster, bundlePath, feed, feedCluster);
-    }
-
-    private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath)
-        throws FalconException {
-
-        Feed feed = getEntity();
-        List<COORDINATORAPP> replicationCoords = new ArrayList<COORDINATORAPP>();
-
-        if (FeedHelper.getCluster(feed, targetCluster.getName()).getType() == ClusterType.TARGET) {
-            String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, feed).toString();
-            Path basePath = getCoordPath(bundlePath, coordName);
-            replicationMapper.createReplicatonWorkflow(targetCluster, basePath, coordName);
-
-            for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
-                if (feedCluster.getType() == ClusterType.SOURCE) {
-                    COORDINATORAPP coord = replicationMapper.createAndGetCoord(feed,
-                            (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, feedCluster.getName()),
-                            targetCluster, bundlePath);
-
-                    if (coord != null) {
-                        replicationCoords.add(coord);
-                    }
-                }
-            }
-        }
-
-        return replicationCoords;
-    }
-
-    @Override
-    protected Map<String, String> getEntityProperties() {
-        Feed feed = getEntity();
-        Map<String, String> props = new HashMap<String, String>();
-        if (feed.getProperties() != null) {
-            for (Property prop : feed.getProperties().getProperties()) {
-                props.put(prop.getName(), prop.getValue());
-            }
-        }
-        return props;
-    }
-
-    private final class RetentionOozieWorkflowMapper {
-
-        private static final String RETENTION_WF_TEMPLATE = "/config/workflow/retention-workflow.xml";
-
-        private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath, Feed feed,
-                                                       org.apache.falcon.entity.v0.feed.Cluster feedCluster)
-            throws FalconException {
-
-            COORDINATORAPP retentionApp = new COORDINATORAPP();
-            String coordName = EntityUtil.getWorkflowName(Tag.RETENTION, feed).toString();
-            retentionApp.setName(coordName);
-            retentionApp.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
-            retentionApp.setStart(SchemaHelper.formatDateUTC(new Date()));
-            retentionApp.setTimezone(feed.getTimezone().getID());
-            TimeUnit timeUnit = feed.getFrequency().getTimeUnit();
-            if (timeUnit == TimeUnit.hours || timeUnit == TimeUnit.minutes) {
-                retentionApp.setFrequency("${coord:hours(6)}");
-            } else {
-                retentionApp.setFrequency("${coord:days(1)}");
-            }
-
-            Path wfPath = getCoordPath(bundlePath, coordName);
-            retentionApp.setAction(getRetentionWorkflowAction(cluster, wfPath, coordName));
-            return retentionApp;
-        }
-
-        private ACTION getRetentionWorkflowAction(Cluster cluster, Path wfPath, String wfName)
-            throws FalconException {
-            Feed feed = getEntity();
-            ACTION retentionAction = new ACTION();
-            WORKFLOW retentionWorkflow = new WORKFLOW();
-            createRetentionWorkflow(cluster, wfPath, wfName);
-            retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
-
-            Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
-            props.put("timeZone", feed.getTimezone().getID());
-            props.put("frequency", feed.getFrequency().getTimeUnit().name());
-
-            final Storage storage = FeedHelper.createStorage(cluster, feed);
-            props.put("falconFeedStorageType", storage.getType().name());
-
-            String feedDataPath = storage.getUriTemplate();
-            props.put("feedDataPath",
-                    feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
-
-            org.apache.falcon.entity.v0.feed.Cluster feedCluster =
-                    FeedHelper.getCluster(feed, cluster.getName());
-            props.put("limit", feedCluster.getRetention().getLimit().toString());
-
-            props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
-            props.put(ARG.feedNames.getPropName(), feed.getName());
-            props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
-
-            props.put("falconInputFeeds", feed.getName());
-            props.put("falconInPaths", "IGNORE");
-
-            propagateUserWorkflowProperties(props, "eviction");
-
-            retentionWorkflow.setConfiguration(getCoordConfig(props));
-            retentionAction.setWorkflow(retentionWorkflow);
-
-            return retentionAction;
-        }
-
-        private void createRetentionWorkflow(Cluster cluster, Path wfPath, String wfName) throws FalconException {
-            try {
-                WORKFLOWAPP retWfApp = getWorkflowTemplate(RETENTION_WF_TEMPLATE);
-                retWfApp.setName(wfName);
-                addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention");
-                addOozieRetries(retWfApp);
-                marshal(cluster, retWfApp, wfPath);
-            } catch(IOException e) {
-                throw new FalconException("Unable to create retention workflow", e);
-            }
-        }
-    }
-
-    private class ReplicationOozieWorkflowMapper {
-        private static final String MR_MAX_MAPS = "maxMaps";
-
-        private static final int THIRTY_MINUTES = 30 * 60 * 1000;
-
-        private static final String REPLICATION_COORD_TEMPLATE = "/config/coordinator/replication-coordinator.xml";
-        private static final String REPLICATION_WF_TEMPLATE = "/config/workflow/replication-workflow.xml";
-
-        private static final String TIMEOUT = "timeout";
-        private static final String PARALLEL = "parallel";
-
-        private void createReplicatonWorkflow(Cluster cluster, Path wfPath, String wfName)
-            throws FalconException {
-            try {
-                WORKFLOWAPP repWFapp = getWorkflowTemplate(REPLICATION_WF_TEMPLATE);
-                repWFapp.setName(wfName);
-                addLibExtensionsToWorkflow(cluster, repWFapp, EntityType.FEED, "replication");
-                addOozieRetries(repWFapp);
-                marshal(cluster, repWFapp, wfPath);
-            } catch(IOException e) {
-                throw new FalconException("Unable to create replication workflow", e);
-            }
-
-        }
-
-        private COORDINATORAPP createAndGetCoord(Feed feed, Cluster srcCluster, Cluster trgCluster,
-                                                 Path bundlePath) throws FalconException {
-            long replicationDelayInMillis = getReplicationDelayInMillis(feed, srcCluster);
-            Date sourceStartDate = getStartDate(feed, srcCluster, replicationDelayInMillis);
-            Date sourceEndDate = getEndDate(feed, srcCluster);
-
-            Date targetStartDate = getStartDate(feed, trgCluster, replicationDelayInMillis);
-            Date targetEndDate = getEndDate(feed, trgCluster);
-
-            if (noOverlapExists(sourceStartDate, sourceEndDate,
-                    targetStartDate, targetEndDate)) {
-                LOG.warn("Not creating replication coordinator, as the source cluster:"
-                        + srcCluster.getName()
-                        + " and target cluster: "
-                        + trgCluster.getName()
-                        + " do not have overlapping dates");
-                return null;
-            }
-
-            COORDINATORAPP replicationCoord;
-            try {
-                replicationCoord = getCoordinatorTemplate(REPLICATION_COORD_TEMPLATE);
-            } catch (FalconException e) {
-                throw new FalconException("Cannot unmarshall replication coordinator template", e);
-            }
-
-            String coordName = EntityUtil.getWorkflowName(
-                    Tag.REPLICATION, Arrays.asList(srcCluster.getName()), feed).toString();
-            String start = sourceStartDate.after(targetStartDate)
-                    ? SchemaHelper.formatDateUTC(sourceStartDate) : SchemaHelper.formatDateUTC(targetStartDate);
-            String end = sourceEndDate.before(targetEndDate)
-                    ? SchemaHelper.formatDateUTC(sourceEndDate) : SchemaHelper.formatDateUTC(targetEndDate);
-
-            initializeCoordAttributes(replicationCoord, coordName, feed, start, end, replicationDelayInMillis);
-            setCoordControls(feed, replicationCoord);
-
-            final Storage sourceStorage = FeedHelper.createReadOnlyStorage(srcCluster, feed);
-            initializeInputDataSet(feed, srcCluster, replicationCoord, sourceStorage);
-
-            final Storage targetStorage = FeedHelper.createStorage(trgCluster, feed);
-            initializeOutputDataSet(feed, trgCluster, replicationCoord, targetStorage);
-
-            Path wfPath = getCoordPath(bundlePath, coordName);
-            ACTION replicationWorkflowAction = getReplicationWorkflowAction(
-                    srcCluster, trgCluster, wfPath, coordName, sourceStorage, targetStorage);
-            replicationCoord.setAction(replicationWorkflowAction);
-
-            return replicationCoord;
-        }
-
-        private Date getStartDate(Feed feed, Cluster cluster, long replicationDelayInMillis) {
-            Date startDate = FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart();
-            return replicationDelayInMillis == 0 ? startDate : new Date(startDate.getTime() + replicationDelayInMillis);
-        }
-
-        private Date getEndDate(Feed feed, Cluster cluster) {
-            return FeedHelper.getCluster(feed, cluster.getName()).getValidity().getEnd();
-        }
-
-        private boolean noOverlapExists(Date sourceStartDate, Date sourceEndDate,
-                                        Date targetStartDate, Date targetEndDate) {
-            return sourceStartDate.after(targetEndDate) || targetStartDate.after(sourceEndDate);
-        }
-
-        private void initializeCoordAttributes(COORDINATORAPP replicationCoord, String coordName,
-                                               Feed feed, String start, String end, long delayInMillis) {
-            replicationCoord.setName(coordName);
-            replicationCoord.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
-
-            if (delayInMillis > 0) {
-                long delayInMins = -1 * delayInMillis / (1000 * 60);
-                String elExp = "${now(0," + delayInMins + ")}";
-
-                replicationCoord.getInputEvents().getDataIn().get(0).getInstance().set(0, elExp);
-                replicationCoord.getOutputEvents().getDataOut().get(0).setInstance(elExp);
-            }
-
-            replicationCoord.setStart(start);
-            replicationCoord.setEnd(end);
-            replicationCoord.setTimezone(feed.getTimezone().getID());
-        }
-
-        private long getReplicationDelayInMillis(Feed feed, Cluster srcCluster) throws FalconException {
-            Frequency replicationDelay = FeedHelper.getCluster(feed, srcCluster.getName()).getDelay();
-            long delayInMillis=0;
-            if (replicationDelay != null) {
-                delayInMillis = ExpressionHelper.get().evaluate(
-                        replicationDelay.toString(), Long.class);
-            }
-
-            return delayInMillis;
-        }
-
-        private void setCoordControls(Feed feed, COORDINATORAPP replicationCoord) throws FalconException {
-            long frequencyInMillis = ExpressionHelper.get().evaluate(
-                    feed.getFrequency().toString(), Long.class);
-            long timeoutInMillis = frequencyInMillis * 6;
-            if (timeoutInMillis < THIRTY_MINUTES) {
-                timeoutInMillis = THIRTY_MINUTES;
-            }
-
-            Map<String, String> props = getEntityProperties();
-            String timeout = props.get(TIMEOUT);
-            if (timeout!=null) {
-                try{
-                    timeoutInMillis= ExpressionHelper.get().evaluate(timeout, Long.class);
-                } catch (Exception ignore) {
-                    LOG.error("Unable to evaluate timeout:", ignore);
-                }
-            }
-            replicationCoord.getControls().setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
-            replicationCoord.getControls().setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
-
-            String parallelProp = props.get(PARALLEL);
-            int parallel = 1;
-            if (parallelProp != null) {
-                try {
-                    parallel = Integer.parseInt(parallelProp);
-                } catch (NumberFormatException ignore) {
-                    LOG.error("Unable to parse parallel:", ignore);
-                }
-            }
-            replicationCoord.getControls().setConcurrency(String.valueOf(parallel));
-        }
-
-        private void initializeInputDataSet(Feed feed, Cluster srcCluster, COORDINATORAPP replicationCoord,
-                                            Storage sourceStorage) throws FalconException {
-            SYNCDATASET inputDataset = (SYNCDATASET)
-                    replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(0);
-
-            String uriTemplate = sourceStorage.getUriTemplate(LocationType.DATA);
-            if (sourceStorage.getType() == Storage.TYPE.TABLE) {
-                uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
-            }
-            inputDataset.setUriTemplate(uriTemplate);
-
-            setDatasetValues(inputDataset, feed, srcCluster);
-
-            if (feed.getAvailabilityFlag() == null) {
-                inputDataset.setDoneFlag("");
-            } else {
-                inputDataset.setDoneFlag(feed.getAvailabilityFlag());
-            }
-        }
-
-        private void initializeOutputDataSet(Feed feed, Cluster targetCluster, COORDINATORAPP replicationCoord,
-                                             Storage targetStorage) throws FalconException {
-            SYNCDATASET outputDataset = (SYNCDATASET)
-                    replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(1);
-
-            String uriTemplate = targetStorage.getUriTemplate(LocationType.DATA);
-            if (targetStorage.getType() == Storage.TYPE.TABLE) {
-                uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
-            }
-            outputDataset.setUriTemplate(uriTemplate);
-
-            setDatasetValues(outputDataset, feed, targetCluster);
-        }
-
-        private void setDatasetValues(SYNCDATASET dataset, Feed feed, Cluster cluster) {
-            dataset.setInitialInstance(SchemaHelper.formatDateUTC(
-                    FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart()));
-            dataset.setTimezone(feed.getTimezone().getID());
-            dataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
-        }
-
-        private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path wfPath,
-                                                    String wfName, Storage sourceStorage,
-                                                    Storage targetStorage) throws FalconException {
-            ACTION replicationAction = new ACTION();
-            WORKFLOW replicationWF = new WORKFLOW();
-            try {
-                replicationWF.setAppPath(getStoragePath(wfPath.toString()));
-                Feed feed = getEntity();
-
-                Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfPath, wfName);
-                props.put("srcClusterName", srcCluster.getName());
-                props.put("srcClusterColo", srcCluster.getColo());
-                if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
-                    props.put(MR_MAX_MAPS, getDefaultMaxMaps());
-                }
-
-                // the storage type is uniform across source and target feeds for replication
-                props.put("falconFeedStorageType", sourceStorage.getType().name());
-
-                String instancePaths = null;
-                if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) {
-                    String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
-                    instancePaths = pathsWithPartitions;
-
-                    propagateFileSystemCopyProperties(pathsWithPartitions, props);
-                } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
-                    instancePaths = "${coord:dataIn('input')}";
-
-                    final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
-                    propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource");
-                    final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage;
-                    propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
-                    propagateTableCopyProperties(srcCluster, sourceTableStorage,
-                            trgCluster, targetTableStorage, props);
-                    setupHiveConfiguration(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, wfPath);
-                }
-
-                propagateLateDataProperties(feed, instancePaths, sourceStorage.getType().name(), props);
-                propagateUserWorkflowProperties(props, "replication");
-
-                replicationWF.setConfiguration(getCoordConfig(props));
-                replicationAction.setWorkflow(replicationWF);
-
-            } catch (Exception e) {
-                throw new FalconException("Unable to create replication workflow", e);
-            }
-
-            return replicationAction;
-        }
-
-        private String getDefaultMaxMaps() {
-            return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
-        }
-
-        private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster,
-                                              Feed feed) throws FalconException {
-            String srcPart = FeedHelper.normalizePartitionExpression(
-                    FeedHelper.getCluster(feed, srcCluster.getName()).getPartition());
-            srcPart = FeedHelper.evaluateClusterExp(srcCluster, srcPart);
-
-            String targetPart = FeedHelper.normalizePartitionExpression(
-                    FeedHelper.getCluster(feed, trgCluster.getName()).getPartition());
-            targetPart = FeedHelper.evaluateClusterExp(trgCluster, targetPart);
-
-            StringBuilder pathsWithPartitions = new StringBuilder();
-            pathsWithPartitions.append("${coord:dataIn('input')}/")
-                    .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
-
-            String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
-            parts = StringUtils.stripEnd(parts, "/");
-            return parts;
-        }
-
-        private void propagateFileSystemCopyProperties(String pathsWithPartitions,
-                                                       Map<String, String> props) throws FalconException {
-            props.put("sourceRelativePaths", pathsWithPartitions);
-
-            props.put("distcpSourcePaths", "${coord:dataIn('input')}");
-            props.put("distcpTargetPaths", "${coord:dataOut('output')}");
-        }
-
-        private void propagateTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
-                                                     Map<String, String> props, String prefix) {
-            props.put(prefix + "NameNode", ClusterHelper.getStorageUrl(cluster));
-            props.put(prefix + "JobTracker", ClusterHelper.getMREndPoint(cluster));
-            props.put(prefix + "HcatNode", tableStorage.getCatalogUrl());
-
-            props.put(prefix + "Database", tableStorage.getDatabase());
-            props.put(prefix + "Table", tableStorage.getTable());
-            props.put(prefix + "Partition", "${coord:dataInPartitionFilter('input', 'hive')}");
-        }
-
-        private void setupHiveConfiguration(Cluster srcCluster, CatalogStorage sourceStorage,
-                                            Cluster trgCluster, CatalogStorage targetStorage, Path wfPath)
-            throws IOException, FalconException {
-            Configuration conf = ClusterHelper.getConfiguration(trgCluster);
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
-
-            // copy import export scripts to stagingDir
-            Path scriptPath = new Path(wfPath, "scripts");
-            copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-export.hql");
-            copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-import.hql");
-
-            // create hive conf to stagingDir
-            Path confPath = new Path(wfPath + "/conf");
-            createHiveConf(fs, confPath, sourceStorage.getCatalogUrl(), srcCluster, "falcon-source-");
-            createHiveConf(fs, confPath, targetStorage.getCatalogUrl(), trgCluster, "falcon-target-");
-        }
-
-        private void copyHiveScript(FileSystem fs, Path scriptPath,
-                                    String localScriptPath, String scriptName) throws IOException {
-            OutputStream out = null;
-            InputStream in = null;
-            try {
-                out = fs.create(new Path(scriptPath, scriptName));
-                in = OozieFeedMapper.class.getResourceAsStream(localScriptPath + scriptName);
-                IOUtils.copy(in, out);
-            } finally {
-                IOUtils.closeQuietly(in);
-                IOUtils.closeQuietly(out);
-            }
-        }
-
-        private void propagateTableCopyProperties(Cluster srcCluster, CatalogStorage sourceStorage,
-                                                  Cluster trgCluster, CatalogStorage targetStorage,
-                                                  Map<String, String> props) {
-            // create staging dirs for export at source & set it as distcpSourcePaths
-            String sourceDatedPartitionKey = sourceStorage.getDatedPartitionKey();
-            String sourceStagingDir =
-                    FeedHelper.getStagingDir(srcCluster, getEntity(), sourceStorage, Tag.REPLICATION)
-                    + "/" + sourceDatedPartitionKey
-                    + "=${coord:dataOutPartitionValue('output', '" + sourceDatedPartitionKey + "')}";
-            props.put("distcpSourcePaths", sourceStagingDir + "/" + NOMINAL_TIME_EL + "/data");
-
-            // create staging dirs for import at target & set it as distcpTargetPaths
-            String targetDatedPartitionKey = targetStorage.getDatedPartitionKey();
-            String targetStagingDir =
-                    FeedHelper.getStagingDir(trgCluster, getEntity(), targetStorage, Tag.REPLICATION)
-                    + "/" + targetDatedPartitionKey
-                    + "=${coord:dataOutPartitionValue('output', '" + targetDatedPartitionKey + "')}";
-            props.put("distcpTargetPaths", targetStagingDir + "/" + NOMINAL_TIME_EL + "/data");
-
-            props.put("sourceRelativePaths", "IGNORE"); // this will bot be used for Table storage.
-        }
-
-        private void propagateLateDataProperties(Feed feed, String instancePaths,
-                                                 String falconFeedStorageType, Map<String, String> props) {
-            // todo these pairs are the same but used in different context
-            // late data handler - should-record action
-            props.put("falconInputFeeds", feed.getName());
-            props.put("falconInPaths", instancePaths);
-
-            // storage type for each corresponding feed - in this case only one feed is involved
-            // needed to compute usage based on storage type in LateDataHandler
-            props.put("falconInputFeedStorageTypes", falconFeedStorageType);
-
-            // falcon post processing
-            props.put(ARG.feedNames.getPropName(), feed.getName());
-            props.put(ARG.feedInstancePaths.getPropName(), "${coord:dataOut('output')}");
-        }
-    }
-
-    private void addOozieRetries(WORKFLOWAPP workflow) {
-        for (Object object : workflow.getDecisionOrForkOrJoin()) {
-            if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
-                continue;
-            }
-            org.apache.falcon.oozie.workflow.ACTION action = (org.apache.falcon.oozie.workflow.ACTION) object;
-            String actionName = action.getName();
-            if (FALCON_ACTIONS.contains(actionName)) {
-                decorateWithOozieRetries(action);
-            }
-        }
-    }
-
-    private void propagateUserWorkflowProperties(Map<String, String> props, String policy) {
-        props.put("userWorkflowName", policy + "-policy");
-        props.put("userWorkflowEngine", "falcon");
-
-        String version;
-        try {
-            version = BuildProperties.get().getProperty("build.version");
-        } catch (Exception e) {  // unfortunate that this is only available in prism/webapp
-            version = "0.5";
-        }
-        props.put("userWorkflowVersion", version);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
index 5e3a30e..2008c2d 100644
--- a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
+++ b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
@@ -18,60 +18,83 @@
 
 package org.apache.falcon.workflow;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
-import org.apache.falcon.converter.AbstractOozieEntityMapper;
-import org.apache.falcon.converter.OozieFeedMapper;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Property;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.messaging.EntityInstanceMessage.EntityOps;
+import org.apache.falcon.oozie.coordinator.ACTION;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.BuildProperties;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
 
-import java.util.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
 /**
  * Workflow definition builder for feed replication & retention.
  */
 public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
+    private static final Logger LOG = Logger.getLogger(OozieFeedWorkflowBuilder.class);
+
+    public OozieFeedWorkflowBuilder(Feed entity) {
+        super(entity);
+    }
 
     @Override
-    public Map<String, Properties> newWorkflowSchedule(Feed feed, List<String> clusters) throws FalconException {
+    public Map<String, Properties> newWorkflowSchedule(String... clusters) throws FalconException {
         Map<String, Properties> propertiesMap = new HashMap<String, Properties>();
 
         for (String clusterName : clusters) {
-            org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
-            Properties properties = newWorkflowSchedule(feed, feedCluster.getValidity().getStart(), clusterName,
-                    CurrentUser.getUser());
-            if (properties == null) {
-                continue;
+            org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, clusterName);
+            if (!feedCluster.getValidity().getStart().before(feedCluster.getValidity().getEnd())) {
+                LOG.info("feed validity start <= end for cluster " + clusterName + ". Skipping schedule");
+                break;
             }
-            propertiesMap.put(clusterName, properties);
-        }
-        return propertiesMap;
-    }
 
-    @Override
-    public Properties newWorkflowSchedule(Feed feed, Date startDate, String clusterName, String user)
-        throws FalconException {
-
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
-        if (!startDate.before(feedCluster.getValidity().getEnd())) {
-            return null;
-        }
+            Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, feedCluster.getName());
+            Path bundlePath = EntityUtil.getNewStagingPath(cluster, entity);
 
-        Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, feedCluster.getName());
-        Path bundlePath = EntityUtil.getNewStagingPath(cluster, feed);
-        Feed feedClone = (Feed) feed.copy();
-        EntityUtil.setStartDate(feedClone, clusterName, startDate);
-
-        AbstractOozieEntityMapper<Feed> mapper = new OozieFeedMapper(feedClone);
-        if (!mapper.map(cluster, bundlePath)) {
-            return null;
+            if (!map(cluster, bundlePath)) {
+                break;
+            }
+            propertiesMap.put(clusterName, createAppProperties(clusterName, bundlePath, CurrentUser.getUser()));
         }
-        return createAppProperties(clusterName, bundlePath, user);
+        return propertiesMap;
     }
 
     @Override
@@ -82,9 +105,518 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
     }
 
     @Override
-    public String[] getWorkflowNames(Feed entity) {
+    public String[] getWorkflowNames() {
         return new String[]{
                 EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString(),
                 EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString(), };
     }
+
+    private final RetentionOozieWorkflowMapper retentionMapper = new RetentionOozieWorkflowMapper();
+    private final ReplicationOozieWorkflowMapper replicationMapper = new ReplicationOozieWorkflowMapper();
+
+    @Override
+    public List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
+        List<COORDINATORAPP> coords = new ArrayList<COORDINATORAPP>();
+        COORDINATORAPP retentionCoord = getRetentionCoordinator(cluster, bundlePath);
+        if (retentionCoord != null) {
+            coords.add(retentionCoord);
+        }
+        List<COORDINATORAPP> replicationCoords = getReplicationCoordinators(cluster, bundlePath);
+        coords.addAll(replicationCoords);
+        return coords;
+    }
+
+    private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+
+        if (feedCluster.getValidity().getEnd().before(new Date())) {
+            LOG.warn("Feed Retention is not applicable as Feed's end time for cluster " + cluster.getName()
+                + " is not in the future");
+            return null;
+        }
+
+        return retentionMapper.getRetentionCoordinator(cluster, bundlePath, entity, feedCluster);
+    }
+
+    private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath)
+        throws FalconException {
+        List<COORDINATORAPP> replicationCoords = new ArrayList<COORDINATORAPP>();
+
+        if (FeedHelper.getCluster(entity, targetCluster.getName()).getType() == ClusterType.TARGET) {
+            String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString();
+            Path basePath = getCoordPath(bundlePath, coordName);
+            replicationMapper.createReplicatonWorkflow(targetCluster, basePath, coordName);
+
+            for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : entity.getClusters().getClusters()) {
+                if (feedCluster.getType() == ClusterType.SOURCE) {
+                    COORDINATORAPP coord = replicationMapper.createAndGetCoord(entity,
+                        (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, feedCluster.getName()),
+                        targetCluster, bundlePath);
+
+                    if (coord != null) {
+                        replicationCoords.add(coord);
+                    }
+                }
+            }
+        }
+
+        return replicationCoords;
+    }
+
+    @Override
+    protected Map<String, String> getEntityProperties() {
+        Map<String, String> props = new HashMap<String, String>();
+        if (entity.getProperties() != null) {
+            for (Property prop : entity.getProperties().getProperties()) {
+                props.put(prop.getName(), prop.getValue());
+            }
+        }
+        return props;
+    }
+
+    private final class RetentionOozieWorkflowMapper {
+
+        private static final String RETENTION_WF_TEMPLATE = "/config/workflow/retention-workflow.xml";
+
+        private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath, Feed feed,
+            org.apache.falcon.entity.v0.feed.Cluster feedCluster) throws FalconException {
+            COORDINATORAPP retentionApp = new COORDINATORAPP();
+            String coordName = EntityUtil.getWorkflowName(Tag.RETENTION, feed).toString();
+            retentionApp.setName(coordName);
+            retentionApp.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
+            retentionApp.setStart(SchemaHelper.formatDateUTC(new Date()));
+            retentionApp.setTimezone(feed.getTimezone().getID());
+            TimeUnit timeUnit = feed.getFrequency().getTimeUnit();
+            if (timeUnit == TimeUnit.hours || timeUnit == TimeUnit.minutes) {
+                retentionApp.setFrequency("${coord:hours(6)}");
+            } else {
+                retentionApp.setFrequency("${coord:days(1)}");
+            }
+
+            Path wfPath = getCoordPath(bundlePath, coordName);
+            retentionApp.setAction(getRetentionWorkflowAction(cluster, wfPath, coordName));
+            return retentionApp;
+        }
+
+        private ACTION getRetentionWorkflowAction(Cluster cluster, Path wfPath, String wfName)
+            throws FalconException {
+            ACTION retentionAction = new ACTION();
+            WORKFLOW retentionWorkflow = new WORKFLOW();
+            createRetentionWorkflow(cluster, wfPath, wfName);
+            retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
+
+            Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
+            props.put("timeZone", entity.getTimezone().getID());
+            props.put("frequency", entity.getFrequency().getTimeUnit().name());
+
+            final Storage storage = FeedHelper.createStorage(cluster, entity);
+            props.put("falconFeedStorageType", storage.getType().name());
+
+            String feedDataPath = storage.getUriTemplate();
+            props.put("feedDataPath",
+                feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+
+            org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+            props.put("limit", feedCluster.getRetention().getLimit().toString());
+
+            props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
+            props.put(ARG.feedNames.getPropName(), entity.getName());
+            props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
+
+            props.put("falconInputFeeds", entity.getName());
+            props.put("falconInPaths", "IGNORE");
+
+            propagateUserWorkflowProperties(props, "eviction");
+
+            retentionWorkflow.setConfiguration(getCoordConfig(props));
+            retentionAction.setWorkflow(retentionWorkflow);
+            return retentionAction;
+        }
+
+        private void createRetentionWorkflow(Cluster cluster, Path wfPath, String wfName) throws FalconException {
+            try {
+                WORKFLOWAPP retWfApp = getWorkflowTemplate(RETENTION_WF_TEMPLATE);
+                retWfApp.setName(wfName);
+                addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention");
+                addOozieRetries(retWfApp);
+                marshal(cluster, retWfApp, wfPath);
+            } catch(IOException e) {
+                throw new FalconException("Unable to create retention workflow", e);
+            }
+        }
+    }
+
+    private class ReplicationOozieWorkflowMapper {
+        private static final String MR_MAX_MAPS = "maxMaps";
+
+        private static final int THIRTY_MINUTES = 30 * 60 * 1000;
+
+        private static final String REPLICATION_COORD_TEMPLATE = "/config/coordinator/replication-coordinator.xml";
+        private static final String REPLICATION_WF_TEMPLATE = "/config/workflow/replication-workflow.xml";
+
+        private static final String TIMEOUT = "timeout";
+        private static final String PARALLEL = "parallel";
+
+        private void createReplicatonWorkflow(Cluster cluster, Path wfPath, String wfName)
+            throws FalconException {
+            try {
+                WORKFLOWAPP repWFapp = getWorkflowTemplate(REPLICATION_WF_TEMPLATE);
+                repWFapp.setName(wfName);
+                addLibExtensionsToWorkflow(cluster, repWFapp, EntityType.FEED, "replication");
+                addOozieRetries(repWFapp);
+                marshal(cluster, repWFapp, wfPath);
+            } catch(IOException e) {
+                throw new FalconException("Unable to create replication workflow", e);
+            }
+
+        }
+
+        private COORDINATORAPP createAndGetCoord(Feed feed, Cluster srcCluster, Cluster trgCluster,
+            Path bundlePath) throws FalconException {
+            long replicationDelayInMillis = getReplicationDelayInMillis(feed, srcCluster);
+            Date sourceStartDate = getStartDate(feed, srcCluster, replicationDelayInMillis);
+            Date sourceEndDate = getEndDate(feed, srcCluster);
+
+            Date targetStartDate = getStartDate(feed, trgCluster, replicationDelayInMillis);
+            Date targetEndDate = getEndDate(feed, trgCluster);
+
+            if (noOverlapExists(sourceStartDate, sourceEndDate,
+                targetStartDate, targetEndDate)) {
+                LOG.warn("Not creating replication coordinator, as the source cluster:" + srcCluster.getName()
+                    + "and target cluster: " + trgCluster.getName() + " do not have overlapping dates");
+                return null;
+            }
+
+            COORDINATORAPP replicationCoord;
+            try {
+                replicationCoord = getCoordinatorTemplate(REPLICATION_COORD_TEMPLATE);
+            } catch (FalconException e) {
+                throw new FalconException("Cannot unmarshall replication coordinator template", e);
+            }
+
+            String coordName = EntityUtil.getWorkflowName(
+                Tag.REPLICATION, Arrays.asList(srcCluster.getName()), feed).toString();
+            String start = sourceStartDate.after(targetStartDate)
+                ? SchemaHelper.formatDateUTC(sourceStartDate) : SchemaHelper.formatDateUTC(targetStartDate);
+            String end = sourceEndDate.before(targetEndDate)
+                ? SchemaHelper.formatDateUTC(sourceEndDate) : SchemaHelper.formatDateUTC(targetEndDate);
+
+            initializeCoordAttributes(replicationCoord, coordName, feed, start, end, replicationDelayInMillis);
+            setCoordControls(feed, replicationCoord);
+
+            final Storage sourceStorage = FeedHelper.createReadOnlyStorage(srcCluster, feed);
+            initializeInputDataSet(feed, srcCluster, replicationCoord, sourceStorage);
+
+            final Storage targetStorage = FeedHelper.createStorage(trgCluster, feed);
+            initializeOutputDataSet(feed, trgCluster, replicationCoord, targetStorage);
+
+            Path wfPath = getCoordPath(bundlePath, coordName);
+            ACTION replicationWorkflowAction = getReplicationWorkflowAction(
+                srcCluster, trgCluster, wfPath, coordName, sourceStorage, targetStorage);
+            replicationCoord.setAction(replicationWorkflowAction);
+
+            return replicationCoord;
+        }
+
+        private Date getStartDate(Feed feed, Cluster cluster, long replicationDelayInMillis) {
+            Date startDate = FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart();
+            return replicationDelayInMillis == 0 ? startDate : new Date(startDate.getTime() + replicationDelayInMillis);
+        }
+
+        private Date getEndDate(Feed feed, Cluster cluster) {
+            return FeedHelper.getCluster(feed, cluster.getName()).getValidity().getEnd();
+        }
+
+        private boolean noOverlapExists(Date sourceStartDate, Date sourceEndDate,
+            Date targetStartDate, Date targetEndDate) {
+            return sourceStartDate.after(targetEndDate) || targetStartDate.after(sourceEndDate);
+        }
+
+        private void initializeCoordAttributes(COORDINATORAPP replicationCoord, String coordName,
+            Feed feed, String start, String end, long delayInMillis) {
+            replicationCoord.setName(coordName);
+            replicationCoord.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+
+            if (delayInMillis > 0) {
+                long delayInMins = -1 * delayInMillis / (1000 * 60);
+                String elExp = "${now(0," + delayInMins + ")}";
+
+                replicationCoord.getInputEvents().getDataIn().get(0).getInstance().set(0, elExp);
+                replicationCoord.getOutputEvents().getDataOut().get(0).setInstance(elExp);
+            }
+
+            replicationCoord.setStart(start);
+            replicationCoord.setEnd(end);
+            replicationCoord.setTimezone(feed.getTimezone().getID());
+        }
+
+        private long getReplicationDelayInMillis(Feed feed, Cluster srcCluster) throws FalconException {
+            Frequency replicationDelay = FeedHelper.getCluster(feed, srcCluster.getName()).getDelay();
+            long delayInMillis=0;
+            if (replicationDelay != null) {
+                delayInMillis = ExpressionHelper.get().evaluate(
+                    replicationDelay.toString(), Long.class);
+            }
+
+            return delayInMillis;
+        }
+
+        private void setCoordControls(Feed feed, COORDINATORAPP replicationCoord) throws FalconException {
+            long frequencyInMillis = ExpressionHelper.get().evaluate(
+                feed.getFrequency().toString(), Long.class);
+            long timeoutInMillis = frequencyInMillis * 6;
+            if (timeoutInMillis < THIRTY_MINUTES) {
+                timeoutInMillis = THIRTY_MINUTES;
+            }
+
+            Map<String, String> props = getEntityProperties();
+            String timeout = props.get(TIMEOUT);
+            if (timeout!=null) {
+                try{
+                    timeoutInMillis= ExpressionHelper.get().evaluate(timeout, Long.class);
+                } catch (Exception ignore) {
+                    LOG.error("Unable to evaluate timeout:", ignore);
+                }
+            }
+            replicationCoord.getControls().setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
+            replicationCoord.getControls().setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
+
+            String parallelProp = props.get(PARALLEL);
+            int parallel = 1;
+            if (parallelProp != null) {
+                try {
+                    parallel = Integer.parseInt(parallelProp);
+                } catch (NumberFormatException ignore) {
+                    LOG.error("Unable to parse parallel:", ignore);
+                }
+            }
+            replicationCoord.getControls().setConcurrency(String.valueOf(parallel));
+        }
+
+        private void initializeInputDataSet(Feed feed, Cluster srcCluster, COORDINATORAPP replicationCoord,
+            Storage sourceStorage) throws FalconException {
+            SYNCDATASET inputDataset = (SYNCDATASET)
+                replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(0);
+
+            String uriTemplate = sourceStorage.getUriTemplate(LocationType.DATA);
+            if (sourceStorage.getType() == Storage.TYPE.TABLE) {
+                uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+            }
+            inputDataset.setUriTemplate(uriTemplate);
+
+            setDatasetValues(inputDataset, feed, srcCluster);
+
+            if (feed.getAvailabilityFlag() == null) {
+                inputDataset.setDoneFlag("");
+            } else {
+                inputDataset.setDoneFlag(feed.getAvailabilityFlag());
+            }
+        }
+
+        private void initializeOutputDataSet(Feed feed, Cluster targetCluster, COORDINATORAPP replicationCoord,
+            Storage targetStorage) throws FalconException {
+            SYNCDATASET outputDataset = (SYNCDATASET)
+                replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(1);
+
+            String uriTemplate = targetStorage.getUriTemplate(LocationType.DATA);
+            if (targetStorage.getType() == Storage.TYPE.TABLE) {
+                uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+            }
+            outputDataset.setUriTemplate(uriTemplate);
+
+            setDatasetValues(outputDataset, feed, targetCluster);
+        }
+
+        private void setDatasetValues(SYNCDATASET dataset, Feed feed, Cluster cluster) {
+            dataset.setInitialInstance(SchemaHelper.formatDateUTC(
+                FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart()));
+            dataset.setTimezone(feed.getTimezone().getID());
+            dataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+        }
+
+        private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path wfPath,
+            String wfName, Storage sourceStorage,
+            Storage targetStorage) throws FalconException {
+            ACTION replicationAction = new ACTION();
+            WORKFLOW replicationWF = new WORKFLOW();
+            try {
+                replicationWF.setAppPath(getStoragePath(wfPath.toString()));
+                Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfPath, wfName);
+                props.put("srcClusterName", srcCluster.getName());
+                props.put("srcClusterColo", srcCluster.getColo());
+                if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
+                    props.put(MR_MAX_MAPS, getDefaultMaxMaps());
+                }
+
+                // the storage type is uniform across source and target feeds for replication
+                props.put("falconFeedStorageType", sourceStorage.getType().name());
+
+                String instancePaths = null;
+                if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) {
+                    String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, entity);
+                    instancePaths = pathsWithPartitions;
+
+                    propagateFileSystemCopyProperties(pathsWithPartitions, props);
+                } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
+                    instancePaths = "${coord:dataIn('input')}";
+
+                    final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
+                    propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource");
+                    final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage;
+                    propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
+                    propagateTableCopyProperties(srcCluster, sourceTableStorage,
+                        trgCluster, targetTableStorage, props);
+                    setupHiveConfiguration(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, wfPath);
+                }
+
+                propagateLateDataProperties(entity, instancePaths, sourceStorage.getType().name(), props);
+                propagateUserWorkflowProperties(props, "replication");
+
+                replicationWF.setConfiguration(getCoordConfig(props));
+                replicationAction.setWorkflow(replicationWF);
+
+            } catch (Exception e) {
+                throw new FalconException("Unable to create replication workflow", e);
+            }
+
+            return replicationAction;
+        }
+
+        private String getDefaultMaxMaps() {
+            return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
+        }
+
+        private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster,
+            Feed feed) throws FalconException {
+            String srcPart = FeedHelper.normalizePartitionExpression(
+                FeedHelper.getCluster(feed, srcCluster.getName()).getPartition());
+            srcPart = FeedHelper.evaluateClusterExp(srcCluster, srcPart);
+
+            String targetPart = FeedHelper.normalizePartitionExpression(
+                FeedHelper.getCluster(feed, trgCluster.getName()).getPartition());
+            targetPart = FeedHelper.evaluateClusterExp(trgCluster, targetPart);
+
+            StringBuilder pathsWithPartitions = new StringBuilder();
+            pathsWithPartitions.append("${coord:dataIn('input')}/")
+                .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
+
+            String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
+            parts = StringUtils.stripEnd(parts, "/");
+            return parts;
+        }
+
+        private void propagateFileSystemCopyProperties(String pathsWithPartitions,
+            Map<String, String> props) throws FalconException {
+            props.put("sourceRelativePaths", pathsWithPartitions);
+
+            props.put("distcpSourcePaths", "${coord:dataIn('input')}");
+            props.put("distcpTargetPaths", "${coord:dataOut('output')}");
+        }
+
+        private void propagateTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
+            Map<String, String> props, String prefix) {
+            props.put(prefix + "NameNode", ClusterHelper.getStorageUrl(cluster));
+            props.put(prefix + "JobTracker", ClusterHelper.getMREndPoint(cluster));
+            props.put(prefix + "HcatNode", tableStorage.getCatalogUrl());
+
+            props.put(prefix + "Database", tableStorage.getDatabase());
+            props.put(prefix + "Table", tableStorage.getTable());
+            props.put(prefix + "Partition", "${coord:dataInPartitionFilter('input', 'hive')}");
+        }
+
+        private void setupHiveConfiguration(Cluster srcCluster, CatalogStorage sourceStorage,
+            Cluster trgCluster, CatalogStorage targetStorage, Path wfPath)
+            throws IOException, FalconException {
+            Configuration conf = ClusterHelper.getConfiguration(trgCluster);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+
+            // copy import export scripts to stagingDir
+            Path scriptPath = new Path(wfPath, "scripts");
+            copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-export.hql");
+            copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-import.hql");
+
+            // create hive conf to stagingDir
+            Path confPath = new Path(wfPath + "/conf");
+            createHiveConf(fs, confPath, sourceStorage.getCatalogUrl(), srcCluster, "falcon-source-");
+            createHiveConf(fs, confPath, targetStorage.getCatalogUrl(), trgCluster, "falcon-target-");
+        }
+
+        private void copyHiveScript(FileSystem fs, Path scriptPath,
+            String localScriptPath, String scriptName) throws IOException {
+            OutputStream out = null;
+            InputStream in = null;
+            try {
+                out = fs.create(new Path(scriptPath, scriptName));
+                in = OozieFeedWorkflowBuilder.class.getResourceAsStream(localScriptPath + scriptName);
+                IOUtils.copy(in, out);
+            } finally {
+                IOUtils.closeQuietly(in);
+                IOUtils.closeQuietly(out);
+            }
+        }
+
+        private void propagateTableCopyProperties(Cluster srcCluster, CatalogStorage sourceStorage,
+            Cluster trgCluster, CatalogStorage targetStorage,
+            Map<String, String> props) {
+            // create staging dirs for export at source & set it as distcpSourcePaths
+            String sourceDatedPartitionKey = sourceStorage.getDatedPartitionKey();
+            String sourceStagingDir =
+                FeedHelper.getStagingDir(srcCluster, entity, sourceStorage, Tag.REPLICATION)
+                    + "/" + sourceDatedPartitionKey
+                    + "=${coord:dataOutPartitionValue('output', '" + sourceDatedPartitionKey + "')}";
+            props.put("distcpSourcePaths", sourceStagingDir + "/" + NOMINAL_TIME_EL + "/data");
+
+            // create staging dirs for import at target & set it as distcpTargetPaths
+            String targetDatedPartitionKey = targetStorage.getDatedPartitionKey();
+            String targetStagingDir =
+                FeedHelper.getStagingDir(trgCluster, entity, targetStorage, Tag.REPLICATION)
+                    + "/" + targetDatedPartitionKey
+                    + "=${coord:dataOutPartitionValue('output', '" + targetDatedPartitionKey + "')}";
+            props.put("distcpTargetPaths", targetStagingDir + "/" + NOMINAL_TIME_EL + "/data");
+
+            props.put("sourceRelativePaths", "IGNORE"); // this will bot be used for Table storage.
+        }
+
+        private void propagateLateDataProperties(Feed feed, String instancePaths,
+            String falconFeedStorageType, Map<String, String> props) {
+            // todo these pairs are the same but used in different context
+            // late data handler - should-record action
+            props.put("falconInputFeeds", feed.getName());
+            props.put("falconInPaths", instancePaths);
+
+            // storage type for each corresponding feed - in this case only one feed is involved
+            // needed to compute usage based on storage type in LateDataHandler
+            props.put("falconInputFeedStorageTypes", falconFeedStorageType);
+
+            // falcon post processing
+            props.put(ARG.feedNames.getPropName(), feed.getName());
+            props.put(ARG.feedInstancePaths.getPropName(), "${coord:dataOut('output')}");
+        }
+    }
+
+    private void addOozieRetries(WORKFLOWAPP workflow) {
+        for (Object object : workflow.getDecisionOrForkOrJoin()) {
+            if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+                continue;
+            }
+            org.apache.falcon.oozie.workflow.ACTION action = (org.apache.falcon.oozie.workflow.ACTION) object;
+            String actionName = action.getName();
+            if (FALCON_ACTIONS.contains(actionName)) {
+                decorateWithOozieRetries(action);
+            }
+        }
+    }
+
+    private void propagateUserWorkflowProperties(Map<String, String> props, String policy) {
+        props.put("userWorkflowName", policy + "-policy");
+        props.put("userWorkflowEngine", "falcon");
+
+        String version;
+        try {
+            version = BuildProperties.get().getProperty("build.version");
+        } catch (Exception e) {  // unfortunate that this is only available in prism/webapp
+            version = "0.5";
+        }
+        props.put("userWorkflowVersion", version);
+    }
 }