You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/09/01 19:15:43 UTC

git commit: FALCON-87 Hive table integration with feed entity. Contributed by Venkatesh Seetharam

Updated Branches:
  refs/heads/master edf03d4a8 -> 1e6ace432


FALCON-87 Hive table integration with feed entity. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/1e6ace43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/1e6ace43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/1e6ace43

Branch: refs/heads/master
Commit: 1e6ace4327910250f60e6d1e41ce5b1726bf4a4f
Parents: edf03d4
Author: srikanth.sundarrajan <sr...@inmobi.com>
Authored: Sun Sep 1 21:53:53 2013 +0530
Committer: srikanth.sundarrajan <sr...@inmobi.com>
Committed: Sun Sep 1 21:53:53 2013 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 client/src/main/resources/feed-0.1.xsd          |  22 ++-
 .../falcon/catalog/AbstractCatalogService.java  |   4 +
 .../falcon/catalog/HiveCatalogService.java      |  32 ++-
 .../apache/falcon/entity/CatalogStorage.java    | 194 +++++++++++++++++++
 .../org/apache/falcon/entity/FeedHelper.java    | 101 ++++++++--
 .../apache/falcon/entity/FileSystemStorage.java | 124 ++++++++++++
 .../java/org/apache/falcon/entity/Storage.java  |  74 +++++++
 .../falcon/entity/parser/FeedEntityParser.java  |  45 ++++-
 .../java/org/apache/falcon/group/FeedGroup.java |   6 +-
 .../org/apache/falcon/group/FeedGroupMap.java   |   6 +-
 .../org/apache/falcon/update/UpdateHelper.java  |  34 ++--
 .../falcon/entity/CatalogStorageTest.java       | 129 ++++++++++++
 .../falcon/entity/FileSystemStorageTest.java    | 133 +++++++++++++
 .../entity/parser/FeedEntityParserTest.java     |  25 ++-
 .../apache/falcon/update/UpdateHelperTest.java  |  24 ++-
 .../resources/config/feed/hive-table-feed.xml   |  48 +++++
 .../test/resources/config/feed/invalid-feed.xml |  53 +++++
 .../falcon/converter/OozieFeedMapper.java       |  70 +++----
 .../falcon/converter/OozieFeedMapperTest.java   |  32 ++-
 .../falcon/converter/OozieProcessMapper.java    |   6 +-
 .../workflow/OozieProcessWorkflowBuilder.java   |   3 +-
 .../converter/OozieProcessMapperTest.java       |  10 +-
 .../org/apache/falcon/resource/TestContext.java |   8 +-
 24 files changed, 1063 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b2dc1d3..046fc27 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,9 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-87 Hive table integration with feed entity. (Venkatesh
+    Seetharam via Srikanth Sundarrajan)
+
     FALCON-86 Hive table integration with cluster entity. (Venaktesh 
     Seetharam via Srikanth Sundarrajan)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/client/src/main/resources/feed-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd
index bf6fa81..00b5172 100644
--- a/client/src/main/resources/feed-0.1.xsd
+++ b/client/src/main/resources/feed-0.1.xsd
@@ -105,7 +105,10 @@
             </xs:element>
             <xs:element type="late-arrival" name="late-arrival" minOccurs="0"/>
             <xs:element type="clusters" name="clusters"/>
-            <xs:element type="locations" name="locations"/>
+            <xs:choice minOccurs="1" maxOccurs="1">
+                <xs:element type="locations" name="locations"/>
+                <xs:element type="catalog-table" name="table"/>
+            </xs:choice>
             <xs:element type="ACL" name="ACL"/>
             <xs:element type="schema" name="schema"/>
             <xs:element type="properties" name="properties" minOccurs="0"/>
@@ -139,7 +142,10 @@
         <xs:sequence>
             <xs:element type="validity" name="validity"/>
             <xs:element type="retention" name="retention"/>
-            <xs:element type="locations" name="locations" minOccurs="0"/>
+            <xs:choice minOccurs="0" maxOccurs="1">
+                <xs:element type="locations" name="locations" minOccurs="0"/>
+                <xs:element type="catalog-table" name="table"/>
+            </xs:choice>
         </xs:sequence>
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
         <xs:attribute type="cluster-type" name="type" use="optional"/>
@@ -194,7 +200,7 @@
     <xs:complexType name="locations">
         <xs:annotation>
             <xs:documentation>
-                A list of locations.
+                A list of locations on the file system.
             </xs:documentation>
         </xs:annotation>
         <xs:choice maxOccurs="unbounded" minOccurs="0">
@@ -356,4 +362,14 @@
             <xs:pattern value="(\w+=[^,]+)?([,]?[ ]*[\w]+=[^,]+)*"/>
         </xs:restriction>
     </xs:simpleType>
+    <xs:complexType name="catalog-table">
+        <xs:annotation>
+            <xs:documentation>
+                catalog specifies the uri of a Hive table along with the partition spec.
+                uri="catalog:$database:$table#(partition-key=partition-value);+"
+                Example: catalog:logs-db:clicks#ds=${YEAR}-${MONTH}-${DAY}
+            </xs:documentation>
+        </xs:annotation>
+        <xs:attribute type="xs:string" name="uri" use="required"/>
+    </xs:complexType>
 </xs:schema>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
index 4086611..a4b6cfd 100644
--- a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
@@ -23,6 +23,7 @@ import org.apache.falcon.FalconException;
 /**
  * Interface definition for a catalog registry service
  * such as Hive or HCatalog.
+ * Catalog should minimally support the following operations.
  */
 public abstract class AbstractCatalogService {
 
@@ -34,4 +35,7 @@ public abstract class AbstractCatalogService {
      * @throws FalconException exception
      */
     public abstract boolean isAlive(String catalogBaseUrl) throws FalconException;
+
+    public abstract boolean tableExists(String catalogUrl, String database, String tableName)
+        throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
index e6f7fe2..67d14af 100644
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -22,6 +22,7 @@ import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.log4j.Logger;
 
 import javax.ws.rs.core.MediaType;
@@ -40,7 +41,36 @@ public class HiveCatalogService extends AbstractCatalogService {
 
         Client client = Client.create();
         WebResource service = client.resource(catalogBaseUrl);
-        ClientResponse response = service.path("status").accept(MediaType.APPLICATION_JSON).head();
+        ClientResponse response = service.path("status")
+                .accept(MediaType.APPLICATION_JSON)
+                .head();
+                // .get(ClientResponse.class);    // todo this isnt working
+
+        if (LOG.isDebugEnabled() && response.getStatus() != 200) {
+            LOG.debug("Output from Server .... \n" + response.getEntity(String.class));
+        }
+
+        return response.getStatus() == 200;
+    }
+
+    @Override
+    public boolean tableExists(String catalogUrl, String database, String tableName)
+        throws FalconException {
+        LOG.info("Checking if the table exists: " + tableName);
+
+        Client client = Client.create();
+        WebResource service = client.resource(catalogUrl);
+
+        ClientResponse response = service.path("ddl/database/").path(database)
+                .path("/table").path(tableName)
+                .queryParam("user.name", CurrentUser.getUser())
+                .accept(MediaType.APPLICATION_JSON)
+                .get(ClientResponse.class);
+
+        if (LOG.isDebugEnabled() && response.getStatus() != 200) {
+            LOG.debug("Output from Server .... \n" + response.getEntity(String.class));
+        }
+
         return response.getStatus() == 200;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
new file mode 100644
index 0000000..5c64f27
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.catalog.CatalogServiceFactory;
+import org.apache.falcon.entity.v0.feed.LocationType;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A catalog registry implementation of a feed storage.
+ */
+public class CatalogStorage implements Storage {
+
+    public static final String PARTITION_SEPARATOR = ";";
+    public static final String PARTITION_KEYVAL_SEPARATOR = "=";
+    public static final String INPUT_PATH_SEPARATOR = ":";
+    public static final String OUTPUT_PATH_SEPARATOR = "/";
+
+    private final String catalogUrl;
+    private String database;
+    private String table;
+    private Map<String, String> partitions;
+
+    protected CatalogStorage(String catalogTable) throws URISyntaxException {
+        this("${hcatNode}", catalogTable);
+    }
+
+    protected CatalogStorage(String catalogUrl, String tableUri) throws URISyntaxException {
+        if (catalogUrl == null || catalogUrl.length() == 0) {
+            throw new IllegalArgumentException("Catalog Registry URL cannot be null or empty");
+        }
+
+        this.catalogUrl = catalogUrl;
+
+        parse(tableUri);
+    }
+
+    /**
+     * Validate URI to conform to catalog:$database:$table#$partitions.
+     * scheme=catalog:database=$database:table=$table#$partitions
+     * partitions=key=value;key=value
+     *
+     * @param catalogTableUri table URI to parse and validate
+     * @throws URISyntaxException
+     */
+    private void parse(String catalogTableUri) throws URISyntaxException {
+
+        URI tableUri = new URI(catalogTableUri);
+
+        if (!"catalog".equals(tableUri.getScheme())) {
+            throw new URISyntaxException(tableUri.toString(), "catalog scheme is missing");
+        }
+
+        final String schemeSpecificPart = tableUri.getSchemeSpecificPart();
+        if (schemeSpecificPart == null) {
+            throw new URISyntaxException(tableUri.toString(), "Database and Table are missing");
+        }
+
+        String[] paths = schemeSpecificPart.split(INPUT_PATH_SEPARATOR);
+
+        if (paths.length != 2) {
+            throw new URISyntaxException(tableUri.toString(), "URI path is not in expected format: database:table");
+        }
+
+        database = paths[0];
+        table = paths[1];
+
+        if (database == null || database.length() == 0) {
+            throw new URISyntaxException(tableUri.toString(), "DB name is missing");
+        }
+        if (table == null || table.length() == 0) {
+            throw new URISyntaxException(tableUri.toString(), "Table name is missing");
+        }
+
+        String partRaw = tableUri.getFragment();
+        if (partRaw == null || partRaw.length() == 0) {
+            throw new URISyntaxException(tableUri.toString(), "Partition details are missing");
+        }
+
+        partitions = new HashMap<String, String>();
+        String[] parts = partRaw.split(PARTITION_SEPARATOR);
+        for (String part : parts) {
+            if (part == null || part.length() == 0) {
+                continue;
+            }
+
+            String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR);
+            if (keyVal.length != 2) {
+                throw new URISyntaxException(tableUri.toString(),
+                        "Partition key value pair is not specified properly in (" + part + ")");
+            }
+
+            partitions.put(keyVal[0], keyVal[1]);
+        }
+    }
+
+    public String getCatalogUrl() {
+        return catalogUrl;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public Map<String, String> getPartitions() {
+        return partitions;
+    }
+
+    /**
+     * @param key partition key
+     * @return partition value
+     */
+    public String getPartitionValue(String key) {
+        return partitions.get(key);
+    }
+
+    /**
+     * @param key partition key
+     * @return if partitions map includes the key or not
+     */
+    public boolean hasPartition(String key) {
+        return partitions.containsKey(key);
+    }
+
+    @Override
+    public TYPE getType() {
+        return TYPE.TABLE;
+    }
+
+    @Override
+    public String getUriTemplate() {
+        return getUriTemplate(LocationType.DATA);
+    }
+
+    @Override
+    public String getUriTemplate(LocationType locationType) {
+        StringBuilder uriTemplate = new StringBuilder();
+        uriTemplate.append(catalogUrl);
+        uriTemplate.append(OUTPUT_PATH_SEPARATOR);
+        uriTemplate.append(database);
+        uriTemplate.append(OUTPUT_PATH_SEPARATOR);
+        uriTemplate.append(table);
+        uriTemplate.append(OUTPUT_PATH_SEPARATOR);
+        for (Map.Entry<String, String> entry : partitions.entrySet()) {
+            uriTemplate.append(entry.getKey());
+            uriTemplate.append(PARTITION_KEYVAL_SEPARATOR);
+            uriTemplate.append(entry.getValue());
+            uriTemplate.append(PARTITION_SEPARATOR);
+        }
+        uriTemplate.setLength(uriTemplate.length() - 1);
+
+        return uriTemplate.toString();
+    }
+
+    @Override
+    public boolean exists() throws FalconException {
+        return CatalogServiceFactory.getCatalogService().tableExists(catalogUrl, database, table);
+    }
+
+    @Override
+    public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
+        CatalogStorage catalogStorage = (CatalogStorage) toCompareAgainst;
+
+        return !(getCatalogUrl() != null && !getCatalogUrl().equals(catalogStorage.getCatalogUrl()))
+                && getDatabase().equals(catalogStorage.getDatabase())
+                && getTable().equals(catalogStorage.getTable())
+                && getPartitions().equals(catalogStorage.getPartitions());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index c96120d..d212a98 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -20,11 +20,19 @@ package org.apache.falcon.entity;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.entity.v0.feed.*;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.Locations;
 import org.apache.falcon.expression.ExpressionHelper;
 
+import java.net.URISyntaxException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -44,32 +52,87 @@ public final class FeedHelper {
         return null;
     }
 
-    public static Location getLocation(Feed feed, LocationType type,
-                                       String clusterName) {
-        Cluster cluster = getCluster(feed, clusterName);
-        if (cluster != null && cluster.getLocations() != null
-                && cluster.getLocations().getLocations().size() != 0) {
-            return getLocation(cluster.getLocations(), type);
-        } else {
-            return getLocation(feed.getLocations(), type);
+    public static Storage createStorage(Feed feed) throws FalconException {
+
+        final List<Location> locations = feed.getLocations().getLocations();
+        if (locations != null) {
+            return new FileSystemStorage(locations);
+        }
+
+        try {
+            final CatalogTable table = feed.getTable();
+            if (table != null) {
+                return new CatalogStorage(table.getUri());
+            }
+        } catch (URISyntaxException e) {
+            throw new FalconException(e);
         }
 
+        throw new FalconException("Both catalog and locations are not defined.");
     }
 
-    public static Location getLocation(Feed feed, LocationType type) {
-        return getLocation(feed.getLocations(), type);
+    public static Storage createStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
+                                        Feed feed) throws FalconException {
+        return createStorage(getCluster(feed, clusterEntity.getName()), feed, clusterEntity);
+    }
+
+    public static Storage createStorage(String clusterName, Feed feed)
+        throws FalconException {
+
+        return createStorage(getCluster(feed, clusterName), feed);
+    }
+
+    public static Storage createStorage(Cluster cluster, Feed feed)
+        throws FalconException {
+
+        final org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
+                EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
+
+        return createStorage(cluster, feed, clusterEntity);
     }
 
-    public static Location getLocation(Locations locations, LocationType type) {
-        for (Location loc : locations.getLocations()) {
-            if (loc.getType() == type) {
-                return loc;
+    public static Storage createStorage(Cluster cluster, Feed feed,
+                                        org.apache.falcon.entity.v0.cluster.Cluster clusterEntity)
+        throws FalconException {
+
+        final List<Location> locations = getLocations(cluster, feed);
+        if (locations != null) {
+            return new FileSystemStorage(ClusterHelper.getStorageUrl(clusterEntity), locations);
+        }
+
+        try {
+            final CatalogTable table = getTable(cluster, feed);
+            if (table != null) {
+                return new CatalogStorage(
+                        ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint(),
+                        table.getUri());
             }
+        } catch (URISyntaxException e) {
+            throw new FalconException(e);
         }
-        Location loc = new Location();
-        loc.setPath("/tmp");
-        loc.setType(type);
-        return loc;
+
+        throw new FalconException("Both catalog and locations are not defined.");
+    }
+
+    private static List<Location> getLocations(Cluster cluster, Feed feed) {
+        // check if locations are overridden in cluster
+        final Locations clusterLocations = cluster.getLocations();
+        if (clusterLocations != null
+                && clusterLocations.getLocations().size() != 0) {
+            return clusterLocations.getLocations();
+        }
+
+        final Locations feedLocations = feed.getLocations();
+        return feedLocations == null ? null : feedLocations.getLocations();
+    }
+
+    private static CatalogTable getTable(Cluster cluster, Feed feed) {
+        // check if table is overridden in cluster
+        if (cluster.getTable() != null) {
+            return cluster.getTable();
+        }
+
+        return feed.getTable();
     }
 
     public static String normalizePartitionExpression(String part1, String part2) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
new file mode 100644
index 0000000..95b18c5
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+
+import java.util.List;
+
+/**
+ * A file system implementation of a feed storage.
+ */
+public class FileSystemStorage implements Storage {
+
+    private final String storageUrl;
+    private final List<Location> locations;
+
+    protected FileSystemStorage(List<Location> locations) {
+        this("${nameNode}", locations);
+    }
+
+    protected FileSystemStorage(String storageUrl, List<Location> locations) {
+        if (storageUrl == null || storageUrl.length() == 0) {
+            throw new IllegalArgumentException("FileSystem URL cannot be null or empty");
+        }
+
+        if (locations == null || locations.size() == 0) {
+            throw new IllegalArgumentException("FileSystem Locations cannot be null or empty");
+        }
+
+        this.storageUrl = storageUrl;
+        this.locations = locations;
+    }
+
+    @Override
+    public TYPE getType() {
+        return TYPE.FILESYSTEM;
+    }
+
+    public String getStorageUrl() {
+        return storageUrl;
+    }
+
+    public List<Location> getLocations() {
+        return locations;
+    }
+
+    @Override
+    public String getUriTemplate() {
+        return getUriTemplate(LocationType.DATA);
+    }
+
+    @Override
+    public String getUriTemplate(LocationType locationType) {
+        Location locationForType = null;
+        for (Location location : locations) {
+            if (location.getType() == locationType) {
+                locationForType = location;
+                break;
+            }
+        }
+
+        if (locationForType == null) {
+            return "/tmp";
+        }
+
+        StringBuilder uriTemplate = new StringBuilder();
+        uriTemplate.append(storageUrl);
+        uriTemplate.append(locationForType.getPath());
+        return uriTemplate.toString();
+    }
+
+    @Override
+    public boolean exists() throws FalconException {
+        // Directories on FS will be created if they don't exist.
+        return true;
+    }
+
+    @Override
+    public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
+        FileSystemStorage fsStorage = (FileSystemStorage) toCompareAgainst;
+        final List<Location> fsStorageLocations = fsStorage.getLocations();
+
+        return getLocations().size() == fsStorageLocations.size()
+               && getLocation(getLocations(), LocationType.DATA).getPath().equals(
+                   getLocation(fsStorageLocations, LocationType.DATA).getPath())
+               && getLocation(getLocations(), LocationType.META).getPath().equals(
+                   getLocation(fsStorageLocations, LocationType.META).getPath())
+               && getLocation(getLocations(), LocationType.STATS).getPath().equals(
+                   getLocation(fsStorageLocations, LocationType.STATS).getPath())
+               && getLocation(getLocations(), LocationType.TMP).getPath().equals(
+                   getLocation(fsStorageLocations, LocationType.TMP).getPath());
+    }
+
+    private static Location getLocation(List<Location> locations, LocationType type) {
+        for (Location loc : locations) {
+            if (loc.getType() == type) {
+                return loc;
+            }
+        }
+
+        Location loc = new Location();
+        loc.setPath("/tmp");
+        loc.setType(type);
+        return loc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/entity/Storage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java
new file mode 100644
index 0000000..18dad65
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/Storage.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.feed.LocationType;
+
+/**
+ * A class to encapsulate the storage for a given feed which can either be
+ * expressed as a path on the file system or a table in a catalog.
+ */
+public interface Storage {
+
+    /**
+     * Enumeration for the various storage types.
+     */
+    enum TYPE {FILESYSTEM, TABLE}
+
+    /**
+     * Return the type of storage.
+     *
+     * @return storage type
+     */
+    TYPE getType();
+
+    /**
+     * Return the uri template.
+     *
+     * @return uri template
+     */
+    String getUriTemplate();
+
+    /**
+     * Return the uri template for a given location type.
+     *
+     * @param locationType type of location, applies only to filesystem type
+     * @return uri template
+     */
+    String getUriTemplate(LocationType locationType);
+
+    /**
+     * Check if the storage, filesystem location or catalog table exists.
+     * Filesystem location always returns true.
+     *
+     * @return true if table exists else false
+     * @throws FalconException an exception
+     */
+    boolean exists() throws FalconException;
+
+    /**
+     * Check for equality of this instance against the one in question.
+     *
+     * @param toCompareAgainst instance to compare
+     * @return true if identical else false
+     * @throws FalconException an exception
+     */
+    boolean isIdentical(Storage toCompareAgainst) throws FalconException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 1c323fd..d0435fb 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -20,8 +20,10 @@ package org.apache.falcon.entity.parser;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityGraph;
@@ -29,7 +31,6 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Cluster;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
@@ -71,6 +72,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
             validateFeedCutOffPeriod(feed, cluster);
         }
 
+        validateFeedStorage(feed);
         validateFeedPartitionExpression(feed);
         validateFeedGroups(feed);
 
@@ -105,21 +107,19 @@ public class FeedEntityParser extends EntityParser<Feed> {
         return processes;
     }
 
-    private void validateFeedGroups(Feed feed) throws ValidationException {
+    private void validateFeedGroups(Feed feed) throws FalconException {
         String[] groupNames = feed.getGroups() != null ? feed.getGroups().split(",") : new String[]{};
-        String defaultPath = FeedHelper.getLocation(feed, LocationType.DATA)
-                .getPath();
+        String defaultPath = FeedHelper.createStorage(feed).getUriTemplate();
         for (Cluster cluster : feed.getClusters().getClusters()) {
-            if (!FeedGroup.getDatePattern(
-                    FeedHelper.getLocation(feed, LocationType.DATA,
-                            cluster.getName()).getPath()).equals(
+            final String uriTemplate = FeedHelper.createStorage(cluster, feed).getUriTemplate();
+            if (!FeedGroup.getDatePattern(uriTemplate).equals(
                     FeedGroup.getDatePattern(defaultPath))) {
                 throw new ValidationException("Feeds default path pattern: "
-                        + FeedHelper.getLocation(feed, LocationType.DATA).getPath()
+                        + FeedHelper.createStorage(feed).getUriTemplate()
                         + ", does not match with cluster: "
                         + cluster.getName()
                         + " path pattern: "
-                        + FeedHelper.getLocation(feed, LocationType.DATA, cluster.getName()).getPath());
+                        + uriTemplate);
             }
         }
         for (String groupName : groupNames) {
@@ -127,7 +127,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
             if (group != null && !group.canContainFeed(feed)) {
                 throw new ValidationException(
                         "Feed " + feed.getName() + "'s frequency: " + feed.getFrequency().toString()
-                                + ", path pattern: " + FeedHelper.getLocation(feed, LocationType.DATA).getPath()
+                                + ", path pattern: " + FeedHelper.createStorage(feed)
                                 + " does not match with group: " + group.getName() + "'s frequency: "
                                 + group.getFrequency()
                                 + ", date pattern: " + group.getDatePattern());
@@ -280,4 +280,29 @@ public class FeedEntityParser extends EntityParser<Feed> {
                     "Alteast one of the partition tags has to be a cluster expression for cluster " + cl.getName());
         }
     }
+
+    /**
+     * Ensure table is already defined in the catalog registry.
+     * Does not matter for FileSystem storage.
+     */
+    private void validateFeedStorage(Feed feed) throws FalconException {
+        StringBuilder buffer = new StringBuilder();
+        for (Cluster cluster : feed.getClusters().getClusters()) {
+            final Storage storage = FeedHelper.createStorage(cluster, feed);
+            if (!storage.exists()) {
+                // this is only true for table, filesystem always returns true
+                CatalogStorage catalogStorage = (CatalogStorage) storage;
+                buffer.append("Table [")
+                        .append(catalogStorage.getTable())
+                        .append("] does not exist for feed: ")
+                        .append(feed.getName())
+                        .append(", cluster: ")
+                        .append(cluster.getName());
+            }
+        }
+
+        if (buffer.length() > 0) {
+            throw new ValidationException(buffer.toString());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/group/FeedGroup.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroup.java b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
index 5dca46f..d517828 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroup.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
@@ -17,10 +17,10 @@
  */
 package org.apache.falcon.group;
 
+import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.common.FeedDataPath;
 import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.LocationType;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -93,8 +93,8 @@ public class FeedGroup {
         return datePattern;
     }
 
-    public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) {
+    public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException {
         return this.frequency.equals(feed.getFrequency())
-                && this.datePattern.equals(getDatePattern(FeedHelper.getLocation(feed, LocationType.DATA).getPath()));
+                && this.datePattern.equals(getDatePattern(FeedHelper.createStorage(feed).getUriTemplate()));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
index ed44b48..d054873 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
@@ -24,7 +24,6 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.service.ConfigurationChangeListener;
 
 import java.util.Collections;
@@ -114,8 +113,9 @@ public final class FeedGroupMap implements ConfigurationChangeListener {
         return groupSet;
     }
 
-    public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed) {
+    public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed)
+        throws FalconException {
         return getGroups(feed.getGroups(), feed.getFrequency(),
-                FeedHelper.getLocation(feed, LocationType.DATA).getPath());
+                FeedHelper.createStorage(feed).getUriTemplate());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index a9d39de..fc69933 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -21,10 +21,10 @@ package org.apache.falcon.update;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Partition;
 import org.apache.falcon.entity.v0.feed.Partitions;
 import org.apache.falcon.entity.v0.process.Cluster;
@@ -78,18 +78,15 @@ public final class UpdateHelper {
         }
     }
 
-    public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedProcess) {
-        if (!FeedHelper.getLocation(oldFeed.getLocations(), LocationType.DATA)
-            .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.DATA).getPath())
-                || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.META)
-                    .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.META).getPath())
-                || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.STATS)
-                    .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.STATS).getPath())
-                || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.TMP)
-                    .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.TMP).getPath())) {
+    public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedProcess)
+        throws FalconException {
+        Storage oldFeedStorage = FeedHelper.createStorage(oldFeed);
+        Storage newFeedStorage = FeedHelper.createStorage(newFeed);
+
+        if (!oldFeedStorage.isIdentical(newFeedStorage)) {
             return true;
         }
-        LOG.debug(oldFeed.toShortString() + ": Location identical. Ignoring...");
+        LOG.debug(oldFeed.toShortString() + ": Storage identical. Ignoring...");
 
         if (!oldFeed.getFrequency().equals(newFeed.getFrequency())) {
             return true;
@@ -128,17 +125,12 @@ public final class UpdateHelper {
         }
 
         for (Cluster cluster : affectedProcess.getClusters().getClusters()) {
-            if (!FeedHelper
-                    .getCluster(oldFeed, cluster.getName()).getValidity().getStart()
+            oldFeedStorage = FeedHelper.createStorage(cluster.getName(), oldFeed);
+            newFeedStorage = FeedHelper.createStorage(cluster.getName(), newFeed);
+
+            if (!FeedHelper.getCluster(oldFeed, cluster.getName()).getValidity().getStart()
                     .equals(FeedHelper.getCluster(newFeed, cluster.getName()).getValidity().getStart())
-                    || !FeedHelper.getLocation(oldFeed, LocationType.DATA, cluster.getName()).getPath()
-                    .equals(FeedHelper.getLocation(newFeed, LocationType.DATA, cluster.getName()).getPath())
-                    || !FeedHelper.getLocation(oldFeed, LocationType.META, cluster.getName()).getPath()
-                    .equals(FeedHelper.getLocation(newFeed, LocationType.META, cluster.getName()).getPath())
-                    || !FeedHelper.getLocation(oldFeed, LocationType.STATS, cluster.getName()).getPath()
-                    .equals(FeedHelper.getLocation(newFeed, LocationType.STATS, cluster.getName()).getPath())
-                    || !FeedHelper.getLocation(oldFeed, LocationType.TMP, cluster.getName()).getPath()
-                    .equals(FeedHelper.getLocation(newFeed, LocationType.TMP, cluster.getName()).getPath())) {
+                    || !oldFeedStorage.isIdentical(newFeedStorage)) {
                 return true;
             }
             LOG.debug(oldFeed.toShortString() + ": Feed on cluster" + cluster.getName() + " identical. Ignoring...");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
new file mode 100644
index 0000000..458111d
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.net.URISyntaxException;
+
+/**
+ * Test class for Catalog Table Storage.
+ * Exists will be covered in integration tests as it actually checks if the table exists.
+ */
+public class CatalogStorageTest {
+
+    @Test
+    public void testGetType() throws Exception {
+        String table = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
+        CatalogStorage storage = new CatalogStorage(table);
+        Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
+    }
+
+    @Test
+    public void testParseVaildURI() throws URISyntaxException {
+        String table = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
+        CatalogStorage storage = new CatalogStorage(table);
+        Assert.assertEquals("${hcatNode}", storage.getCatalogUrl());
+        Assert.assertEquals("clicksdb", storage.getDatabase());
+        Assert.assertEquals("clicks", storage.getTable());
+        Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
+        Assert.assertEquals(2, storage.getPartitions().size());
+        Assert.assertEquals("us", storage.getPartitionValue("region"));
+        Assert.assertTrue(storage.hasPartition("region"));
+        Assert.assertNull(storage.getPartitionValue("unknown"));
+        Assert.assertFalse(storage.hasPartition("unknown"));
+    }
+
+
+    @DataProvider(name = "invalidTableURIs")
+    public Object[][] createInvalidTableURIData() {
+        return new Object[][] {
+            {"catalog:default:clicks:ds=$YEAR-$MONTH-$DAY#region=us", ""},
+            {"default:clicks:ds=$YEAR-$MONTH-$DAY#region=us", ""},
+            {"catalog:default#ds=$YEAR-$MONTH-$DAY;region=us", ""},
+            {"catalog://default/clicks#ds=$YEAR-$MONTH-$DAY:region=us", ""},
+        };
+    }
+
+    @Test(dataProvider = "invalidTableURIs", expectedExceptions = URISyntaxException.class)
+    public void testParseInvalidURI(String tableUri, String ignore) throws URISyntaxException {
+        new CatalogStorage(tableUri);
+        Assert.fail("Exception must have been thrown");
+    }
+
+    @Test
+    public void testIsIdenticalPositive() throws Exception {
+        CatalogStorage table1 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+        CatalogStorage table2 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+        Assert.assertTrue(table1.isIdentical(table2));
+
+        final String catalogUrl = "thrift://localhost:49083";
+        CatalogStorage table3 = new CatalogStorage(catalogUrl,
+                "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+        CatalogStorage table4 = new CatalogStorage(catalogUrl,
+                "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+        Assert.assertTrue(table3.isIdentical(table4));
+    }
+
+    @Test
+    public void testIsIdenticalNegative() throws Exception {
+        CatalogStorage table1 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+        CatalogStorage table2 = new CatalogStorage("catalog:clicksdb:impressions#ds=$YEAR-$MONTH-$DAY;region=us");
+        Assert.assertFalse(table1.isIdentical(table2));
+
+        final String catalogUrl = "thrift://localhost:49083";
+        CatalogStorage table3 = new CatalogStorage(catalogUrl,
+                "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+        CatalogStorage table4 = new CatalogStorage(catalogUrl,
+                "catalog:clicksdb:impressions#ds=$YEAR-$MONTH-$DAY;region=us");
+        Assert.assertFalse(table3.isIdentical(table4));
+
+        CatalogStorage table5 = new CatalogStorage("thrift://localhost:49084",
+                "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+        CatalogStorage table6 = new CatalogStorage("thrift://localhost:49083",
+                "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+        Assert.assertFalse(table5.isIdentical(table6));
+    }
+
+    @Test
+    public void testGetUriTemplateWithCatalogUrl() throws Exception {
+        final String catalogUrl = "thrift://localhost:49083";
+        String tableUri = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
+        String uriTemplate = "thrift://localhost:49083/clicksdb/clicks/region=us;ds=$YEAR-$MONTH-$DAY";
+
+        CatalogStorage table = new CatalogStorage(catalogUrl, tableUri);
+
+        Assert.assertEquals(uriTemplate, table.getUriTemplate());
+        Assert.assertEquals(uriTemplate, table.getUriTemplate(LocationType.DATA));
+    }
+
+    @Test
+    public void testGetUriTemplateWithOutCatalogUrl() throws Exception {
+        String tableUri = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
+        String uriTemplate = "${hcatNode}/clicksdb/clicks/region=us;ds=$YEAR-$MONTH-$DAY";
+
+        CatalogStorage table = new CatalogStorage(tableUri);
+
+        Assert.assertEquals(uriTemplate, table.getUriTemplate());
+        Assert.assertEquals(uriTemplate, table.getUriTemplate(LocationType.DATA));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
new file mode 100644
index 0000000..3e3b575
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test class for File System Storage.
+ */
+public class FileSystemStorageTest {
+
+    @Test
+    public void testGetType() throws Exception {
+        final Location location = new Location();
+        location.setPath("/foo/bar");
+        location.setType(LocationType.DATA);
+        List<Location> locations = new ArrayList<Location>();
+        locations.add(location);
+
+        FileSystemStorage storage = new FileSystemStorage(locations);
+        Assert.assertEquals(storage.getType(), Storage.TYPE.FILESYSTEM);
+    }
+
+    @Test
+    public void testGetUriTemplate() throws Exception {
+        final Location location = new Location();
+        location.setPath("/foo/bar");
+        location.setType(LocationType.DATA);
+        List<Location> locations = new ArrayList<Location>();
+        locations.add(location);
+
+        FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
+        Assert.assertEquals(storage.getUriTemplate(), "hdfs://localhost:41020/foo/bar");
+    }
+
+    @Test
+    public void testGetUriTemplateWithOutStorageURL() throws Exception {
+        final Location location = new Location();
+        location.setPath("/foo/bar");
+        location.setType(LocationType.DATA);
+        List<Location> locations = new ArrayList<Location>();
+        locations.add(location);
+
+        FileSystemStorage storage = new FileSystemStorage(locations);
+        Assert.assertEquals(storage.getUriTemplate(), "${nameNode}/foo/bar");
+    }
+
+    @Test
+    public void testGetUriTemplateWithLocationType() throws Exception {
+        final Location location = new Location();
+        location.setPath("/foo/bar");
+        location.setType(LocationType.DATA);
+        List<Location> locations = new ArrayList<Location>();
+        locations.add(location);
+
+        FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "hdfs://localhost:41020/foo/bar");
+    }
+
+    @Test
+    public void testExists() throws Exception {
+        final Location location = new Location();
+        location.setPath("/foo/bar");
+        location.setType(LocationType.DATA);
+        List<Location> locations = new ArrayList<Location>();
+        locations.add(location);
+
+        FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
+        Assert.assertTrue(storage.exists());
+    }
+
+    @Test
+    public void testIsIdentical() throws Exception {
+        final String storageUrl = "hdfs://localhost:41020";
+        final Location location1 = new Location();
+        location1.setPath("/foo/bar");
+        location1.setType(LocationType.DATA);
+        List<Location> locations1 = new ArrayList<Location>();
+        locations1.add(location1);
+        FileSystemStorage storage1 = new FileSystemStorage(storageUrl, locations1);
+
+        final Location location2 = new Location();
+        location2.setPath("/foo/bar");
+        location2.setType(LocationType.DATA);
+        List<Location> locations2 = new ArrayList<Location>();
+        locations2.add(location2);
+        FileSystemStorage storage2 = new FileSystemStorage(storageUrl, locations2);
+
+        Assert.assertTrue(storage1.isIdentical(storage2));
+    }
+
+    @Test
+    public void testIsIdenticalNegative() throws Exception {
+        final String storageUrl = "hdfs://localhost:41020";
+        final Location location1 = new Location();
+        location1.setPath("/foo/baz");
+        location1.setType(LocationType.DATA);
+        List<Location> locations1 = new ArrayList<Location>();
+        locations1.add(location1);
+        FileSystemStorage storage1 = new FileSystemStorage(storageUrl, locations1);
+
+        final Location location2 = new Location();
+        location2.setPath("/foo/bar");
+        location2.setType(LocationType.DATA);
+        List<Location> locations2 = new ArrayList<Location>();
+        locations2.add(location2);
+        FileSystemStorage storage2 = new FileSystemStorage(storageUrl, locations2);
+
+        Assert.assertFalse(storage1.isIdentical(storage2));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index 4cc4a0e..8bcff78 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -116,12 +116,12 @@ public class FeedEntityParserTest extends AbstractTestBase {
         assertEquals(feed.getClusters().getClusters().get(1).getRetention()
                 .getLimit().toString(), "hours(6)");
 
-        assertEquals(FeedHelper.getLocation(feed, LocationType.DATA).getPath(),
-                "/projects/falcon/clicks");
-        assertEquals(FeedHelper.getLocation(feed, LocationType.META).getPath(),
-                "/projects/falcon/clicksMetaData");
-        assertEquals(FeedHelper.getLocation(feed, LocationType.STATS).getPath(),
-                "/projects/falcon/clicksStats");
+        assertEquals("${nameNode}/projects/falcon/clicks",
+                FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
+        assertEquals("${nameNode}/projects/falcon/clicksMetaData",
+                FeedHelper.createStorage(feed).getUriTemplate(LocationType.META));
+        assertEquals("${nameNode}/projects/falcon/clicksStats",
+                FeedHelper.createStorage(feed).getUriTemplate(LocationType.STATS));
 
         assertEquals(feed.getACL().getGroup(), "group");
         assertEquals(feed.getACL().getOwner(), "testuser");
@@ -444,4 +444,17 @@ public class FeedEntityParserTest extends AbstractTestBase {
             Assert.assertEquals(org.xml.sax.SAXParseException.class, e.getCause().getCause().getClass());
         }
     }
+
+    @Test
+    public void testParseFeedWithTable() throws FalconException {
+        final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
+        Feed feedWithTable = parser.parse(inputStream);
+        Assert.assertEquals(feedWithTable.getTable().getUri(),
+                "catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR");
+    }
+
+    @Test (expectedExceptions = FalconException.class)
+    public void testParseInvalidFeedWithTable() throws FalconException {
+        parser.parse(FeedEntityParserTest.class.getResourceAsStream("/config/feed/invalid-feed.xml"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
index d4b5a2a..e8f80bd 100644
--- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
@@ -28,7 +28,9 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
 import org.apache.falcon.entity.v0.feed.Partition;
 import org.apache.falcon.entity.v0.feed.Properties;
 import org.apache.falcon.entity.v0.feed.Property;
@@ -117,11 +119,11 @@ public class UpdateHelperTest extends AbstractTestBase {
         newFeed.getLateArrival().setCutOff(oldFeed.getLateArrival().getCutOff());
         Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
 
-        FeedHelper.getLocation(newFeed, LocationType.DATA).setPath("/test");
+        getLocation(newFeed, LocationType.DATA).setPath("/test");
         Assert.assertTrue(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
 
-        FeedHelper.getLocation(newFeed, LocationType.DATA).setPath(
-                FeedHelper.getLocation(oldFeed, LocationType.DATA).getPath());
+        getLocation(newFeed, LocationType.DATA).setPath(
+                getLocation(oldFeed, LocationType.DATA).getPath());
         Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
 
         newFeed.setFrequency(Frequency.fromString("months(1)"));
@@ -154,4 +156,20 @@ public class UpdateHelperTest extends AbstractTestBase {
                         process.getClusters().getClusters().get(0).getName()).getValidity().getStart());
         Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
     }
+
+    private static Location getLocation(Feed feed, LocationType type) {
+        return getLocation(feed.getLocations(), type);
+    }
+
+    private static Location getLocation(Locations locations, LocationType type) {
+        for (Location loc : locations.getLocations()) {
+            if (loc.getType() == type) {
+                return loc;
+            }
+        }
+        Location loc = new Location();
+        loc.setPath("/tmp");
+        loc.setType(type);
+        return loc;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/test/resources/config/feed/hive-table-feed.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/hive-table-feed.xml b/common/src/test/resources/config/feed/hive-table-feed.xml
new file mode 100644
index 0000000..e84f90a
--- /dev/null
+++ b/common/src/test/resources/config/feed/hive-table-feed.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1">
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source" partition="*/${cluster.colo}">
+            <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+        <cluster name="backupCluster" type="target">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(6)" action="archive"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <table uri="catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR" />
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/test/resources/config/feed/invalid-feed.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/invalid-feed.xml b/common/src/test/resources/config/feed/invalid-feed.xml
new file mode 100644
index 0000000..b7273a9
--- /dev/null
+++ b/common/src/test/resources/config/feed/invalid-feed.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1">
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source" partition="*/${cluster.colo}">
+            <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+        <cluster name="backupCluster" type="target">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(6)" action="archive"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+    <table uri="catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR" />
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index adef3ec..00c261b 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -21,9 +21,9 @@ package org.apache.falcon.converter;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
-import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
@@ -62,7 +62,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
     private static final String REPLICATION_COORD_TEMPLATE = "/config/coordinator/replication-coordinator.xml";
     private static final String REPLICATION_WF_TEMPLATE = "/config/workflow/replication-workflow.xml";
 
-    private static final String FEED_PATH_SEP = "#";
+    public static final String FEED_PATH_SEP = "#";
     private static final String TIMEOUT = "timeout";
     private static final String PARALLEL = "parallel";
 
@@ -116,7 +116,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
         WORKFLOW retentionWorkflow = new WORKFLOW();
         try {
             //
-            WORKFLOWAPP retWfApp = createRetentionWorkflow(cluster);
+            WORKFLOWAPP retWfApp = createRetentionWorkflow();
             retWfApp.setName(wfName);
             marshal(cluster, retWfApp, wfPath);
             retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
@@ -124,23 +124,9 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
             Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
 
             org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
-            String feedPathMask = getLocationURI(cluster, feed, LocationType.DATA);
-            String metaPathMask = getLocationURI(cluster, feed, LocationType.META);
-            String statsPathMask = getLocationURI(cluster, feed, LocationType.STATS);
-            String tmpPathMask = getLocationURI(cluster, feed, LocationType.TMP);
-
-            StringBuilder feedBasePaths = new StringBuilder(feedPathMask);
-            if (metaPathMask != null) {
-                feedBasePaths.append(FEED_PATH_SEP).append(metaPathMask);
-            }
-            if (statsPathMask != null) {
-                feedBasePaths.append(FEED_PATH_SEP).append(statsPathMask);
-            }
-            if (tmpPathMask != null) {
-                feedBasePaths.append(FEED_PATH_SEP).append(tmpPathMask);
-            }
+            String feedBasePaths = getFeedDataPath(cluster, feed);
 
-            props.put("feedDataPath", feedBasePaths.toString().replaceAll("\\$\\{", "\\?\\{"));
+            props.put("feedDataPath", feedBasePaths);
             props.put("timeZone", feed.getTimezone().getID());
             props.put("frequency", feed.getFrequency().getTimeUnit().name());
             props.put("limit", feedCluster.getRetention().getLimit().toString());
@@ -156,6 +142,29 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
         }
     }
 
+    protected String getFeedDataPath(Cluster cluster, Feed feed) throws FalconException {
+        final Storage storage = FeedHelper.createStorage(cluster, feed);
+        String feedPathMask = storage.getUriTemplate(LocationType.DATA);
+        String metaPathMask = storage.getUriTemplate(LocationType.META);
+        String statsPathMask = storage.getUriTemplate(LocationType.STATS);
+        String tmpPathMask = storage.getUriTemplate(LocationType.TMP);
+
+        StringBuilder feedBasePaths = new StringBuilder(feedPathMask);
+        if (metaPathMask != null) {
+            feedBasePaths.append(FEED_PATH_SEP).append(metaPathMask);
+        }
+
+        if (statsPathMask != null) {
+            feedBasePaths.append(FEED_PATH_SEP).append(statsPathMask);
+        }
+
+        if (tmpPathMask != null) {
+            feedBasePaths.append(FEED_PATH_SEP).append(tmpPathMask);
+        }
+
+        return feedBasePaths.toString().replaceAll("\\$\\{", "\\?\\{");
+    }
+
     private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath)
         throws FalconException {
 
@@ -263,10 +272,8 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
             SYNCDATASET inputDataset = (SYNCDATASET) replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(0);
             SYNCDATASET outputDataset = (SYNCDATASET) replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(1);
 
-            inputDataset.setUriTemplate(new Path(ClusterHelper.getStorageUrl(srcCluster),
-                FeedHelper.getLocation(feed, LocationType.DATA, srcCluster.getName()).getPath()).toString());
-            outputDataset.setUriTemplate(getStoragePath(
-                FeedHelper.getLocation(feed, LocationType.DATA, trgCluster.getName()).getPath()));
+            inputDataset.setUriTemplate(FeedHelper.createStorage(srcCluster, feed).getUriTemplate(LocationType.DATA));
+            outputDataset.setUriTemplate(FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
             setDatasetValues(inputDataset, feed, srcCluster);
             setDatasetValues(outputDataset, feed, srcCluster);
             if (feed.getAvailabilityFlag() == null) {
@@ -338,7 +345,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
         marshal(cluster, repWFapp, wfPath);
     }
 
-    private WORKFLOWAPP createRetentionWorkflow(Cluster cluster) throws IOException, FalconException {
+    private WORKFLOWAPP createRetentionWorkflow() throws IOException, FalconException {
         return getWorkflowTemplate(RETENTION_WF_TEMPLATE);
     }
 
@@ -353,19 +360,4 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
         }
         return props;
     }
-
-    private String getLocationURI(Cluster cluster, Feed feed, LocationType type) {
-        String path = FeedHelper.getLocation(feed, type, cluster.getName())
-                .getPath();
-
-        if (!path.equals("/tmp")) {
-            if (new Path(path).toUri().getScheme() == null) {
-                return new Path(ClusterHelper.getStorageUrl(cluster), path)
-                        .toString();
-            } else {
-                return path;
-            }
-        }
-        return null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index 7fbe179..3b3132a 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -19,6 +19,7 @@ package org.apache.falcon.converter;
 
 import static org.testng.Assert.assertEquals;
 
+import java.util.Calendar;
 import java.util.Collection;
 import java.util.List;
 
@@ -27,6 +28,7 @@ import javax.xml.bind.Unmarshaller;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
@@ -74,7 +76,6 @@ public class OozieFeedMapperTest {
         ClusterHelper.getInterface(trgCluster, Interfacetype.WRITE).setEndpoint(trgHdfsUrl);
 
         feed = (Feed) storeEntity(EntityType.FEED, FEED);
-
     }
 
     protected Entity storeEntity(EntityType type, String path) throws Exception {
@@ -148,6 +149,35 @@ public class OozieFeedMapperTest {
                 break;
             }
         }
+    }
+
+    @Test
+    public void testRetentionCoords() throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName());
+        final Calendar instance = Calendar.getInstance();
+        instance.roll(Calendar.YEAR, 1);
+        cluster.getValidity().setEnd(instance.getTime());
+
+        OozieFeedMapper feedMapper = new OozieFeedMapper(feed);
+        List<COORDINATORAPP> coords = feedMapper.getCoordinators(srcCluster, new Path("/projects/falcon/"));
+        COORDINATORAPP coord = coords.get(0);
 
+        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
+        Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
+        Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
+
+        String feedDataPath = null;
+        org.apache.falcon.oozie.coordinator.CONFIGURATION configuration =
+                coord.getAction().getWorkflow().getConfiguration();
+        for (Property property : configuration.getProperty()) {
+            if ("feedDataPath".equals(property.getName())) {
+                feedDataPath = property.getValue();
+                break;
+            }
+        }
+
+        if (feedDataPath != null) {
+            Assert.assertEquals(feedDataPath, feedMapper.getFeedDataPath(srcCluster, feed));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index 8f75736..86ad937 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -295,10 +295,8 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
 
         SYNCDATASET syncdataset = new SYNCDATASET();
         syncdataset.setName(datasetName);
-        String locPath = FeedHelper.getLocation(feed, locationType,
-                cluster.getName()).getPath();
-        syncdataset.setUriTemplate(new Path(locPath).toUri().getScheme() != null ? locPath : "${nameNode}"
-                + locPath);
+        String locPath = FeedHelper.createStorage(cluster, feed).getUriTemplate(locationType);
+        syncdataset.setUriTemplate(locPath);
         syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
 
         org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 3f70557..128a73d 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -31,7 +31,6 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.security.CurrentUser;
@@ -73,7 +72,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
         properties.put(inName + ".initial-instance", SchemaHelper.formatDateUTC(cluster.getValidity().getStart()));
         properties.put(inName + ".done-flag", "notused");
 
-        String locPath = FeedHelper.getLocation(feed, LocationType.DATA, clusterName).getPath().replace('$', '%');
+        String locPath = FeedHelper.createStorage(clusterName, feed).getUriTemplate().replace('$', '%');
         properties.put(inName + ".uri-template",
                 new Path(locPath).toUri().getScheme() != null ? locPath : "${nameNode}" + locPath);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index 7be41da..e3a42ca 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -47,7 +47,6 @@ import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Validity;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
@@ -142,13 +141,14 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         ConfigurationStore store = ConfigurationStore.get();
         Feed feed = store.get(EntityType.FEED, process.getInputs().getInputs().get(0).getFeed());
         SYNCDATASET ds = (SYNCDATASET) coord.getDatasets().getDatasetOrAsyncDataset().get(0);
-        assertEquals(SchemaHelper.formatDateUTC(feed.getClusters().getClusters().get(0).getValidity().getStart()),
-                ds.getInitialInstance());
+
+        final org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
+        assertEquals(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()), ds.getInitialInstance());
         assertEquals(feed.getTimezone().getID(), ds.getTimezone());
         assertEquals("${coord:" + feed.getFrequency().toString() + "}", ds.getFrequency());
         assertEquals("", ds.getDoneFlag());
-        assertEquals(ds.getUriTemplate(), "${nameNode}" + FeedHelper.getLocation(feed, LocationType.DATA,
-                feed.getClusters().getClusters().get(0).getName()).getPath());
+        assertEquals(ds.getUriTemplate(), FeedHelper.createStorage(feedCluster, feed).getUriTemplate());
+
         for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
             if (prop.getName().equals("mapred.job.priority")) {
                 assertEquals(prop.getValue(), "LOW");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index f762c4b..74bab07 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -295,9 +295,11 @@ public class TestContext {
 
         ServletInputStream rawlogStream = getServletInputStream(tmpFile);
 
-        return this.service.path("api/entities/submit/" + entityType.name().toLowerCase()).header("Remote-User",
-                "testuser")
-                .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML).post(ClientResponse.class, rawlogStream);
+        return this.service.path("api/entities/submit/" + entityType.name().toLowerCase())
+                .header("Remote-User", REMOTE_USER)
+                .accept(MediaType.TEXT_XML)
+                .type(MediaType.TEXT_XML)
+                .post(ClientResponse.class, rawlogStream);
     }
 
     public void assertRequestId(ClientResponse clientRepsonse) {