You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/11/12 12:05:28 UTC

[11/12] FALCON-85 Hive (HCatalog) integration. Contributed by Venkatesh Seetharam FALCON-163 Merge FALCON-85 branch into main line. Contributed by Venkatesh Seetharam

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
new file mode 100644
index 0000000..51e4d6e
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.catalog;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hcatalog.api.HCatClient;
+import org.apache.hcatalog.api.HCatDatabase;
+import org.apache.hcatalog.api.HCatPartition;
+import org.apache.hcatalog.api.HCatTable;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * An implementation of CatalogService that uses Hive Meta Store (HCatalog)
+ * as the backing Catalog registry.
+ */
+public class HiveCatalogService extends AbstractCatalogService {
+
+    private static final Logger LOG = Logger.getLogger(HiveCatalogService.class);
+
+    private static final ConcurrentHashMap<String, HCatClient> CACHE = new ConcurrentHashMap<String, HCatClient>();
+
+    public static HCatClient get(Cluster cluster) throws FalconException {
+        assert cluster != null : "Cluster cant be null";
+
+        String metastoreUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
+        return get(metastoreUrl);
+    }
+
+    public static synchronized HCatClient get(String metastoreUrl) throws FalconException {
+
+        if (!CACHE.containsKey(metastoreUrl)) {
+            HCatClient hCatClient = getHCatClient(metastoreUrl);
+            LOG.info("Caching HCatalog client object for " + metastoreUrl);
+            CACHE.putIfAbsent(metastoreUrl, hCatClient);
+        }
+
+        return CACHE.get(metastoreUrl);
+    }
+
+    private static HCatClient getHCatClient(String metastoreUrl) throws FalconException {
+        try {
+            HiveConf hcatConf = new HiveConf();
+            hcatConf.set("hive.metastore.local", "false");
+            hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl);
+            hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+            hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                    HCatSemanticAnalyzer.class.getName());
+            hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+
+            hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+            hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+
+            return HCatClient.create(hcatConf);
+        } catch (HCatException e) {
+            throw new FalconException(e);
+        }
+    }
+
+
+    @Override
+    public boolean isAlive(String catalogBaseUrl) throws FalconException {
+        LOG.info("Checking if the service is alive for: " + catalogBaseUrl);
+
+        try {
+            HCatClient client = get(catalogBaseUrl);
+            client.close();
+            HCatDatabase database = client.getDatabase("default");
+            return database != null;
+        } catch (HCatException e) {
+            throw new FalconException(e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(String catalogUrl, String database, String tableName)
+        throws FalconException {
+        LOG.info("Checking if the table exists: " + tableName);
+
+        try {
+            HCatClient client = get(catalogUrl);
+            HCatTable table = client.getTable(database, tableName);
+            return table != null;
+        } catch (HCatException e) {
+            throw new FalconException(e);
+        }
+    }
+
+    @Override
+    public boolean isTableExternal(String catalogUrl, String database, String tableName)
+        throws FalconException {
+        LOG.info("Returns a list of table properties for:" + tableName);
+
+        try {
+            HCatClient client = get(catalogUrl);
+            HCatTable table = client.getTable(database, tableName);
+            return !table.getTabletype().equals("MANAGED_TABLE");
+        } catch (HCatException e) {
+            throw new FalconException(e);
+        }
+    }
+
+    @Override
+    public List<CatalogPartition> listPartitionsByFilter(String catalogUrl, String database,
+                                                         String tableName, String filter)
+        throws FalconException {
+        LOG.info("List partitions for : " + tableName + ", partition filter: " + filter);
+
+        try {
+            List<CatalogPartition> catalogPartitionList = new ArrayList<CatalogPartition>();
+
+            HCatClient client = get(catalogUrl);
+            List<HCatPartition> hCatPartitions = client.listPartitionsByFilter(database, tableName, filter);
+            for (HCatPartition hCatPartition : hCatPartitions) {
+                CatalogPartition partition = createCatalogPartition(hCatPartition);
+                catalogPartitionList.add(partition);
+            }
+
+            return catalogPartitionList;
+        } catch (HCatException e) {
+            throw new FalconException(e);
+        }
+    }
+
+    private CatalogPartition createCatalogPartition(HCatPartition hCatPartition) {
+        final CatalogPartition catalogPartition = new CatalogPartition();
+        catalogPartition.setDatabaseName(hCatPartition.getDatabaseName());
+        catalogPartition.setTableName(hCatPartition.getTableName());
+        catalogPartition.setValues(hCatPartition.getValues());
+        catalogPartition.setInputFormat(hCatPartition.getInputFormat());
+        catalogPartition.setOutputFormat(hCatPartition.getOutputFormat());
+        catalogPartition.setLocation(hCatPartition.getLocation());
+        catalogPartition.setSerdeInfo(hCatPartition.getSerDe());
+        catalogPartition.setCreateTime(hCatPartition.getCreateTime());
+        catalogPartition.setLastAccessTime(hCatPartition.getLastAccessTime());
+
+        List<String> tableColumns = new ArrayList<String>();
+        for (HCatFieldSchema hCatFieldSchema : hCatPartition.getColumns()) {
+            tableColumns.add(hCatFieldSchema.getName());
+        }
+        catalogPartition.setTableColumns(tableColumns);
+
+        return catalogPartition;
+    }
+
+    @Override
+    public boolean dropPartitions(String catalogUrl, String database,
+                                  String tableName, Map<String, String> partitions)
+        throws FalconException {
+        LOG.info("Dropping partitions for : " + tableName + ", partitions: " + partitions);
+
+        try {
+            HCatClient client = get(catalogUrl);
+            client.dropPartitions(database, tableName, partitions, true);
+        } catch (HCatException e) {
+            throw new FalconException(e);
+        }
+
+        return true;
+    }
+
+    @Override
+    public CatalogPartition getPartition(String catalogUrl, String database, String tableName,
+                                         Map<String, String> partitionSpec) throws FalconException {
+        LOG.info("List partitions for : " + tableName + ", partition spec: " + partitionSpec);
+
+        try {
+            HCatClient client = get(catalogUrl);
+            HCatPartition hCatPartition = client.getPartition(database, tableName, partitionSpec);
+            return createCatalogPartition(hCatPartition);
+        } catch (HCatException e) {
+            throw new FalconException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
index 49662d1..644afd2 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
@@ -84,7 +84,7 @@ public abstract class AbstractCleanupHandler {
         return paths;
     }
 
-    private FileSystem getFileSystem(org.apache.falcon.entity.v0.cluster.Cluster cluster)
+    protected FileSystem getFileSystem(org.apache.falcon.entity.v0.cluster.Cluster cluster)
         throws FalconException {
 
         FileSystem fs;
@@ -101,6 +101,16 @@ public abstract class AbstractCleanupHandler {
         throws FalconException {
 
         FileStatus[] logs = getAllLogs(cluster, entity);
+        delete(cluster, entity, retention, logs);
+    }
+
+    protected void delete(Cluster cluster, Entity entity, long retention, FileStatus[] logs)
+        throws FalconException {
+        if (logs == null || logs.length == 0) {
+            LOG.info("Nothing to delete for cluster: " + cluster.getName() + ", entity: " + entity.getName());
+            return;
+        }
+
         long now = System.currentTimeMillis();
 
         for (FileStatus log : logs) {
@@ -126,7 +136,6 @@ public abstract class AbstractCleanupHandler {
                         + log.getPath());
             }
         }
-
     }
 
     private void deleteParentIfEmpty(FileSystem fs, Path parent) throws IOException {
@@ -136,7 +145,6 @@ public abstract class AbstractCleanupHandler {
             fs.delete(parent, true);
             deleteParentIfEmpty(fs, parent.getParent());
         }
-
     }
 
     public abstract void cleanup() throws FalconException;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
index 0c8cf82..7dbac58 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
@@ -18,12 +18,19 @@
 package org.apache.falcon.cleanup;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import java.io.IOException;
 import java.util.Collection;
 
 /**
@@ -44,9 +51,10 @@ public class FeedCleanupHandler extends AbstractCleanupHandler {
                 Cluster currentCluster = STORE.get(EntityType.CLUSTER,
                         cluster.getName());
                 if (currentCluster.getColo().equals(getCurrentColo())) {
-                    LOG.info("Cleaning up logs for process:" + feedName
+                    LOG.info("Cleaning up logs & staged data for feed:" + feedName
                             + " in  cluster: " + cluster.getName() + " with retention: " + retention);
                     delete(currentCluster, feed, retention);
+                    deleteStagedData(currentCluster, feed, retention);
                 } else {
                     LOG.info("Ignoring cleanup for process:" + feedName
                             + " in  cluster: " + cluster.getName() + " as this does not belong to current colo");
@@ -56,11 +64,36 @@ public class FeedCleanupHandler extends AbstractCleanupHandler {
         }
     }
 
+    /**
+     * Delete the staging area used for replicating tables.
+     *
+     * @param cluster cluster hosting the staged data
+     * @param feed feed entity
+     * @param retention retention limit
+     * @throws FalconException
+     */
+    private void deleteStagedData(Cluster cluster, Feed feed, long retention)
+        throws FalconException {
+        Storage storage = FeedHelper.createStorage(cluster, feed);
+        if (storage.getType() == Storage.TYPE.FILESYSTEM) {  // FS does NOT use staging dirs
+            return;
+        }
+
+        final CatalogStorage tableStorage = (CatalogStorage) storage;
+        String stagingDir = FeedHelper.getStagingDir(cluster, feed, tableStorage, Tag.REPLICATION);
+        Path stagingPath = new Path(stagingDir + "/*/*/*");  // stagingDir/dataOutPartitionValue/nominal-time/data
+        FileSystem fs = getFileSystem(cluster);
+        try {
+            FileStatus[] paths = fs.globStatus(stagingPath);
+            delete(cluster, feed, retention, paths);
+        } catch (IOException e) {
+            throw new FalconException(e);
+        }
+    }
+
     @Override
     protected Path getLogPath(Entity entity, String stagingPath) {
-        Path logPath = new Path(stagingPath, "falcon/workflows/feed/"
+        return new Path(stagingPath, "falcon/workflows/feed/"
                 + entity.getName() + "/logs/job-*/*/*");
-        return logPath;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
new file mode 100644
index 0000000..32f7605
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.catalog.CatalogServiceFactory;
+import org.apache.falcon.entity.common.FeedDataPath;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+/**
+ * A catalog registry implementation of a feed storage.
+ */
+public class CatalogStorage implements Storage {
+
+    public static final String PARTITION_SEPARATOR = ";";
+    public static final String PARTITION_KEYVAL_SEPARATOR = "=";
+    public static final String INPUT_PATH_SEPARATOR = ":";
+    public static final String OUTPUT_PATH_SEPARATOR = "/";
+    public static final String PARTITION_VALUE_QUOTE = "'";
+
+    public static final String CATALOG_URL = "${hcatNode}";
+
+    private final String catalogUrl;
+    private String database;
+    private String table;
+    private Map<String, String> partitions;
+
+    protected CatalogStorage(Feed feed) throws URISyntaxException {
+        this(CATALOG_URL, feed.getTable());
+    }
+
+    protected CatalogStorage(Cluster cluster, CatalogTable table) throws URISyntaxException {
+        this(ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint(), table);
+    }
+
+    protected CatalogStorage(String catalogUrl, CatalogTable table) throws URISyntaxException {
+        this(catalogUrl, table.getUri());
+    }
+
+    protected CatalogStorage(String catalogUrl, String tableUri) throws URISyntaxException {
+        if (catalogUrl == null || catalogUrl.length() == 0) {
+            throw new IllegalArgumentException("Catalog Registry URL cannot be null or empty");
+        }
+
+        this.catalogUrl = catalogUrl;
+
+        parseFeedUri(tableUri);
+    }
+
+    /**
+     * Validate URI to conform to catalog:$database:$table#$partitions.
+     * scheme=catalog:database=$database:table=$table#$partitions
+     * partitions=key=value;key=value
+     *
+     * @param catalogTableUri table URI to parse and validate
+     * @throws URISyntaxException
+     */
+    private void parseFeedUri(String catalogTableUri) throws URISyntaxException {
+
+        final String processed = catalogTableUri.replaceAll(DOLLAR_EXPR_START_REGEX, DOLLAR_EXPR_START_NORMALIZED)
+                                                .replaceAll("}", EXPR_CLOSE_NORMALIZED);
+        URI tableUri = new URI(processed);
+
+        if (!"catalog".equals(tableUri.getScheme())) {
+            throw new URISyntaxException(tableUri.toString(), "catalog scheme is missing");
+        }
+
+        final String schemeSpecificPart = tableUri.getSchemeSpecificPart();
+        if (schemeSpecificPart == null) {
+            throw new URISyntaxException(tableUri.toString(), "Database and Table are missing");
+        }
+
+        String[] paths = schemeSpecificPart.split(INPUT_PATH_SEPARATOR);
+
+        if (paths.length != 2) {
+            throw new URISyntaxException(tableUri.toString(), "URI path is not in expected format: database:table");
+        }
+
+        database = paths[0];
+        table = paths[1];
+
+        if (database == null || database.length() == 0) {
+            throw new URISyntaxException(tableUri.toString(), "DB name is missing");
+        }
+        if (table == null || table.length() == 0) {
+            throw new URISyntaxException(tableUri.toString(), "Table name is missing");
+        }
+
+        String partRaw = tableUri.getFragment();
+        if (partRaw == null || partRaw.length() == 0) {
+            throw new URISyntaxException(tableUri.toString(), "Partition details are missing");
+        }
+
+        final String rawPartition = partRaw.replaceAll(DOLLAR_EXPR_START_NORMALIZED, DOLLAR_EXPR_START_REGEX)
+                                           .replaceAll(EXPR_CLOSE_NORMALIZED, EXPR_CLOSE_REGEX);
+        partitions = new LinkedHashMap<String, String>(); // preserve insertion order
+        String[] parts = rawPartition.split(PARTITION_SEPARATOR);
+        for (String part : parts) {
+            if (part == null || part.length() == 0) {
+                continue;
+            }
+
+            String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR);
+            if (keyVal.length != 2) {
+                throw new URISyntaxException(tableUri.toString(),
+                        "Partition key value pair is not specified properly in (" + part + ")");
+            }
+
+            partitions.put(keyVal[0], keyVal[1]);
+        }
+    }
+
+    /**
+     * Create an instance from the URI Template that was generated using
+     * the getUriTemplate() method.
+     *
+     * @param uriTemplate the uri template from org.apache.falcon.entity.CatalogStorage#getUriTemplate
+     * @throws URISyntaxException
+     */
+    protected CatalogStorage(String uriTemplate) throws URISyntaxException {
+        if (uriTemplate == null || uriTemplate.length() == 0) {
+            throw new IllegalArgumentException("URI template cannot be null or empty");
+        }
+
+        final String processed = uriTemplate.replaceAll(DOLLAR_EXPR_START_REGEX, DOLLAR_EXPR_START_NORMALIZED)
+                                            .replaceAll("}", EXPR_CLOSE_NORMALIZED);
+        URI uri = new URI(processed);
+
+        this.catalogUrl = uri.getScheme() + "://" + uri.getAuthority();
+
+        parseUriTemplate(uri);
+    }
+
+    private void parseUriTemplate(URI uriTemplate) throws URISyntaxException {
+        String path = uriTemplate.getPath();
+        String[] paths = path.split(OUTPUT_PATH_SEPARATOR);
+        if (paths.length != 4) {
+            throw new URISyntaxException(uriTemplate.toString(),
+                    "URI path is not in expected format: database:table");
+        }
+
+        database = paths[1];
+        table = paths[2];
+        String partRaw = paths[3];
+
+        if (database == null || database.length() == 0) {
+            throw new URISyntaxException(uriTemplate.toString(), "DB name is missing");
+        }
+        if (table == null || table.length() == 0) {
+            throw new URISyntaxException(uriTemplate.toString(), "Table name is missing");
+        }
+        if (partRaw == null || partRaw.length() == 0) {
+            throw new URISyntaxException(uriTemplate.toString(), "Partition details are missing");
+        }
+
+        String rawPartition = partRaw.replaceAll(DOLLAR_EXPR_START_NORMALIZED, DOLLAR_EXPR_START_REGEX)
+                .replaceAll(EXPR_CLOSE_NORMALIZED, EXPR_CLOSE_REGEX);
+        partitions = new LinkedHashMap<String, String>();
+        String[] parts = rawPartition.split(PARTITION_SEPARATOR);
+        for (String part : parts) {
+            if (part == null || part.length() == 0) {
+                continue;
+            }
+
+            String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR);
+            if (keyVal.length != 2) {
+                throw new URISyntaxException(uriTemplate.toString(),
+                        "Partition key value pair is not specified properly in (" + part + ")");
+            }
+
+            partitions.put(keyVal[0], keyVal[1]);
+        }
+    }
+
+    public String getCatalogUrl() {
+        return catalogUrl;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public Map<String, String> getPartitions() {
+        return partitions;
+    }
+
+    /**
+     * @param key partition key
+     * @return partition value
+     */
+    public String getPartitionValue(String key) {
+        return partitions.get(key);
+    }
+
+    /**
+     * @param key partition key
+     * @return if partitions map includes the key or not
+     */
+    public boolean hasPartition(String key) {
+        return partitions.containsKey(key);
+    }
+
+    public String getDatedPartitionKey() {
+        String datedPartitionKey = null;
+
+        for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
+
+            Matcher matcher = FeedDataPath.PATTERN.matcher(entry.getValue());
+            if (matcher.find()) {
+                datedPartitionKey = entry.getKey();
+                break;
+            }
+        }
+
+        return datedPartitionKey;
+    }
+
+    /**
+     * Convert the partition map to filter string.
+     * Each key value pair is separated by ';'.
+     *
+     * @return filter string
+     */
+    public String toPartitionFilter() {
+        StringBuilder filter = new StringBuilder();
+        filter.append("(");
+        for (Map.Entry<String, String> entry : partitions.entrySet()) {
+            if (filter.length() > 1) {
+                filter.append(PARTITION_SEPARATOR);
+            }
+            filter.append(entry.getKey());
+            filter.append(PARTITION_KEYVAL_SEPARATOR);
+            filter.append(PARTITION_VALUE_QUOTE);
+            filter.append(entry.getValue());
+            filter.append(PARTITION_VALUE_QUOTE);
+        }
+        filter.append(")");
+        return filter.toString();
+    }
+
+    /**
+     * Convert the partition map to path string.
+     * Each key value pair is separated by '/'.
+     *
+     * @return path string
+     */
+    public String toPartitionAsPath() {
+        StringBuilder partitionFilter = new StringBuilder();
+
+        for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
+            partitionFilter.append(entry.getKey())
+                    .append(PARTITION_KEYVAL_SEPARATOR)
+                    .append(entry.getValue())
+                    .append(OUTPUT_PATH_SEPARATOR);
+        }
+
+        partitionFilter.setLength(partitionFilter.length() - 1);
+        return partitionFilter.toString();
+    }
+
+    @Override
+    public TYPE getType() {
+        return TYPE.TABLE;
+    }
+
+    /**
+     * LocationType does NOT matter here.
+     */
+    @Override
+    public String getUriTemplate() {
+        return getUriTemplate(LocationType.DATA);
+    }
+
+    /**
+     * LocationType does NOT matter here.
+     */
+    @Override
+    public String getUriTemplate(LocationType locationType) {
+        StringBuilder uriTemplate = new StringBuilder();
+        uriTemplate.append(catalogUrl);
+        uriTemplate.append(OUTPUT_PATH_SEPARATOR);
+        uriTemplate.append(database);
+        uriTemplate.append(OUTPUT_PATH_SEPARATOR);
+        uriTemplate.append(table);
+        uriTemplate.append(OUTPUT_PATH_SEPARATOR);
+        for (Map.Entry<String, String> entry : partitions.entrySet()) {
+            uriTemplate.append(entry.getKey());
+            uriTemplate.append(PARTITION_KEYVAL_SEPARATOR);
+            uriTemplate.append(entry.getValue());
+            uriTemplate.append(PARTITION_SEPARATOR);
+        }
+        uriTemplate.setLength(uriTemplate.length() - 1);
+
+        return uriTemplate.toString();
+    }
+
+    @Override
+    public boolean exists() throws FalconException {
+        return CatalogServiceFactory.getCatalogService().tableExists(catalogUrl, database, table);
+    }
+
+    @Override
+    public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
+        CatalogStorage catalogStorage = (CatalogStorage) toCompareAgainst;
+
+        return !(getCatalogUrl() != null && !getCatalogUrl().equals(catalogStorage.getCatalogUrl()))
+                && getDatabase().equals(catalogStorage.getDatabase())
+                && getTable().equals(catalogStorage.getTable())
+                && getPartitions().equals(catalogStorage.getPartitions());
+    }
+
+    @Override
+    public String toString() {
+        return "CatalogStorage{"
+                + "catalogUrl='" + catalogUrl + '\''
+                + ", database='" + database + '\''
+                + ", table='" + table + '\''
+                + ", partitions=" + partitions
+                + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index fc4a467..ba80cac 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -253,12 +253,12 @@ public final class EntityUtil {
         default:
         }
 
+        final int freq = frequency.getFrequencyAsInt();
         if (count > 2) {
-            startCal.add(frequency.getTimeUnit().getCalendarUnit(),
-                    ((count - 2) / frequency.getFrequency()) * frequency.getFrequency());
+            startCal.add(frequency.getTimeUnit().getCalendarUnit(), ((count - 2) / freq) * freq);
         }
         while (startCal.getTime().before(now)) {
-            startCal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequency());
+            startCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
         }
         return startCal.getTime();
     }
@@ -288,15 +288,15 @@ public final class EntityUtil {
         default:
         }
 
+        final int freq = frequency.getFrequencyAsInt();
         if (count > 2) {
-            startCal.add(frequency.getTimeUnit().getCalendarUnit(),
-                    (count / frequency.getFrequency()) * frequency.getFrequency());
-            count = (count / frequency.getFrequency());
+            startCal.add(frequency.getTimeUnit().getCalendarUnit(), (count / freq) * freq);
+            count = (count / freq);
         } else {
             count = 0;
         }
         while (startCal.getTime().before(instanceTime)) {
-            startCal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequency());
+            startCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
             count++;
         }
         return count + 1;
@@ -558,6 +558,7 @@ public final class EntityUtil {
                     .equalsIgnoreCase("true")) {
                 return null;
             }
+
             LateProcess lateProcess = new LateProcess();
             lateProcess.setDelay(new Frequency(RuntimeProperties.get()
                     .getProperty("feed.late.frequency", "hours(3)")));
@@ -598,10 +599,7 @@ public final class EntityUtil {
     }
 
     public static boolean responsibleFor(String colo) {
-        if (DeploymentUtil.isEmbeddedMode() || (!DeploymentUtil.isPrism()
-                && colo.equals(DeploymentUtil.getCurrentColo()))) {
-            return true;
-        }
-        return false;
+        return DeploymentUtil.isEmbeddedMode() || (!DeploymentUtil.isPrism()
+                && colo.equals(DeploymentUtil.getCurrentColo()));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index c96120d..67257e3 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -20,11 +20,20 @@ package org.apache.falcon.entity;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.entity.v0.feed.*;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.Locations;
 import org.apache.falcon.expression.ExpressionHelper;
 
+import java.net.URISyntaxException;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -44,32 +53,164 @@ public final class FeedHelper {
         return null;
     }
 
-    public static Location getLocation(Feed feed, LocationType type,
-                                       String clusterName) {
-        Cluster cluster = getCluster(feed, clusterName);
-        if (cluster != null && cluster.getLocations() != null
-                && cluster.getLocations().getLocations().size() != 0) {
-            return getLocation(cluster.getLocations(), type);
-        } else {
-            return getLocation(feed.getLocations(), type);
+    public static Storage createStorage(Feed feed) throws FalconException {
+
+        final Locations feedLocations = feed.getLocations();
+        if (feedLocations != null
+                && feedLocations.getLocations().size() != 0) {
+            return new FileSystemStorage(feed);
+        }
+
+        try {
+            final CatalogTable table = feed.getTable();
+            if (table != null) {
+                return new CatalogStorage(feed);
+            }
+        } catch (URISyntaxException e) {
+            throw new FalconException(e);
         }
 
+        throw new FalconException("Both catalog and locations are not defined.");
+    }
+
+    public static Storage createStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
+                                        Feed feed) throws FalconException {
+        return createStorage(getCluster(feed, clusterEntity.getName()), feed, clusterEntity);
+    }
+
+    public static Storage createStorage(String clusterName, Feed feed)
+        throws FalconException {
+
+        return createStorage(getCluster(feed, clusterName), feed);
+    }
+
+    public static Storage createStorage(Cluster cluster, Feed feed)
+        throws FalconException {
+
+        final org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
+                EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
+
+        return createStorage(cluster, feed, clusterEntity);
     }
 
-    public static Location getLocation(Feed feed, LocationType type) {
-        return getLocation(feed.getLocations(), type);
+    public static Storage createStorage(Cluster cluster, Feed feed,
+                                        org.apache.falcon.entity.v0.cluster.Cluster clusterEntity)
+        throws FalconException {
+
+        final List<Location> locations = getLocations(cluster, feed);
+        if (locations != null) {
+            return new FileSystemStorage(ClusterHelper.getStorageUrl(clusterEntity), locations);
+        }
+
+        try {
+            final CatalogTable table = getTable(cluster, feed);
+            if (table != null) {
+                return new CatalogStorage(clusterEntity, table);
+            }
+        } catch (URISyntaxException e) {
+            throw new FalconException(e);
+        }
+
+        throw new FalconException("Both catalog and locations are not defined.");
     }
 
-    public static Location getLocation(Locations locations, LocationType type) {
-        for (Location loc : locations.getLocations()) {
-            if (loc.getType() == type) {
-                return loc;
+    /**
+     * Factory method to dole out a storage instance used for replication source.
+     *
+     * @param clusterEntity cluster entity
+     * @param feed feed entity
+     * @return an implementation of Storage
+     * @throws FalconException
+     */
+    public static Storage createReadOnlyStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
+                                                Feed feed) throws FalconException {
+        Cluster feedCluster = getCluster(feed, clusterEntity.getName());
+        final List<Location> locations = getLocations(feedCluster, feed);
+        if (locations != null) {
+            return new FileSystemStorage(ClusterHelper.getReadOnlyStorageUrl(clusterEntity), locations);
+        }
+
+        try {
+            final CatalogTable table = getTable(feedCluster, feed);
+            if (table != null) {
+                return new CatalogStorage(clusterEntity, table);
             }
+        } catch (URISyntaxException e) {
+            throw new FalconException(e);
         }
-        Location loc = new Location();
-        loc.setPath("/tmp");
-        loc.setType(type);
-        return loc;
+
+        throw new FalconException("Both catalog and locations are not defined.");
+    }
+
+    public static Storage createStorage(String type, String storageUriTemplate)
+        throws URISyntaxException {
+
+        Storage.TYPE storageType = Storage.TYPE.valueOf(type);
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            return new FileSystemStorage(storageUriTemplate);
+        } else if (storageType == Storage.TYPE.TABLE) {
+            return new CatalogStorage(storageUriTemplate);
+        }
+
+        throw new IllegalArgumentException("Bad type: " + type);
+    }
+
+    public static Storage.TYPE getStorageType(Feed feed) throws FalconException {
+        final Locations feedLocations = feed.getLocations();
+        if (feedLocations != null
+                && feedLocations.getLocations().size() != 0) {
+            return Storage.TYPE.FILESYSTEM;
+        }
+
+        final CatalogTable table = feed.getTable();
+        if (table != null) {
+            return Storage.TYPE.TABLE;
+        }
+
+        throw new FalconException("Both catalog and locations are not defined.");
+    }
+
+    public static Storage.TYPE getStorageType(Feed feed,
+                                              Cluster cluster) throws FalconException {
+        final List<Location> locations = getLocations(cluster, feed);
+        if (locations != null) {
+            return Storage.TYPE.FILESYSTEM;
+        }
+
+        final CatalogTable table = getTable(cluster, feed);
+        if (table != null) {
+            return Storage.TYPE.TABLE;
+        }
+
+        throw new FalconException("Both catalog and locations are not defined.");
+    }
+
+    public static Storage.TYPE getStorageType(Feed feed,
+                                              org.apache.falcon.entity.v0.cluster.Cluster clusterEntity)
+        throws FalconException {
+        Cluster feedCluster = getCluster(feed, clusterEntity.getName());
+        return getStorageType(feed, feedCluster);
+    }
+
+    protected static List<Location> getLocations(Cluster cluster, Feed feed) {
+        // check if locations are overridden in cluster
+        final Locations clusterLocations = cluster.getLocations();
+        if (clusterLocations != null
+                && clusterLocations.getLocations().size() != 0) {
+            return clusterLocations.getLocations();
+        }
+
+        final Locations feedLocations = feed.getLocations();
+        return feedLocations == null ? null : feedLocations.getLocations();
+    }
+
+    protected static CatalogTable getTable(Cluster cluster, Feed feed) {
+        // check if table is overridden in cluster
+        if (cluster.getTable() != null) {
+            return cluster.getTable();
+        }
+
+        return feed.getTable();
     }
 
     public static String normalizePartitionExpression(String part1, String part2) {
@@ -106,4 +247,14 @@ public final class FeedHelper {
         expHelp.setPropertiesForVariable(properties);
         return expHelp.evaluateFullExpression(exp, String.class);
     }
+
+    public static String getStagingDir(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
+                                       Feed feed, CatalogStorage storage, Tag tag) {
+        String workflowName = EntityUtil.getWorkflowName(
+                tag, Arrays.asList(clusterEntity.getName()), feed).toString();
+        return ClusterHelper.getCompleteLocation(clusterEntity, "staging") + "/"
+                + workflowName  + "/"
+                + storage.getDatabase() + "/"
+                + storage.getTable();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
new file mode 100644
index 0000000..68370c7
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.hadoop.fs.Path;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A file system implementation of a feed storage.
+ */
+public class FileSystemStorage implements Storage {
+
+    public static final String FEED_PATH_SEP = "#";
+    public static final String LOCATION_TYPE_SEP = "=";
+
+    public static final String FILE_SYSTEM_URL = "${nameNode}";
+
+    private final String storageUrl;
+    private final List<Location> locations;
+
+    protected FileSystemStorage(Feed feed) {
+        this(FILE_SYSTEM_URL, feed.getLocations());
+    }
+
+    protected FileSystemStorage(String storageUrl, Locations locations) {
+        this(storageUrl, locations.getLocations());
+    }
+
+    protected FileSystemStorage(String storageUrl, List<Location> locations) {
+        if (storageUrl == null || storageUrl.length() == 0) {
+            throw new IllegalArgumentException("FileSystem URL cannot be null or empty");
+        }
+
+        if (locations == null || locations.size() == 0) {
+            throw new IllegalArgumentException("FileSystem Locations cannot be null or empty");
+        }
+
+        this.storageUrl = storageUrl;
+        this.locations = locations;
+    }
+
+    /**
+     * Create an instance from the URI Template that was generated using
+     * the getUriTemplate() method.
+     *
+     * @param uriTemplate the uri template from org.apache.falcon.entity.FileSystemStorage#getUriTemplate
+     * @throws URISyntaxException
+     */
+    protected FileSystemStorage(String uriTemplate) throws URISyntaxException {
+        if (uriTemplate == null || uriTemplate.length() == 0) {
+            throw new IllegalArgumentException("URI template cannot be null or empty");
+        }
+
+        String rawStorageUrl = null;
+        List<Location> rawLocations = new ArrayList<Location>();
+        String[] feedLocs = uriTemplate.split(FEED_PATH_SEP);
+        for (String rawPath : feedLocs) {
+            String[] typeAndPath = rawPath.split(LOCATION_TYPE_SEP);
+            final String processed = typeAndPath[1].replaceAll(DOLLAR_EXPR_START_REGEX, DOLLAR_EXPR_START_NORMALIZED)
+                                                   .replaceAll("}", EXPR_CLOSE_NORMALIZED);
+            URI uri = new URI(processed);
+            if (rawStorageUrl == null) {
+                rawStorageUrl = uri.getScheme() + "://" + uri.getAuthority();
+            }
+
+            String path = uri.getPath();
+            final String finalPath = path.replaceAll(DOLLAR_EXPR_START_NORMALIZED, DOLLAR_EXPR_START_REGEX)
+                                         .replaceAll(EXPR_CLOSE_NORMALIZED, EXPR_CLOSE_REGEX);
+
+            Location location = new Location();
+            location.setPath(finalPath);
+            location.setType(LocationType.valueOf(typeAndPath[0]));
+            rawLocations.add(location);
+        }
+
+        this.storageUrl = rawStorageUrl;
+        this.locations = rawLocations;
+    }
+
+    @Override
+    public TYPE getType() {
+        return TYPE.FILESYSTEM;
+    }
+
+    public String getStorageUrl() {
+        return storageUrl;
+    }
+
+    public List<Location> getLocations() {
+        return locations;
+    }
+
+    @Override
+    public String getUriTemplate() {
+        String feedPathMask = getUriTemplate(LocationType.DATA);
+        String metaPathMask = getUriTemplate(LocationType.META);
+        String statsPathMask = getUriTemplate(LocationType.STATS);
+        String tmpPathMask = getUriTemplate(LocationType.TMP);
+
+        StringBuilder feedBasePaths = new StringBuilder();
+        feedBasePaths.append(LocationType.DATA.name())
+                     .append(LOCATION_TYPE_SEP)
+                     .append(feedPathMask);
+
+        if (metaPathMask != null) {
+            feedBasePaths.append(FEED_PATH_SEP)
+                         .append(LocationType.META.name())
+                         .append(LOCATION_TYPE_SEP)
+                         .append(metaPathMask);
+        }
+
+        if (statsPathMask != null) {
+            feedBasePaths.append(FEED_PATH_SEP)
+                         .append(LocationType.STATS.name())
+                         .append(LOCATION_TYPE_SEP)
+                         .append(statsPathMask);
+        }
+
+        if (tmpPathMask != null) {
+            feedBasePaths.append(FEED_PATH_SEP)
+                         .append(LocationType.TMP.name())
+                         .append(LOCATION_TYPE_SEP)
+                         .append(tmpPathMask);
+        }
+
+        return feedBasePaths.toString();
+    }
+
+    @Override
+    public String getUriTemplate(LocationType locationType) {
+        Location locationForType = null;
+        for (Location location : locations) {
+            if (location.getType() == locationType) {
+                locationForType = location;
+                break;
+            }
+        }
+
+        if (locationForType == null) {
+            return "/tmp";
+        }
+
+        // normalize the path so trailing and double '/' are removed
+        return storageUrl + new Path(locationForType.getPath());
+    }
+
+    @Override
+    public boolean exists() throws FalconException {
+        // Directories on FS will be created if they don't exist.
+        return true;
+    }
+
+    @Override
+    public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
+        FileSystemStorage fsStorage = (FileSystemStorage) toCompareAgainst;
+        final List<Location> fsStorageLocations = fsStorage.getLocations();
+
+        return getLocations().size() == fsStorageLocations.size()
+               && getLocation(getLocations(), LocationType.DATA).getPath().equals(
+                   getLocation(fsStorageLocations, LocationType.DATA).getPath())
+               && getLocation(getLocations(), LocationType.META).getPath().equals(
+                   getLocation(fsStorageLocations, LocationType.META).getPath())
+               && getLocation(getLocations(), LocationType.STATS).getPath().equals(
+                   getLocation(fsStorageLocations, LocationType.STATS).getPath())
+               && getLocation(getLocations(), LocationType.TMP).getPath().equals(
+                   getLocation(fsStorageLocations, LocationType.TMP).getPath());
+    }
+
+    private static Location getLocation(List<Location> locations, LocationType type) {
+        for (Location loc : locations) {
+            if (loc.getType() == type) {
+                return loc;
+            }
+        }
+
+        Location loc = new Location();
+        loc.setPath("/tmp");
+        loc.setType(type);
+        return loc;
+    }
+
+    @Override
+    public String toString() {
+        return "FileSystemStorage{"
+                + "storageUrl='" + storageUrl + '\''
+                + ", locations=" + locations
+                + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/entity/Storage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java
new file mode 100644
index 0000000..0634969
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/Storage.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.feed.LocationType;
+
+/**
+ * A class to encapsulate the storage for a given feed which can either be
+ * expressed as a path on the file system or a table in a catalog.
+ */
+public interface Storage {
+
+    String DOLLAR_EXPR_START_REGEX = "\\$\\{";
+    String QUESTION_EXPR_START_REGEX = "\\?\\{";
+    String EXPR_CLOSE_REGEX = "\\}";
+
+    /**
+     * URI Friendly expression.
+     */
+    String DOLLAR_EXPR_START_NORMALIZED = "_D__START_";
+    String EXPR_CLOSE_NORMALIZED = "_CLOSE_";
+
+    /**
+     * Enumeration for the various storage types.
+     */
+    enum TYPE {FILESYSTEM, TABLE}
+
+    /**
+     * Return the type of storage.
+     *
+     * @return storage type
+     */
+    TYPE getType();
+
+    /**
+     * Return the uri template.
+     *
+     * @return uri template
+     */
+    String getUriTemplate();
+
+    /**
+     * Return the uri template for a given location type.
+     *
+     * @param locationType type of location, applies only to filesystem type
+     * @return uri template
+     */
+    String getUriTemplate(LocationType locationType);
+
+    /**
+     * Check if the storage, filesystem location or catalog table exists.
+     * Filesystem location always returns true.
+     *
+     * @return true if table exists else false
+     * @throws FalconException an exception
+     */
+    boolean exists() throws FalconException;
+
+    /**
+     * Check for equality of this instance against the one in question.
+     *
+     * @param toCompareAgainst instance to compare
+     * @return true if identical else false
+     * @throws FalconException an exception
+     */
+    boolean isIdentical(Storage toCompareAgainst) throws FalconException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index b1cf8f3..e633838 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -23,12 +23,14 @@ import java.io.IOException;
 import javax.jms.ConnectionFactory;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.catalog.CatalogServiceFactory;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.StoreAccessException;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.cluster.Interface;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -56,6 +58,10 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
         validateScheme(cluster, Interfacetype.WRITE);
         validateScheme(cluster, Interfacetype.WORKFLOW);
         validateScheme(cluster, Interfacetype.MESSAGING);
+        if (CatalogServiceFactory.isEnabled()
+                && ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY) != null) {
+            validateScheme(cluster, Interfacetype.REGISTRY);
+        }
 
         if (!EntityUtil.responsibleFor(cluster.getColo())) {
             return;
@@ -66,8 +72,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
         validateExecuteInterface(cluster);
         validateWorkflowInterface(cluster);
         validateMessagingInterface(cluster);
-
-        // Interfacetype.REGISTRY is not validated as its not used
+        validateRegistryInterface(cluster);
     }
 
     private void validateScheme(Cluster cluster, Interfacetype interfacetype)
@@ -150,4 +155,29 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
                     + " for: " + implementation, e);
         }
     }
+
+    private void validateRegistryInterface(Cluster cluster) throws ValidationException {
+        final boolean isCatalogRegistryEnabled = CatalogServiceFactory.isEnabled();
+        if (!isCatalogRegistryEnabled) {
+            return;  // ignore the registry interface for backwards compatibility
+        }
+
+        // continue validation only if a catalog service is provided
+        final Interface catalogInterface = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY);
+        if (catalogInterface == null) {
+            LOG.info("Catalog service is not enabled for cluster: " + cluster.getName());
+            return;
+        }
+
+        final String catalogUrl = catalogInterface.getEndpoint();
+        LOG.info("Validating catalog registry interface: " + catalogUrl);
+
+        try {
+            if (!CatalogServiceFactory.getCatalogService().isAlive(catalogUrl)) {
+                throw new ValidationException("Unable to reach Catalog server:" + catalogUrl);
+            }
+        } catch (FalconException e) {
+            throw new ValidationException("Invalid Catalog server or port: " + catalogUrl, e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 1c323fd..8d7903b 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -20,8 +20,10 @@ package org.apache.falcon.entity.parser;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityGraph;
@@ -71,6 +73,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
             validateFeedCutOffPeriod(feed, cluster);
         }
 
+        validateFeedStorage(feed);
         validateFeedPartitionExpression(feed);
         validateFeedGroups(feed);
 
@@ -105,21 +108,20 @@ public class FeedEntityParser extends EntityParser<Feed> {
         return processes;
     }
 
-    private void validateFeedGroups(Feed feed) throws ValidationException {
+    private void validateFeedGroups(Feed feed) throws FalconException {
         String[] groupNames = feed.getGroups() != null ? feed.getGroups().split(",") : new String[]{};
-        String defaultPath = FeedHelper.getLocation(feed, LocationType.DATA)
-                .getPath();
+        final Storage storage = FeedHelper.createStorage(feed);
+        String defaultPath = storage.getUriTemplate(LocationType.DATA);
         for (Cluster cluster : feed.getClusters().getClusters()) {
-            if (!FeedGroup.getDatePattern(
-                    FeedHelper.getLocation(feed, LocationType.DATA,
-                            cluster.getName()).getPath()).equals(
+            final String uriTemplate = FeedHelper.createStorage(cluster, feed).getUriTemplate(LocationType.DATA);
+            if (!FeedGroup.getDatePattern(uriTemplate).equals(
                     FeedGroup.getDatePattern(defaultPath))) {
                 throw new ValidationException("Feeds default path pattern: "
-                        + FeedHelper.getLocation(feed, LocationType.DATA).getPath()
+                        + storage.getUriTemplate(LocationType.DATA)
                         + ", does not match with cluster: "
                         + cluster.getName()
                         + " path pattern: "
-                        + FeedHelper.getLocation(feed, LocationType.DATA, cluster.getName()).getPath());
+                        + uriTemplate);
             }
         }
         for (String groupName : groupNames) {
@@ -127,7 +129,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
             if (group != null && !group.canContainFeed(feed)) {
                 throw new ValidationException(
                         "Feed " + feed.getName() + "'s frequency: " + feed.getFrequency().toString()
-                                + ", path pattern: " + FeedHelper.getLocation(feed, LocationType.DATA).getPath()
+                                + ", path pattern: " + storage
                                 + " does not match with group: " + group.getName() + "'s frequency: "
                                 + group.getFrequency()
                                 + ", date pattern: " + group.getDatePattern());
@@ -159,9 +161,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
                     CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), newFeed, clusterName);
                     CrossEntityValidations.validateInstanceRange(process, input, newFeed);
 
-                    if (input.getPartition() != null) {
-                        CrossEntityValidations.validateInputPartition(input, newFeed);
-                    }
+                    validateInputPartition(newFeed, input);
                 }
             }
 
@@ -179,6 +179,19 @@ public class FeedEntityParser extends EntityParser<Feed> {
         }
     }
 
+    private void validateInputPartition(Feed newFeed, Input input) throws FalconException {
+        if (input.getPartition() == null) {
+            return;
+        }
+
+        final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(newFeed);
+        if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
+            CrossEntityValidations.validateInputPartition(input, newFeed);
+        } else if (baseFeedStorageType == Storage.TYPE.TABLE) {
+            throw new ValidationException("Input partitions are not supported for table storage: " + input.getName());
+        }
+    }
+
     private void validateClusterValidity(Date start, Date end, String clusterName) throws FalconException {
         try {
             if (start.after(end)) {
@@ -280,4 +293,74 @@ public class FeedEntityParser extends EntityParser<Feed> {
                     "Alteast one of the partition tags has to be a cluster expression for cluster " + cl.getName());
         }
     }
+
+    /**
+     * Ensure table is already defined in the catalog registry.
+     * Does not matter for FileSystem storage.
+     */
+    private void validateFeedStorage(Feed feed) throws FalconException {
+        final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed);
+        validateMultipleSourcesExist(feed, baseFeedStorageType);
+        validateUniformStorageType(feed, baseFeedStorageType);
+        validatePartitions(feed, baseFeedStorageType);
+        validateStorageExists(feed);
+    }
+
+    private void validateMultipleSourcesExist(Feed feed, Storage.TYPE baseFeedStorageType) throws FalconException {
+        if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
+            return;
+        }
+
+        // validate that there is only one source cluster
+        int numberOfSourceClusters = 0;
+        for (Cluster cluster : feed.getClusters().getClusters()) {
+            if (cluster.getType() == ClusterType.SOURCE) {
+                numberOfSourceClusters++;
+            }
+        }
+
+        if (numberOfSourceClusters > 1) {
+            throw new ValidationException("Multiple sources are not supported for feed with table storage: "
+                    + feed.getName());
+        }
+    }
+
+    private void validateUniformStorageType(Feed feed, Storage.TYPE feedStorageType) throws FalconException {
+        for (Cluster cluster : feed.getClusters().getClusters()) {
+            Storage.TYPE feedClusterStorageType = FeedHelper.getStorageType(feed, cluster);
+
+            if (feedStorageType != feedClusterStorageType) {
+                throw new ValidationException("The storage type is not uniform for cluster: " + cluster.getName());
+            }
+        }
+    }
+
+    private void validatePartitions(Feed feed, Storage.TYPE storageType) throws  FalconException {
+        if (storageType == Storage.TYPE.TABLE && feed.getPartitions() != null) {
+            throw new ValidationException("Partitions are not supported for feeds with table storage. "
+                    + "It should be defined as part of the table URI. "
+                    + feed.getName());
+        }
+    }
+
+    private void validateStorageExists(Feed feed) throws FalconException {
+        StringBuilder buffer = new StringBuilder();
+        for (Cluster cluster : feed.getClusters().getClusters()) {
+            final Storage storage = FeedHelper.createStorage(cluster, feed);
+            if (!storage.exists()) {
+                // this is only true for table, filesystem always returns true
+                CatalogStorage catalogStorage = (CatalogStorage) storage;
+                buffer.append("Table [")
+                        .append(catalogStorage.getTable())
+                        .append("] does not exist for feed: ")
+                        .append(feed.getName())
+                        .append(", cluster: ")
+                        .append(cluster.getName());
+            }
+        }
+
+        if (buffer.length() > 0) {
+            throw new ValidationException(buffer.toString());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index e4a9cf0..8647d43 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -29,6 +29,8 @@ import java.util.TimeZone;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -68,25 +70,24 @@ public class ProcessEntityParser extends EntityParser<Process> {
             }
             validateEntityExists(EntityType.CLUSTER, clusterName);
             validateProcessValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd());
-            validateHDFSpaths(process, clusterName);
+            validateHDFSPaths(process, clusterName);
 
             if (process.getInputs() != null) {
                 for (Input input : process.getInputs().getInputs()) {
                     validateEntityExists(EntityType.FEED, input.getFeed());
-                    Feed feed = (Feed) ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
+                    Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
                     CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
                     CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), feed, clusterName);
                     CrossEntityValidations.validateInstanceRange(process, input, feed);
-                    if (input.getPartition() != null) {
-                        CrossEntityValidations.validateInputPartition(input, feed);
-                    }
+                    validateInputPartition(input, feed);
+                    validateOptionalInputsForTableStorage(feed, input);
                 }
             }
 
             if (process.getOutputs() != null) {
                 for (Output output : process.getOutputs().getOutputs()) {
                     validateEntityExists(EntityType.FEED, output.getFeed());
-                    Feed feed = (Feed) ConfigurationStore.get().get(EntityType.FEED, output.getFeed());
+                    Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed());
                     CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
                     CrossEntityValidations.validateInstance(process, output, feed);
                 }
@@ -96,7 +97,7 @@ public class ProcessEntityParser extends EntityParser<Process> {
         validateLateInputs(process);
     }
 
-    private void validateHDFSpaths(Process process, String clusterName) throws FalconException {
+    private void validateHDFSPaths(Process process, String clusterName) throws FalconException {
         org.apache.falcon.entity.v0.cluster.Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER,
                 clusterName);
 
@@ -130,8 +131,7 @@ public class ProcessEntityParser extends EntityParser<Process> {
     }
 
     private String getNameNode(Cluster cluster, String clusterName) throws ValidationException {
-        // cluster should never be null as it is validated while submitting
-        // feeds.
+        // cluster should never be null as it is validated while submitting feeds.
         if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme() == null) {
             throw new ValidationException(
                     "Cannot get valid nameNode scheme from write interface of cluster: " + clusterName);
@@ -152,6 +152,19 @@ public class ProcessEntityParser extends EntityParser<Process> {
         }
     }
 
+    private void validateInputPartition(Input input, Feed feed) throws FalconException {
+        if (input.getPartition() == null) {
+            return;
+        }
+
+        final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed);
+        if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
+            CrossEntityValidations.validateInputPartition(input, feed);
+        } else if (baseFeedStorageType == Storage.TYPE.TABLE) {
+            throw new ValidationException("Input partitions are not supported for table storage: " + input.getName());
+        }
+    }
+
     private void validateDatasetName(Inputs inputs, Outputs outputs) throws ValidationException {
         Set<String> datasetNames = new HashSet<String>();
         if (inputs != null) {
@@ -172,6 +185,10 @@ public class ProcessEntityParser extends EntityParser<Process> {
     }
 
     private void validateLateInputs(Process process) throws ValidationException {
+        if (process.getLateProcess() == null) {
+            return;
+        }
+
         Map<String, String> feeds = new HashMap<String, String>();
         if (process.getInputs() != null) {
             for (Input in : process.getInputs().getInputs()) {
@@ -179,21 +196,27 @@ public class ProcessEntityParser extends EntityParser<Process> {
             }
         }
 
-        if (process.getLateProcess() != null) {
-            for (LateInput lp : process.getLateProcess().getLateInputs()) {
-                if (!feeds.keySet().contains(lp.getInput())) {
-                    throw new ValidationException("Late Input: " + lp.getInput() + " is not specified in the inputs");
-                }
-                try {
-                    Feed feed = (Feed) ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput()));
-                    if (feed.getLateArrival() == null) {
-                        throw new ValidationException(
-                                "Late Input feed: " + lp.getInput() + " is not configured with late arrival cut-off");
-                    }
-                } catch (FalconException e) {
-                    throw new ValidationException(e);
+        for (LateInput lp : process.getLateProcess().getLateInputs()) {
+            if (!feeds.keySet().contains(lp.getInput())) {
+                throw new ValidationException("Late Input: " + lp.getInput() + " is not specified in the inputs");
+            }
+
+            try {
+                Feed feed = ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput()));
+                if (feed.getLateArrival() == null) {
+                    throw new ValidationException(
+                            "Late Input feed: " + lp.getInput() + " is not configured with late arrival cut-off");
                 }
+            } catch (FalconException e) {
+                throw new ValidationException(e);
             }
         }
     }
+
+    private void validateOptionalInputsForTableStorage(Feed feed, Input input) throws FalconException {
+        if (input.isOptional() && FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
+            throw new ValidationException("Optional Input is not supported for feeds with table storage! "
+                    + input.getName());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/group/FeedGroup.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroup.java b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
index 5dca46f..d288925 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroup.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.group;
 
+import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.common.FeedDataPath;
 import org.apache.falcon.entity.v0.Frequency;
@@ -93,8 +94,9 @@ public class FeedGroup {
         return datePattern;
     }
 
-    public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) {
+    public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException {
         return this.frequency.equals(feed.getFrequency())
-                && this.datePattern.equals(getDatePattern(FeedHelper.getLocation(feed, LocationType.DATA).getPath()));
+                && this.datePattern.equals(getDatePattern(
+                    FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA)));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
index f0d2e0b..7fbb61a 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
@@ -114,8 +114,8 @@ public final class FeedGroupMap implements ConfigurationChangeListener {
         return groupSet;
     }
 
-    public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed) {
+    public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException {
         return getGroups(feed.getGroups(), feed.getFrequency(),
-                FeedHelper.getLocation(feed, LocationType.DATA).getPath());
+                FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index a9d39de..fc69933 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -21,10 +21,10 @@ package org.apache.falcon.update;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Partition;
 import org.apache.falcon.entity.v0.feed.Partitions;
 import org.apache.falcon.entity.v0.process.Cluster;
@@ -78,18 +78,15 @@ public final class UpdateHelper {
         }
     }
 
-    public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedProcess) {
-        if (!FeedHelper.getLocation(oldFeed.getLocations(), LocationType.DATA)
-            .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.DATA).getPath())
-                || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.META)
-                    .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.META).getPath())
-                || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.STATS)
-                    .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.STATS).getPath())
-                || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.TMP)
-                    .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.TMP).getPath())) {
+    public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedProcess)
+        throws FalconException {
+        Storage oldFeedStorage = FeedHelper.createStorage(oldFeed);
+        Storage newFeedStorage = FeedHelper.createStorage(newFeed);
+
+        if (!oldFeedStorage.isIdentical(newFeedStorage)) {
             return true;
         }
-        LOG.debug(oldFeed.toShortString() + ": Location identical. Ignoring...");
+        LOG.debug(oldFeed.toShortString() + ": Storage identical. Ignoring...");
 
         if (!oldFeed.getFrequency().equals(newFeed.getFrequency())) {
             return true;
@@ -128,17 +125,12 @@ public final class UpdateHelper {
         }
 
         for (Cluster cluster : affectedProcess.getClusters().getClusters()) {
-            if (!FeedHelper
-                    .getCluster(oldFeed, cluster.getName()).getValidity().getStart()
+            oldFeedStorage = FeedHelper.createStorage(cluster.getName(), oldFeed);
+            newFeedStorage = FeedHelper.createStorage(cluster.getName(), newFeed);
+
+            if (!FeedHelper.getCluster(oldFeed, cluster.getName()).getValidity().getStart()
                     .equals(FeedHelper.getCluster(newFeed, cluster.getName()).getValidity().getStart())
-                    || !FeedHelper.getLocation(oldFeed, LocationType.DATA, cluster.getName()).getPath()
-                    .equals(FeedHelper.getLocation(newFeed, LocationType.DATA, cluster.getName()).getPath())
-                    || !FeedHelper.getLocation(oldFeed, LocationType.META, cluster.getName()).getPath()
-                    .equals(FeedHelper.getLocation(newFeed, LocationType.META, cluster.getName()).getPath())
-                    || !FeedHelper.getLocation(oldFeed, LocationType.STATS, cluster.getName()).getPath()
-                    .equals(FeedHelper.getLocation(newFeed, LocationType.STATS, cluster.getName()).getPath())
-                    || !FeedHelper.getLocation(oldFeed, LocationType.TMP, cluster.getName()).getPath()
-                    .equals(FeedHelper.getLocation(newFeed, LocationType.TMP, cluster.getName()).getPath())) {
+                    || !oldFeedStorage.isIdentical(newFeedStorage)) {
                 return true;
             }
             LOG.debug(oldFeed.toShortString() + ": Feed on cluster" + cluster.getName() + " identical. Ignoring...");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 17695d2..b86a715 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -81,7 +81,7 @@ public abstract class AbstractWorkflowEngine {
 
     public abstract String getWorkflowStatus(String cluster, String jobId) throws FalconException;
 
-    public abstract String getWorkflowProperty(String cluster, String jobId, String property) throws FalconException;
+    public abstract Properties getWorkflowProperties(String cluster, String jobId) throws FalconException;
 
     public abstract InstancesResult getJobDetails(String cluster, String jobId) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/log4j.xml b/common/src/main/resources/log4j.xml
index 959e26c..734d17c 100644
--- a/common/src/main/resources/log4j.xml
+++ b/common/src/main/resources/log4j.xml
@@ -28,7 +28,7 @@
     </appender>
 
     <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
-        <param name="File" value="${user.dir}/logs/application.log"/>
+        <param name="File" value="${user.dir}/target/logs/application.log"/>
         <param name="Append" value="true"/>
         <param name="Threshold" value="debug"/>
         <layout class="org.apache.log4j.PatternLayout">
@@ -37,7 +37,7 @@
     </appender>
 
     <appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender">
-        <param name="File" value="${user.dir}/logs/audit.log"/>
+        <param name="File" value="${user.dir}/target/logs/audit.log"/>
         <param name="Append" value="true"/>
         <param name="Threshold" value="debug"/>
         <layout class="org.apache.log4j.PatternLayout">
@@ -46,7 +46,7 @@
     </appender>
 
     <appender name="TRANSACTIONLOG" class="org.apache.log4j.DailyRollingFileAppender">
-        <param name="File" value="${user.dir}/logs/tranlog.log"/>
+        <param name="File" value="${user.dir}/target/logs/tranlog.log"/>
         <param name="Append" value="true"/>
         <param name="Threshold" value="debug"/>
         <layout class="org.apache.log4j.PatternLayout">
@@ -55,7 +55,7 @@
     </appender>
 
     <appender name="METRIC" class="org.apache.log4j.DailyRollingFileAppender">
-        <param name="File" value="${user.dir}/logs/metric.log"/>
+        <param name="File" value="${user.dir}/target/logs/metric.log"/>
         <param name="Append" value="true"/>
         <param name="Threshold" value="debug"/>
         <layout class="org.apache.log4j.PatternLayout">

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index fea2a31..5473f5d 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -27,6 +27,8 @@
 *.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager
 *.ConfigSyncService.impl=org.apache.falcon.resource.ConfigSyncService
 *.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager
+*.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService
+
 *.application.services=org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.service.ProcessSubscriberService,\
                         org.apache.falcon.rerun.service.RetryService,\

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
index fa21a90..10a9cc0 100644
--- a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
@@ -17,14 +17,17 @@
  */
 package org.apache.falcon.cleanup;
 
-import java.io.IOException;
-
 import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -33,6 +36,9 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import java.io.IOException;
+import java.io.InputStream;
+
 /**
  * Test for log cleanup service.
  */
@@ -41,6 +47,11 @@ public class LogCleanupServiceTest extends AbstractTestBase {
     private FileSystem fs;
     private FileSystem tfs;
     private EmbeddedCluster targetDfsCluster;
+    private Path sourceStagingPath1;
+    private Path sourceStagingPath2;
+    private Path targetStagingPath1;
+    private Path targetStagingPath2;
+
     private final Path instanceLogPath = new Path("/projects/falcon/staging/falcon/workflows/process/"
         + "sample" + "/logs/job-2010-01-01-01-00/000");
     private final Path instanceLogPath1 = new Path("/projects/falcon/staging/falcon/workflows/process/"
@@ -107,7 +118,43 @@ public class LogCleanupServiceTest extends AbstractTestBase {
         fs.createNewFile(new Path(feedInstanceLogPath, "oozie.log"));
         tfs.createNewFile(new Path(feedInstanceLogPath, "oozie.log"));
 
+        // table feed staging dir setup
+        initializeStagingDirs();
+        createStageData(sourceStagingPath1, targetStagingPath1);
+
         Thread.sleep(61000);
+
+        createStageData(sourceStagingPath2, targetStagingPath2);
+    }
+
+    private void initializeStagingDirs() throws Exception {
+        final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
+        Feed tableFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(inputStream);
+        getStore().publish(EntityType.FEED, tableFeed);
+
+        final Cluster srcCluster = dfsCluster.getCluster();
+        final CatalogStorage sourceStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
+        String sourceStagingDir = FeedHelper.getStagingDir(srcCluster, tableFeed, sourceStorage, Tag.REPLICATION);
+
+        sourceStagingPath1 = new Path(sourceStagingDir + "/ds=2012092400/" + System.currentTimeMillis());
+        sourceStagingPath2 = new Path(sourceStagingDir + "/ds=2012092500/" + System.currentTimeMillis());
+
+        final Cluster targetCluster = targetDfsCluster.getCluster();
+        final CatalogStorage targetStorage = (CatalogStorage) FeedHelper.createStorage(targetCluster, tableFeed);
+        String targetStagingDir = FeedHelper.getStagingDir(targetCluster, tableFeed, targetStorage, Tag.REPLICATION);
+
+        targetStagingPath1 = new Path(targetStagingDir + "/ds=2012092400/" + System.currentTimeMillis());
+        targetStagingPath2 = new Path(targetStagingDir + "/ds=2012092500/" + System.currentTimeMillis());
+    }
+
+    private void createStageData(Path sourcePath, Path targetPath) throws Exception {
+        fs.mkdirs(sourcePath);
+        fs.createNewFile(new Path(sourcePath, "_metadata.xml"));
+        fs.createNewFile(new Path(sourcePath, "data.txt"));
+
+        tfs.mkdirs(targetPath);
+        tfs.createNewFile(new Path(targetPath, "_metadata.xml"));
+        tfs.createNewFile(new Path(targetPath, "data.txt"));
     }
 
     @Test
@@ -120,7 +167,6 @@ public class LogCleanupServiceTest extends AbstractTestBase {
         Assert.assertFalse(fs.exists(instanceLogPath1));
         Assert.assertFalse(fs.exists(instanceLogPath2));
         Assert.assertTrue(fs.exists(instanceLogPath3));
-
     }
 
     @Test
@@ -134,5 +180,18 @@ public class LogCleanupServiceTest extends AbstractTestBase {
         Assert.assertTrue(fs.exists(feedInstanceLogPath1));
         Assert.assertTrue(tfs.exists(feedInstanceLogPath1));
 
+        // source table replication staging dirs
+        Assert.assertFalse(fs.exists(new Path(sourceStagingPath1, "_metadata.xml")));
+        Assert.assertFalse(fs.exists(new Path(sourceStagingPath1, "data.txt")));
+
+        Assert.assertTrue(fs.exists(new Path(sourceStagingPath2, "_metadata.xml")));
+        Assert.assertTrue(fs.exists(new Path(sourceStagingPath2, "data.txt")));
+
+        // target table replication staging dirs
+        Assert.assertFalse(tfs.exists(new Path(targetStagingPath1, "_metadata.xml")));
+        Assert.assertFalse(tfs.exists(new Path(targetStagingPath1, "data.txt")));
+
+        Assert.assertTrue(tfs.exists(new Path(targetStagingPath2, "_metadata.xml")));
+        Assert.assertTrue(tfs.exists(new Path(targetStagingPath2, "data.txt")));
     }
 }