You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/09/28 21:26:44 UTC

[1/2] falcon git commit: FALCON-965 Open up life cycle stage implementation within Falcon for extension. Contributed by Ajay Yadava.

Repository: falcon
Updated Branches:
  refs/heads/master c462f3e05 -> f7ad3f487


http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
new file mode 100644
index 0000000..0a87213
--- /dev/null
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.engine.oozie.retention;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.HiveUtil;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.RetentionStage;
+import org.apache.falcon.lifecycle.engine.oozie.utils.OozieBuilderUtils;
+import org.apache.falcon.lifecycle.retention.AgeBasedDelete;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Properties;
+
+/**
+ * Workflow Builder for AgeBasedDelete.
+ */
+public final class AgeBasedWorkflowBuilder {
+    private static final String EVICTION_ACTION_TEMPLATE = "/action/feed/eviction-action.xml";
+    private static final String EVICTION_ACTION_NAME = "eviction";
+
+    private AgeBasedWorkflowBuilder(){
+
+    }
+
+    public static Properties build(Cluster cluster, Path basePath, Feed feed) throws FalconException {
+        Path buildPath = OozieBuilderUtils.getBuildPath(basePath, LifeCycle.EVICTION.getTag());
+        WORKFLOWAPP workflow = new WORKFLOWAPP();
+        String wfName = EntityUtil.getWorkflowName(Tag.RETENTION, feed).toString();
+
+        //Add eviction action
+        ACTION eviction = OozieBuilderUtils.unmarshalAction(EVICTION_ACTION_TEMPLATE);
+        OozieBuilderUtils.addTransition(eviction, OozieBuilderUtils.SUCCESS_POSTPROCESS_ACTION_NAME,
+                OozieBuilderUtils.FAIL_POSTPROCESS_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(eviction);
+
+        //Add post-processing actions
+        ACTION success = OozieBuilderUtils.getSuccessPostProcessAction();
+        OozieBuilderUtils.addTransition(success, OozieBuilderUtils.OK_ACTION_NAME, OozieBuilderUtils.FAIL_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(success);
+
+        ACTION fail = OozieBuilderUtils.getFailPostProcessAction();
+        OozieBuilderUtils.addTransition(fail, OozieBuilderUtils.FAIL_ACTION_NAME, OozieBuilderUtils.FAIL_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(fail);
+
+        OozieBuilderUtils.decorateWorkflow(workflow, wfName, EVICTION_ACTION_NAME);
+        OozieBuilderUtils.addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION, EntityType.FEED);
+
+        // Prepare and marshal properties to config-default.xml
+        Properties props = OozieBuilderUtils.getProperties(buildPath, wfName);
+        props.putAll(getWorkflowProperties(feed, cluster));
+        props.putAll(OozieBuilderUtils.createDefaultConfiguration(cluster, feed,
+                WorkflowExecutionContext.EntityOperations.DELETE));
+        props.putAll(FeedHelper.getUserWorkflowProperties(LifeCycle.EVICTION));
+        // override the queueName and priority
+        RetentionStage retentionStage = FeedHelper.getRetentionStage(feed, cluster.getName());
+        props.put(OozieBuilderUtils.MR_QUEUE_NAME, retentionStage.getQueue());
+        props.put(OozieBuilderUtils.MR_JOB_PRIORITY, retentionStage.getPriority());
+
+        if (EntityUtil.isTableStorageType(cluster, feed)) {
+            setupHiveCredentials(cluster, buildPath, workflow);
+            // copy paste todo kludge send source hcat creds for coord dependency check to pass
+            props.putAll(HiveUtil.getHiveCredentials(cluster));
+        }
+
+        // Write out the config to config-default.xml
+        OozieBuilderUtils.marshalDefaultConfig(cluster, workflow, props, buildPath);
+
+        // write out the workflow.xml
+        OozieBuilderUtils.marshalWokflow(cluster, workflow, buildPath);
+        return props;
+    }
+
+    private static Properties getWorkflowProperties(Feed feed, Cluster cluster) throws FalconException {
+        final Storage storage = FeedHelper.createStorage(cluster, feed);
+        Properties props = new Properties();
+        props.setProperty("srcClusterName", "NA");
+        props.setProperty("availabilityFlag", "NA");
+        props.put("timeZone", feed.getTimezone().getID());
+        props.put("frequency", feed.getFrequency().getTimeUnit().name());
+        props.put("falconFeedStorageType", storage.getType().name());
+        props.put("limit", new AgeBasedDelete().getRetentionLimit(feed, cluster.getName()).toString());
+        props.put("falconInputFeeds", feed.getName());
+        props.put("falconInPaths", OozieBuilderUtils.IGNORE);
+
+        String feedDataPath = storage.getUriTemplate();
+        props.put("feedDataPath",
+                feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), feed.getName());
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OozieBuilderUtils.IGNORE);
+
+        return props;
+    }
+
+    private static void setupHiveCredentials(Cluster cluster, Path wfPath, WORKFLOWAPP workflowApp)
+        throws FalconException {
+        if (SecurityUtil.isSecurityEnabled()) {
+            // add hcatalog credentials for secure mode and add a reference to each action
+            OozieBuilderUtils.addHCatalogCredentials(workflowApp, cluster, OozieBuilderUtils.HIVE_CREDENTIAL_NAME);
+        }
+
+        // create hive-site.xml file so actions can use it in the classpath
+        OozieBuilderUtils.createHiveConfiguration(cluster, wfPath, ""); // no prefix since only one hive instance
+
+        for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
+            if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+                continue;
+            }
+
+            org.apache.falcon.oozie.workflow.ACTION action =
+                    (org.apache.falcon.oozie.workflow.ACTION) object;
+            String actionName = action.getName();
+            if (EVICTION_ACTION_NAME.equals(actionName)) {
+                // add reference to hive-site conf to each action
+                action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
+
+                if (SecurityUtil.isSecurityEnabled()) {
+                    // add a reference to credential in the action
+                    action.setCred(OozieBuilderUtils.HIVE_CREDENTIAL_NAME);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
new file mode 100644
index 0000000..be9175e
--- /dev/null
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.engine.oozie.utils;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.ExternalId;
+import org.apache.falcon.entity.HiveUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.oozie.coordinator.CONFIGURATION;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.CREDENTIAL;
+import org.apache.falcon.oozie.workflow.CREDENTIALS;
+import org.apache.falcon.oozie.workflow.END;
+import org.apache.falcon.oozie.workflow.KILL;
+import org.apache.falcon.oozie.workflow.START;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.apache.falcon.workflow.util.OozieConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.OozieClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.namespace.QName;
+import javax.xml.transform.stream.StreamSource;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utility class to build oozie artificats.
+ */
+public final class OozieBuilderUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(OozieBuilderUtils.class);
+
+    private static final String POSTPROCESS_TEMPLATE = "/action/post-process.xml";
+
+    public static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";
+    private static final String USER_JMS_NOTIFICATION_ENABLED = "userJMSNotificationEnabled";
+    public static final String MR_QUEUE_NAME = "queueName";
+    public static final String MR_JOB_PRIORITY = "jobPriority";
+    private static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}";
+    private static final String ACTUAL_TIME_EL = "${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}";
+    private static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
+
+    private static final JAXBContext WORKFLOW_JAXB_CONTEXT;
+    private static final JAXBContext ACTION_JAXB_CONTEXT;
+    private static final JAXBContext COORD_JAXB_CONTEXT;
+    private static final JAXBContext CONFIG_JAXB_CONTEXT;
+
+
+    public static final String SUCCESS_POSTPROCESS_ACTION_NAME = "succeeded-post-processing";
+    public static final String FAIL_POSTPROCESS_ACTION_NAME = "failed-post-processing";
+    public static final String OK_ACTION_NAME = "end";
+    public static final String FAIL_ACTION_NAME = "fail";
+
+
+    public static final String ENTITY_PATH = "ENTITY_PATH";
+    public static final String ENTITY_NAME = "ENTITY_NAME";
+    public static final String IGNORE = "IGNORE";
+
+
+    static {
+        try {
+            WORKFLOW_JAXB_CONTEXT = JAXBContext.newInstance(WORKFLOWAPP.class);
+            ACTION_JAXB_CONTEXT = JAXBContext.newInstance(org.apache.falcon.oozie.workflow.ACTION.class);
+            COORD_JAXB_CONTEXT = JAXBContext.newInstance(COORDINATORAPP.class);
+            CONFIG_JAXB_CONTEXT = JAXBContext.newInstance(org.apache.falcon.oozie.workflow.CONFIGURATION.class);
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXB context", e);
+        }
+    }
+
+    private OozieBuilderUtils() {
+
+    }
+
+    public static ACTION addTransition(ACTION action, String ok, String fail) {
+        // XTODOS : why return when it is changing the same object?
+        action.getOk().setTo(ok);
+        action.getError().setTo(fail);
+        return action;
+    }
+
+
+    public static void decorateWorkflow(WORKFLOWAPP wf, String name, String startAction) {
+        wf.setName(name);
+        wf.setStart(new START());
+        wf.getStart().setTo(startAction);
+
+        wf.setEnd(new END());
+        wf.getEnd().setName(OK_ACTION_NAME);
+
+        KILL kill = new KILL();
+        kill.setName(FAIL_ACTION_NAME);
+        kill.setMessage("Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]");
+        wf.getDecisionOrForkOrJoin().add(kill);
+    }
+
+    public static ACTION getSuccessPostProcessAction() throws FalconException {
+        ACTION action = unmarshalAction(POSTPROCESS_TEMPLATE);
+        decorateWithOozieRetries(action);
+        return action;
+    }
+
+    public static ACTION getFailPostProcessAction() throws FalconException {
+        ACTION action = unmarshalAction(POSTPROCESS_TEMPLATE);
+        decorateWithOozieRetries(action);
+        action.setName(FAIL_POSTPROCESS_ACTION_NAME);
+        return action;
+    }
+
+    private static Path marshal(Cluster cluster, JAXBElement<?> jaxbElement,
+                           JAXBContext jaxbContext, Path outPath) throws FalconException {
+        try {
+            Marshaller marshaller = jaxbContext.createMarshaller();
+            marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
+
+            if (LOG.isDebugEnabled()) {
+                StringWriter writer = new StringWriter();
+                marshaller.marshal(jaxbElement, writer);
+                LOG.debug("Writing definition to {} on cluster {}", outPath, cluster.getName());
+                LOG.debug(writer.getBuffer().toString());
+            }
+
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    outPath.toUri(), ClusterHelper.getConfiguration(cluster));
+            OutputStream out = fs.create(outPath);
+            try {
+                marshaller.marshal(jaxbElement, out);
+            } finally {
+                out.close();
+            }
+
+            LOG.info("Marshalled {} to {}", jaxbElement.getDeclaredType(), outPath);
+            return outPath;
+        } catch (Exception e) {
+            throw new FalconException("Unable to marshall app object", e);
+        }
+    }
+
+    public static Path marshalCoordinator(Cluster cluster, COORDINATORAPP coord, Path outPath) throws FalconException {
+        return marshal(cluster, new org.apache.falcon.oozie.coordinator.ObjectFactory().createCoordinatorApp(coord),
+                COORD_JAXB_CONTEXT, new Path(outPath, "coordinator.xml"));
+    }
+
+
+    public static Path marshalDefaultConfig(Cluster cluster, WORKFLOWAPP workflowapp,
+               Properties properties, Path outPath) throws FalconException {
+        QName workflowQName = new org.apache.falcon.oozie.workflow.ObjectFactory()
+                .createWorkflowApp(workflowapp).getName();
+        org.apache.falcon.oozie.workflow.CONFIGURATION config = getWorkflowConfig(properties);
+        JAXBElement<org.apache.falcon.oozie.workflow.CONFIGURATION> configJaxbElement =
+                new JAXBElement(new QName(workflowQName.getNamespaceURI(), "configuration", workflowQName.getPrefix()),
+                        org.apache.falcon.oozie.workflow.CONFIGURATION.class, config);
+
+        Path defaultConfigPath = new Path(outPath, "config-default.xml");
+        return marshal(cluster, configJaxbElement, CONFIG_JAXB_CONTEXT, defaultConfigPath);
+    }
+
+
+    public static Path marshalWokflow(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
+        return marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
+                WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
+    }
+
+    public static <T> T unmarshal(String template, JAXBContext context, Class<T> cls) throws FalconException {
+        InputStream resourceAsStream = null;
+        try {
+            resourceAsStream = OozieBuilderUtils.class.getResourceAsStream(template);
+            Unmarshaller unmarshaller = context.createUnmarshaller();
+            JAXBElement<T> jaxbElement = unmarshaller.unmarshal(new StreamSource(resourceAsStream), cls);
+            return jaxbElement.getValue();
+        } catch (JAXBException e) {
+            throw new FalconException("Failed to unmarshal " + template, e);
+        } finally {
+            IOUtils.closeQuietly(resourceAsStream);
+        }
+    }
+
+    public static ACTION unmarshalAction(String template) throws FalconException {
+        return unmarshal(template, ACTION_JAXB_CONTEXT, ACTION.class);
+    }
+
+    // XTODOS Should we make them more specific to feeds??
+    public static void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag, EntityType type)
+        throws FalconException {
+        String libext = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/libext";
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                ClusterHelper.getConfiguration(cluster));
+        try {
+            addExtensionJars(fs, new Path(libext), wf);
+            addExtensionJars(fs, new Path(libext, type.name()), wf);
+            if (tag != null) {
+                addExtensionJars(fs, new Path(libext, type.name() + "/" + tag.name().toLowerCase()),
+                        wf);
+            }
+        } catch (IOException e) {
+            throw new FalconException(e);
+        }
+    }
+
+    /**
+     *
+     * @param path
+     * @param name
+     * @return
+     */
+    public static Properties getProperties(Path path, String name) {
+        if (path == null) {
+            return null;
+        }
+        Properties prop = new Properties();
+        prop.setProperty(ENTITY_PATH, path.toString());
+        prop.setProperty(ENTITY_NAME, name);
+        return prop;
+    }
+
+
+    /**
+     * Adds path(will be the list of directories containing jars to be added as external jars to workflow e.g.
+     * for feeds libext, libext/FEED/, libext/FEED/RETENTION, libext/FEED/REPLICATION as an extension jar to the
+     * workflow. e.g.
+     *
+     * @param fs
+     * @param path
+     * @param wf
+     * @throws IOException
+     */
+    public static void addExtensionJars(FileSystem fs, Path path, WORKFLOWAPP wf) throws IOException {
+        FileStatus[] libs;
+        try {
+            libs = fs.listStatus(path);
+        } catch (FileNotFoundException ignore) {
+            //Ok if the libext is not configured
+            return;
+        }
+
+        for (FileStatus lib : libs) {
+            if (lib.isDirectory()) {
+                continue;
+            }
+
+            for (Object obj : wf.getDecisionOrForkOrJoin()) {
+                if (!(obj instanceof ACTION)) {
+                    continue;
+                }
+                ACTION action = (ACTION) obj;
+                List<String> files = null;
+                if (action.getJava() != null) {
+                    files = action.getJava().getFile();
+                } else if (action.getPig() != null) {
+                    files = action.getPig().getFile();
+                } else if (action.getMapReduce() != null) {
+                    files = action.getMapReduce().getFile();
+                }
+                if (files != null) {
+                    files.add(lib.getPath().toString());
+                }
+            }
+        }
+    }
+
+
+    public static void decorateWithOozieRetries(ACTION action) {
+        Properties props = RuntimeProperties.get();
+        action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
+        action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
+    }
+
+    // creates the default configuration which is written in config-default.xml
+    public static Properties createDefaultConfiguration(Cluster cluster, Entity entity,
+                WorkflowExecutionContext.EntityOperations operation)  throws FalconException {
+        Properties props = new Properties();
+        props.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name());
+        props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName());
+        props.put("falconDataOperation", operation.name());
+
+        props.put(WorkflowExecutionArgs.LOG_DIR.getName(),
+                getStoragePath(EntityUtil.getLogPath(cluster, entity)));
+        props.put(WorkflowExecutionArgs.WF_ENGINE_URL.getName(), ClusterHelper.getOozieUrl(cluster));
+
+        addLateDataProperties(props, entity);
+        addBrokerProperties(cluster, props);
+
+        props.put(MR_QUEUE_NAME, "default");
+        props.put(MR_JOB_PRIORITY, "NORMAL");
+
+        //properties provided in entity override the default generated properties
+        props.putAll(EntityUtil.getEntityProperties(entity));
+        props.putAll(createAppProperties(cluster));
+        return props;
+    }
+
+
+    // gets the cluster specific properties to be populated in config-default.xml
+    private static Properties createAppProperties(Cluster cluster) throws FalconException {
+        Properties properties = EntityUtil.getEntityProperties(cluster);
+        properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
+        properties.setProperty(AbstractWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster));
+        properties.setProperty("colo.name", cluster.getColo());
+        final String endpoint = ClusterHelper.getInterface(cluster, Interfacetype.WORKFLOW).getEndpoint();
+        if (!OozieConstants.LOCAL_OOZIE.equals(endpoint)) {
+            properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true");
+        }
+        properties.setProperty("falcon.libpath",
+                ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath()  + "/lib");
+
+        return properties;
+    }
+
+    // creates hive-site.xml configuration in conf dir for the given cluster on the same cluster.
+    public static void createHiveConfiguration(Cluster cluster, Path workflowPath,
+                                           String prefix) throws FalconException {
+        Configuration hiveConf = getHiveCredentialsAsConf(cluster);
+
+        try {
+            Configuration conf = ClusterHelper.getConfiguration(cluster);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+
+            // create hive conf to stagingDir
+            Path confPath = new Path(workflowPath + "/conf");
+
+            persistHiveConfiguration(fs, confPath, hiveConf, prefix);
+        } catch (IOException e) {
+            throw new FalconException("Unable to create create hive site", e);
+        }
+    }
+
+    private static void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf,
+                                          String prefix) throws IOException {
+        OutputStream out = null;
+        try {
+            out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
+            hiveConf.writeXml(out);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+
+    /**
+     * This is only necessary if table is involved and is secure mode.
+     *
+     * @param workflowApp workflow xml
+     * @param cluster     cluster entity
+     */
+    public static void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster, String credentialName) {
+        CREDENTIALS credentials = workflowApp.getCredentials();
+        if (credentials == null) {
+            credentials = new CREDENTIALS();
+        }
+
+        credentials.getCredential().add(createHCatalogCredential(cluster, credentialName));
+
+        // add credential for workflow
+        workflowApp.setCredentials(credentials);
+    }
+
+
+    /**
+     * This is only necessary if table is involved and is secure mode.
+     *
+     * @param cluster        cluster entity
+     * @param credentialName credential name
+     * @return CREDENTIALS object
+     */
+    public static CREDENTIAL createHCatalogCredential(Cluster cluster, String credentialName) {
+        final String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
+
+        CREDENTIAL credential = new CREDENTIAL();
+        credential.setName(credentialName);
+        credential.setType("hcat");
+
+        credential.getProperty().add(createProperty(HiveUtil.METASTROE_URI, metaStoreUrl));
+        credential.getProperty().add(createProperty(SecurityUtil.METASTORE_PRINCIPAL,
+                ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL)));
+
+        return credential;
+    }
+
+    public static CREDENTIAL.Property createProperty(String name, String value) {
+        CREDENTIAL.Property property = new CREDENTIAL.Property();
+        property.setName(name);
+        property.setValue(value);
+        return property;
+    }
+
+    private static Properties getHiveCredentials(Cluster cluster) {
+        String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
+        if (metaStoreUrl == null) {
+            throw new IllegalStateException("Registry interface is not defined in cluster: " + cluster.getName());
+        }
+
+        Properties hiveCredentials = new Properties();
+        hiveCredentials.put(HiveUtil.METASTOREURIS, metaStoreUrl);
+        hiveCredentials.put(HiveUtil.METASTORE_UGI, "true");
+        hiveCredentials.put(HiveUtil.NODE, metaStoreUrl.replace("thrift", "hcat"));
+        hiveCredentials.put(HiveUtil.METASTROE_URI, metaStoreUrl);
+
+        if (SecurityUtil.isSecurityEnabled()) {
+            String principal = ClusterHelper
+                    .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL);
+            hiveCredentials.put(SecurityUtil.METASTORE_PRINCIPAL, principal);
+            hiveCredentials.put(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL, principal);
+            hiveCredentials.put(SecurityUtil.METASTORE_USE_THRIFT_SASL, "true");
+        }
+
+        return hiveCredentials;
+    }
+
+    private static Configuration getHiveCredentialsAsConf(Cluster cluster) {
+        Properties hiveCredentials = getHiveCredentials(cluster);
+
+        Configuration hiveConf = new Configuration(false);
+        for (Map.Entry<Object, Object> entry : hiveCredentials.entrySet()) {
+            hiveConf.set((String)entry.getKey(), (String)entry.getValue());
+        }
+
+        return hiveConf;
+    }
+
+    public static Path getBuildPath(Path buildPath, Tag tag) {
+        return new Path(buildPath, tag.name());
+    }
+
+    protected static String getStoragePath(Path path) {
+        if (path != null) {
+            return getStoragePath(path.toString());
+        }
+        return null;
+    }
+
+    public static String getStoragePath(String path) {
+        if (StringUtils.isNotEmpty(path)) {
+            if (new Path(path).toUri().getScheme() == null && !path.startsWith("${nameNode}")) {
+                path = "${nameNode}" + path;
+            }
+        }
+        return path;
+    }
+
+    // default configuration for coordinator
+    public static Properties createCoordDefaultConfiguration(String coordName, Entity entity)
+        throws FalconException {
+
+        Properties props = new Properties();
+        props.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME_EL);
+        props.put(WorkflowExecutionArgs.TIMESTAMP.getName(), ACTUAL_TIME_EL);
+        props.put(OozieClient.EXTERNAL_ID,
+                new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
+                        "${coord:nominalTime()}").getId());
+        props.put(USER_JMS_NOTIFICATION_ENABLED, "true");
+        //props in entity override the set props.
+        props.putAll(EntityUtil.getEntityProperties(entity));
+        return props;
+    }
+
+    private static void addLateDataProperties(Properties props, Entity entity) throws FalconException {
+        if (EntityUtil.getLateProcess(entity) == null
+                || EntityUtil.getLateProcess(entity).getLateInputs() == null
+                || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
+            props.put("shouldRecord", "false");
+        } else {
+            props.put("shouldRecord", "true");
+        }
+    }
+
+    private static void addBrokerProperties(Cluster cluster, Properties props) {
+        props.put(WorkflowExecutionArgs.USER_BRKR_URL.getName(),
+                ClusterHelper.getMessageBrokerUrl(cluster));
+        props.put(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(),
+                ClusterHelper.getMessageBrokerImplClass(cluster));
+
+        String falconBrokerUrl = StartupProperties.get().getProperty(
+                "broker.url", "tcp://localhost:61616?daemon=true");
+        props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl);
+
+        String falconBrokerImplClass = StartupProperties.get().getProperty(
+                "broker.impl.class", ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
+        props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass);
+
+        String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
+                DEFAULT_BROKER_MSG_TTL.toString());
+        props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL);
+    }
+
+
+    private static org.apache.falcon.oozie.workflow.CONFIGURATION getWorkflowConfig(Properties props) {
+        org.apache.falcon.oozie.workflow.CONFIGURATION conf = new org.apache.falcon.oozie.workflow.CONFIGURATION();
+        for (Map.Entry<Object, Object> prop : props.entrySet()) {
+            org.apache.falcon.oozie.workflow.CONFIGURATION.Property confProp =
+                    new org.apache.falcon.oozie.workflow.CONFIGURATION.Property();
+            confProp.setName((String) prop.getKey());
+            confProp.setValue((String) prop.getValue());
+            conf.getProperty().add(confProp);
+        }
+        return conf;
+    }
+
+    public static CONFIGURATION getCoordinatorConfig(Properties props) {
+        CONFIGURATION conf = new CONFIGURATION();
+        for (Map.Entry<Object, Object> prop : props.entrySet()) {
+            CONFIGURATION.Property confProp = new CONFIGURATION.Property();
+            confProp.setName((String) prop.getKey());
+            confProp.setValue((String) prop.getValue());
+            conf.getProperty().add(confProp);
+        }
+        return conf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/resources/action/feed/eviction-action.xml
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/resources/action/feed/eviction-action.xml b/lifecycle/src/main/resources/action/feed/eviction-action.xml
new file mode 100644
index 0000000..4ab67d2
--- /dev/null
+++ b/lifecycle/src/main/resources/action/feed/eviction-action.xml
@@ -0,0 +1,59 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<action name="eviction" xmlns="uri:oozie:workflow:0.3">
+    <java>
+        <job-tracker>${jobTracker}</job-tracker>
+        <name-node>${nameNode}</name-node>
+        <configuration>
+            <property>
+                <name>mapred.job.queue.name</name>
+                <value>${queueName}</value>
+            </property>
+            <property>
+                <name>oozie.launcher.mapred.job.priority</name>
+                <value>${jobPriority}</value>
+            </property>
+            <!-- HCatalog jars -->
+            <property>
+                <name>oozie.action.sharelib.for.java</name>
+                <value>hcatalog</value>
+            </property>
+            <property>
+                <name>oozie.launcher.oozie.libpath</name>
+                <value>${wf:conf("falcon.libpath")}</value>
+            </property>
+        </configuration>
+        <main-class>org.apache.falcon.retention.FeedEvictor</main-class>
+        <arg>-feedBasePath</arg>
+        <arg>${feedDataPath}</arg>
+        <arg>-falconFeedStorageType</arg>
+        <arg>${falconFeedStorageType}</arg>
+        <arg>-retentionType</arg>
+        <arg>instance</arg>
+        <arg>-retentionLimit</arg>
+        <arg>${limit}</arg>
+        <arg>-timeZone</arg>
+        <arg>${timeZone}</arg>
+        <arg>-frequency</arg>
+        <arg>${frequency}</arg>
+        <arg>-logFile</arg>
+        <arg>${logDir}/job-${nominalTime}/${wf:run()}/evicted-instancePaths.csv</arg>
+    </java>
+    <ok to="succeeded-post-processing"/>
+    <error to="failed-post-processing"/>
+</action>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/resources/binding/jaxb-binding.xjb
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/resources/binding/jaxb-binding.xjb b/lifecycle/src/main/resources/binding/jaxb-binding.xjb
new file mode 100644
index 0000000..1a43660
--- /dev/null
+++ b/lifecycle/src/main/resources/binding/jaxb-binding.xjb
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<jaxb:bindings xmlns:xs="http://www.w3.org/2001/XMLSchema"
+               xmlns:jaxb="http://java.sun.com/xml/ns/jaxb" version="2.1">
+
+    <jaxb:bindings schemaLocation="../../../../target/oozie-schemas/oozie-workflow-0.3.xsd"
+                   node="//xs:complexType[@name='ACTION']/xs:sequence/xs:any[@namespace='uri:oozie:sla:0.1']">
+        <jaxb:property name="anySLA"/>
+    </jaxb:bindings>
+</jaxb:bindings>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/test/java/org/apache/falcon/lifecycle/retention/AgeBasedDeleteTest.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/test/java/org/apache/falcon/lifecycle/retention/AgeBasedDeleteTest.java b/lifecycle/src/test/java/org/apache/falcon/lifecycle/retention/AgeBasedDeleteTest.java
new file mode 100644
index 0000000..cf90f04
--- /dev/null
+++ b/lifecycle/src/test/java/org/apache/falcon/lifecycle/retention/AgeBasedDeleteTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.retention;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LateArrival;
+import org.apache.falcon.entity.v0.feed.Lifecycle;
+import org.apache.falcon.entity.v0.feed.Properties;
+import org.apache.falcon.entity.v0.feed.Property;
+import org.apache.falcon.entity.v0.feed.RetentionStage;
+import org.apache.falcon.entity.v0.feed.Sla;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Tests for AgeBasedDelete Policy validations.
+ */
+public class AgeBasedDeleteTest {
+    private static Feed feed;
+    private static String clusterName = "testCluster";
+
+    @BeforeMethod
+    private void init() {
+        feed = new Feed();
+        Cluster cluster = new Cluster();
+        cluster.setName(clusterName);
+
+        Property property = new Property();
+        property.setName(AgeBasedDelete.LIMIT_PROPERTY_NAME);
+        property.setValue("hours(3)");
+
+        Properties properties = new Properties();
+        properties.getProperties().add(property);
+
+        RetentionStage retentionStage = new RetentionStage();
+        retentionStage.setProperties(properties);
+
+        Lifecycle lifecycle = new Lifecycle();
+        lifecycle.setRetentionStage(retentionStage);
+
+        cluster.setLifecycle(lifecycle);
+
+        Clusters clusters = new Clusters();
+        clusters.getClusters().add(cluster);
+        feed.setClusters(clusters);
+
+        //set sla
+        Sla sla = new Sla();
+        sla.setSlaLow(new Frequency("hours(3)"));
+        sla.setSlaHigh(new Frequency("hours(3)"));
+        feed.setSla(sla);
+
+        // set late data arrival
+        LateArrival lateArrival = new LateArrival();
+        lateArrival.setCutOff(new Frequency("hours(3)"));
+        feed.setLateArrival(lateArrival);
+    }
+
+    @Test(expectedExceptions = ValidationException.class,
+        expectedExceptionsMessageRegExp = ".*slaHigh of Feed:.*")
+    public void testSlaValidation() throws FalconException {
+        feed.getSla().setSlaHigh(new Frequency("hours(4)"));
+        new AgeBasedDelete().validate(feed, clusterName);
+    }
+
+    @Test(expectedExceptions = ValidationException.class,
+    expectedExceptionsMessageRegExp = ".*Feed's retention limit:.*")
+    public void testLateDataValidation() throws FalconException {
+        feed.getLateArrival().setCutOff(new Frequency("hours(4)"));
+        new AgeBasedDelete().validate(feed, clusterName);
+    }
+
+    @Test(expectedExceptions = FalconException.class,
+        expectedExceptionsMessageRegExp = ".*Invalid value for property.*")
+    public void testValidateLimit() throws FalconException {
+        feed.getClusters().getClusters().get(0).getLifecycle().getRetentionStage().getProperties().getProperties()
+                .get(0).setValue("invalid");
+        new AgeBasedDelete().validate(feed, clusterName);
+    }
+
+    @Test(expectedExceptions = FalconException.class, expectedExceptionsMessageRegExp = ".*limit is required.*")
+    public void testStageValidity() throws Exception {
+        feed.getClusters().getClusters().get(0).getLifecycle().getRetentionStage().getProperties().getProperties()
+                .clear();
+        new AgeBasedDelete().validate(feed, clusterName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/oozie/pom.xml b/oozie/pom.xml
index 157edf9..40c0a3e 100644
--- a/oozie/pom.xml
+++ b/oozie/pom.xml
@@ -90,6 +90,13 @@
             <groupId>org.mockito</groupId>
             <artifactId>mockito-all</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-feed-lifecycle</artifactId>
+            <scope>compile</scope>
+        </dependency>
+
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
index b819dee..9e55edf 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
@@ -20,10 +20,14 @@ package org.apache.falcon.oozie.feed;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
+import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Lifecycle;
+import org.apache.falcon.lifecycle.LifecyclePolicy;
 import org.apache.falcon.oozie.OozieBundleBuilder;
 import org.apache.falcon.oozie.OozieCoordinatorBuilder;
+import org.apache.falcon.service.LifecyclePolicyMap;
 import org.apache.hadoop.fs.Path;
 
 import java.util.ArrayList;
@@ -38,16 +42,32 @@ public class FeedBundleBuilder extends OozieBundleBuilder<Feed> {
         super(entity);
     }
 
-    @Override protected List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
-        List<Properties> props = new ArrayList<Properties>();
-        List<Properties> evictionProps =
-            OozieCoordinatorBuilder.get(entity, Tag.RETENTION).buildCoords(cluster, buildPath);
-        if (evictionProps != null) {
-            props.addAll(evictionProps);
+    @Override
+    protected List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
+        // if feed has lifecycle defined - then use it to create coordinator and wf else fall back
+        List<Properties> props = new ArrayList<>();
+        Lifecycle lifecycle = this.entity.getLifecycle();
+        if (lifecycle != null) {
+            for (String name : FeedHelper.getPolicies(this.entity, cluster.getName())) {
+                LifecyclePolicy policy = LifecyclePolicyMap.get().get(name);
+                if (policy == null) {
+                    LOG.error("Couldn't find lifecycle policy for name:{}", name);
+                    throw new FalconException("Invalid policy name " + name);
+                }
+                Properties appProps = policy.build(cluster, buildPath, this.entity);
+                if (appProps != null) {
+                    props.add(appProps);
+                }
+            }
+        } else {
+            List<Properties> evictionProps =
+                    OozieCoordinatorBuilder.get(entity, Tag.RETENTION).buildCoords(cluster, buildPath);
+            if (evictionProps != null) {
+                props.addAll(evictionProps);
+            }
         }
-
-        List<Properties> replicationProps = OozieCoordinatorBuilder.get(entity, Tag.REPLICATION).buildCoords(cluster,
-            buildPath);
+        List<Properties> replicationProps = OozieCoordinatorBuilder.get(entity, Tag.REPLICATION)
+                .buildCoords(cluster, buildPath);
         if (replicationProps != null) {
             props.addAll(replicationProps);
         }
@@ -55,7 +75,6 @@ public class FeedBundleBuilder extends OozieBundleBuilder<Feed> {
         if (!props.isEmpty()) {
             copySharedLibs(cluster, new Path(getLibPath(buildPath)));
         }
-
         return props;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 7d0174a..cfce1ae 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -49,6 +49,7 @@ import org.apache.falcon.oozie.workflow.JAVA;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.service.LifecyclePolicyMap;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
@@ -88,12 +89,16 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
     private Feed feed;
     private Feed tableFeed;
     private Feed fsReplFeed;
+    private Feed lifecycleRetentionFeed;
+    private Feed retentionFeed;
 
     private static final String SRC_CLUSTER_PATH = "/feed/src-cluster.xml";
     private static final String TRG_CLUSTER_PATH = "/feed/trg-cluster.xml";
     private static final String FEED = "/feed/feed.xml";
     private static final String TABLE_FEED = "/feed/table-replication-feed.xml";
     private static final String FS_REPLICATION_FEED = "/feed/fs-replication-feed.xml";
+    private static final String FS_RETENTION_LIFECYCLE_FEED = "/feed/fs-retention-lifecycle-feed.xml";
+    private static final String FS_RETENTION_ORIG_FEED = "/feed/fs-retention-feed.xml";
 
     @BeforeClass
     public void setUpDFS() throws Exception {
@@ -105,6 +110,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         trgMiniDFS = EmbeddedCluster.newCluster("cluster2");
         String trgHdfsUrl = trgMiniDFS.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY);
 
+        LifecyclePolicyMap.get().init();
         cleanupStore();
 
         org.apache.falcon.entity.v0.cluster.Property property =
@@ -124,6 +130,8 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         feed = (Feed) storeEntity(EntityType.FEED, FEED);
         fsReplFeed = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED);
         tableFeed = (Feed) storeEntity(EntityType.FEED, TABLE_FEED);
+        lifecycleRetentionFeed = (Feed) storeEntity(EntityType.FEED, FS_RETENTION_LIFECYCLE_FEED);
+        retentionFeed = (Feed) storeEntity(EntityType.FEED, FS_RETENTION_ORIG_FEED);
     }
 
     private Entity storeEntity(EntityType type, String resource) throws Exception {
@@ -150,6 +158,32 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
     }
 
     @Test
+    public void testRetentionWithLifecycle() throws Exception {
+        OozieEntityBuilder builder = OozieEntityBuilder.get(lifecycleRetentionFeed);
+        Path bundlePath = new Path("/projects/falcon/");
+        builder.build(trgCluster, bundlePath);
+
+        BUNDLEAPP bundle = getBundle(trgMiniDFS.getFileSystem(), bundlePath);
+        List<COORDINATOR> coords = bundle.getCoordinator();
+
+        COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getAppPath());
+        assertLibExtensions(coord, "retention");
+        HashMap<String, String> props = getCoordProperties(coord);
+        Assert.assertEquals(props.get("ENTITY_PATH"), bundlePath.toString() + "/RETENTION");
+        Assert.assertEquals(coord.getFrequency(), "${coord:hours(17)}");
+        Assert.assertEquals(coord.getEnd(), "2099-01-01T00:00Z");
+        Assert.assertEquals(coord.getTimezone(), "UTC");
+
+        HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord);
+        Assert.assertEquals(wfProps.get("feedNames"), lifecycleRetentionFeed.getName());
+        Assert.assertTrue(StringUtils.equals(wfProps.get("entityType"), EntityType.FEED.name()));
+        Assert.assertEquals(wfProps.get("userWorkflowEngine"), "falcon");
+        Assert.assertEquals(wfProps.get("queueName"), "retention");
+        Assert.assertEquals(wfProps.get("limit"), "hours(2)");
+        Assert.assertEquals(wfProps.get("jobPriority"), "LOW");
+    }
+
+    @Test
     public void testReplicationCoordsForFSStorage() throws Exception {
         OozieEntityBuilder builder = OozieEntityBuilder.get(feed);
         Path bundlePath = new Path("/projects/falcon/");

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/oozie/src/test/resources/feed/fs-retention-feed.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/feed/fs-retention-feed.xml b/oozie/src/test/resources/feed/fs-retention-feed.xml
new file mode 100644
index 0000000..7eb85fa
--- /dev/null
+++ b/oozie/src/test/resources/feed/fs-retention-feed.xml
@@ -0,0 +1,50 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="lifecycle original retention feed" name="retention-test" xmlns="uri:falcon:feed:0.1">
+    <partitions>
+        <partition name="colo"/>
+        <partition name="eventTime"/>
+        <partition name="impressionHour"/>
+        <partition name="pricingModel"/>
+    </partitions>
+
+    <frequency>minutes(5)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="minutes(1)"/>
+
+    <clusters>
+        <cluster partition="${cluster.colo}" type="source" name="corp1">
+            <validity end="2099-01-01T00:00Z" start="2012-10-01T12:00Z"/>
+            <retention action="delete" limit="days(10000)"/>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location path="/data/lifecycle/" type="data"/>
+        <location path="/data/regression/fetlrc/billing/stats" type="stats"/>
+        <location path="/data/regression/fetlrc/billing/metadata" type="meta"/>
+    </locations>
+
+    <ACL permission="0x755" group="group" owner="fetl"/>
+    <schema provider="protobuf" location="/databus/streams_local/click_rr/schema/"/>
+    <properties>
+        <property name="maxMaps" value="33" />
+        <property name="mapBandwidth" value="2" />
+    </properties>
+
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml b/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml
new file mode 100644
index 0000000..2cadfe0
--- /dev/null
+++ b/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml
@@ -0,0 +1,60 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="lifecycle retention feed" name="retention-lifecycle-test" xmlns="uri:falcon:feed:0.1">
+    <partitions>
+        <partition name="colo"/>
+        <partition name="eventTime"/>
+        <partition name="impressionHour"/>
+        <partition name="pricingModel"/>
+    </partitions>
+
+    <frequency>minutes(5)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="minutes(1)"/>
+
+    <clusters>
+        <cluster partition="${cluster.colo}" type="source" name="corp2">
+            <validity end="2099-01-01T00:00Z" start="2012-10-01T12:00Z"/>
+            <retention action="delete" limit="days(10000)"/>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location path="/data/lifecycle/" type="data"/>
+        <location path="/data/regression/fetlrc/billing/stats" type="stats"/>
+        <location path="/data/regression/fetlrc/billing/metadata" type="meta"/>
+    </locations>
+
+    <ACL permission="0x755" group="group" owner="fetl"/>
+    <schema provider="protobuf" location="/databus/streams_local/click_rr/schema/"/>
+    <properties>
+        <property name="maxMaps" value="33" />
+        <property name="mapBandwidth" value="2" />
+    </properties>
+
+    <lifecycle>
+        <retention-stage>
+            <frequency>hours(17)</frequency>
+            <queue>retention</queue>
+            <priority>LOW</priority>
+            <properties>
+                <property name="retention.policy.agebaseddelete.limit" value="hours(2)"></property>
+            </properties>
+        </retention-stage>
+    </lifecycle>
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8127b46..9cdfc87 100644
--- a/pom.xml
+++ b/pom.xml
@@ -434,6 +434,7 @@
         <module>rerun</module>
         <module>prism</module>
         <module>unit</module>
+        <module>lifecycle</module>
         <module>webapp</module>
         <module>docs</module>
     </modules>
@@ -844,6 +845,12 @@
 
             <dependency>
                 <groupId>org.apache.falcon</groupId>
+                <artifactId>falcon-feed-lifecycle</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.falcon</groupId>
                 <artifactId>falcon-process</artifactId>
                 <version>${project.version}</version>
             </dependency>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 8f3bc35..9c6aef7 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -26,6 +26,7 @@
 ## DONT MODIFY UNLESS SURE ABOUT CHANGE ##
 
 *.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine
+*.lifecycle.engine.impl=org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory
 *.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder
 *.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder
 *.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager
@@ -49,6 +50,12 @@
 ##### Prism Services #####
 prism.application.services=org.apache.falcon.entity.store.ConfigurationStore
 
+
+# List of Lifecycle policies configured.
+*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
+# List of builders for the policies.
+*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder
+
 ##### Falcon Configuration Store Change listeners #####
 *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
                         org.apache.falcon.entity.ColoClusterRelation,\


[2/2] falcon git commit: FALCON-965 Open up life cycle stage implementation within Falcon for extension. Contributed by Ajay Yadava.

Posted by aj...@apache.org.
FALCON-965 Open up life cycle stage implementation within Falcon for extension. Contributed by Ajay Yadava.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f7ad3f48
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f7ad3f48
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f7ad3f48

Branch: refs/heads/master
Commit: f7ad3f487966e2c1a13a8f61c203fc11cc7c73c8
Parents: c462f3e
Author: Ajay Yadava <aj...@gmail.com>
Authored: Tue Sep 29 00:24:36 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Sep 29 00:24:36 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 client/src/main/resources/feed-0.1.xsd          |  35 ++
 .../org/apache/falcon/entity/FeedHelper.java    |  68 ++-
 .../falcon/entity/parser/FeedEntityParser.java  |  26 +-
 .../lifecycle/AbstractPolicyBuilderFactory.java |  30 +
 .../falcon/lifecycle/FeedLifecycleStage.java    |  37 ++
 .../falcon/lifecycle/LifecyclePolicy.java       |  63 +++
 .../apache/falcon/lifecycle/PolicyBuilder.java  |  37 ++
 .../lifecycle/retention/AgeBasedDelete.java     | 114 ++++
 .../lifecycle/retention/RetentionPolicy.java    |  54 ++
 .../falcon/service/LifecyclePolicyMap.java      |  81 +++
 .../falcon/workflow/WorkflowEngineFactory.java  |   7 +
 common/src/main/resources/startup.properties    |   5 +
 .../apache/falcon/entity/AbstractTestBase.java  |   2 +
 .../apache/falcon/entity/FeedHelperTest.java    | 143 +++++
 .../entity/parser/FeedEntityParserTest.java     |  24 +-
 .../src/test/resources/config/feed/feed-0.3.xml |  83 +++
 .../src/test/resources/config/feed/feed-0.4.xml |  74 +++
 docs/src/site/twiki/EntitySpecification.twiki   |  40 ++
 lifecycle/pom.xml                               | 208 +++++++
 .../engine/oozie/OoziePolicyBuilderFactory.java |  59 ++
 .../retention/AgeBasedCoordinatorBuilder.java   | 112 ++++
 .../oozie/retention/AgeBasedDeleteBuilder.java  |  56 ++
 .../retention/AgeBasedWorkflowBuilder.java      | 153 +++++
 .../engine/oozie/utils/OozieBuilderUtils.java   | 556 +++++++++++++++++++
 .../resources/action/feed/eviction-action.xml   |  59 ++
 .../src/main/resources/binding/jaxb-binding.xjb |  26 +
 .../lifecycle/retention/AgeBasedDeleteTest.java | 108 ++++
 oozie/pom.xml                                   |   7 +
 .../falcon/oozie/feed/FeedBundleBuilder.java    |  39 +-
 .../feed/OozieFeedWorkflowBuilderTest.java      |  34 ++
 .../test/resources/feed/fs-retention-feed.xml   |  50 ++
 .../feed/fs-retention-lifecycle-feed.xml        |  60 ++
 pom.xml                                         |   7 +
 src/conf/startup.properties                     |   7 +
 35 files changed, 2448 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a4edebc..d18d5aa 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Trunk (Unreleased)
     FALCON-1401 MetadataMappingService fails to add an edge for a process instance(Pallavi Rao) 
 
   NEW FEATURES
+    FALCON-965 Open up life cycle stage implementation within Falcon for extension(Ajay Yadava)
+
     FALCON-1437 Change DR recipes notification with Falcon notification(Peeyush Bishnoi via Sowmya Ramesh)
 
     FALCON-1027 Falcon proxy user support(Sowmya Ramesh)

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/client/src/main/resources/feed-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd
index 2af28d2..77b8f4b 100644
--- a/client/src/main/resources/feed-0.1.xsd
+++ b/client/src/main/resources/feed-0.1.xsd
@@ -125,6 +125,7 @@
             <xs:element type="ACL" name="ACL"/>
             <xs:element type="schema" name="schema"/>
             <xs:element type="properties" name="properties" minOccurs="0"/>
+            <xs:element type="lifecycle" name="lifecycle" minOccurs="0" />
         </xs:sequence>
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
         <xs:attribute type="xs:string" name="description"/>
@@ -160,6 +161,7 @@
                 <xs:element type="locations" name="locations" minOccurs="0"/>
                 <xs:element type="catalog-table" name="table"/>
             </xs:choice>
+            <xs:element type="lifecycle" name="lifecycle" minOccurs="0" />
         </xs:sequence>
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
         <xs:attribute type="cluster-type" name="type" use="optional"/>
@@ -357,6 +359,20 @@
             <xs:enumeration value="chmod"/>
         </xs:restriction>
     </xs:simpleType>
+    <xs:complexType name="lifecycle">
+        <xs:annotation>
+            <xs:documentation>
+                Lifecycle of the feed consists of various stages. For example typical stages of a feed are import,
+                replication, archival, retention and export. All these stages together are called lifecycle of a feed.
+            </xs:documentation>
+        </xs:annotation>
+
+        <xs:all>
+            <xs:element type="retention-stage" name="retention-stage" minOccurs="0"></xs:element>
+        </xs:all>
+
+    </xs:complexType>
+
     <xs:simpleType name="cluster-type">
         <xs:annotation>
             <xs:documentation>
@@ -435,4 +451,23 @@
             <xs:minLength value="1"/>
         </xs:restriction>
     </xs:simpleType>
+
+    <xs:complexType name="retention-stage">
+        <xs:annotation>
+            <xs:documentation>
+                Retention stage is the new way to define retention for a feed using feed lifecycle feature. Retention
+                has a configurable policy which does the validation and the real execution through workflow engine.
+                This method of specifying retention gives you more control like using different queue name, priority
+                and execution-order for retention than other lifecycle stages of feed like replication.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:all>
+            <xs:element type="non-empty-string" name="policy" minOccurs="0" maxOccurs="1"></xs:element>
+            <xs:element type="frequency-type" name="frequency" minOccurs="0" maxOccurs="1"></xs:element>
+            <xs:element type="xs:string" name="queue" minOccurs="0" maxOccurs="1"></xs:element>
+            <xs:element type="xs:string" name="priority" minOccurs="0" maxOccurs="1"></xs:element>
+            <xs:element type="properties" name="properties" minOccurs="0" maxOccurs="1"></xs:element>
+        </xs:all>
+    </xs:complexType>
+
 </xs:schema>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 572923b..79f1959 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -19,7 +19,6 @@
 package org.apache.falcon.entity;
 
 import org.apache.commons.lang3.StringUtils;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
@@ -31,14 +30,17 @@ import org.apache.falcon.entity.v0.cluster.Property;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Lifecycle;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.RetentionStage;
 import org.apache.falcon.entity.v0.feed.Sla;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.lifecycle.FeedLifecycleStage;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.resource.FeedInstanceResult;
@@ -372,6 +374,49 @@ public final class FeedHelper {
         return feedProperties;
     }
 
+    public static Lifecycle getLifecycle(Feed feed, String clusterName) throws FalconException {
+        Cluster cluster = getCluster(feed, clusterName);
+        if (cluster !=null) {
+            return cluster.getLifecycle() != null ? cluster.getLifecycle() : feed.getLifecycle();
+        }
+        throw new FalconException("Cluster: " + clusterName + " isn't valid for feed: " + feed.getName());
+    }
+
+    public static RetentionStage getRetentionStage(Feed feed, String clusterName) throws FalconException {
+        if (isLifecycleEnabled(feed, clusterName)) {
+            Lifecycle globalLifecycle = feed.getLifecycle();
+            Lifecycle clusterLifecycle = getCluster(feed, clusterName).getLifecycle();
+
+            if (clusterLifecycle != null && clusterLifecycle.getRetentionStage() != null) {
+                return clusterLifecycle.getRetentionStage();
+            } else if (globalLifecycle != null) {
+                return globalLifecycle.getRetentionStage();
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Returns various policies applicable for a feed.
+     *
+     * @param feed
+     * @return list of names of lifecycle policies for the given feed, empty list if there are none.
+     */
+    public static List<String> getPolicies(Feed feed, String clusterName) throws FalconException {
+        List<String> result = new ArrayList<>();
+        Cluster cluster = getCluster(feed, clusterName);
+        if (cluster != null) {
+            if (isLifecycleEnabled(feed, clusterName)) {
+                String policy = getRetentionStage(feed, clusterName).getPolicy();
+                policy = StringUtils.isBlank(policy)
+                        ? FeedLifecycleStage.RETENTION.getDefaultPolicyName() : policy;
+                result.add(policy);
+            }
+            return result;
+        }
+        throw new FalconException("Cluster: " + clusterName + " isn't valid for feed: " + feed.getName());
+    }
+
     /**
      *  Extracts date from the actual data path e.g., /path/2014/05/06 maps to 2014-05-06T00:00Z.
      * @param instancePath - actual data path
@@ -748,4 +793,25 @@ public final class FeedHelper {
         return storage.getInstanceAvailabilityStatus(feed, clusterName, LocationType.DATA, instanceTime);
     }
 
+    public static boolean isLifecycleEnabled(Feed feed, String clusterName) {
+        Cluster cluster = getCluster(feed, clusterName);
+        return cluster != null && (feed.getLifecycle() != null || cluster.getLifecycle() != null);
+    }
+
+    public static Frequency getRetentionFrequency(Feed feed, String clusterName) throws FalconException {
+        Frequency retentionFrequency;
+        RetentionStage retentionStage = getRetentionStage(feed, clusterName);
+        if (retentionStage != null && retentionStage.getFrequency() != null) {
+            retentionFrequency = retentionStage.getFrequency();
+        } else {
+            Frequency.TimeUnit timeUnit = feed.getFrequency().getTimeUnit();
+            if (timeUnit == Frequency.TimeUnit.hours || timeUnit == Frequency.TimeUnit.minutes) {
+                retentionFrequency = new Frequency("hours(6)");
+            } else {
+                retentionFrequency = new Frequency("days(1)");
+            }
+        }
+        return  retentionFrequency;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 4f5599e..c73cc78 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -32,14 +32,14 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityGraph;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Properties;
-import org.apache.falcon.entity.v0.feed.Property;
 import org.apache.falcon.entity.v0.feed.ACL;
 import org.apache.falcon.entity.v0.feed.Cluster;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Properties;
+import org.apache.falcon.entity.v0.feed.Property;
 import org.apache.falcon.entity.v0.feed.Sla;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
@@ -48,6 +48,7 @@ import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.group.FeedGroup;
 import org.apache.falcon.group.FeedGroupMap;
 import org.apache.falcon.util.DateUtil;
+import org.apache.falcon.service.LifecyclePolicyMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
@@ -55,9 +56,9 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Date;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.TimeZone;
-import java.util.List;
 
 /**
  * Parser that parses feed entity definition.
@@ -80,6 +81,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
             throw new ValidationException("Feed should have at least one cluster");
         }
 
+        validateLifecycle(feed);
         validateACL(feed);
         for (Cluster cluster : feed.getClusters().getClusters()) {
             validateEntityExists(EntityType.CLUSTER, cluster.getName());
@@ -100,7 +102,6 @@ public class FeedEntityParser extends EntityParser<Feed> {
         validateFeedPartitionExpression(feed);
         validateFeedGroups(feed);
         validateFeedSLA(feed);
-        validateACL(feed);
         validateProperties(feed);
 
         // Seems like a good enough entity object for a new one
@@ -124,6 +125,21 @@ public class FeedEntityParser extends EntityParser<Feed> {
         ensureValidityFor(feed, processes);
     }
 
+    private void validateLifecycle(Feed feed) throws FalconException {
+        LifecyclePolicyMap map = LifecyclePolicyMap.get();
+        for (Cluster cluster : feed.getClusters().getClusters()) {
+            if (FeedHelper.isLifecycleEnabled(feed, cluster.getName())) {
+                if (FeedHelper.getRetentionStage(feed, cluster.getName()) == null) {
+                    throw new ValidationException("Retention is a mandatory stage, didn't find it for cluster: "
+                            + cluster.getName());
+                }
+                for (String policyName : FeedHelper.getPolicies(feed, cluster.getName())) {
+                    map.get(policyName).validate(feed, cluster.getName());
+                }
+            }
+        }
+    }
+
     private Set<Process> findProcesses(Set<Entity> referenced) {
         Set<Process> processes = new HashSet<Process>();
         for (Entity entity : referenced) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java b/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java
new file mode 100644
index 0000000..5bcc2f8
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Abstract factory class for feed lifecycle policy builders.
+ */
+public abstract class AbstractPolicyBuilderFactory {
+
+    public abstract PolicyBuilder getPolicyBuilder(String policyName) throws FalconException;
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java b/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java
new file mode 100644
index 0000000..833ad04
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.lifecycle;
+
+/**
+ * Enum for valid lifecycle stages for the feed.
+ */
+public enum FeedLifecycleStage {
+
+    RETENTION("AgeBasedDelete");
+
+    private String defaultPolicyName;
+
+    private FeedLifecycleStage(String defaultPolicyName) {
+        this.defaultPolicyName = defaultPolicyName;
+    }
+
+    public String getDefaultPolicyName() {
+        return defaultPolicyName;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java b/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java
new file mode 100644
index 0000000..be4e68c
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Properties;
+
+/**
+ * Interface for all policies in feed lifecycle.
+ */
+public interface LifecyclePolicy {
+
+    /**
+     * Returns the name of the policy. Name of policy must be unique as it is used as an identifier.
+     * @return name of the policy
+     */
+    String getName();
+
+    /**
+     * Returns the stage to which the policy belongs.
+     * @return stage to which the policy belongs.
+     */
+    FeedLifecycleStage getStage();
+
+    /**
+     * Validates the configurations as per this policy.
+     * @param feed Parent feed for which the policy is configured.
+     * @param clusterName cluster to be used as context for validation.
+     * @throws FalconException
+     */
+    void validate(Feed feed, String clusterName) throws FalconException;
+
+    /**
+     * Builds workflow engine artifacts.
+     * @param cluster cluster to be used as context
+     * @param buildPath base path to be used for storing the artifacts.
+     * @param feed Parent feed.
+     * @return Properties to be passed to the caller e.g. bundle in case of oozie workflow engine.
+     * @throws FalconException
+     */
+    Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException;
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java b/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java
new file mode 100644
index 0000000..5e5055b
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Properties;
+
+/**
+ * Interface to be implemented by all policy builders for a lifecycle policy.
+ * A Builder builds workflow engine specific artifacts for a policy.
+ */
+public interface PolicyBuilder {
+
+    Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException;
+
+    String getPolicyName();
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
new file mode 100644
index 0000000..0a1810e
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.retention;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Property;
+import org.apache.falcon.entity.v0.feed.RetentionStage;
+import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.expression.ExpressionHelper;
+
+import java.util.Date;
+
+/**
+ * Retention policy which deletes all instances of instance time older than a given time.
+ * It will create the workflow and coordinators for this policy.
+ */
+public class AgeBasedDelete extends RetentionPolicy {
+
+    public static final String LIMIT_PROPERTY_NAME = "retention.policy.agebaseddelete.limit";
+
+    @Override
+    public void validate(Feed feed, String clusterName) throws FalconException {
+        // validate that it is a valid cluster
+        Cluster cluster = FeedHelper.getCluster(feed, clusterName);
+        Frequency retentionLimit = getRetentionLimit(feed, clusterName);
+        if (cluster != null) {
+            validateLimitWithSla(feed, cluster, retentionLimit.toString());
+            validateLimitWithLateData(feed, cluster, retentionLimit.toString());
+        }
+    }
+
+    private void validateLimitWithLateData(Feed feed, Cluster cluster, String retention) throws FalconException {
+        ExpressionHelper evaluator = ExpressionHelper.get();
+        long retentionPeriod = evaluator.evaluate(retention, Long.class);
+
+        if (feed.getLateArrival() != null) {
+            String feedCutoff = feed.getLateArrival().getCutOff().toString();
+            long feedCutOffPeriod = evaluator.evaluate(feedCutoff, Long.class);
+            if (retentionPeriod < feedCutOffPeriod) {
+                throw new ValidationException(
+                        "Feed's retention limit: " + retention + " of referenced cluster " + cluster.getName()
+                                + " should be more than feed's late arrival cut-off period: " + feedCutoff
+                                + " for feed: " + feed.getName());
+            }
+        }
+    }
+
+    private void validateLimitWithSla(Feed feed, Cluster cluster, String retentionExpression) throws FalconException {
+        // test that slaHigh is less than retention
+        Sla clusterSla = FeedHelper.getSLAs(cluster, feed);
+        if (clusterSla != null) {
+            ExpressionHelper evaluator = ExpressionHelper.get();
+            ExpressionHelper.setReferenceDate(new Date());
+
+            Frequency slaHighExpression = clusterSla.getSlaHigh();
+            Date slaHigh = new Date(evaluator.evaluate(slaHighExpression.toString(), Long.class));
+
+            Date retention = new Date(evaluator.evaluate(retentionExpression, Long.class));
+            if (slaHigh.after(retention)) {
+                throw new ValidationException("slaHigh of Feed: " + slaHighExpression
+                        + " is greater than retention of the feed: " + retentionExpression
+                        + " for cluster: " + cluster.getName()
+                );
+            }
+        }
+    }
+
+    public Frequency getRetentionLimit(Feed feed, String clusterName) throws FalconException {
+        RetentionStage retention = FeedHelper.getRetentionStage(feed, clusterName);
+        if (retention != null) {
+            String limit = null;
+            for (Property property : retention.getProperties().getProperties()) {
+                if (StringUtils.equals(property.getName(), LIMIT_PROPERTY_NAME)) {
+                    limit = property.getValue();
+                }
+            }
+            if (limit == null) {
+                throw new FalconException("Property: " + LIMIT_PROPERTY_NAME + " is required for "
+                        + getName() + " policy.");
+            }
+            try {
+                return new Frequency(limit);
+            } catch (IllegalArgumentException e) {
+                throw new FalconException("Invalid value for property: " + LIMIT_PROPERTY_NAME + ", should be a valid "
+                        + "frequency e.g. hours(2)", e);
+            }
+        } else {
+            throw new FalconException("Cluster " + clusterName + " doesn't contain retention stage");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java
new file mode 100644
index 0000000..7fd6175
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.retention;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.lifecycle.AbstractPolicyBuilderFactory;
+import org.apache.falcon.lifecycle.FeedLifecycleStage;
+import org.apache.falcon.lifecycle.LifecyclePolicy;
+import org.apache.falcon.lifecycle.PolicyBuilder;
+import org.apache.falcon.workflow.WorkflowEngineFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Properties;
+
+/**
+ * All retention policies must implement this interface.
+ */
+public abstract class RetentionPolicy implements LifecyclePolicy {
+
+    @Override
+    public String getName() {
+        return this.getClass().getSimpleName();
+    }
+
+    @Override
+    public FeedLifecycleStage getStage() {
+        return FeedLifecycleStage.RETENTION;
+    }
+
+    @Override
+    public Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException {
+        AbstractPolicyBuilderFactory factory = WorkflowEngineFactory.getLifecycleEngine();
+        PolicyBuilder builder = factory.getPolicyBuilder(getName());
+        return builder.build(cluster, buildPath, feed);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/service/LifecyclePolicyMap.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/LifecyclePolicyMap.java b/common/src/main/java/org/apache/falcon/service/LifecyclePolicyMap.java
new file mode 100644
index 0000000..b8c979e
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/service/LifecyclePolicyMap.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.lifecycle.FeedLifecycleStage;
+import org.apache.falcon.lifecycle.LifecyclePolicy;
+import org.apache.falcon.util.ReflectionUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Stores all internal and external feed lifecycle policies.
+ */
+public final class LifecyclePolicyMap implements FalconService {
+    private static final Logger LOG = LoggerFactory.getLogger(LifecyclePolicyMap.class);
+    private static final LifecyclePolicyMap STORE = new LifecyclePolicyMap();
+
+    private final Map<String, LifecyclePolicy> policyMap = new HashMap<>();
+
+    private LifecyclePolicyMap() {}
+
+    public static LifecyclePolicyMap get() {
+        return STORE;
+    }
+
+    public LifecyclePolicy get(String policyName) {
+        return policyMap.get(policyName);
+    }
+
+    @Override
+    public String getName() {
+        return getClass().getSimpleName();
+    }
+
+    @Override
+    public void init() throws FalconException {
+        String[] policyNames = StartupProperties.get().getProperty("falcon.feed.lifecycle.policies").split(",");
+        for (String name : policyNames) {
+            LifecyclePolicy policy = ReflectionUtils.getInstanceByClassName(name);
+            LOG.debug("Loaded policy : {} for stage : {}", policy.getName(), policy.getStage());
+            policyMap.put(policy.getName(), policy);
+        }
+        validate();
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        policyMap.clear();
+    }
+
+    // validate that default policy for each stage is available
+    private void validate() throws FalconException {
+        for (FeedLifecycleStage stage : FeedLifecycleStage.values()) {
+            if (!policyMap.containsKey(stage.getDefaultPolicyName())) {
+                throw new FalconException("Default Policy: " + stage.getDefaultPolicyName()
+                        + " for stage: " + stage.name() + "was not found.");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java
index 756c6b8..49592ac 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.workflow;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.lifecycle.AbstractPolicyBuilderFactory;
 import org.apache.falcon.util.ReflectionUtils;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 
@@ -30,6 +31,8 @@ public final class WorkflowEngineFactory {
 
     private static final String WORKFLOW_ENGINE = "workflow.engine.impl";
 
+    private static final String LIFECYCLE_ENGINE = "lifecycle.engine.impl";
+
     private WorkflowEngineFactory() {
     }
 
@@ -37,4 +40,8 @@ public final class WorkflowEngineFactory {
         return ReflectionUtils.getInstance(WORKFLOW_ENGINE);
     }
 
+    public static AbstractPolicyBuilderFactory getLifecycleEngine() throws FalconException {
+        return ReflectionUtils.getInstance(LIFECYCLE_ENGINE);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 9db460c..357b90c 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -22,6 +22,7 @@
 ## DONT MODIFY UNLESS SURE ABOUT CHANGE ##
 
 *.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine
+*.lifecycle.engine.impl=org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory
 *.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder
 *.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder
 *.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager
@@ -41,6 +42,10 @@
                         org.apache.falcon.service.GroupsService,\
                         org.apache.falcon.service.ProxyUserService
 
+# List of Lifecycle policies configured.
+*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
+# List of builders for the policies.
+*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder
 ##### Falcon Configuration Store Change listeners #####
 *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
                         org.apache.falcon.entity.ColoClusterRelation,\

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index e9946c4..aab9cee 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -55,6 +55,8 @@ public class AbstractTestBase {
 
     protected static final String PROCESS_XML = "/config/process/process-0.1.xml";
     protected static final String FEED_XML = "/config/feed/feed-0.1.xml";
+    protected static final String FEED3_XML = "/config/feed/feed-0.3.xml";
+    protected static final String FEED4_XML = "/config/feed/feed-0.4.xml";
     protected static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml";
     protected EmbeddedCluster dfsCluster;
     protected Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
index c70cfcc..4020d36 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
@@ -19,6 +19,8 @@
 package org.apache.falcon.entity;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.parser.EntityParserFactory;
+import org.apache.falcon.entity.parser.FeedEntityParser;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
@@ -28,9 +30,11 @@ import org.apache.falcon.entity.v0.cluster.Properties;
 import org.apache.falcon.entity.v0.cluster.Property;
 import org.apache.falcon.entity.v0.feed.Clusters;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Lifecycle;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.RetentionStage;
 import org.apache.falcon.entity.v0.feed.Validity;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Inputs;
@@ -38,6 +42,7 @@ import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.resource.SchedulableEntityInstance;
+import org.apache.falcon.service.LifecyclePolicyMap;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
@@ -50,6 +55,7 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.TimeZone;
 
@@ -63,6 +69,7 @@ public class FeedHelperTest extends AbstractTestBase {
     @BeforeClass
     public void init() throws Exception {
         initConfigStore();
+        LifecyclePolicyMap.get().init();
     }
 
     @BeforeMethod
@@ -232,6 +239,16 @@ public class FeedHelperTest extends AbstractTestBase {
     }
 
     @Test
+    public void testGetPolicies() throws Exception {
+        FeedEntityParser parser = (FeedEntityParser) EntityParserFactory
+                .getParser(EntityType.FEED);
+        Feed feed = parser.parse(this.getClass().getResourceAsStream(FEED3_XML));
+        List<String> policies = FeedHelper.getPolicies(feed, "testCluster");
+        Assert.assertEquals(policies.size(), 1);
+        Assert.assertEquals(policies.get(0), "AgeBasedDelete");
+    }
+
+    @Test
     public void testFeedWithNoDependencies() throws Exception {
         Cluster cluster = publishCluster();
         Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
@@ -706,6 +723,132 @@ public class FeedHelperTest extends AbstractTestBase {
         Assert.assertEquals(result, expected);
     }
 
+    @Test
+    public void testIsLifeCycleEnabled() throws Exception {
+        Feed feed = new Feed();
+
+        // lifecycle is not defined
+        Clusters clusters = new Clusters();
+        org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster();
+        cluster.setName("cluster1");
+        clusters.getClusters().add(cluster);
+        feed.setClusters(clusters);
+        Assert.assertFalse(FeedHelper.isLifecycleEnabled(feed, cluster.getName()));
+
+        // lifecycle is defined at global level
+        Lifecycle globalLifecycle = new Lifecycle();
+        RetentionStage retentionStage = new RetentionStage();
+        retentionStage.setFrequency(new Frequency("hours(2)"));
+        globalLifecycle.setRetentionStage(retentionStage);
+        feed.setLifecycle(globalLifecycle);
+        Assert.assertTrue(FeedHelper.isLifecycleEnabled(feed, cluster.getName()));
+
+        // lifecycle is defined at both global and cluster level
+        Lifecycle clusterLifecycle = new Lifecycle();
+        retentionStage = new RetentionStage();
+        retentionStage.setFrequency(new Frequency("hours(4)"));
+        clusterLifecycle.setRetentionStage(retentionStage);
+        feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle);
+        Assert.assertTrue(FeedHelper.isLifecycleEnabled(feed, cluster.getName()));
+
+        // lifecycle is defined only at cluster level
+        feed.setLifecycle(null);
+        Assert.assertTrue(FeedHelper.isLifecycleEnabled(feed, cluster.getName()));
+    }
+
+    @Test
+    public void testGetRetentionStage() throws Exception {
+        Feed feed = new Feed();
+        feed.setFrequency(new Frequency("days(1)"));
+
+        // lifecycle is not defined
+        Clusters clusters = new Clusters();
+        org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster();
+        cluster.setName("cluster1");
+        clusters.getClusters().add(cluster);
+        feed.setClusters(clusters);
+        Assert.assertNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("days(1)"));
+
+        // lifecycle is defined at global level
+        Lifecycle globalLifecycle = new Lifecycle();
+        RetentionStage globalRetentionStage = new RetentionStage();
+        globalRetentionStage.setFrequency(new Frequency("hours(2)"));
+        globalLifecycle.setRetentionStage(globalRetentionStage);
+        feed.setLifecycle(globalLifecycle);
+        Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()),
+                feed.getLifecycle().getRetentionStage().getFrequency());
+
+        // lifecycle is defined at both global and cluster level
+        Lifecycle clusterLifecycle = new Lifecycle();
+        RetentionStage clusterRetentionStage = new RetentionStage();
+        clusterRetentionStage.setFrequency(new Frequency("hours(4)"));
+        clusterLifecycle.setRetentionStage(clusterRetentionStage);
+        feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle);
+        Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()),
+                cluster.getLifecycle().getRetentionStage().getFrequency());
+
+        // lifecycle at both level - retention only at cluster level.
+        feed.getLifecycle().setRetentionStage(null);
+        Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()),
+                cluster.getLifecycle().getRetentionStage().getFrequency());
+
+        // lifecycle at both level - retention only at global level.
+        feed.getLifecycle().setRetentionStage(globalRetentionStage);
+        feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(null);
+        Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()),
+                feed.getLifecycle().getRetentionStage().getFrequency());
+
+        // lifecycle is defined only at cluster level
+        feed.setLifecycle(null);
+        feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(clusterRetentionStage);
+        Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()),
+                cluster.getLifecycle().getRetentionStage().getFrequency());
+    }
+
+    @Test
+    public void testGetRetentionFrequency() throws Exception {
+        Feed feed = new Feed();
+        feed.setFrequency(new Frequency("days(10)"));
+
+        // no lifecycle defined - test both daily and monthly feeds
+        Clusters clusters = new Clusters();
+        org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster();
+        cluster.setName("cluster1");
+        clusters.getClusters().add(cluster);
+        feed.setClusters(clusters);
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("days(1)"));
+
+        feed.setFrequency(new Frequency("hours(1)"));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("hours(6)"));
+
+        feed.setFrequency(new Frequency("minutes(5)"));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("hours(6)"));
+
+        // lifecycle at both level - retention only at global level.
+        Lifecycle globalLifecycle = new Lifecycle();
+        RetentionStage globalRetentionStage = new RetentionStage();
+        globalRetentionStage.setFrequency(new Frequency("hours(2)"));
+        globalLifecycle.setRetentionStage(globalRetentionStage);
+        feed.setLifecycle(globalLifecycle);
+
+        Lifecycle clusterLifecycle = new Lifecycle();
+        RetentionStage clusterRetentionStage = new RetentionStage();
+        clusterLifecycle.setRetentionStage(clusterRetentionStage);
+        feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle);
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("hours(6)"));
+
+        // lifecycle at both level - retention only at cluster level.
+        feed.getLifecycle().getRetentionStage().setFrequency(null);
+        clusterRetentionStage.setFrequency(new Frequency("hours(4)"));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("hours(4)"));
+    }
+
     private Validity getFeedValidity(String start, String end) throws ParseException {
         Validity validity = new Validity();
         validity.setStart(getDate(start));

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index b6fdb13..1c43800 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -42,6 +42,7 @@ import org.apache.falcon.entity.v0.feed.Property;
 import org.apache.falcon.entity.v0.feed.Validity;
 import org.apache.falcon.group.FeedGroupMapTest;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.service.LifecyclePolicyMap;
 import org.apache.falcon.util.FalconTestUtil;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.fs.Path;
@@ -88,9 +89,9 @@ public class FeedEntityParserTest extends AbstractTestBase {
         cluster.setName("backupCluster");
         store.publish(EntityType.CLUSTER, cluster);
 
+        LifecyclePolicyMap.get().init();
         CurrentUser.authenticate(FalconTestUtil.TEST_USER_2);
-        modifiableFeed = parser.parseAndValidate(this.getClass()
-                .getResourceAsStream(FEED_XML));
+        modifiableFeed = parser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
     }
 
     @Test(expectedExceptions = ValidationException.class)
@@ -163,6 +164,25 @@ public class FeedEntityParserTest extends AbstractTestBase {
         System.out.println(stringWriter.toString());
     }
 
+    @Test
+    public void testLifecycleParse() throws Exception {
+        Feed feed = parser.parseAndValidate(this.getClass()
+                .getResourceAsStream(FEED3_XML));
+        assertEquals("hours(17)", feed.getLifecycle().getRetentionStage().getFrequency().toString());
+        assertEquals("AgeBasedDelete", FeedHelper.getPolicies(feed, "testCluster").get(0));
+        assertEquals("reports", feed.getLifecycle().getRetentionStage().getQueue());
+        assertEquals("NORMAL", feed.getLifecycle().getRetentionStage().getPriority());
+    }
+
+    @Test(expectedExceptions = ValidationException.class,
+            expectedExceptionsMessageRegExp = ".*Retention is a mandatory stage.*")
+    public void testMandatoryRetention() throws Exception {
+        Feed feed = parser.parseAndValidate(this.getClass()
+                .getResourceAsStream(FEED3_XML));
+        feed.getLifecycle().setRetentionStage(null);
+        parser.validate(feed);
+    }
+
     @Test(expectedExceptions = ValidationException.class)
     public void applyValidationInvalidFeed() throws Exception {
         Feed feed = parser.parseAndValidate(ProcessEntityParserTest.class

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/test/resources/config/feed/feed-0.3.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/feed-0.3.xml b/common/src/test/resources/config/feed/feed-0.3.xml
new file mode 100644
index 0000000..fc3ea06
--- /dev/null
+++ b/common/src/test/resources/config/feed/feed-0.3.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1"
+        >
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting</tags>
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+    <availabilityFlag>_SUCCESS</availabilityFlag>
+
+    <frequency>hours(1)</frequency>
+    <sla slaLow="hours(2)" slaHigh="hours(3)"/>
+    <timezone>UTC</timezone>
+
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <sla slaLow="hours(3)" slaHigh="hours(4)"/>
+            <locations>
+                <location type="data" path="/projects/falcon/clicks"/>
+                <location type="stats" path="/projects/falcon/clicksStats"/>
+                <location type="meta" path="/projects/falcon/clicksMetaData"/>
+            </locations>
+            <lifecycle>
+                <retention-stage>
+                    <frequency>hours(10)</frequency>
+                    <queue>reports</queue>
+                    <priority>NORMAL</priority>
+                    <properties>
+                        <property name="retention.policy.agebaseddelete.limit" value="hours(9)"></property>
+                    </properties>
+                </retention-stage>
+            </lifecycle>
+        </cluster>
+        <cluster name="backupCluster" type="target">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(6)" action="archive"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser-ut-user" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+    <lifecycle>
+        <retention-stage>
+            <frequency>hours(17)</frequency>
+            <queue>reports</queue>
+            <priority>NORMAL</priority>
+            <properties>
+                <property name="retention.policy.agebaseddelete.limit" value="hours(7)"></property>
+            </properties>
+        </retention-stage>
+    </lifecycle>
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/test/resources/config/feed/feed-0.4.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/feed-0.4.xml b/common/src/test/resources/config/feed/feed-0.4.xml
new file mode 100644
index 0000000..3983c59
--- /dev/null
+++ b/common/src/test/resources/config/feed/feed-0.4.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1"
+        >
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting</tags>
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+    <availabilityFlag>_SUCCESS</availabilityFlag>
+
+    <frequency>hours(1)</frequency>
+    <sla slaLow="hours(2)" slaHigh="hours(3)"/>
+    <timezone>UTC</timezone>
+
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <sla slaLow="hours(3)" slaHigh="hours(4)"/>
+            <locations>
+                <location type="data" path="/projects/falcon/clicks"/>
+                <location type="stats" path="/projects/falcon/clicksStats"/>
+                <location type="meta" path="/projects/falcon/clicksMetaData"/>
+            </locations>
+        </cluster>
+        <cluster name="backupCluster" type="target">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(6)" action="archive"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser-ut-user" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+    <lifecycle>
+        <retention-stage>
+            <frequency>hours(17)</frequency>
+            <queue>reports</queue>
+            <priority>NORMAL</priority>
+            <properties>
+                <property name="retention.policy.agebaseddelete.limit" value="hours(7)"></property>
+            </properties>
+        </retention-stage>
+    </lifecycle>
+
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index d4f4140..e07fe12 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -316,6 +316,46 @@ destination but not in source during replication. "preserveBlockSize" represents
 replication. "preserveReplicationNumber" represents preserving replication number during replication.
 "preservePermission" represents preserving permission during
 
+
+---+++ Lifecycle
+<verbatim>
+
+<lifecycle>
+    <retention-stage>
+        <frequency>hours(10)</frequency>
+        <queue>reports</queue>
+        <priority>NORMAL</priority>
+        <properties>
+            <property name="retention.policy.agebaseddelete.limit" value="hours(9)"></property>
+        </properties>
+    </retention-stage>
+</lifecycle>
+
+</verbatim>
+
+lifecycle tag is the new way to define various stages of a feed's lifecycle. In the example above we have defined a
+retention-stage using lifecycle tag. You may define lifecycle at global level or a cluster level or both. Cluster level
+configuration takes precedence and falcon falls back to global definition if cluster level specification is missing.
+
+
+----++++ Retention Stage
+As of now there are two ways to specify retention. One is through the <retention> tag in the cluster and another is the
+new way through <retention-stage> tag in <lifecycle> tag. If both are defined for a feed, then the lifecycle tag will be
+considered effective and falcon will ignore the <retention> tag in the cluster. If there is an invalid configuration of
+retention-stage in lifecycle tag, then falcon will *NOT* fall back to retention tag even if it is defined and will
+throw validation error.
+
+In this new method of defining retention you can specify the frequency at which the retention should occur, you can
+also define the queue and priority parameters for retention jobs. The default behavior of retention-stage is same as
+the existing one which is to delete all instances corresponding to instance-time earlier than the duration provided in
+"retention.policy.agebaseddelete.limit"
+
+Property "retention.policy.agebaseddelete.limit" is a mandatory property and must contain a valid duration e.g. "hours(1)"
+
+In future, we will allow more customisation like customising how to choose instances to be deleted through this method.
+
+
+
 ---++ Process Specification
 A process defines configuration for a workflow. A workflow is a directed acyclic graph(DAG) which defines the job for the workflow engine. A process definition defines  the configurations required to run the workflow job. For example, process defines the frequency at which the workflow should run, the clusters on which the workflow should run, the inputs and outputs for the workflow, how the workflow failures should be handled, how the late inputs should be handled and so on.  
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/pom.xml
----------------------------------------------------------------------
diff --git a/lifecycle/pom.xml b/lifecycle/pom.xml
new file mode 100644
index 0000000..ddb9550
--- /dev/null
+++ b/lifecycle/pom.xml
@@ -0,0 +1,208 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.falcon</groupId>
+        <artifactId>falcon-main</artifactId>
+        <version>0.8-SNAPSHOT</version>
+    </parent>
+    <artifactId>falcon-feed-lifecycle</artifactId>
+    <description>Apache Falcon Lifecycle Module</description>
+    <name>Apache Falcon LIfecycle Module</name>
+    <packaging>jar</packaging>
+
+    <profiles>
+        <profile>
+            <id>hadoop-2</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <dependencies>
+                 <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-distcp</artifactId>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.oozie</groupId>
+            <artifactId>oozie-client</artifactId>
+        </dependency>
+
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>unpack</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>unpack</goal>
+                        </goals>
+                        <configuration>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>org.apache.oozie</groupId>
+                                    <artifactId>oozie-client</artifactId>
+                                    <outputDirectory>${project.build.directory}/oozie-schemas</outputDirectory>
+                                    <includes>**/*.xsd</includes>
+                                    <excludes>**/*.class</excludes>
+                                </artifactItem>
+                            </artifactItems>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jvnet.jaxb2.maven2</groupId>
+                <artifactId>maven-jaxb2-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>coord-gen</id>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <configuration>
+                            <forceRegenerate>true</forceRegenerate>
+                            <generatePackage>org.apache.falcon.oozie.coordinator</generatePackage>
+                            <schemas>
+                                <schema>
+                                    <dependencyResource>
+                                        <groupId>org.apache.oozie</groupId>
+                                        <artifactId>oozie-client</artifactId>
+                                        <resource>oozie-coordinator-0.3.xsd</resource>
+                                    </dependencyResource>
+                                </schema>
+                            </schemas>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>wf-gen</id>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <configuration>
+                            <!-- Uses workflow xsd from unpacked oozie client jar to customize jaxb binding.
+                            jaxb binding is required to avoid 'Property "Any" is already defined' error-->
+                            <forceRegenerate>true</forceRegenerate>
+                            <generatePackage>org.apache.falcon.oozie.workflow</generatePackage>
+                            <bindingDirectory>src/main/resources/binding</bindingDirectory>
+                            <schemaDirectory>${project.build.directory}/oozie-schemas</schemaDirectory>
+                            <schemaIncludes>
+                                <include>oozie-workflow-0.3.xsd</include>
+                            </schemaIncludes>
+                            <debug>true</debug>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>hive-gen</id>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <configuration>
+                            <forceRegenerate>true</forceRegenerate>
+                            <generatePackage>org.apache.falcon.oozie.hive</generatePackage>
+                            <schemas>
+                                <schema>
+                                    <dependencyResource>
+                                        <groupId>org.apache.oozie</groupId>
+                                        <artifactId>oozie-client</artifactId>
+                                        <resource>hive-action-0.2.xsd</resource>
+                                    </dependencyResource>
+                                </schema>
+                            </schemas>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>bundle-gen</id>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <configuration>
+                            <forceRegenerate>true</forceRegenerate>
+                            <generatePackage>org.apache.falcon.oozie.bundle</generatePackage>
+                            <schemas>
+                                <schema>
+                                    <dependencyResource>
+                                        <groupId>org.apache.oozie</groupId>
+                                        <artifactId>oozie-client</artifactId>
+                                        <resource>oozie-bundle-0.1.xsd</resource>
+                                    </dependencyResource>
+                                </schema>
+                            </schemas>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.7</source>
+                    <target>1.7</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <excludes>
+                        <exclude>**/log4j.xml</exclude>
+                    </excludes>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/OoziePolicyBuilderFactory.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/OoziePolicyBuilderFactory.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/OoziePolicyBuilderFactory.java
new file mode 100644
index 0000000..7464b4c
--- /dev/null
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/OoziePolicyBuilderFactory.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.engine.oozie;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.lifecycle.AbstractPolicyBuilderFactory;
+import org.apache.falcon.lifecycle.PolicyBuilder;
+import org.apache.falcon.util.ReflectionUtils;
+import org.apache.falcon.util.StartupProperties;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Builds feed lifecycle policies for Oozie workflow engine.
+ */
+public class OoziePolicyBuilderFactory extends AbstractPolicyBuilderFactory {
+
+    private static Map<String, PolicyBuilder> registry = new HashMap<>();
+
+    static {
+        String builders = StartupProperties.get().getProperty("falcon.feed.lifecycle.policy.builders", "");
+        if (StringUtils.isNotBlank(builders)) {
+            for (String builder : builders.split(",")) {
+                try {
+                    PolicyBuilder policyBuilder = ReflectionUtils.getInstanceByClassName(builder);
+                    registry.put(policyBuilder.getPolicyName(), policyBuilder);
+                } catch (FalconException e) {
+                    throw new RuntimeException("Couldn't load builder for " + builder.getClass().getSimpleName(), e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public PolicyBuilder getPolicyBuilder(String policyName) throws FalconException {
+        if (registry.containsKey(policyName)) {
+            return registry.get(policyName);
+        }
+        throw new FalconException("Couldn't find builder for policy " + policyName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java
new file mode 100644
index 0000000..4601070
--- /dev/null
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.engine.oozie.retention;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.ExecutionType;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.lifecycle.engine.oozie.utils.OozieBuilderUtils;
+import org.apache.falcon.oozie.coordinator.ACTION;
+import org.apache.falcon.oozie.coordinator.CONTROLS;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.Properties;
+
+/**
+ * Utility class to build coordinators for AgeBasedDelete Policy.
+ */
+public final class AgeBasedCoordinatorBuilder {
+
+    private AgeBasedCoordinatorBuilder() {
+
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(AgeBasedCoordinatorBuilder.class);
+
+    /**
+     * Builds the coordinator app.
+     * @param cluster - cluster to schedule retention on.
+     * @param basePath - Base path to marshal coordinator app.
+     * @param feed - feed for which retention is to be scheduled.
+     * @param wfProp - properties passed from workflow to coordinator e.g. ENTITY_PATH
+     * @return - Properties from creating the coordinator application to be used by Bundle.
+     * @throws FalconException
+     */
+    public static Properties build(Cluster cluster, Path basePath, Feed feed, Properties wfProp)
+        throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        if (feedCluster.getValidity().getEnd().before(new Date())) {
+            LOG.warn("Feed Retention is not applicable as Feed's end time for cluster {} is not in the future",
+                    cluster.getName());
+            return null;
+        }
+
+        COORDINATORAPP coord = new COORDINATORAPP();
+        String coordName = EntityUtil.getWorkflowName(LifeCycle.EVICTION.getTag(), feed).toString();
+        coord.setName(coordName);
+        coord.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
+        coord.setStart(SchemaHelper.formatDateUTC(new Date()));
+        coord.setTimezone(feed.getTimezone().getID());
+
+        Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, cluster.getName());
+        // set controls
+        long frequencyInMillis = ExpressionHelper.get().evaluate(retentionFrequency.toString(), Long.class);
+        CONTROLS controls = new CONTROLS();
+        controls.setExecution(ExecutionType.LAST_ONLY.value());
+        controls.setTimeout(String.valueOf(frequencyInMillis / (1000 * 60)));
+        coord.setControls(controls);
+
+        coord.setFrequency("${coord:" + retentionFrequency.toString() + "}");
+
+        Path buildPath = OozieBuilderUtils.getBuildPath(basePath, LifeCycle.EVICTION.getTag());
+        Properties props = OozieBuilderUtils.createCoordDefaultConfiguration(coordName, feed);
+        props.putAll(OozieBuilderUtils.getProperties(buildPath, coordName));
+
+        WORKFLOW workflow = new WORKFLOW();
+        String entityPath = wfProp.getProperty(OozieBuilderUtils.ENTITY_PATH);
+        String storagePath = OozieBuilderUtils.getStoragePath(entityPath);
+        workflow.setAppPath(storagePath);
+        workflow.setConfiguration(OozieBuilderUtils.getCoordinatorConfig(props));
+        ACTION action = new ACTION();
+        action.setWorkflow(workflow);
+
+        coord.setAction(action);
+
+        Path marshalPath = OozieBuilderUtils.marshalCoordinator(cluster, coord, buildPath);
+        return OozieBuilderUtils.getProperties(marshalPath, coordName);
+    }
+
+
+    protected static WorkflowExecutionContext.EntityOperations getOperation() {
+        return WorkflowExecutionContext.EntityOperations.DELETE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedDeleteBuilder.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedDeleteBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedDeleteBuilder.java
new file mode 100644
index 0000000..edb4f8e
--- /dev/null
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedDeleteBuilder.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.engine.oozie.retention;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.lifecycle.PolicyBuilder;
+import org.apache.falcon.lifecycle.retention.AgeBasedDelete;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Properties;
+
+/**
+ * Oozie Builder for AgeBasedDelete policy.
+ */
+public class AgeBasedDeleteBuilder implements PolicyBuilder {
+
+    private static final String NAME = new AgeBasedDelete().getName();
+
+    @Override
+    public Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException {
+        Properties wfProps = buildWorkflow(cluster, buildPath, feed);
+        return buildCoordinator(cluster, buildPath, feed, wfProps);
+    }
+
+    @Override
+    public String getPolicyName() {
+        return NAME;
+    }
+
+    public Properties buildCoordinator(Cluster cluster, Path buildPath, Feed feed, Properties wfProps)
+        throws FalconException {
+        return AgeBasedCoordinatorBuilder.build(cluster, buildPath, feed, wfProps);
+    }
+
+    public Properties buildWorkflow(Cluster cluster, Path buildPath, Feed feed) throws FalconException {
+        return AgeBasedWorkflowBuilder.build(cluster, buildPath, feed);
+    }
+}