You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/07/10 08:57:36 UTC

[7/9] FALCON-369 Refactor workflow builder. Contributed by Shwetha GS

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
new file mode 100644
index 0000000..bb8dfcc
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Property;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.oozie.feed.FeedBundleBuilder;
+import org.apache.falcon.oozie.process.ProcessBundleBuilder;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.service.FalconPathFilter;
+import org.apache.falcon.service.SharedLibraryHostingService;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.Marshaller;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+/**
+ * Base class for building oozie entities - workflow, coordinator and bundle.
+ * @param <T>
+ */
+public abstract class OozieEntityBuilder<T extends Entity> {
+    public static final Logger LOG = LoggerFactory.getLogger(OozieEntityBuilder.class);
+
+    public static final String METASTOREURIS = "hive.metastore.uris";
+    public static final String METASTORE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal";
+    public static final String METASTORE_USE_THRIFT_SASL = "hive.metastore.sasl.enabled";
+
+    public static final String ENTITY_PATH = "ENTITY_PATH";
+    public static final String ENTITY_NAME = "ENTITY_NAME";
+
+    private static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
+        @Override
+        public boolean accept(Path path) {
+            return path.getName().startsWith("falcon");
+        }
+
+        @Override
+        public String getJarName(Path path) {
+            String name = path.getName();
+            if (name.endsWith(".jar")) {
+                name = name.substring(0, name.indexOf(".jar"));
+            }
+            return name;
+        }
+    };
+
+    protected T entity;
+    protected final boolean isSecurityEnabled = SecurityUtil.isSecurityEnabled();
+
+    public OozieEntityBuilder(T entity) {
+        this.entity = entity;
+    }
+
+    public abstract Properties build(Cluster cluster, Path buildPath) throws FalconException;
+
+    protected String getStoragePath(Path path) {
+        if (path != null) {
+            return getStoragePath(path.toString());
+        }
+        return null;
+    }
+
+    protected String getStoragePath(String path) {
+        if (StringUtils.isNotEmpty(path)) {
+            if (new Path(path).toUri().getScheme() == null && !path.startsWith("${nameNode}")) {
+                path = "${nameNode}" + path;
+            }
+        }
+        return path;
+    }
+
+    public static OozieEntityBuilder get(Entity entity) {
+        switch(entity.getEntityType()) {
+        case FEED:
+            return new FeedBundleBuilder((Feed) entity);
+
+        case PROCESS:
+            return new ProcessBundleBuilder((Process)entity);
+
+        default:
+        }
+        throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
+    }
+
+    protected void marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
+        throws FalconException {
+        try {
+            Marshaller marshaller = jaxbContext.createMarshaller();
+            marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(
+                outPath.toUri(), ClusterHelper.getConfiguration(cluster));
+            OutputStream out = fs.create(outPath);
+            try {
+                marshaller.marshal(jaxbElement, out);
+            } finally {
+                out.close();
+            }
+            if (LOG.isDebugEnabled()) {
+                StringWriter writer = new StringWriter();
+                marshaller.marshal(jaxbElement, writer);
+                LOG.debug("Writing definition to {} on cluster {}", outPath, cluster.getName());
+                LOG.debug(writer.getBuffer().toString());
+            }
+
+            LOG.info("Marshalled {} to {}", jaxbElement.getDeclaredType(), outPath);
+        } catch (Exception e) {
+            throw new FalconException("Unable to marshall app object", e);
+        }
+    }
+
+    protected boolean isTableStorageType(Cluster cluster) throws FalconException {
+        return entity.getEntityType() == EntityType.PROCESS
+            ? isTableStorageType(cluster, (Process) entity) : isTableStorageType(cluster, (Feed) entity);
+    }
+
+    protected boolean isTableStorageType(Cluster cluster, Feed feed) throws FalconException {
+        Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
+        return Storage.TYPE.TABLE == storageType;
+    }
+
+    protected boolean isTableStorageType(Cluster cluster, Process process) throws FalconException {
+        Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, process);
+        return Storage.TYPE.TABLE == storageType;
+    }
+
+    protected Properties getHiveCredentials(Cluster cluster) {
+        String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
+        if (metaStoreUrl == null) {
+            throw new IllegalStateException(
+                "Registry interface is not defined in cluster: " + cluster.getName());
+        }
+
+        Properties hiveCredentials = new Properties();
+        hiveCredentials.put(METASTOREURIS, metaStoreUrl);
+        hiveCredentials.put("hive.metastore.execute.setugi", "true");
+        hiveCredentials.put("hcatNode", metaStoreUrl.replace("thrift", "hcat"));
+        hiveCredentials.put("hcat.metastore.uri", metaStoreUrl);
+
+        if (isSecurityEnabled) {
+            String principal = ClusterHelper
+                .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL);
+            hiveCredentials.put(METASTORE_KERBEROS_PRINCIPAL, principal);
+            hiveCredentials.put(METASTORE_USE_THRIFT_SASL, "true");
+            hiveCredentials.put("hcat.metastore.principal", principal);
+        }
+
+        return hiveCredentials;
+    }
+
+    protected Configuration getHiveCredentialsAsConf(Cluster cluster) {
+        Properties hiveCredentials = getHiveCredentials(cluster);
+
+        Configuration hiveConf = new Configuration(false);
+        for (Entry<Object, Object> entry : hiveCredentials.entrySet()) {
+            hiveConf.set((String)entry.getKey(), (String)entry.getValue());
+        }
+
+        return hiveConf;
+    }
+
+    protected Properties getEntityProperties(Entity myEntity) {
+        Properties properties = new Properties();
+        switch (myEntity.getEntityType()) {
+        case CLUSTER:
+            org.apache.falcon.entity.v0.cluster.Properties clusterProps = ((Cluster) myEntity).getProperties();
+            if (clusterProps != null) {
+                for (Property prop : clusterProps.getProperties()) {
+                    properties.put(prop.getName(), prop.getValue());
+                }
+            }
+            break;
+
+        case FEED:
+            org.apache.falcon.entity.v0.feed.Properties feedProps = ((Feed) myEntity).getProperties();
+            if (feedProps != null) {
+                for (org.apache.falcon.entity.v0.feed.Property prop : feedProps.getProperties()) {
+                    properties.put(prop.getName(), prop.getValue());
+                }
+            }
+            break;
+
+        case PROCESS:
+            org.apache.falcon.entity.v0.process.Properties processProps = ((Process) myEntity).getProperties();
+            if (processProps != null) {
+                for (org.apache.falcon.entity.v0.process.Property prop : processProps.getProperties()) {
+                    properties.put(prop.getName(), prop.getValue());
+                }
+            }
+            break;
+
+        default:
+            throw new IllegalArgumentException("Unhandled entity type " + myEntity.getEntityType());
+        }
+        return properties;
+    }
+
+    protected void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage, Properties props) {
+        String prefix = "falcon_" + input.getName();
+
+        propagateCommonCatalogTableProperties(tableStorage, props, prefix);
+
+        props.put(prefix + "_partition_filter_pig",
+            "${coord:dataInPartitionFilter('" + input.getName() + "', 'pig')}");
+        props.put(prefix + "_partition_filter_hive",
+            "${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}");
+        props.put(prefix + "_partition_filter_java",
+            "${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}");
+        props.put(prefix + "_datain_partitions_hive",
+            "${coord:dataInPartitions('" + input.getName() + "', 'hive-export')}");
+    }
+
+    protected void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage, Properties props) {
+        String prefix = "falcon_" + output.getName();
+
+        propagateCommonCatalogTableProperties(tableStorage, props, prefix);
+
+        //pig and java actions require partition expression as "key1=val1, key2=val2"
+        props.put(prefix + "_partitions_pig",
+            "${coord:dataOutPartitions('" + output.getName() + "')}");
+        props.put(prefix + "_partitions_java",
+            "${coord:dataOutPartitions('" + output.getName() + "')}");
+
+        //hive requires partition expression as "key1='val1', key2='val2'" (with quotes around values)
+        //there is no direct EL expression in oozie
+        List<String> partitions = new ArrayList<String>();
+        for (String key : tableStorage.getDatedPartitionKeys()) {
+            StringBuilder expr = new StringBuilder();
+            expr.append("${coord:dataOutPartitionValue('").append(output.getName()).append("', '").append(key)
+                .append("')}");
+            props.put(prefix + "_dated_partition_value_" + key, expr.toString());
+            partitions.add(key + "='" + expr + "'");
+
+        }
+        props.put(prefix + "_partitions_hive", StringUtils.join(partitions, ","));
+    }
+
+    protected void propagateCommonCatalogTableProperties(CatalogStorage tableStorage, Properties props, String prefix) {
+        props.put(prefix + "_storage_type", tableStorage.getType().name());
+        props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
+        props.put(prefix + "_database", tableStorage.getDatabase());
+        props.put(prefix + "_table", tableStorage.getTable());
+    }
+
+    protected void copySharedLibs(Cluster cluster, Path libPath) throws FalconException {
+        try {
+            SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
+                libPath, cluster, FALCON_JAR_FILTER);
+        } catch (IOException e) {
+            throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);
+        }
+    }
+
+    protected Properties getProperties(Path path, String name) {
+        if (path == null) {
+            return null;
+        }
+
+        Properties prop = new Properties();
+        prop.setProperty(OozieEntityBuilder.ENTITY_PATH, getStoragePath(path));
+        prop.setProperty(OozieEntityBuilder.ENTITY_NAME, name);
+        return prop;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
new file mode 100644
index 0000000..ac78297
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.oozie.feed.FeedReplicationWorkflowBuilder;
+import org.apache.falcon.oozie.feed.FeedRetentionWorkflowBuilder;
+import org.apache.falcon.oozie.process.HiveProcessWorkflowBuilder;
+import org.apache.falcon.oozie.process.OozieProcessWorkflowBuilder;
+import org.apache.falcon.oozie.process.PigProcessWorkflowBuilder;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.CREDENTIAL;
+import org.apache.falcon.oozie.workflow.CREDENTIALS;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.util.OozieUtils;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Base class for building orchestration workflow in oozie.
+ * @param <T>
+ */
+public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extends OozieEntityBuilder<T> {
+    protected static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";
+    public static final Set<String> FALCON_ACTIONS = new HashSet<String>(
+        Arrays.asList(new String[]{"recordsize", "succeeded-post-processing", "failed-post-processing", }));
+    private final Tag lifecycle;
+
+    public OozieOrchestrationWorkflowBuilder(T entity, Tag lifecycle) {
+        super(entity);
+        this.lifecycle = lifecycle;
+    }
+
+    public static final OozieOrchestrationWorkflowBuilder get(Entity entity, Tag lifecycle) {
+        switch(entity.getEntityType()) {
+        case FEED:
+            Feed feed = (Feed) entity;
+            switch (lifecycle) {
+            case RETENTION:
+                return new FeedRetentionWorkflowBuilder(feed);
+
+            case REPLICATION:
+                return new FeedReplicationWorkflowBuilder(feed);
+
+            default:
+                throw new IllegalArgumentException("Unhandled type " + entity.getEntityType() + ", lifecycle "
+                    + lifecycle);
+            }
+
+        case PROCESS:
+            Process process = (Process) entity;
+            switch(process.getWorkflow().getEngine()) {
+            case PIG:
+                return new PigProcessWorkflowBuilder(process);
+
+            case OOZIE:
+                return new OozieProcessWorkflowBuilder(process);
+
+            case HIVE:
+                return new HiveProcessWorkflowBuilder(process);
+
+            default:
+                break;
+            }
+
+        default:
+        }
+
+        throw new IllegalArgumentException("Unhandled type " + entity.getEntityType() + ", lifecycle " + lifecycle);
+    }
+
+    protected void marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
+        marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
+            OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
+    }
+
+    protected WORKFLOWAPP getWorkflow(String template) throws FalconException {
+        InputStream resourceAsStream = null;
+        try {
+            resourceAsStream = OozieOrchestrationWorkflowBuilder.class.getResourceAsStream(template);
+            Unmarshaller unmarshaller = OozieUtils.WORKFLOW_JAXB_CONTEXT.createUnmarshaller();
+            @SuppressWarnings("unchecked")
+            JAXBElement<WORKFLOWAPP> jaxbElement = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(resourceAsStream);
+            return jaxbElement.getValue();
+        } catch (JAXBException e) {
+            throw new FalconException(e);
+        } finally {
+            IOUtils.closeQuietly(resourceAsStream);
+        }
+    }
+
+    protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag)
+        throws FalconException {
+        String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+        FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+        try {
+            addExtensionJars(fs, new Path(libext), wf);
+            addExtensionJars(fs, new Path(libext, entity.getEntityType().name()), wf);
+            if (tag != null) {
+                addExtensionJars(fs,
+                    new Path(libext, entity.getEntityType().name() + "/" + tag.name().toLowerCase()), wf);
+            }
+        } catch(IOException e) {
+            throw new FalconException(e);
+        }
+    }
+
+    private void addExtensionJars(FileSystem fs, Path path, WORKFLOWAPP wf) throws IOException {
+        FileStatus[] libs = null;
+        try {
+            libs = fs.listStatus(path);
+        } catch(FileNotFoundException ignore) {
+            //Ok if the libext is not configured
+        }
+
+        if (libs == null) {
+            return;
+        }
+
+        for(FileStatus lib : libs) {
+            if (lib.isDir()) {
+                continue;
+            }
+
+            for(Object obj: wf.getDecisionOrForkOrJoin()) {
+                if (!(obj instanceof ACTION)) {
+                    continue;
+                }
+                ACTION action = (ACTION) obj;
+                List<String> files = null;
+                if (action.getJava() != null) {
+                    files = action.getJava().getFile();
+                } else if (action.getPig() != null) {
+                    files = action.getPig().getFile();
+                } else if (action.getMapReduce() != null) {
+                    files = action.getMapReduce().getFile();
+                }
+                if (files != null) {
+                    files.add(lib.getPath().toString());
+                }
+            }
+        }
+    }
+
+    // creates hive-site.xml configuration in conf dir for the given cluster on the same cluster.
+    protected void createHiveConfiguration(Cluster cluster, Path workflowPath, String prefix) throws FalconException {
+        Configuration hiveConf = getHiveCredentialsAsConf(cluster);
+
+        try {
+            Configuration conf = ClusterHelper.getConfiguration(cluster);
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+
+            // create hive conf to stagingDir
+            Path confPath = new Path(workflowPath + "/conf");
+
+            persistHiveConfiguration(fs, confPath, hiveConf, prefix);
+        } catch (IOException e) {
+            throw new FalconException("Unable to create create hive site", e);
+        }
+    }
+
+    private void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf,
+        String prefix) throws IOException {
+        OutputStream out = null;
+        try {
+            out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
+            hiveConf.writeXml(out);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+
+    /**
+     * This is only necessary if table is involved and is secure mode.
+     *
+     * @param workflowApp workflow xml
+     * @param cluster     cluster entity
+     */
+    protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster, String credentialName) {
+        CREDENTIALS credentials = workflowApp.getCredentials();
+        if (credentials == null) {
+            credentials = new CREDENTIALS();
+        }
+
+        credentials.getCredential().add(createHCatalogCredential(cluster, credentialName));
+
+        // add credential for workflow
+        workflowApp.setCredentials(credentials);
+    }
+
+    /**
+     * This is only necessary if table is involved and is secure mode.
+     *
+     * @param workflowApp workflow xml
+     * @param cluster     cluster entity
+     */
+    protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster,
+        String credentialName, Set<String> actions) {
+        addHCatalogCredentials(workflowApp, cluster, credentialName);
+
+        // add credential to each action
+        for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
+            if (!(object instanceof ACTION)) {
+                continue;
+            }
+
+            ACTION action = (ACTION) object;
+            String actionName = action.getName();
+            if (actions.contains(actionName)) {
+                action.setCred(credentialName);
+            }
+        }
+    }
+
+    /**
+     * This is only necessary if table is involved and is secure mode.
+     *
+     * @param cluster        cluster entity
+     * @param credentialName credential name
+     * @return CREDENTIALS object
+     */
+    private CREDENTIAL createHCatalogCredential(Cluster cluster, String credentialName) {
+        final String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
+
+        CREDENTIAL credential = new CREDENTIAL();
+        credential.setName(credentialName);
+        credential.setType("hcat");
+
+        credential.getProperty().add(createProperty("hcat.metastore.uri", metaStoreUrl));
+        credential.getProperty().add(createProperty("hcat.metastore.principal",
+            ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL)));
+
+        return credential;
+    }
+
+    private CREDENTIAL.Property createProperty(String name, String value) {
+        CREDENTIAL.Property property = new CREDENTIAL.Property();
+        property.setName(name);
+        property.setValue(value);
+        return property;
+    }
+
+    protected void addOozieRetries(WORKFLOWAPP workflow) {
+        for (Object object : workflow.getDecisionOrForkOrJoin()) {
+            if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+                continue;
+            }
+            org.apache.falcon.oozie.workflow.ACTION action = (org.apache.falcon.oozie.workflow.ACTION) object;
+            String actionName = action.getName();
+            if (FALCON_ACTIONS.contains(actionName)) {
+                decorateWithOozieRetries(action);
+            }
+        }
+    }
+
+    protected void decorateWithOozieRetries(ACTION action) {
+        Properties props = RuntimeProperties.get();
+        action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
+        action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
new file mode 100644
index 0000000..6917f4e
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.feed;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.OozieBundleBuilder;
+import org.apache.falcon.oozie.OozieCoordinatorBuilder;
+import org.apache.hadoop.fs.Path;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Builds oozie bundle for the feed.
+ */
+public class FeedBundleBuilder extends OozieBundleBuilder<Feed> {
+    public FeedBundleBuilder(Feed entity) {
+        super(entity);
+    }
+
+    @Override protected Path getLibPath(Cluster cluster, Path buildPath) {
+        return new Path(buildPath, "lib");
+    }
+
+    @Override protected List<Properties> doBuild(Cluster cluster, Path buildPath) throws FalconException {
+        List<Properties> props = new ArrayList<Properties>();
+        List<Properties> evictionProps =
+            OozieCoordinatorBuilder.get(entity, Tag.RETENTION).buildCoords(cluster, buildPath);
+        if (evictionProps != null) {
+            props.addAll(evictionProps);
+        }
+
+        List<Properties> replicationProps = OozieCoordinatorBuilder.get(entity, Tag.REPLICATION).buildCoords(cluster,
+            buildPath);
+        if (replicationProps != null) {
+            props.addAll(replicationProps);
+        }
+
+        if (!props.isEmpty()) {
+            copySharedLibs(cluster, getLibPath(cluster, buildPath));
+        }
+
+        return props;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
new file mode 100644
index 0000000..3226cf2
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.feed;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.oozie.OozieCoordinatorBuilder;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.oozie.coordinator.ACTION;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ *  Builds oozie coordinator for feed replication, one per source-target cluster combination.
+ */
+public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> {
+    private static final String REPLICATION_COORD_TEMPLATE = "/coordinator/replication-coordinator.xml";
+    private static final int THIRTY_MINUTES = 30 * 60 * 1000;
+
+    private static final String PARALLEL = "parallel";
+    private static final String TIMEOUT = "timeout";
+    private static final String MR_MAX_MAPS = "maxMaps";
+    private static final String MR_MAP_BANDWIDTH = "mapBandwidthKB";
+
+    public FeedReplicationCoordinatorBuilder(Feed entity) {
+        super(entity, Tag.REPLICATION);
+    }
+
+    @Override public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+        if (feedCluster.getType() == ClusterType.TARGET) {
+            List<Properties> props = new ArrayList<Properties>();
+            OozieOrchestrationWorkflowBuilder builder = OozieOrchestrationWorkflowBuilder.get(entity, Tag.REPLICATION);
+            for (org.apache.falcon.entity.v0.feed.Cluster srcFeedCluster : entity.getClusters().getClusters()) {
+
+                if (srcFeedCluster.getType() == ClusterType.SOURCE) {
+                    Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, srcFeedCluster.getName());
+                    // workflow is serialized to a specific dir
+                    Path coordPath = new Path(buildPath, Tag.REPLICATION.name() + "/" + srcCluster.getName());
+
+                    // Different workflow for each source since hive credentials vary for each cluster
+                    builder.build(cluster, coordPath);
+
+                    props.add(doBuild(srcCluster, cluster, coordPath));
+                }
+            }
+            return props;
+        }
+        return null;
+    }
+
+    private Properties doBuild(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException {
+        long replicationDelayInMillis = getReplicationDelayInMillis(srcCluster);
+        Date sourceStartDate = getStartDate(srcCluster, replicationDelayInMillis);
+        Date sourceEndDate = getEndDate(srcCluster);
+
+        Date targetStartDate = getStartDate(trgCluster, replicationDelayInMillis);
+        Date targetEndDate = getEndDate(trgCluster);
+
+        if (noOverlapExists(sourceStartDate, sourceEndDate,
+            targetStartDate, targetEndDate)) {
+            LOG.warn("Not creating replication coordinator, as the source cluster: {} and target cluster: {} do "
+                + "not have overlapping dates", srcCluster.getName(), trgCluster.getName());
+            return null;
+        }
+
+        COORDINATORAPP coord = getCoordinatorTemplate(REPLICATION_COORD_TEMPLATE);
+
+        String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, Arrays.asList(srcCluster.getName()),
+            entity).toString();
+        String start = sourceStartDate.after(targetStartDate)
+            ? SchemaHelper.formatDateUTC(sourceStartDate) : SchemaHelper.formatDateUTC(targetStartDate);
+        String end = sourceEndDate.before(targetEndDate)
+            ? SchemaHelper.formatDateUTC(sourceEndDate) : SchemaHelper.formatDateUTC(targetEndDate);
+
+        initializeCoordAttributes(coord, coordName, start, end, replicationDelayInMillis);
+        setCoordControls(coord);
+
+        final Storage sourceStorage = FeedHelper.createReadOnlyStorage(srcCluster, entity);
+        initializeInputDataSet(srcCluster, coord, sourceStorage);
+
+        final Storage targetStorage = FeedHelper.createStorage(trgCluster, entity);
+        initializeOutputDataSet(trgCluster, coord, targetStorage);
+
+        ACTION replicationWorkflowAction = getReplicationWorkflowAction(
+            srcCluster, trgCluster, buildPath, coordName, sourceStorage, targetStorage);
+        coord.setAction(replicationWorkflowAction);
+
+        marshal(trgCluster, coord, buildPath);
+        return getProperties(buildPath, coordName);
+    }
+
+    private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path buildPath,
+        String wfName, Storage sourceStorage, Storage targetStorage) throws FalconException {
+        ACTION action = new ACTION();
+        WORKFLOW workflow = new WORKFLOW();
+
+        workflow.setAppPath(getStoragePath(buildPath.toString()));
+        Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
+        props.put("srcClusterName", srcCluster.getName());
+        props.put("srcClusterColo", srcCluster.getColo());
+        if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
+            props.put(MR_MAX_MAPS, getDefaultMaxMaps());
+        }
+        if (props.get(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden
+            props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth());
+        }
+
+        // the storage type is uniform across source and target feeds for replication
+        props.put("falconFeedStorageType", sourceStorage.getType().name());
+
+        String instancePaths = "";
+        if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) {
+            String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster);
+            instancePaths = pathsWithPartitions;
+
+            propagateFileSystemCopyProperties(pathsWithPartitions, props);
+        } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
+            instancePaths = "${coord:dataIn('input')}";
+            final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
+            propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource");
+            final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage;
+            propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
+            propagateTableCopyProperties(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, props);
+            setupHiveConfiguration(srcCluster, trgCluster, buildPath);
+        }
+
+        propagateLateDataProperties(instancePaths, sourceStorage.getType().name(), props);
+        props.putAll(FeedHelper.getUserWorkflowProperties("replication"));
+
+        workflow.setConfiguration(getConfig(props));
+        action.setWorkflow(workflow);
+
+        return action;
+    }
+
+    private String getDefaultMaxMaps() {
+        return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
+    }
+
+    private String getDefaultMapBandwidth() {
+        return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidthKB", "102400");
+    }
+
+    private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster) throws FalconException {
+        String srcPart = FeedHelper.normalizePartitionExpression(
+            FeedHelper.getCluster(entity, srcCluster.getName()).getPartition());
+        srcPart = FeedHelper.evaluateClusterExp(srcCluster, srcPart);
+
+        String targetPart = FeedHelper.normalizePartitionExpression(
+            FeedHelper.getCluster(entity, trgCluster.getName()).getPartition());
+        targetPart = FeedHelper.evaluateClusterExp(trgCluster, targetPart);
+
+        StringBuilder pathsWithPartitions = new StringBuilder();
+        pathsWithPartitions.append("${coord:dataIn('input')}/")
+            .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
+
+        String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
+        parts = StringUtils.stripEnd(parts, "/");
+        return parts;
+    }
+
+    private void propagateFileSystemCopyProperties(String paths, Properties props) throws FalconException {
+        props.put("sourceRelativePaths", paths);
+
+        props.put("distcpSourcePaths", "${coord:dataIn('input')}");
+        props.put("distcpTargetPaths", "${coord:dataOut('output')}");
+    }
+
+    private void propagateTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
+        Properties props, String prefix) {
+        props.put(prefix + "NameNode", ClusterHelper.getStorageUrl(cluster));
+        props.put(prefix + "JobTracker", ClusterHelper.getMREndPoint(cluster));
+        props.put(prefix + "HcatNode", tableStorage.getCatalogUrl());
+
+        props.put(prefix + "Database", tableStorage.getDatabase());
+        props.put(prefix + "Table", tableStorage.getTable());
+        props.put(prefix + "Partition", "${coord:dataInPartitions('input', 'hive-export')}");
+    }
+
+    private void propagateTableCopyProperties(Cluster srcCluster, CatalogStorage sourceStorage,
+        Cluster trgCluster, CatalogStorage targetStorage, Properties props) {
+        // create staging dirs for export at source & set it as distcpSourcePaths
+        String sourceStagingPath =
+            FeedHelper.getStagingPath(srcCluster, entity, sourceStorage, Tag.REPLICATION,
+                NOMINAL_TIME_EL + "/" + trgCluster.getName());
+        props.put("distcpSourcePaths", sourceStagingPath);
+
+        // create staging dirs for import at target & set it as distcpTargetPaths
+        String targetStagingPath =
+            FeedHelper.getStagingPath(trgCluster, entity, targetStorage, Tag.REPLICATION,
+                NOMINAL_TIME_EL + "/" + trgCluster.getName());
+        props.put("distcpTargetPaths", targetStagingPath);
+
+        props.put("sourceRelativePaths", IGNORE); // this will bot be used for Table storage.
+    }
+
+    private void propagateLateDataProperties(String instancePaths, String falconFeedStorageType, Properties props) {
+        // todo these pairs are the same but used in different context
+        // late data handler - should-record action
+        props.put("falconInputFeeds", entity.getName());
+        props.put("falconInPaths", instancePaths);
+
+        // storage type for each corresponding feed - in this case only one feed is involved
+        // needed to compute usage based on storage type in LateDataHandler
+        props.put("falconInputFeedStorageTypes", falconFeedStorageType);
+
+        // falcon post processing
+        props.put(ARG.feedNames.getPropName(), entity.getName());
+        props.put(ARG.feedInstancePaths.getPropName(), "${coord:dataOut('output')}");
+    }
+
+    private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException {
+        Configuration conf = ClusterHelper.getConfiguration(trgCluster);
+        FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+
+        try {
+            // copy import export scripts to stagingDir
+            Path scriptPath = new Path(buildPath, "scripts");
+            copyHiveScript(fs, scriptPath, "/workflow/", "falcon-table-export.hql");
+            copyHiveScript(fs, scriptPath, "/workflow/", "falcon-table-import.hql");
+
+            // create hive conf to stagingDir
+            Path confPath = new Path(buildPath + "/conf");
+            persistHiveConfiguration(fs, confPath, srcCluster, "falcon-source-");
+            persistHiveConfiguration(fs, confPath, trgCluster, "falcon-target-");
+        } catch (IOException e) {
+            throw new FalconException("Unable to create hive conf files", e);
+        }
+    }
+
+    private void copyHiveScript(FileSystem fs, Path scriptPath, String localScriptPath,
+        String scriptName) throws IOException {
+        OutputStream out = null;
+        InputStream in = null;
+        try {
+            out = fs.create(new Path(scriptPath, scriptName));
+            in = FeedReplicationCoordinatorBuilder.class.getResourceAsStream(localScriptPath + scriptName);
+            IOUtils.copy(in, out);
+        } finally {
+            IOUtils.closeQuietly(in);
+            IOUtils.closeQuietly(out);
+        }
+    }
+
+    protected void persistHiveConfiguration(FileSystem fs, Path confPath,
+        Cluster cluster, String prefix) throws IOException {
+        Configuration hiveConf = getHiveCredentialsAsConf(cluster);
+        OutputStream out = null;
+        try {
+            out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
+            hiveConf.writeXml(out);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+
+    private void initializeCoordAttributes(COORDINATORAPP coord, String coordName, String start, String end,
+        long delayInMillis) {
+        coord.setName(coordName);
+        coord.setFrequency("${coord:" + entity.getFrequency().toString() + "}");
+
+        if (delayInMillis > 0) {
+            long delayInMins = -1 * delayInMillis / (1000 * 60);
+            String elExp = "${now(0," + delayInMins + ")}";
+
+            coord.getInputEvents().getDataIn().get(0).getInstance().set(0, elExp);
+            coord.getOutputEvents().getDataOut().get(0).setInstance(elExp);
+        }
+
+        coord.setStart(start);
+        coord.setEnd(end);
+        coord.setTimezone(entity.getTimezone().getID());
+    }
+
+    private void setCoordControls(COORDINATORAPP coord) throws FalconException {
+        long frequencyInMillis = ExpressionHelper.get().evaluate(entity.getFrequency().toString(), Long.class);
+        long timeoutInMillis = frequencyInMillis * 6;
+        if (timeoutInMillis < THIRTY_MINUTES) {
+            timeoutInMillis = THIRTY_MINUTES;
+        }
+
+        Properties props = getEntityProperties(entity);
+        String timeout = props.getProperty(TIMEOUT);
+        if (timeout!=null) {
+            try{
+                timeoutInMillis= ExpressionHelper.get().evaluate(timeout, Long.class);
+            } catch (Exception ignore) {
+                LOG.error("Unable to evaluate timeout:", ignore);
+            }
+        }
+        coord.getControls().setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
+        coord.getControls().setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
+
+        String parallelProp = props.getProperty(PARALLEL);
+        int parallel = 1;
+        if (parallelProp != null) {
+            try {
+                parallel = Integer.parseInt(parallelProp);
+            } catch (NumberFormatException ignore) {
+                LOG.error("Unable to parse parallel:", ignore);
+            }
+        }
+        coord.getControls().setConcurrency(String.valueOf(parallel));
+    }
+
+
+    private void initializeInputDataSet(Cluster cluster, COORDINATORAPP coord, Storage storage) throws FalconException {
+        SYNCDATASET inputDataset = (SYNCDATASET)coord.getDatasets().getDatasetOrAsyncDataset().get(0);
+
+        String uriTemplate = storage.getUriTemplate(LocationType.DATA);
+        if (storage.getType() == Storage.TYPE.TABLE) {
+            uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+        }
+        inputDataset.setUriTemplate(uriTemplate);
+
+        setDatasetValues(inputDataset, cluster);
+
+        if (entity.getAvailabilityFlag() == null) {
+            inputDataset.setDoneFlag("");
+        } else {
+            inputDataset.setDoneFlag(entity.getAvailabilityFlag());
+        }
+    }
+
+    private void initializeOutputDataSet(Cluster cluster, COORDINATORAPP coord,
+        Storage storage) throws FalconException {
+        SYNCDATASET outputDataset = (SYNCDATASET)coord.getDatasets().getDatasetOrAsyncDataset().get(1);
+
+        String uriTemplate = storage.getUriTemplate(LocationType.DATA);
+        if (storage.getType() == Storage.TYPE.TABLE) {
+            uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+        }
+        outputDataset.setUriTemplate(uriTemplate);
+
+        setDatasetValues(outputDataset, cluster);
+    }
+
+    private void setDatasetValues(SYNCDATASET dataset, Cluster cluster) {
+        dataset.setInitialInstance(SchemaHelper.formatDateUTC(
+            FeedHelper.getCluster(entity, cluster.getName()).getValidity().getStart()));
+        dataset.setTimezone(entity.getTimezone().getID());
+        dataset.setFrequency("${coord:" + entity.getFrequency().toString() + "}");
+    }
+
+    private long getReplicationDelayInMillis(Cluster srcCluster) throws FalconException {
+        Frequency replicationDelay = FeedHelper.getCluster(entity, srcCluster.getName()).getDelay();
+        long delayInMillis=0;
+        if (replicationDelay != null) {
+            delayInMillis = ExpressionHelper.get().evaluate(
+                replicationDelay.toString(), Long.class);
+        }
+
+        return delayInMillis;
+    }
+
+    private Date getStartDate(Cluster cluster, long replicationDelayInMillis) {
+        Date startDate = FeedHelper.getCluster(entity, cluster.getName()).getValidity().getStart();
+        return replicationDelayInMillis == 0 ? startDate : new Date(startDate.getTime() + replicationDelayInMillis);
+    }
+
+    private Date getEndDate(Cluster cluster) {
+        return FeedHelper.getCluster(entity, cluster.getName()).getValidity().getEnd();
+    }
+
+    private boolean noOverlapExists(Date sourceStartDate, Date sourceEndDate,
+        Date targetStartDate, Date targetEndDate) {
+        return sourceStartDate.after(targetEndDate) || targetStartDate.after(sourceEndDate);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
new file mode 100644
index 0000000..00fab99
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.feed;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Properties;
+
+/**
+ * Builds feed replication workflow, one per source-target cluster combination.
+ */
+public class FeedReplicationWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Feed> {
+    private static final String REPLICATION_WF_TEMPLATE = "/workflow/replication-workflow.xml";
+    private static final String SOURCE_HIVE_CREDENTIAL_NAME = "falconSourceHiveAuth";
+    private static final String TARGET_HIVE_CREDENTIAL_NAME = "falconTargetHiveAuth";
+
+    public FeedReplicationWorkflowBuilder(Feed entity) {
+        super(entity, Tag.REPLICATION);
+    }
+
+    @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
+        WORKFLOWAPP workflow = getWorkflow(REPLICATION_WF_TEMPLATE);
+        Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, buildPath.getName());
+        String wfName = EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString();
+        workflow.setName(wfName);
+
+        addLibExtensionsToWorkflow(cluster, workflow, Tag.REPLICATION);
+
+        addOozieRetries(workflow);
+
+        if (isTableStorageType(cluster)) {
+            setupHiveCredentials(cluster, srcCluster, workflow);
+        }
+
+        marshal(cluster, workflow, buildPath);
+
+        return getProperties(buildPath, wfName);
+    }
+
+    private void setupHiveCredentials(Cluster targetCluster, Cluster sourceCluster, WORKFLOWAPP workflowApp) {
+        if (isSecurityEnabled) {
+            // add hcatalog credentials for secure mode and add a reference to each action
+            addHCatalogCredentials(workflowApp, sourceCluster, SOURCE_HIVE_CREDENTIAL_NAME);
+            addHCatalogCredentials(workflowApp, targetCluster, TARGET_HIVE_CREDENTIAL_NAME);
+        }
+
+        // hive-site.xml file is created later in coordinator initialization but
+        // actions are set to point to that here
+
+        for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
+            if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+                continue;
+            }
+
+            org.apache.falcon.oozie.workflow.ACTION action =
+                (org.apache.falcon.oozie.workflow.ACTION) object;
+            String actionName = action.getName();
+            if ("recordsize".equals(actionName)) {
+                // add reference to hive-site conf to each action
+                action.getJava().setJobXml("${wf:appPath()}/conf/falcon-source-hive-site.xml");
+
+                if (isSecurityEnabled) { // add a reference to credential in the action
+                    action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
+                }
+            } else if ("table-export".equals(actionName)) {
+                if (isSecurityEnabled) { // add a reference to credential in the action
+                    action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
+                }
+            } else if ("table-import".equals(actionName)) {
+                if (isSecurityEnabled) { // add a reference to credential in the action
+                    action.setCred(TARGET_HIVE_CREDENTIAL_NAME);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
new file mode 100644
index 0000000..4393c94
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.feed;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.messaging.EntityInstanceMessage.EntityOps;
+import org.apache.falcon.oozie.OozieCoordinatorBuilder;
+import org.apache.falcon.oozie.OozieEntityBuilder;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.coordinator.ACTION;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Builds feed retention coordinator.
+ */
+public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> {
+    public FeedRetentionCoordinatorBuilder(Feed entity) {
+        super(entity, Tag.RETENTION);
+    }
+
+    @Override public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+
+        if (feedCluster.getValidity().getEnd().before(new Date())) {
+            LOG.warn("Feed Retention is not applicable as Feed's end time for cluster {} is not in the future",
+                cluster.getName());
+            return null;
+        }
+
+        COORDINATORAPP coord = new COORDINATORAPP();
+        String coordName = getEntityName();
+        coord.setName(coordName);
+        coord.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
+        coord.setStart(SchemaHelper.formatDateUTC(new Date()));
+        coord.setTimezone(entity.getTimezone().getID());
+        TimeUnit timeUnit = entity.getFrequency().getTimeUnit();
+        if (timeUnit == TimeUnit.hours || timeUnit == TimeUnit.minutes) {
+            coord.setFrequency("${coord:hours(6)}");
+        } else {
+            coord.setFrequency("${coord:days(1)}");
+        }
+
+        Path coordPath = getBuildPath(buildPath);
+        Properties props = createCoordDefaultConfiguration(cluster, coordName);
+        props.put("timeZone", entity.getTimezone().getID());
+        props.put("frequency", entity.getFrequency().getTimeUnit().name());
+
+        final Storage storage = FeedHelper.createStorage(cluster, entity);
+        props.put("falconFeedStorageType", storage.getType().name());
+
+        String feedDataPath = storage.getUriTemplate();
+        props.put("feedDataPath",
+            feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+
+        props.put("limit", feedCluster.getRetention().getLimit().toString());
+
+        props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
+        props.put(ARG.feedNames.getPropName(), entity.getName());
+        props.put(ARG.feedInstancePaths.getPropName(), IGNORE);
+
+        props.put("falconInputFeeds", entity.getName());
+        props.put("falconInPaths", IGNORE);
+
+        props.putAll(FeedHelper.getUserWorkflowProperties("eviction"));
+
+        WORKFLOW workflow = new WORKFLOW();
+        Properties wfProp = OozieOrchestrationWorkflowBuilder.get(entity, Tag.RETENTION).build(cluster, coordPath);
+        workflow.setAppPath(getStoragePath(wfProp.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+        workflow.setConfiguration(getConfig(props));
+        ACTION action = new ACTION();
+        action.setWorkflow(workflow);
+
+        coord.setAction(action);
+
+        marshal(cluster, coord, coordPath);
+
+        return Arrays.asList(getProperties(coordPath, coordName));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
new file mode 100644
index 0000000..4a7f96b
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.feed;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Properties;
+
+/**
+ * Builds feed retention workflow.
+ */
+public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Feed> {
+    private static final String RETENTION_WF_TEMPLATE = "/workflow/retention-workflow.xml";
+
+    public FeedRetentionWorkflowBuilder(Feed entity) {
+        super(entity, Tag.DEFAULT);
+    }
+
+    @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
+        WORKFLOWAPP workflow = getWorkflow(RETENTION_WF_TEMPLATE);
+        String wfName = EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString();
+        workflow.setName(wfName);
+        addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION);
+        addOozieRetries(workflow);
+
+        if (isTableStorageType(cluster)) {
+            setupHiveCredentials(cluster, buildPath, workflow);
+        }
+
+        marshal(cluster, workflow, buildPath);
+        return getProperties(buildPath, wfName);
+    }
+
+    private void setupHiveCredentials(Cluster cluster, Path wfPath,
+        WORKFLOWAPP workflowApp) throws FalconException {
+        if (isSecurityEnabled) {
+            // add hcatalog credentials for secure mode and add a reference to each action
+            addHCatalogCredentials(workflowApp, cluster, HIVE_CREDENTIAL_NAME);
+        }
+
+        // create hive-site.xml file so actions can use it in the classpath
+        createHiveConfiguration(cluster, wfPath, ""); // no prefix since only one hive instance
+
+        for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
+            if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+                continue;
+            }
+
+            org.apache.falcon.oozie.workflow.ACTION action =
+                (org.apache.falcon.oozie.workflow.ACTION) object;
+            String actionName = action.getName();
+            if ("eviction".equals(actionName)) {
+                // add reference to hive-site conf to each action
+                action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
+
+                if (isSecurityEnabled) {
+                    // add a reference to credential in the action
+                    action.setCred(HIVE_CREDENTIAL_NAME);
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
new file mode 100644
index 0000000..79a1883
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.oozie.hive.CONFIGURATION.Property;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.CONFIGURATION;
+import org.apache.falcon.util.OozieUtils;
+import org.apache.hadoop.fs.Path;
+
+import javax.xml.bind.JAXBElement;
+import java.util.List;
+
+/**
+ * Builds orchestration workflow for process where engine is hive.
+ */
+public class HiveProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
+    public HiveProcessWorkflowBuilder(Process entity) {
+        super(entity);
+    }
+
+    @Override protected void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException {
+        if (!action.getName().equals("user-hive-job")) {
+            return;
+        }
+
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(action);
+        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+        Path userWfPath = ProcessHelper.getUserWorkflowPath(entity, cluster, buildPath);
+        hiveAction.setScript(getStoragePath(userWfPath));
+
+        addPrepareDeleteOutputPath(hiveAction);
+
+        final List<String> paramList = hiveAction.getParam();
+        addInputFeedsAsParams(paramList, cluster);
+        addOutputFeedsAsParams(paramList, cluster);
+
+        propagateEntityProperties(hiveAction);
+
+        // adds hive-site.xml in hive classpath
+        hiveAction.setJobXml("${wf:appPath()}/conf/hive-site.xml");
+
+        addArchiveForCustomJars(cluster, hiveAction.getArchive(), ProcessHelper.getUserLibPath(entity, cluster,
+            buildPath));
+
+        OozieUtils.marshalHiveAction(action, actionJaxbElement);
+    }
+
+    private void propagateEntityProperties(org.apache.falcon.oozie.hive.ACTION hiveAction) {
+        CONFIGURATION conf = new CONFIGURATION();
+        super.propagateEntityProperties(conf, hiveAction.getParam());
+
+        List<Property> hiveConf = hiveAction.getConfiguration().getProperty();
+        for (CONFIGURATION.Property prop : conf.getProperty()) {
+            Property hiveProp = new Property();
+            hiveProp.setName(prop.getName());
+            hiveProp.setValue(prop.getValue());
+            hiveConf.add(hiveProp);
+        }
+    }
+
+    private void addPrepareDeleteOutputPath(org.apache.falcon.oozie.hive.ACTION hiveAction) throws FalconException {
+
+        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList();
+        if (deleteOutputPathList.isEmpty()) {
+            return;
+        }
+
+        org.apache.falcon.oozie.hive.PREPARE prepare = new org.apache.falcon.oozie.hive.PREPARE();
+        List<org.apache.falcon.oozie.hive.DELETE> deleteList = prepare.getDelete();
+
+        for (String deletePath : deleteOutputPathList) {
+            org.apache.falcon.oozie.hive.DELETE delete = new org.apache.falcon.oozie.hive.DELETE();
+            delete.setPath(deletePath);
+            deleteList.add(delete);
+        }
+
+        if (!deleteList.isEmpty()) {
+            hiveAction.setPrepare(prepare);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
new file mode 100644
index 0000000..977d8c1
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Builds oozie workflow for process where the engine is oozie.
+ */
+public class OozieProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
+    public OozieProcessWorkflowBuilder(Process entity) {
+        super(entity);
+    }
+
+    @Override protected void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException {
+        if (!action.getName().equals("user-oozie-workflow")) {
+            return;
+        }
+        action.getSubWorkflow().setAppPath(getStoragePath(ProcessHelper.getUserWorkflowPath(entity, cluster,
+            buildPath)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
new file mode 100644
index 0000000..29f601d
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.DELETE;
+import org.apache.falcon.oozie.workflow.PIG;
+import org.apache.falcon.oozie.workflow.PREPARE;
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+
+/**
+ * Builds orchestration workflow for process where engine is pig.
+ */
+public class PigProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
+
+    public PigProcessWorkflowBuilder(Process entity) {
+        super(entity);
+    }
+
+    @Override protected void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException {
+        if (!action.getName().equals("user-pig-job")) {
+            return;
+        }
+
+        PIG pigAction = action.getPig();
+        Path userWfPath = ProcessHelper.getUserWorkflowPath(entity, cluster, buildPath);
+        pigAction.setScript(getStoragePath(userWfPath));
+
+        addPrepareDeleteOutputPath(pigAction);
+
+        final List<String> paramList = pigAction.getParam();
+        addInputFeedsAsParams(paramList, cluster);
+        addOutputFeedsAsParams(paramList, cluster);
+
+        propagateEntityProperties(pigAction.getConfiguration(), pigAction.getParam());
+
+        if (isTableStorageType(cluster)) { // adds hive-site.xml in pig classpath
+            pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
+        }
+
+        addArchiveForCustomJars(cluster, pigAction.getArchive(), ProcessHelper.getUserLibPath(entity, cluster,
+            buildPath));
+    }
+
+    private void addPrepareDeleteOutputPath(PIG pigAction) throws FalconException {
+        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList();
+        if (deleteOutputPathList.isEmpty()) {
+            return;
+        }
+
+        final PREPARE prepare = new PREPARE();
+        final List<DELETE> deleteList = prepare.getDelete();
+
+        for (String deletePath : deleteOutputPathList) {
+            final DELETE delete = new DELETE();
+            delete.setPath(deletePath);
+            deleteList.add(delete);
+        }
+
+        if (!deleteList.isEmpty()) {
+            pigAction.setPrepare(prepare);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
new file mode 100644
index 0000000..86cea93
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.oozie.OozieBundleBuilder;
+import org.apache.falcon.oozie.OozieCoordinatorBuilder;
+import org.apache.falcon.update.UpdateHelper;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.CoordinatorJob.Timeunit;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Builds oozie bundle for process - schedulable entity in oozie.
+ */
+public class ProcessBundleBuilder extends OozieBundleBuilder<Process> {
+
+    public ProcessBundleBuilder(Process entity) {
+        super(entity);
+    }
+
+    @Override protected Properties getAdditionalProperties(Cluster cluster) throws FalconException {
+        Properties properties = new Properties();
+        if (entity.getInputs() != null) {
+            for (Input in : entity.getInputs().getInputs()) {
+                if (in.isOptional()) {
+                    properties.putAll(getOptionalInputProperties(in, cluster.getName()));
+                }
+            }
+        }
+        return  properties;
+    }
+
+    private Properties getOptionalInputProperties(Input in, String clusterName) throws FalconException {
+        Properties properties = new Properties();
+        Feed feed = EntityUtil.getEntity(EntityType.FEED, in.getFeed());
+        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, clusterName);
+        String inName = in.getName();
+        properties.put(inName + ".frequency", String.valueOf(feed.getFrequency().getFrequency()));
+        properties.put(inName + ".freq_timeunit", mapToCoordTimeUnit(feed.getFrequency().getTimeUnit()).name());
+        properties.put(inName + ".timezone", feed.getTimezone().getID());
+        properties.put(inName + ".end_of_duration", Timeunit.NONE.name());
+        properties.put(inName + ".initial-instance", SchemaHelper.formatDateUTC(cluster.getValidity().getStart()));
+        properties.put(inName + ".done-flag", "notused");
+
+        String locPath = FeedHelper.createStorage(clusterName, feed)
+            .getUriTemplate(LocationType.DATA).replace('$', '%');
+        properties.put(inName + ".uri-template", locPath);
+
+        properties.put(inName + ".start-instance", in.getStart());
+        properties.put(inName + ".end-instance", in.getEnd());
+        return  properties;
+    }
+
+    private Timeunit mapToCoordTimeUnit(TimeUnit tu) {
+        switch (tu) {
+        case days:
+            return Timeunit.DAY;
+
+        case hours:
+            return Timeunit.HOUR;
+
+        case minutes:
+            return Timeunit.MINUTE;
+
+        case months:
+            return Timeunit.MONTH;
+
+        default:
+            throw new IllegalArgumentException("Unhandled time unit " + tu);
+        }
+    }
+
+    @Override protected Path getLibPath(Cluster cluster, Path buildPath) throws FalconException {
+        return ProcessHelper.getUserLibPath(entity, cluster, buildPath);
+    }
+
+    @Override protected List<Properties> doBuild(Cluster cluster, Path buildPath) throws FalconException {
+        copyUserWorkflow(cluster, buildPath);
+
+        return OozieCoordinatorBuilder.get(entity, Tag.DEFAULT).buildCoords(cluster, buildPath);
+    }
+
+    private void copyUserWorkflow(Cluster cluster, Path buildPath) throws FalconException {
+        try {
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+
+            //Copy user workflow and lib to staging dir
+            Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getPath()),
+                new Path(buildPath, EntityUtil.PROCESS_USER_DIR));
+            if (entity.getWorkflow().getLib() != null && fs.exists(new Path(entity.getWorkflow().getLib()))) {
+                checksums.putAll(UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getLib()),
+                    new Path(buildPath, EntityUtil.PROCESS_USERLIB_DIR)));
+            }
+
+            writeChecksums(fs, new Path(buildPath, EntityUtil.PROCESS_CHECKSUM_FILE), checksums);
+        } catch (IOException e) {
+            throw new FalconException("Failed to copy user workflow/lib", e);
+        }
+    }
+
+    private void writeChecksums(FileSystem fs, Path path, Map<String, String> checksums) throws FalconException {
+        try {
+            FSDataOutputStream stream = fs.create(path);
+            try {
+                for (Map.Entry<String, String> entry : checksums.entrySet()) {
+                    stream.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+                }
+            } finally {
+                stream.close();
+            }
+        } catch (IOException e) {
+            throw new FalconException("Failed to copy user workflow/lib", e);
+        }
+    }
+}