You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/11/12 12:05:23 UTC

[06/12] FALCON-85 Hive (HCatalog) integration. Contributed by Venkatesh Seetharam FALCON-163 Merge FALCON-85 branch into main line. Contributed by Venkatesh Seetharam

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index e4441cc..eed7fa3 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -21,14 +21,17 @@ package org.apache.falcon.converter;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
+import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.EngineType;
@@ -57,9 +60,20 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-
+import org.apache.xerces.dom.ElementNSImpl;
+import org.w3c.dom.Document;
+
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.dom.DOMResult;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 /**
  * This class maps the Falcon entities into Oozie artifacts.
@@ -108,18 +122,8 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         // Configuration
         Map<String, String> props = createCoordDefaultConfiguration(cluster, coordPath, coordName);
 
-        List<String> inputFeeds = new ArrayList<String>();
-        List<String> inputPaths = new ArrayList<String>();
-        initializeInputPaths(cluster, process, coord, props, inputFeeds, inputPaths); // inputs
-        props.put("falconInPaths", join(inputPaths.iterator(), '#'));
-        props.put("falconInputFeeds", join(inputFeeds.iterator(), '#'));
-
-        List<String> outputFeeds = new ArrayList<String>();
-        List<String> outputPaths = new ArrayList<String>();
-        initializeOutputPaths(cluster, process, coord, props, outputFeeds, outputPaths);  // outputs
-        // Output feed name and path for parent workflow
-        props.put(ARG.feedNames.getPropName(), join(outputFeeds.iterator(), ','));
-        props.put(ARG.feedInstancePaths.getPropName(), join(outputPaths.iterator(), ','));
+        initializeInputPaths(cluster, process, coord, props); // inputs
+        initializeOutputPaths(cluster, process, coord, props);  // outputs
 
         Workflow processWorkflow = process.getWorkflow();
         props.put("userWorkflowEngine", processWorkflow.getEngine().value());
@@ -177,13 +181,18 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
     }
 
     private void initializeInputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
-                                      Map<String, String> props, List<String> inputFeeds, List<String> inputPaths)
-        throws FalconException {
+                                      Map<String, String> props) throws FalconException {
         if (process.getInputs() == null) {
             return;
         }
 
+        List<String> inputFeeds = new ArrayList<String>();
+        List<String> inputPaths = new ArrayList<String>();
+        List<String> inputFeedStorageTypes = new ArrayList<String>();
         for (Input input : process.getInputs().getInputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+
             if (!input.isOptional()) {
                 if (coord.getDatasets() == null) {
                     coord.setDatasets(new DATASETS());
@@ -192,24 +201,43 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
                     coord.setInputEvents(new INPUTEVENTS());
                 }
 
-                SYNCDATASET syncdataset = createDataSet(input.getFeed(), cluster, input.getName(),
-                        LocationType.DATA);
+                SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(), LocationType.DATA);
                 coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
 
                 DATAIN datain = createDataIn(input);
                 coord.getInputEvents().getDataIn().add(datain);
             }
 
-            String inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
-            props.put(input.getName(), inputExpr);
+            String inputExpr = null;
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
+                props.put(input.getName(), inputExpr);
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                inputExpr = "${coord:dataIn('" + input.getName() + "')}";
+                propagateCatalogTableProperties(input, (CatalogStorage) storage, props);
+            }
+
             inputFeeds.add(input.getName());
             inputPaths.add(inputExpr);
+            inputFeedStorageTypes.add(storage.getType().name());
         }
+
+        propagateLateDataProperties(inputFeeds, inputPaths, inputFeedStorageTypes, props);
+    }
+
+    private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputPaths,
+                                             List<String> inputFeedStorageTypes, Map<String, String> props) {
+        // populate late data handler - should-record action
+        props.put("falconInputFeeds", join(inputFeeds.iterator(), '#'));
+        props.put("falconInPaths", join(inputPaths.iterator(), '#'));
+
+        // storage type for each corresponding feed sent as a param to LateDataHandler
+        // needed to compute usage based on storage type in LateDataHandler
+        props.put("falconInputFeedStorageTypes", join(inputFeedStorageTypes.iterator(), '#'));
     }
 
     private void initializeOutputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
-                                       Map<String, String> props, List<String> outputFeeds, List<String> outputPaths)
-        throws FalconException {
+                                       Map<String, String> props) throws FalconException {
         if (process.getOutputs() == null) {
             return;
         }
@@ -222,26 +250,60 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
             coord.setOutputEvents(new OUTPUTEVENTS());
         }
 
+        List<String> outputFeeds = new ArrayList<String>();
+        List<String> outputPaths = new ArrayList<String>();
         for (Output output : process.getOutputs().getOutputs()) {
-            SYNCDATASET syncdataset = createDataSet(output.getFeed(), cluster, output.getName(), LocationType.DATA);
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+
+            SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, output.getName(), LocationType.DATA);
             coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
 
             DATAOUT dataout = createDataOut(output);
             coord.getOutputEvents().getDataOut().add(dataout);
 
             String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
-            props.put(output.getName(), outputExpr);
             outputFeeds.add(output.getName());
             outputPaths.add(outputExpr);
 
-            // stats and meta paths
-            createOutputEvent(output.getFeed(), output.getName(), cluster, "stats",
-                    LocationType.STATS, coord, props, output.getInstance());
-            createOutputEvent(output.getFeed(), output.getName(), cluster, "meta",
-                    LocationType.META, coord, props, output.getInstance());
-            createOutputEvent(output.getFeed(), output.getName(), cluster, "tmp",
-                    LocationType.TMP, coord, props, output.getInstance());
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                props.put(output.getName(), outputExpr);
+
+                propagateFileSystemProperties(output, feed, cluster, coord, storage, props);
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                propagateCatalogTableProperties(output, (CatalogStorage) storage, props);
+            }
+        }
+
+        // Output feed name and path for parent workflow
+        props.put(ARG.feedNames.getPropName(), join(outputFeeds.iterator(), ','));
+        props.put(ARG.feedInstancePaths.getPropName(), join(outputPaths.iterator(), ','));
+    }
+
+    private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
+                                      String datasetName, LocationType locationType) throws FalconException {
+
+        SYNCDATASET syncdataset = new SYNCDATASET();
+        syncdataset.setName(datasetName);
+        syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+
+        String uriTemplate = storage.getUriTemplate(locationType);
+        if (storage.getType() == Storage.TYPE.TABLE) {
+            uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+        }
+        syncdataset.setUriTemplate(uriTemplate);
+
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
+        syncdataset.setTimezone(feed.getTimezone().getID());
+
+        if (feed.getAvailabilityFlag() == null) {
+            syncdataset.setDoneFlag("");
+        } else {
+            syncdataset.setDoneFlag(feed.getAvailabilityFlag());
         }
+
+        return syncdataset;
     }
 
     private DATAOUT createDataOut(Output output) {
@@ -261,25 +323,71 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         return datain;
     }
 
-    //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
-    private void createOutputEvent(String feed, String name, Cluster cluster,
-                                   String type, LocationType locType, COORDINATORAPP coord,
-                                   Map<String, String> props, String instance) throws FalconException {
-        SYNCDATASET dataset = createDataSet(feed, cluster, name + type,
-                locType);
+    private void propagateFileSystemProperties(Output output, Feed feed, Cluster cluster, COORDINATORAPP coord,
+                                               Storage storage, Map<String, String> props)
+        throws FalconException {
+
+        // stats and meta paths
+        createOutputEvent(output, feed, cluster, LocationType.STATS, coord, props, storage);
+        createOutputEvent(output, feed, cluster, LocationType.META, coord, props, storage);
+        createOutputEvent(output, feed, cluster, LocationType.TMP, coord, props, storage);
+    }
+
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    private void createOutputEvent(Output output, Feed feed, Cluster cluster, LocationType locType,
+                                   COORDINATORAPP coord, Map<String, String> props, Storage storage)
+        throws FalconException {
+
+        String name = output.getName();
+        String type = locType.name().toLowerCase();
+
+        SYNCDATASET dataset = createDataSet(feed, cluster, storage, name + type, locType);
         coord.getDatasets().getDatasetOrAsyncDataset().add(dataset);
+
         DATAOUT dataout = new DATAOUT();
-        if (coord.getOutputEvents() == null) {
-            coord.setOutputEvents(new OUTPUTEVENTS());
-        }
         dataout.setName(name + type);
         dataout.setDataset(name + type);
-        dataout.setInstance(getELExpression(instance));
-        coord.getOutputEvents().getDataOut().add(dataout);
+        dataout.setInstance(getELExpression(output.getInstance()));
+
+        OUTPUTEVENTS outputEvents = coord.getOutputEvents();
+        if (outputEvents == null) {
+            outputEvents = new OUTPUTEVENTS();
+            coord.setOutputEvents(outputEvents);
+        }
+        outputEvents.getDataOut().add(dataout);
+
         String outputExpr = "${coord:dataOut('" + name + type + "')}";
         props.put(name + "." + type, outputExpr);
     }
-    //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+    private void propagateCommonCatalogTableProperties(CatalogStorage tableStorage,
+                                                       Map<String, String> props, String prefix) {
+        props.put(prefix + "_storage_type", tableStorage.getType().name());
+        props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
+        props.put(prefix + "_database", tableStorage.getDatabase());
+        props.put(prefix + "_table", tableStorage.getTable());
+    }
+
+    private void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage,
+                                                 Map<String, String> props) {
+        String prefix = "falcon_" + input.getName();
+
+        propagateCommonCatalogTableProperties(tableStorage, props, prefix);
+
+        props.put(prefix + "_partition_filter_pig", "${coord:dataInPartitionFilter('input', 'pig')}");
+        props.put(prefix + "_partition_filter_hive", "${coord:dataInPartitionFilter('input', 'hive')}");
+        props.put(prefix + "_partition_filter_java", "${coord:dataInPartitionFilter('input', 'java')}");
+    }
+
+    private void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage,
+                                                 Map<String, String> props) {
+        String prefix = "falcon_" + output.getName();
+
+        propagateCommonCatalogTableProperties(tableStorage, props, prefix);
+
+        props.put(prefix + "_dataout_partitions", "${coord:dataOutPartitions('output')}");
+    }
 
     private String join(Iterator<String> itr, char sep) {
         String joinedStr = StringUtils.join(itr, sep);
@@ -289,29 +397,6 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         return joinedStr;
     }
 
-    private SYNCDATASET createDataSet(String feedName, Cluster cluster, String datasetName,
-                                      LocationType locationType) throws FalconException {
-        Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
-
-        SYNCDATASET syncdataset = new SYNCDATASET();
-        syncdataset.setName(datasetName);
-        String locPath = FeedHelper.getLocation(feed, locationType,
-                cluster.getName()).getPath();
-        syncdataset.setUriTemplate(new Path(locPath).toUri().getScheme() != null ? locPath : "${nameNode}"
-                + locPath);
-        syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
-
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
-        syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
-        syncdataset.setTimezone(feed.getTimezone().getID());
-        if (feed.getAvailabilityFlag() == null) {
-            syncdataset.setDoneFlag("");
-        } else {
-            syncdataset.setDoneFlag(feed.getAvailabilityFlag());
-        }
-        return syncdataset;
-    }
-
     private String getELExpression(String expr) {
         if (expr != null) {
             expr = "${" + expr + "}";
@@ -331,8 +416,8 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         return props;
     }
 
-    private void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
-                                String wfName, Path wfPath) throws FalconException {
+    protected void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
+                                  String wfName, Path wfPath) throws FalconException {
         WORKFLOWAPP wfApp = getWorkflowTemplate(DEFAULT_WF_TEMPLATE);
         wfApp.setName(wfName);
         try {
@@ -353,33 +438,75 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
             if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
                 action.getSubWorkflow().setAppPath(storagePath);
             } else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
-                decoratePIGAction(cluster, process, processWorkflow, storagePath, action.getPig());
+                decoratePIGAction(cluster, process, processWorkflow, storagePath, action.getPig(), wfPath);
+            } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
+                decorateHiveAction(cluster, process, processWorkflow, storagePath, action, wfPath);
             }
         }
 
         marshal(cluster, wfApp, wfPath);
     }
 
-    private void decoratePIGAction(Cluster cluster, Process process, Workflow processWorkflow,
-                                   String storagePath, PIG pigAction) throws FalconException {
-
+    private void decoratePIGAction(Cluster cluster, Process process,
+                                   Workflow processWorkflow, String storagePath,
+                                   PIG pigAction, Path wfPath) throws FalconException {
         pigAction.setScript(storagePath);
 
         addPrepareDeleteOutputPath(process, pigAction);
 
-        addInputOutputFeedsAsParams(pigAction, process);
+        final List<String> paramList = pigAction.getParam();
+        addInputFeedsAsParams(paramList, process, cluster, EngineType.PIG.name().toLowerCase());
+        addOutputFeedsAsParams(paramList, process, cluster);
 
         propagateProcessProperties(pigAction, process);
 
-        addArchiveForCustomJars(cluster, processWorkflow, pigAction);
+        Storage.TYPE storageType = getStorageType(cluster, process);
+        if (Storage.TYPE.TABLE == storageType) {
+            // adds hive-site.xml in pig classpath
+            setupHiveConfiguration(cluster, wfPath, ""); // DO NOT ADD PREFIX!!!
+            pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
+        }
+
+        addArchiveForCustomJars(cluster, processWorkflow, pigAction.getArchive());
     }
 
-    private void addPrepareDeleteOutputPath(Process process, PIG pigAction) {
+    private void decorateHiveAction(Cluster cluster, Process process,
+                                    Workflow processWorkflow, String storagePath,
+                                    ACTION wfAction, Path wfPath) throws FalconException {
+
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = unMarshalHiveAction(wfAction);
+        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+        hiveAction.setScript(storagePath);
+
+        addPrepareDeleteOutputPath(process, hiveAction);
+
+        final List<String> paramList = hiveAction.getParam();
+        addInputFeedsAsParams(paramList, process, cluster, EngineType.HIVE.name().toLowerCase());
+        addOutputFeedsAsParams(paramList, process, cluster);
+
+        propagateProcessProperties(hiveAction, process);
+
+        setupHiveConfiguration(cluster, wfPath, "falcon-");
+
+        addArchiveForCustomJars(cluster, processWorkflow, hiveAction.getArchive());
+
+        marshalHiveAction(wfAction, actionJaxbElement);
+    }
+
+    private void addPrepareDeleteOutputPath(Process process,
+                                            PIG pigAction) throws FalconException {
+        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
+        if (deleteOutputPathList.isEmpty()) {
+            return;
+        }
+
         final PREPARE prepare = new PREPARE();
         final List<DELETE> deleteList = prepare.getDelete();
-        for (Output output : process.getOutputs().getOutputs()) {
+
+        for (String deletePath : deleteOutputPathList) {
             final DELETE delete = new DELETE();
-            delete.setPath("${wf:conf('" + output.getName() + "')}");
+            delete.setPath(deletePath);
             deleteList.add(delete);
         }
 
@@ -388,14 +515,83 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         }
     }
 
-    private void addInputOutputFeedsAsParams(PIG pigAction, Process process) throws FalconException {
-        final List<String> paramList = pigAction.getParam();
+    private void addPrepareDeleteOutputPath(Process process,
+                                            org.apache.falcon.oozie.hive.ACTION hiveAction)
+        throws FalconException {
+
+        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
+        if (deleteOutputPathList.isEmpty()) {
+            return;
+        }
+
+        org.apache.falcon.oozie.hive.PREPARE prepare = new org.apache.falcon.oozie.hive.PREPARE();
+        List<org.apache.falcon.oozie.hive.DELETE> deleteList = prepare.getDelete();
+
+        for (String deletePath : deleteOutputPathList) {
+            org.apache.falcon.oozie.hive.DELETE delete = new org.apache.falcon.oozie.hive.DELETE();
+            delete.setPath(deletePath);
+            deleteList.add(delete);
+        }
+
+        if (!deleteList.isEmpty()) {
+            hiveAction.setPrepare(prepare);
+        }
+    }
+
+    private List<String> getPrepareDeleteOutputPathList(Process process) throws FalconException {
+        final List<String> deleteList = new ArrayList<String>();
+        for (Output output : process.getOutputs().getOutputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+
+            if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
+                continue; // prepare delete only applies to FileSystem storage
+            }
+
+            deleteList.add("${wf:conf('" + output.getName() + "')}");
+        }
+
+        return deleteList;
+    }
+
+    private void addInputFeedsAsParams(List<String> paramList, Process process, Cluster cluster,
+                                       String engineType) throws FalconException {
         for (Input input : process.getInputs().getInputs()) {
-            paramList.add(input.getName() + "=${" + input.getName() + "}");
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+
+            final String inputName = input.getName();
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                paramList.add(inputName + "=${" + inputName + "}"); // no prefix for backwards compatibility
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                final String paramName = "falcon_" + inputName; // prefix 'falcon' for new params
+                Map<String, String> props = new HashMap<String, String>();
+                propagateCommonCatalogTableProperties((CatalogStorage) storage, props, paramName);
+                for (String key : props.keySet()) {
+                    paramList.add(key + "=${wf:conf('" + key + "')}");
+                }
+
+                paramList.add(paramName + "_filter=${wf:conf('"
+                        + paramName + "_partition_filter_" + engineType + "')}");
+            }
         }
+    }
 
+    private void addOutputFeedsAsParams(List<String> paramList, Process process,
+                                        Cluster cluster) throws FalconException {
         for (Output output : process.getOutputs().getOutputs()) {
-            paramList.add(output.getName() + "=${" + output.getName() + "}");
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+            Storage storage = FeedHelper.createStorage(cluster, feed);
+
+            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+                final String outputName = output.getName();  // no prefix for backwards compatibility
+                paramList.add(outputName + "=${" + outputName + "}");
+            } else if (storage.getType() == Storage.TYPE.TABLE) {
+                Map<String, String> props = new HashMap<String, String>();
+                propagateCatalogTableProperties(output, (CatalogStorage) storage, props); // prefix is auto added
+                for (String key : props.keySet()) {
+                    paramList.add(key + "=${wf:conf('" + key + "')}");
+                }
+            }
         }
     }
 
@@ -408,24 +604,79 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         // Propagate user defined properties to job configuration
         final List<org.apache.falcon.oozie.workflow.CONFIGURATION.Property> configuration =
                 pigAction.getConfiguration().getProperty();
+
+        // Propagate user defined properties to pig script as macros
+        // passed as parameters -p name=value that can be accessed as $name
+        final List<String> paramList = pigAction.getParam();
+
         for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
             org.apache.falcon.oozie.workflow.CONFIGURATION.Property configProperty =
                     new org.apache.falcon.oozie.workflow.CONFIGURATION.Property();
             configProperty.setName(property.getName());
             configProperty.setValue(property.getValue());
             configuration.add(configProperty);
+
+            paramList.add(property.getName() + "=" + property.getValue());
         }
+    }
+
+    private void propagateProcessProperties(org.apache.falcon.oozie.hive.ACTION hiveAction, Process process) {
+        org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
+        if (processProperties == null) {
+            return;
+        }
+
+        // Propagate user defined properties to job configuration
+        final List<org.apache.falcon.oozie.hive.CONFIGURATION.Property> configuration =
+                hiveAction.getConfiguration().getProperty();
 
         // Propagate user defined properties to pig script as macros
         // passed as parameters -p name=value that can be accessed as $name
-        final List<String> paramList = pigAction.getParam();
+        final List<String> paramList = hiveAction.getParam();
+
         for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
+            org.apache.falcon.oozie.hive.CONFIGURATION.Property configProperty =
+                    new org.apache.falcon.oozie.hive.CONFIGURATION.Property();
+            configProperty.setName(property.getName());
+            configProperty.setValue(property.getValue());
+            configuration.add(configProperty);
+
             paramList.add(property.getName() + "=" + property.getValue());
         }
     }
 
+    private Storage.TYPE getStorageType(Cluster cluster, Process process) throws FalconException {
+        Storage.TYPE storageType = Storage.TYPE.FILESYSTEM;
+        if (process.getInputs() == null) {
+            return storageType;
+        }
+
+        for (Input input : process.getInputs().getInputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+            storageType = FeedHelper.getStorageType(feed, cluster);
+            if (Storage.TYPE.TABLE == storageType) {
+                break;
+            }
+        }
+
+        return storageType;
+    }
+
+    // creates hive-site.xml configuration in conf dir.
+    private void setupHiveConfiguration(Cluster cluster, Path wfPath,
+                                        String prefix) throws FalconException {
+        String catalogUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
+        try {
+            FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+            Path confPath = new Path(wfPath, "conf");
+            createHiveConf(fs, confPath, catalogUrl, prefix);
+        } catch (IOException e) {
+            throw new FalconException(e);
+        }
+    }
+
     private void addArchiveForCustomJars(Cluster cluster, Workflow processWorkflow,
-                                         PIG pigAction) throws FalconException {
+                                         List<String> archiveList) throws FalconException {
         String processWorkflowLib = processWorkflow.getLib();
         if (processWorkflowLib == null) {
             return;
@@ -435,7 +686,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         try {
             final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
             if (fs.isFile(libPath)) {  // File, not a Dir
-                pigAction.getArchive().add(processWorkflowLib);
+                archiveList.add(processWorkflowLib);
                 return;
             }
 
@@ -452,10 +703,34 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
             });
 
             for (FileStatus fileStatus : fileStatuses) {
-                pigAction.getArchive().add(fileStatus.getPath().toString());
+                archiveList.add(fileStatus.getPath().toString());
             }
         } catch (IOException e) {
             throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
         }
     }
+
+    @SuppressWarnings("unchecked")
+    protected JAXBElement<org.apache.falcon.oozie.hive.ACTION> unMarshalHiveAction(ACTION wfAction) {
+        try {
+            Unmarshaller unmarshaller = HIVE_ACTION_JAXB_CONTEXT.createUnmarshaller();
+            unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
+            return (JAXBElement<org.apache.falcon.oozie.hive.ACTION>)
+                    unmarshaller.unmarshal((ElementNSImpl) wfAction.getAny());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to unmarshall hive action.", e);
+        }
+    }
+
+    protected void marshalHiveAction(ACTION wfAction,
+                                     JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionjaxbElement) {
+        try {
+            DOMResult hiveActionDOM = new DOMResult();
+            Marshaller marshaller = HIVE_ACTION_JAXB_CONTEXT.createMarshaller();
+            marshaller.marshal(actionjaxbElement, hiveActionDOM);
+            wfAction.setAny(((Document) hiveActionDOM.getNode()).getDocumentElement());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to marshall hive action.", e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 3f70557..1329733 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -73,9 +73,9 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
         properties.put(inName + ".initial-instance", SchemaHelper.formatDateUTC(cluster.getValidity().getStart()));
         properties.put(inName + ".done-flag", "notused");
 
-        String locPath = FeedHelper.getLocation(feed, LocationType.DATA, clusterName).getPath().replace('$', '%');
-        properties.put(inName + ".uri-template",
-                new Path(locPath).toUri().getScheme() != null ? locPath : "${nameNode}" + locPath);
+        String locPath = FeedHelper.createStorage(clusterName, feed)
+                .getUriTemplate(LocationType.DATA).replace('$', '%');
+        properties.put(inName + ".uri-template", locPath);
 
         properties.put(inName + ".start-instance", in.getStart());
         properties.put(inName + ".end-instance", in.getEnd());

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/main/resources/config/workflow/process-parent-workflow.xml
----------------------------------------------------------------------
diff --git a/process/src/main/resources/config/workflow/process-parent-workflow.xml b/process/src/main/resources/config/workflow/process-parent-workflow.xml
index eeffdd5..494bf20 100644
--- a/process/src/main/resources/config/workflow/process-parent-workflow.xml
+++ b/process/src/main/resources/config/workflow/process-parent-workflow.xml
@@ -38,6 +38,15 @@
                     <name>oozie.launcher.mapred.job.priority</name>
                     <value>${jobPriority}</value>
                 </property>
+                <!-- HCatalog jars -->
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>hcatalog</value>
+                </property>
             </configuration>
             <main-class>org.apache.falcon.latedata.LateDataHandler</main-class>
             <arg>-out</arg>
@@ -46,6 +55,8 @@
             <arg>${falconInPaths}</arg>
             <arg>-falconInputFeeds</arg>
             <arg>${falconInputFeeds}</arg>
+            <arg>-falconInputFeedStorageTypes</arg>
+            <arg>${falconInputFeedStorageTypes}</arg>
             <capture-output/>
         </java>
         <ok to="user-workflow"/>
@@ -60,6 +71,9 @@
             <case to="user-pig-job">
                 ${userWorkflowEngine=="pig"}
             </case>
+            <case to="user-hive-job">
+                ${userWorkflowEngine=="hive"}
+            </case>
             <default to="user-oozie-workflow"/>
         </switch>
     </decision>
@@ -76,12 +90,40 @@
                     <name>oozie.launcher.mapred.job.priority</name>
                     <value>${jobPriority}</value>
                 </property>
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.pig</name>
+                    <value>pig,hcatalog</value>
+                </property>
             </configuration>
             <script>#USER_WF_PATH#</script>
         </pig>
         <ok to="succeeded-post-processing"/>
         <error to="failed-post-processing"/>
     </action>
+    <action name="user-hive-job">
+        <hive xmlns="uri:oozie:hive-action:0.2">
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <job-xml>${wf:appPath()}/conf/falcon-hive-site.xml</job-xml>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <script>#USER_WF_PATH#</script>
+        </hive>
+        <ok to="succeeded-post-processing"/>
+        <error to="failed-post-processing"/>
+    </action>
     <action name='user-oozie-workflow'>
         <sub-workflow>
             <app-path>#USER_WF_PATH#</app-path>
@@ -90,7 +132,6 @@
         <ok to="succeeded-post-processing"/>
         <error to="failed-post-processing"/>
     </action>
-
     <action name='succeeded-post-processing'>
         <java>
             <job-tracker>${jobTracker}</job-tracker>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index 9d13d79..b6f03e5 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -18,28 +18,14 @@
 
 package org.apache.falcon.converter;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.InputStreamReader;
-import java.util.Collections;
-import java.util.List;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.stream.StreamSource;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-
+import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
@@ -48,6 +34,8 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Validity;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
@@ -56,17 +44,37 @@ import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.DECISION;
+import org.apache.falcon.oozie.workflow.PIG;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 /**
  * Test for the Falcon entities mapping into Oozie artifacts.
  */
@@ -89,6 +97,7 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         ConfigurationStore store = ConfigurationStore.get();
         Cluster cluster = store.get(EntityType.CLUSTER, "corp");
         ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
+        ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).setEndpoint("thrift://localhost:49083");
         fs = new Path(hdfsUrl).getFileSystem(new Configuration());
         fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/PROCESS/ext.jar")).close();
 
@@ -145,19 +154,20 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         ConfigurationStore store = ConfigurationStore.get();
         Feed feed = store.get(EntityType.FEED, process.getInputs().getInputs().get(0).getFeed());
         SYNCDATASET ds = (SYNCDATASET) coord.getDatasets().getDatasetOrAsyncDataset().get(0);
-        assertEquals(SchemaHelper.formatDateUTC(feed.getClusters().getClusters().get(0).getValidity().getStart()),
-                ds.getInitialInstance());
+
+        final org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
+        assertEquals(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()), ds.getInitialInstance());
         assertEquals(feed.getTimezone().getID(), ds.getTimezone());
         assertEquals("${coord:" + feed.getFrequency().toString() + "}", ds.getFrequency());
         assertEquals("", ds.getDoneFlag());
-        assertEquals(ds.getUriTemplate(), "${nameNode}" + FeedHelper.getLocation(feed, LocationType.DATA,
-                feed.getClusters().getClusters().get(0).getName()).getPath());
+        assertEquals(ds.getUriTemplate(),
+                FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA));
+
+        HashMap<String, String> props = new HashMap<String, String>();
         for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            if (prop.getName().equals("mapred.job.priority")) {
-                assertEquals(prop.getValue(), "LOW");
-                break;
-            }
+            props.put(prop.getName(), prop.getValue());
         }
+        assertEquals(props.get("mapred.job.priority"), "LOW");
 
         assertLibExtensions(coord);
     }
@@ -194,16 +204,168 @@ public class OozieProcessMapperTest extends AbstractTestBase {
 
         List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
 
-        ACTION pigAction = (ACTION) decisionOrForkOrJoin.get(3);
-        Assert.assertEquals("user-pig-job", pigAction.getName());
-        Assert.assertEquals("${nameNode}/apps/pig/id.pig", pigAction.getPig().getScript());
-        Assert.assertEquals(Collections.EMPTY_LIST, pigAction.getPig().getArchive());
+        ACTION pigActionNode = (ACTION) decisionOrForkOrJoin.get(3);
+        Assert.assertEquals("user-pig-job", pigActionNode.getName());
 
-        ACTION oozieAction = (ACTION) decisionOrForkOrJoin.get(4);
+        final PIG pigAction = pigActionNode.getPig();
+        Assert.assertEquals("${nameNode}/apps/pig/id.pig", pigAction.getScript());
+        Assert.assertNotNull(pigAction.getPrepare());
+        Assert.assertEquals(1, pigAction.getPrepare().getDelete().size());
+        Assert.assertFalse(pigAction.getParam().isEmpty());
+        Assert.assertEquals(5, pigAction.getParam().size());
+        Assert.assertEquals(Collections.EMPTY_LIST, pigAction.getArchive());
+        Assert.assertTrue(pigAction.getFile().size() > 0);
+
+        ACTION oozieAction = (ACTION) decisionOrForkOrJoin.get(5);
         Assert.assertEquals("user-oozie-workflow", oozieAction.getName());
         Assert.assertEquals("#USER_WF_PATH#", oozieAction.getSubWorkflow().getAppPath());
     }
 
+    @Test
+    public void testHiveProcessMapper() throws Exception {
+        URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
+        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, inFeed);
+
+        resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, outFeed);
+
+        resource = this.getClass().getResource("/config/process/hive-process.xml");
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
+        OozieProcessMapper mapper = new OozieProcessMapper(process);
+        Path bundlePath = new Path("/tmp/seetharam", EntityUtil.getStagingPath(process));
+        mapper.map(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        // verify table props
+        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
+        for (Map.Entry<String, String> entry : props.entrySet()) {
+            if (expected.containsKey(entry.getKey())) {
+                Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
+            }
+        }
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+        testParentWorkflow(process, parentWorkflow);
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+        ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
+        Assert.assertEquals("user-hive-job", hiveNode.getName());
+
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = mapper.unMarshalHiveAction(hiveNode);
+        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+        Assert.assertEquals("${nameNode}/apps/hive/script.hql", hiveAction.getScript());
+        Assert.assertNull(hiveAction.getPrepare());
+        Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
+        Assert.assertFalse(hiveAction.getParam().isEmpty());
+        Assert.assertEquals(10, hiveAction.getParam().size());
+    }
+
+    @Test
+    public void testProcessMapperForTableStorage() throws Exception {
+        URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
+        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, inFeed);
+
+        resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, outFeed);
+
+        resource = this.getClass().getResource("/config/process/pig-process-table.xml");
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
+        OozieProcessMapper mapper = new OozieProcessMapper(process);
+        Path bundlePath = new Path("/", EntityUtil.getStagingPath(process));
+        mapper.map(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        // verify table props
+        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
+        for (Map.Entry<String, String> entry : props.entrySet()) {
+            if (expected.containsKey(entry.getKey())) {
+                Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
+            }
+        }
+
+        // verify the late data params
+        Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getName());
+        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+    }
+
+    private Map<String, String> getExpectedProperties(Feed inFeed, Feed outFeed, Process process,
+                                                      Cluster cluster) throws FalconException {
+        Map<String, String> expected = new HashMap<String, String>();
+        for (Input input : process.getInputs().getInputs()) {
+            CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, inFeed);
+            propagateStorageProperties(input.getName(), storage, expected);
+        }
+
+        for (Output output : process.getOutputs().getOutputs()) {
+            CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, outFeed);
+            propagateStorageProperties(output.getName(), storage, expected);
+        }
+
+        return expected;
+    }
+
+    private void propagateStorageProperties(String feedName, CatalogStorage tableStorage,
+                                            Map<String, String> props) {
+        String prefix = "falcon_" + feedName;
+        props.put(prefix + "_storage_type", tableStorage.getType().name());
+        props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
+        props.put(prefix + "_database", tableStorage.getDatabase());
+        props.put(prefix + "_table", tableStorage.getTable());
+
+        if (prefix.equals("input")) {
+            props.put(prefix + "_partition_filter_pig", "${coord:dataInPartitionFilter('input', 'pig')}");
+            props.put(prefix + "_partition_filter_hive", "${coord:dataInPartitionFilter('input', 'hive')}");
+            props.put(prefix + "_partition_filter_java", "${coord:dataInPartitionFilter('input', 'java')}");
+        } else if (prefix.equals("output")) {
+            props.put(prefix + "_dataout_partitions", "${coord:dataOutPartitions('output')}");
+        }
+    }
+
+    @SuppressWarnings("unchecked")
     private void assertLibExtensions(COORDINATORAPP coord) throws Exception {
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
         JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
@@ -261,8 +423,11 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
         Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
         Assert.assertEquals("user-workflow", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
-        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(5)).getName());
-        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
+        Assert.assertEquals("user-pig-job", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
+        Assert.assertEquals("user-hive-job", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
+        Assert.assertEquals("user-oozie-workflow", ((ACTION) decisionOrForkOrJoin.get(5)).getName());
+        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
+        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
     }
 
     private COORDINATORAPP getCoordinator(Path path) throws Exception {
@@ -277,16 +442,14 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         return jaxbBundle.getValue();
     }
 
+    @SuppressWarnings("unchecked")
     private WORKFLOWAPP getParentWorkflow(Path path) throws Exception {
         String workflow = readFile(new Path(path, "workflow.xml"));
 
-        Unmarshaller unmarshaller = JAXBContext.newInstance(WORKFLOWAPP.class).createUnmarshaller();
-        SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
-        Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-workflow-0.3.xsd"));
-        unmarshaller.setSchema(schema);
-        JAXBElement<WORKFLOWAPP> jaxbWorkflow = unmarshaller.unmarshal(
-                new StreamSource(new ByteArrayInputStream(workflow.trim().getBytes())), WORKFLOWAPP.class);
-        return jaxbWorkflow.getValue();
+        JAXBContext wfAppContext = JAXBContext.newInstance(WORKFLOWAPP.class);
+        Unmarshaller unmarshaller = wfAppContext.createUnmarshaller();
+        return ((JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
+                new StreamSource(new ByteArrayInputStream(workflow.trim().getBytes())))).getValue();
     }
 
     private BUNDLEAPP getBundle(Path path) throws Exception {
@@ -312,8 +475,11 @@ public class OozieProcessMapperTest extends AbstractTestBase {
     }
 
     @Override
-    @AfterClass
+    @AfterMethod
     public void cleanup() throws Exception {
         super.cleanup();
+        ConfigurationStore.get().remove(EntityType.PROCESS, "table-process");
+        ConfigurationStore.get().remove(EntityType.FEED, "clicks-raw-table");
+        ConfigurationStore.get().remove(EntityType.FEED, "clicks-summary-table");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/test/resources/config/feed/hive-table-feed-out.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/feed/hive-table-feed-out.xml b/process/src/test/resources/config/feed/hive-table-feed-out.xml
new file mode 100644
index 0000000..bd93a01
--- /dev/null
+++ b/process/src/test/resources/config/feed/hive-table-feed-out.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks summary table " name="clicks-summary-table" xmlns="uri:falcon:feed:0.1">
+    <groups>online,bi</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="corp" type="source" partition="*/${cluster.colo}">
+            <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+        <cluster name="backupCluster" type="target">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(6)" action="archive"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <table uri="catalog:default:clicks-summary#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/test/resources/config/feed/hive-table-feed.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/feed/hive-table-feed.xml b/process/src/test/resources/config/feed/hive-table-feed.xml
new file mode 100644
index 0000000..66d0742
--- /dev/null
+++ b/process/src/test/resources/config/feed/hive-table-feed.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log table " name="clicks-raw-table" xmlns="uri:falcon:feed:0.1">
+    <groups>online,bi</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="corp" type="source" partition="*/${cluster.colo}">
+            <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+        <cluster name="backupCluster" type="target">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(6)" action="archive"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <table uri="catalog:default:clicks#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/test/resources/config/process/hive-process.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/hive-process.xml b/process/src/test/resources/config/process/hive-process.xml
new file mode 100644
index 0000000..4dac8e9
--- /dev/null
+++ b/process/src/test/resources/config/process/hive-process.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="hive-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <workflow engine="hive" path="/apps/hive/script.hql"/>
+
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/test/resources/config/process/pig-process-table.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/pig-process-table.xml b/process/src/test/resources/config/process/pig-process-table.xml
new file mode 100644
index 0000000..37aca10
--- /dev/null
+++ b/process/src/test/resources/config/process/pig-process-table.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="table-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <workflow engine="pig" path="/apps/pig/id.pig"/>
+
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/process-0.1.xml b/process/src/test/resources/config/process/process-0.1.xml
index 91d5e0f..975d1a4 100644
--- a/process/src/test/resources/config/process/process-0.1.xml
+++ b/process/src/test/resources/config/process/process-0.1.xml
@@ -40,6 +40,6 @@
 
     <late-process policy="exp-backoff" delay="hours(1)">
         <late-input input="impression" workflow-path="hdfs://impression/late/workflow"/>
-        <late-input input="clicks" workflow-path="hdfs://clicks/late/workflow"/>
+        <late-input input="click" workflow-path="hdfs://clicks/late/workflow"/>
     </late-process>
 </process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/replication/pom.xml
----------------------------------------------------------------------
diff --git a/replication/pom.xml b/replication/pom.xml
index 200cb2f..07175e6 100644
--- a/replication/pom.xml
+++ b/replication/pom.xml
@@ -68,6 +68,11 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-common</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index 5c51b5f..17227bf 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -18,6 +18,7 @@
 package org.apache.falcon.replication;
 
 import org.apache.commons.cli.*;
+import org.apache.falcon.entity.Storage;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -48,8 +49,8 @@ public class FeedReplicator extends Configured implements Tool {
 
     @Override
     public int run(String[] args) throws Exception {
-
-        DistCpOptions options = getDistCpOptions(args);
+        CommandLine cmd = getCommand(args);
+        DistCpOptions options = getDistCpOptions(cmd);
 
         Configuration conf = this.getConf();
         // inject wf configs
@@ -60,10 +61,68 @@ public class FeedReplicator extends Configured implements Tool {
                 + confPath.getFileSystem(conf).exists(confPath));
         conf.addResource(confPath);
 
-        DistCp distCp = new CustomReplicator(conf, options);
+        String falconFeedStorageType = cmd.getOptionValue("falconFeedStorageType").trim();
+        Storage.TYPE feedStorageType = Storage.TYPE.valueOf(falconFeedStorageType);
+
+        DistCp distCp = (feedStorageType == Storage.TYPE.FILESYSTEM)
+                ? new CustomReplicator(conf, options)
+                : new DistCp(conf, options);
         LOG.info("Started DistCp");
         distCp.execute();
 
+        if (feedStorageType == Storage.TYPE.FILESYSTEM) {
+            executePostProcessing(options);  // this only applies for FileSystem Storage.
+        }
+
+        LOG.info("Completed DistCp");
+        return 0;
+    }
+
+    protected CommandLine getCommand(String[] args) throws ParseException {
+        Options options = new Options();
+        Option opt = new Option("maxMaps", true,
+                "max number of maps to use for this copy");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("sourcePaths", true,
+                "comma separtated list of source paths to be copied");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("targetPath", true, "target path");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("falconFeedStorageType", true, "feed storage type");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return new GnuParser().parse(options, args);
+    }
+
+    protected DistCpOptions getDistCpOptions(CommandLine cmd) {
+        String[] paths = cmd.getOptionValue("sourcePaths").trim().split(",");
+        List<Path> srcPaths = getPaths(paths);
+        String trgPath = cmd.getOptionValue("targetPath").trim();
+
+        DistCpOptions distcpOptions = new DistCpOptions(srcPaths, new Path(trgPath));
+        distcpOptions.setSyncFolder(true);
+        distcpOptions.setBlocking(true);
+        distcpOptions.setMaxMaps(Integer.valueOf(cmd.getOptionValue("maxMaps")));
+
+        return distcpOptions;
+    }
+
+    private List<Path> getPaths(String[] paths) {
+        List<Path> listPaths = new ArrayList<Path>();
+        for (String path : paths) {
+            listPaths.add(new Path(path));
+        }
+        return listPaths;
+    }
+
+    private void executePostProcessing(DistCpOptions options) throws IOException {
         Path targetPath = options.getTargetPath();
         FileSystem fs = targetPath.getFileSystem(getConf());
         List<Path> inPaths = options.getSourcePaths();
@@ -87,8 +146,6 @@ public class FeedReplicator extends Configured implements Tool {
             LOG.info("No files present in path: "
                     + new Path(targetPath.toString() + "/" + fixedPath).toString());
         }
-        LOG.info("Completed DistCp");
-        return 0;
     }
 
     private String getFixedPath(String relativePath) throws IOException {
@@ -113,44 +170,4 @@ public class FeedReplicator extends Configured implements Tool {
         String result = resultBuffer.toString();
         return result.substring(0, result.lastIndexOf('/'));
     }
-
-    public DistCpOptions getDistCpOptions(String[] args) throws ParseException {
-        Options options = new Options();
-        Option opt;
-        opt = new Option("maxMaps", true,
-                "max number of maps to use for this copy");
-        opt.setRequired(true);
-        options.addOption(opt);
-
-        opt = new Option("sourcePaths", true,
-                "comma separtated list of source paths to be copied");
-        opt.setRequired(true);
-        options.addOption(opt);
-
-        opt = new Option("targetPath", true, "target path");
-        opt.setRequired(true);
-        options.addOption(opt);
-
-        CommandLine cmd = new GnuParser().parse(options, args);
-        String[] paths = cmd.getOptionValue("sourcePaths").trim().split(",");
-        List<Path> srcPaths = getPaths(paths);
-        String trgPath = cmd.getOptionValue("targetPath").trim();
-
-        DistCpOptions distcpOptions = new DistCpOptions(srcPaths, new Path(
-                trgPath));
-        distcpOptions.setSyncFolder(true);
-        distcpOptions.setBlocking(true);
-        distcpOptions
-                .setMaxMaps(Integer.valueOf(cmd.getOptionValue("maxMaps")));
-
-        return distcpOptions;
-    }
-
-    private List<Path> getPaths(String[] paths) {
-        List<Path> listPaths = new ArrayList<Path>();
-        for (String path : paths) {
-            listPaths.add(new Path(path));
-        }
-        return listPaths;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
----------------------------------------------------------------------
diff --git a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
index 67795b5..b8b39ad 100644
--- a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
+++ b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.falcon.replication;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.falcon.entity.Storage;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.tools.DistCpOptions;
 import org.testng.Assert;
@@ -38,11 +40,17 @@ public class FeedReplicatorTest {
          * <arg>-sourcePaths</arg><arg>${distcpSourcePaths}</arg>
          * <arg>-targetPath</arg><arg>${distcpTargetPaths}</arg>
          */
+        final String[] args = {
+            "true",
+            "-maxMaps", "5",
+            "-sourcePaths", "hdfs://localhost:8020/tmp/",
+            "-targetPath", "hdfs://localhost1:8020/tmp/",
+            "-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
+        };
+
         FeedReplicator replicator = new FeedReplicator();
-        DistCpOptions options = replicator.getDistCpOptions(new String[] {
-            "true", "-maxMaps", "5", "-sourcePaths",
-            "hdfs://localhost:8020/tmp/", "-targetPath",
-            "hdfs://localhost1:8020/tmp/", });
+        CommandLine cmd = replicator.getCommand(args);
+        DistCpOptions options = replicator.getDistCpOptions(cmd);
 
         List<Path> srcPaths = new ArrayList<Path>();
         srcPaths.add(new Path("hdfs://localhost:8020/tmp/"));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index 5b758b8..4b35760 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -19,6 +19,12 @@
 package org.apache.falcon.latedata;
 
 import org.apache.commons.cli.*;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.catalog.CatalogPartition;
+import org.apache.falcon.catalog.CatalogServiceFactory;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -29,6 +35,7 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
 
 import java.io.*;
+import java.net.URISyntaxException;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -56,48 +63,44 @@ public class LateDataHandler extends Configured implements Tool {
         Option opt = new Option("out", true, "Out file name");
         opt.setRequired(true);
         options.addOption(opt);
+
         opt = new Option("paths", true,
                 "Comma separated path list, further separated by #");
         opt.setRequired(true);
         options.addOption(opt);
+
         opt = new Option("falconInputFeeds", true,
                 "Input feed names, further separated by #");
         opt.setRequired(true);
         options.addOption(opt);
 
+        opt = new Option("falconInputFeedStorageTypes", true,
+                "Feed storage types corresponding to Input feed names, separated by #");
+        opt.setRequired(true);
+        options.addOption(opt);
+
         return new GnuParser().parse(options, args);
     }
 
     @Override
     public int run(String[] args) throws Exception {
-
         CommandLine command = getCommand(args);
 
-        Path file = new Path(command.getOptionValue("out"));
-        Map<String, Long> map = new LinkedHashMap<String, Long>();
         String pathStr = getOptionValue(command, "paths");
         if (pathStr == null) {
             return 0;
         }
 
+        String[] inputFeeds = getOptionValue(command, "falconInputFeeds").split("#");
         String[] pathGroups = pathStr.split("#");
-        String[] inputFeeds = getOptionValue(command, "falconInputFeeds").split(
-                "#");
-        for (int index = 0; index < pathGroups.length; index++) {
-            long usage = 0;
-            for (String pathElement : pathGroups[index].split(",")) {
-                Path inPath = new Path(pathElement);
-                usage += usage(inPath, getConf());
-            }
-            map.put(inputFeeds[index], usage);
-        }
-        LOG.info("MAP data: " + map);
+        String[] inputFeedStorageTypes = getOptionValue(command, "falconInputFeedStorageTypes").split("#");
+
+        Map<String, Long> metrics = computeMetrics(inputFeeds, pathGroups, inputFeedStorageTypes);
+        LOG.info("MAP data: " + metrics);
+
+        Path file = new Path(command.getOptionValue("out"));
+        persistMetrics(metrics, file);
 
-        OutputStream out = file.getFileSystem(getConf()).create(file);
-        for (Map.Entry<String, Long> entry : map.entrySet()) {
-            out.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
-        }
-        out.close();
         return 0;
     }
 
@@ -109,7 +112,142 @@ public class LateDataHandler extends Configured implements Tool {
         return value;
     }
 
-    public String detectChanges(Path file, Map<String, Long> map, Configuration conf)
+    private Map<String, Long> computeMetrics(String[] inputFeeds, String[] pathGroups,
+                                             String[] inputFeedStorageTypes)
+        throws IOException, FalconException, URISyntaxException {
+
+        Map<String, Long> computedMetrics = new LinkedHashMap<String, Long>();
+        for (int index = 0; index < pathGroups.length; index++) {
+            long storageMetric = computeStorageMetric(pathGroups[index], inputFeedStorageTypes[index], getConf());
+            computedMetrics.put(inputFeeds[index], storageMetric);
+        }
+
+        return computedMetrics;
+    }
+
+    private void persistMetrics(Map<String, Long> metrics, Path file) throws IOException {
+        OutputStream out = null;
+        try {
+            out = file.getFileSystem(getConf()).create(file);
+
+            for (Map.Entry<String, Long> entry : metrics.entrySet()) {
+                out.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+            }
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException ignore) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+    /**
+     * This method computes the storage metrics for a given feed's instance or partition.
+     * It uses size on disk as the metric for File System Storage.
+     * It uses create time as the metric for Catalog Table Storage.
+     *
+     * The assumption is that if a partition has changed or reinstated, the underlying
+     * metric would change, either size or create time.
+     *
+     * @param feedUriTemplate URI for the feed storage, filesystem path or table uri
+     * @param feedStorageType feed storage type
+     * @param conf configuration
+     * @return computed metric
+     * @throws IOException
+     * @throws FalconException
+     * @throws URISyntaxException
+     */
+    public long computeStorageMetric(String feedUriTemplate, String feedStorageType, Configuration conf)
+        throws IOException, FalconException, URISyntaxException {
+
+        Storage.TYPE storageType = Storage.TYPE.valueOf(feedStorageType);
+
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            // usage on file system is the metric
+            return getFileSystemUsageMetric(feedUriTemplate, conf);
+        } else if (storageType == Storage.TYPE.TABLE) {
+            // todo: this should have been done in oozie mapper but el ${coord:dataIn('input')} returns hcat scheme
+            feedUriTemplate = feedUriTemplate.replace("hcat", "thrift");
+            // creation time of the given partition is the metric
+            return getTablePartitionCreateTimeMetric(feedUriTemplate);
+        }
+
+        throw new IllegalArgumentException("Unknown storage type: " + feedStorageType);
+    }
+
+    /**
+     * The storage metric for File System Storage is the size of content
+     * this feed's instance represented by the path uses on the file system.
+     *
+     * If this instance was reinstated, the assumption is that the size of
+     * this instance on disk would change.
+     *
+     * @param pathGroup path on file system
+     * @param conf configuration
+     * @return metric as the size of data on file system
+     * @throws IOException
+     */
+    private long getFileSystemUsageMetric(String pathGroup, Configuration conf)
+        throws IOException {
+        long usage = 0;
+        for (String pathElement : pathGroup.split(",")) {
+            Path inPath = new Path(pathElement);
+            usage += usage(inPath, conf);
+        }
+
+        return usage;
+    }
+
+    private long usage(Path inPath, Configuration conf) throws IOException {
+        FileSystem fs = inPath.getFileSystem(conf);
+        FileStatus[] fileStatuses = fs.globStatus(inPath);
+        if (fileStatuses == null || fileStatuses.length == 0) {
+            return 0;
+        }
+        long totalSize = 0;
+        for (FileStatus fileStatus : fileStatuses) {
+            totalSize += fs.getContentSummary(fileStatus.getPath()).getLength();
+        }
+        return totalSize;
+    }
+
+    /**
+     * The storage metric for Table Storage is the create time of the given partition
+     * since there is API in Hive nor HCatalog to find if a partition has changed.
+     *
+     * If this partition was reinstated, the assumption is that the create time of
+     * this partition would change.
+     *
+     * @param feedUriTemplate catalog table uri
+     * @return metric as creation time of the given partition
+     * @throws IOException
+     * @throws URISyntaxException
+     * @throws FalconException
+     */
+    private long getTablePartitionCreateTimeMetric(String feedUriTemplate)
+        throws IOException, URISyntaxException, FalconException {
+
+        CatalogStorage storage = (CatalogStorage)
+                FeedHelper.createStorage(Storage.TYPE.TABLE.name(), feedUriTemplate);
+        CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
+                storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), storage.getPartitions());
+        return partition == null ? 0 : partition.getCreateTime();
+    }
+
+    /**
+     * This method compares the recorded metrics persisted in file against
+     * the recently computed metrics and returns the list of feeds that has changed.
+     *
+     * @param file persisted metrics from the first run
+     * @param metrics newly computed metrics
+     * @param conf configuration
+     * @return list if feed names which has changed, empty string is none has changed
+     * @throws Exception
+     */
+    public String detectChanges(Path file, Map<String, Long> metrics, Configuration conf)
         throws Exception {
 
         StringBuilder buffer = new StringBuilder();
@@ -117,7 +255,7 @@ public class LateDataHandler extends Configured implements Tool {
                 file.getFileSystem(conf).open(file)));
         String line;
         try {
-            Map<String, Long> recorded = new LinkedHashMap<String, Long>();
+            Map<String, Long> recordedMetrics = new LinkedHashMap<String, Long>();
             while ((line = in.readLine()) != null) {
                 if (line.isEmpty()) {
                     continue;
@@ -125,17 +263,17 @@ public class LateDataHandler extends Configured implements Tool {
                 int index = line.indexOf('=');
                 String key = line.substring(0, index);
                 long size = Long.parseLong(line.substring(index + 1));
-                recorded.put(key, size);
+                recordedMetrics.put(key, size);
             }
 
-            for (Map.Entry<String, Long> entry : map.entrySet()) {
-                if (recorded.get(entry.getKey()) == null) {
+            for (Map.Entry<String, Long> entry : metrics.entrySet()) {
+                if (recordedMetrics.get(entry.getKey()) == null) {
                     LOG.info("No matching key " + entry.getKey());
                     continue;
                 }
-                if (!recorded.get(entry.getKey()).equals(entry.getValue())) {
-                    LOG.info("Recorded size:" + recorded.get(entry.getKey()) + "  is different from new size"
-                            + entry.getValue());
+                if (!recordedMetrics.get(entry.getKey()).equals(entry.getValue())) {
+                    LOG.info("Recorded size:" + recordedMetrics.get(entry.getKey())
+                            + " is different from new size" + entry.getValue());
                     buffer.append(entry.getKey()).append(',');
                 }
             }
@@ -149,17 +287,4 @@ public class LateDataHandler extends Configured implements Tool {
             in.close();
         }
     }
-
-    public long usage(Path inPath, Configuration conf) throws IOException {
-        FileSystem fs = inPath.getFileSystem(conf);
-        FileStatus[] status = fs.globStatus(inPath);
-        if (status == null || status.length == 0) {
-            return 0;
-        }
-        long totalSize = 0;
-        for (FileStatus statu : status) {
-            totalSize += fs.getContentSummary(statu.getPath()).getLength();
-        }
-        return totalSize;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
index dcde876..b5ac121 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
@@ -22,14 +22,14 @@ package org.apache.falcon.rerun.event;
  */
 public class LaterunEvent extends RerunEvent {
 
-    //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
-    public LaterunEvent(String clusterName, String wfId, long msgInsertTime,
-                        long delay, String entityType, String entityName, String instance,
-                        int runId) {
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+    public LaterunEvent(String clusterName, String wfId, long msgInsertTime, long delay,
+                        String entityType, String entityName, String instance, int runId) {
         super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
                 instance, runId);
     }
-    //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
 
     @Override
     public String toString() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
index 0dcc93d..baf4601 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
@@ -45,7 +45,7 @@ public class RerunEvent implements Delayed {
     protected String instance;
     protected int runId;
 
-    //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public RerunEvent(String clusterName, String wfId, long msgInsertTime, long delay,
                       String entityType, String entityName, String instance, int runId) {
         this.clusterName = clusterName;
@@ -57,7 +57,7 @@ public class RerunEvent implements Delayed {
         this.runId = runId;
         this.entityType = entityType;
     }
-    //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     public String getClusterName() {
         return clusterName;