You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2013/09/12 01:50:22 UTC

git commit: FALCON-102 Add integration tests for feed entity parser with table. Includes minor refactoring of storage. Contributed by Venkatesh Seetharam

Updated Branches:
  refs/heads/FALCON-85 57ddc0b07 -> 2e103e4a7


FALCON-102 Add integration tests for feed entity parser with table. Includes minor refactoring of storage. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/2e103e4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/2e103e4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/2e103e4a

Branch: refs/heads/FALCON-85
Commit: 2e103e4a7ec4d5d4bfedc94b3d229dd02b5ca8e6
Parents: 57ddc0b
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Sep 11 16:48:50 2013 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Sep 11 16:48:50 2013 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../apache/falcon/entity/CatalogStorage.java    | 109 +++++++++++++-
 .../org/apache/falcon/entity/FeedHelper.java    |  29 ++--
 .../apache/falcon/entity/FileSystemStorage.java | 104 +++++++++++++-
 .../falcon/entity/parser/FeedEntityParser.java  |  10 +-
 .../java/org/apache/falcon/group/FeedGroup.java |   4 +-
 .../org/apache/falcon/group/FeedGroupMap.java   |   3 +-
 .../falcon/entity/CatalogStorageTest.java       |  78 ++++++----
 .../falcon/entity/FileSystemStorageTest.java    |  78 +++++++++-
 .../entity/parser/FeedEntityParserTest.java     |   5 +-
 .../resources/config/feed/hive-table-feed.xml   |   2 +-
 .../workflow/OozieProcessWorkflowBuilder.java   |   7 +-
 .../converter/OozieProcessMapperTest.java       |   4 +-
 .../falcon/resource/FeedEntityValidationIT.java | 141 +++++++++++++++++++
 webapp/src/test/resources/hive-table-feed.xml   |  37 +++++
 15 files changed, 549 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6138a92..7eb9e04 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,9 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-102 Add integration tests for feed entity parser for table.
+    (Venkatesh Seetharam)
+
     FALCON-103 Upgrade oozie to 4.0.x. (Venkatesh Seetharam)
 
     FALCON-96 Hive client to talk to the metastore. (Venkatesh

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index 5c64f27..6518140 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -20,6 +20,10 @@ package org.apache.falcon.entity;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.catalog.CatalogServiceFactory;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
 
 import java.net.URI;
@@ -37,13 +41,25 @@ public class CatalogStorage implements Storage {
     public static final String INPUT_PATH_SEPARATOR = ":";
     public static final String OUTPUT_PATH_SEPARATOR = "/";
 
+    public static final String CATALOG_URL = "${hcatNode}";
+    private static final String DOLLAR_EXPR_START = "_D__START_";
+    private static final String EXPR_CLOSE = "_CLOSE_";
+
     private final String catalogUrl;
     private String database;
     private String table;
     private Map<String, String> partitions;
 
-    protected CatalogStorage(String catalogTable) throws URISyntaxException {
-        this("${hcatNode}", catalogTable);
+    protected CatalogStorage(Feed feed) throws URISyntaxException {
+        this(CATALOG_URL, feed.getTable());
+    }
+
+    protected CatalogStorage(Cluster cluster, Feed feed) throws URISyntaxException {
+        this(ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint(), feed.getTable());
+    }
+
+    protected CatalogStorage(String catalogUrl, CatalogTable table) throws URISyntaxException {
+        this(catalogUrl, table.getUri());
     }
 
     protected CatalogStorage(String catalogUrl, String tableUri) throws URISyntaxException {
@@ -53,7 +69,7 @@ public class CatalogStorage implements Storage {
 
         this.catalogUrl = catalogUrl;
 
-        parse(tableUri);
+        parseFeedUri(tableUri);
     }
 
     /**
@@ -64,9 +80,11 @@ public class CatalogStorage implements Storage {
      * @param catalogTableUri table URI to parse and validate
      * @throws URISyntaxException
      */
-    private void parse(String catalogTableUri) throws URISyntaxException {
+    private void parseFeedUri(String catalogTableUri) throws URISyntaxException {
 
-        URI tableUri = new URI(catalogTableUri);
+        final String processed = catalogTableUri.replaceAll("\\$\\{", DOLLAR_EXPR_START)
+                                                .replaceAll("}", EXPR_CLOSE);
+        URI tableUri = new URI(processed);
 
         if (!"catalog".equals(tableUri.getScheme())) {
             throw new URISyntaxException(tableUri.toString(), "catalog scheme is missing");
@@ -98,8 +116,10 @@ public class CatalogStorage implements Storage {
             throw new URISyntaxException(tableUri.toString(), "Partition details are missing");
         }
 
+        final String rawPartition = partRaw.replaceAll(DOLLAR_EXPR_START, "\\$\\{")
+                                           .replaceAll(EXPR_CLOSE, "\\}");
         partitions = new HashMap<String, String>();
-        String[] parts = partRaw.split(PARTITION_SEPARATOR);
+        String[] parts = rawPartition.split(PARTITION_SEPARATOR);
         for (String part : parts) {
             if (part == null || part.length() == 0) {
                 continue;
@@ -115,6 +135,67 @@ public class CatalogStorage implements Storage {
         }
     }
 
+    /**
+     * Create an instance from the URI Template that was generated using
+     * the getUriTemplate() method.
+     *
+     * @param uriTemplate the uri template from org.apache.falcon.entity.CatalogStorage#getUriTemplate
+     * @throws URISyntaxException
+     */
+    protected CatalogStorage(String uriTemplate) throws URISyntaxException {
+        if (uriTemplate == null || uriTemplate.length() == 0) {
+            throw new IllegalArgumentException("URI template cannot be null or empty");
+        }
+
+        final String processed = uriTemplate.replaceAll("\\$\\{", DOLLAR_EXPR_START)
+                                            .replaceAll("}", EXPR_CLOSE);
+        URI uri = new URI(processed);
+
+        this.catalogUrl = uri.getScheme() + "://" + uri.getAuthority();
+
+        parseUriTemplate(uri);
+    }
+
+    private void parseUriTemplate(URI uriTemplate) throws URISyntaxException {
+        String path = uriTemplate.getPath();
+        String[] paths = path.split(OUTPUT_PATH_SEPARATOR);
+        if (paths.length != 4) {
+            throw new URISyntaxException(uriTemplate.toString(),
+                    "URI path is not in expected format: database:table");
+        }
+
+        database = paths[1];
+        table = paths[2];
+        String partRaw = paths[3];
+
+        if (database == null || database.length() == 0) {
+            throw new URISyntaxException(uriTemplate.toString(), "DB name is missing");
+        }
+        if (table == null || table.length() == 0) {
+            throw new URISyntaxException(uriTemplate.toString(), "Table name is missing");
+        }
+        if (partRaw == null || partRaw.length() == 0) {
+            throw new URISyntaxException(uriTemplate.toString(), "Partition details are missing");
+        }
+
+        String rawPartition = partRaw.replaceAll(DOLLAR_EXPR_START, "\\$\\{").replaceAll(EXPR_CLOSE, "\\}");
+        partitions = new HashMap<String, String>();
+        String[] parts = rawPartition.split(PARTITION_SEPARATOR);
+        for (String part : parts) {
+            if (part == null || part.length() == 0) {
+                continue;
+            }
+
+            String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR);
+            if (keyVal.length != 2) {
+                throw new URISyntaxException(uriTemplate.toString(),
+                        "Partition key value pair is not specified properly in (" + part + ")");
+            }
+
+            partitions.put(keyVal[0], keyVal[1]);
+        }
+    }
+
     public String getCatalogUrl() {
         return catalogUrl;
     }
@@ -152,11 +233,17 @@ public class CatalogStorage implements Storage {
         return TYPE.TABLE;
     }
 
+    /**
+     * LocationType does NOT matter here.
+     */
     @Override
     public String getUriTemplate() {
         return getUriTemplate(LocationType.DATA);
     }
 
+    /**
+     * LocationType does NOT matter here.
+     */
     @Override
     public String getUriTemplate(LocationType locationType) {
         StringBuilder uriTemplate = new StringBuilder();
@@ -191,4 +278,14 @@ public class CatalogStorage implements Storage {
                 && getTable().equals(catalogStorage.getTable())
                 && getPartitions().equals(catalogStorage.getPartitions());
     }
+
+    @Override
+    public String toString() {
+        return "CatalogStorage{"
+                + "catalogUrl='" + catalogUrl + '\''
+                + ", database='" + database + '\''
+                + ", table='" + table + '\''
+                + ", partitions=" + partitions
+                + '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index d212a98..364aeb0 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -21,7 +21,6 @@ package org.apache.falcon.entity;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.cluster.Property;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Cluster;
@@ -54,15 +53,16 @@ public final class FeedHelper {
 
     public static Storage createStorage(Feed feed) throws FalconException {
 
-        final List<Location> locations = feed.getLocations().getLocations();
-        if (locations != null) {
-            return new FileSystemStorage(locations);
+        final Locations feedLocations = feed.getLocations();
+        if (feedLocations != null
+                && feedLocations.getLocations().size() != 0) {
+            return new FileSystemStorage(feed);
         }
 
         try {
             final CatalogTable table = feed.getTable();
             if (table != null) {
-                return new CatalogStorage(table.getUri());
+                return new CatalogStorage(feed);
             }
         } catch (URISyntaxException e) {
             throw new FalconException(e);
@@ -97,15 +97,13 @@ public final class FeedHelper {
 
         final List<Location> locations = getLocations(cluster, feed);
         if (locations != null) {
-            return new FileSystemStorage(ClusterHelper.getStorageUrl(clusterEntity), locations);
+            return new FileSystemStorage(clusterEntity, feed);
         }
 
         try {
             final CatalogTable table = getTable(cluster, feed);
             if (table != null) {
-                return new CatalogStorage(
-                        ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint(),
-                        table.getUri());
+                return new CatalogStorage(clusterEntity, feed);
             }
         } catch (URISyntaxException e) {
             throw new FalconException(e);
@@ -114,6 +112,19 @@ public final class FeedHelper {
         throw new FalconException("Both catalog and locations are not defined.");
     }
 
+    public static Storage createStorage(String type, String storageUriTemplate)
+        throws URISyntaxException {
+
+        Storage.TYPE storageType = Storage.TYPE.valueOf(type);
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            return new FileSystemStorage(storageUriTemplate);
+        } else if (storageType == Storage.TYPE.TABLE) {
+            return new CatalogStorage(storageUriTemplate);
+        }
+
+        throw new IllegalArgumentException("Bad type: " + type);
+    }
+
     private static List<Location> getLocations(Cluster cluster, Feed feed) {
         // check if locations are overridden in cluster
         final Locations clusterLocations = cluster.getLocations();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 95b18c5..664813b 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -19,9 +19,15 @@
 package org.apache.falcon.entity;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
 
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -29,11 +35,26 @@ import java.util.List;
  */
 public class FileSystemStorage implements Storage {
 
+    public static final String FEED_PATH_SEP = "#";
+    public static final String LOCATION_TYPE_SEP = "=";
+
+    public static final String FILE_SYSTEM_URL = "${nameNode}";
+    private static final String DOLLAR_EXPR_START = "_D__START_";
+    private static final String EXPR_CLOSE = "_CLOSE_";
+
     private final String storageUrl;
     private final List<Location> locations;
 
-    protected FileSystemStorage(List<Location> locations) {
-        this("${nameNode}", locations);
+    protected FileSystemStorage(Feed feed) {
+        this(FILE_SYSTEM_URL, feed.getLocations());
+    }
+
+    protected FileSystemStorage(Cluster cluster, Feed feed) {
+        this(ClusterHelper.getStorageUrl(cluster), feed.getLocations());
+    }
+
+    protected FileSystemStorage(String storageUrl, Locations locations) {
+        this(storageUrl, locations.getLocations());
     }
 
     protected FileSystemStorage(String storageUrl, List<Location> locations) {
@@ -49,6 +70,44 @@ public class FileSystemStorage implements Storage {
         this.locations = locations;
     }
 
+    /**
+     * Create an instance from the URI Template that was generated using
+     * the getUriTemplate() method.
+     *
+     * @param uriTemplate the uri template from org.apache.falcon.entity.CatalogStorage#getUriTemplate
+     * @throws URISyntaxException
+     */
+    protected FileSystemStorage(String uriTemplate) throws URISyntaxException {
+        if (uriTemplate == null || uriTemplate.length() == 0) {
+            throw new IllegalArgumentException("URI template cannot be null or empty");
+        }
+
+        String rawStorageUrl = null;
+        List<Location> rawLocations = new ArrayList<Location>();
+        String[] feedLocs = uriTemplate.split(FEED_PATH_SEP);
+        for (String rawPath : feedLocs) {
+            String[] typeAndPath = rawPath.split(LOCATION_TYPE_SEP);
+            final String processed = typeAndPath[1].replaceAll("\\$\\{", DOLLAR_EXPR_START)
+                                                   .replaceAll("}", EXPR_CLOSE);
+            URI uri = new URI(processed);
+            if (rawStorageUrl == null) {
+                rawStorageUrl = uri.getScheme() + "://" + uri.getAuthority();
+            }
+
+            String path = uri.getPath();
+            final String finalPath = path.replaceAll(DOLLAR_EXPR_START, "\\$\\{")
+                                         .replaceAll(EXPR_CLOSE, "\\}");
+
+            Location location = new Location();
+            location.setPath(finalPath);
+            location.setType(LocationType.valueOf(typeAndPath[0]));
+            rawLocations.add(location);
+        }
+
+        this.storageUrl = rawStorageUrl;
+        this.locations = rawLocations;
+    }
+
     @Override
     public TYPE getType() {
         return TYPE.FILESYSTEM;
@@ -64,7 +123,38 @@ public class FileSystemStorage implements Storage {
 
     @Override
     public String getUriTemplate() {
-        return getUriTemplate(LocationType.DATA);
+        String feedPathMask = getUriTemplate(LocationType.DATA);
+        String metaPathMask = getUriTemplate(LocationType.META);
+        String statsPathMask = getUriTemplate(LocationType.STATS);
+        String tmpPathMask = getUriTemplate(LocationType.TMP);
+
+        StringBuilder feedBasePaths = new StringBuilder();
+        feedBasePaths.append(LocationType.DATA.name())
+                     .append(LOCATION_TYPE_SEP)
+                     .append(feedPathMask);
+
+        if (metaPathMask != null) {
+            feedBasePaths.append(FEED_PATH_SEP)
+                         .append(LocationType.META.name())
+                         .append(LOCATION_TYPE_SEP)
+                         .append(metaPathMask);
+        }
+
+        if (statsPathMask != null) {
+            feedBasePaths.append(FEED_PATH_SEP)
+                         .append(LocationType.STATS.name())
+                         .append(LOCATION_TYPE_SEP)
+                         .append(statsPathMask);
+        }
+
+        if (tmpPathMask != null) {
+            feedBasePaths.append(FEED_PATH_SEP)
+                         .append(LocationType.TMP.name())
+                         .append(LOCATION_TYPE_SEP)
+                         .append(tmpPathMask);
+        }
+
+        return feedBasePaths.toString();
     }
 
     @Override
@@ -121,4 +211,12 @@ public class FileSystemStorage implements Storage {
         loc.setType(type);
         return loc;
     }
+
+    @Override
+    public String toString() {
+        return "FileSystemStorage{"
+                + "storageUrl='" + storageUrl + '\''
+                + ", locations=" + locations
+                + '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index d0435fb..b72efc6 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -31,6 +31,7 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Cluster;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
@@ -109,13 +110,14 @@ public class FeedEntityParser extends EntityParser<Feed> {
 
     private void validateFeedGroups(Feed feed) throws FalconException {
         String[] groupNames = feed.getGroups() != null ? feed.getGroups().split(",") : new String[]{};
-        String defaultPath = FeedHelper.createStorage(feed).getUriTemplate();
+        final Storage storage = FeedHelper.createStorage(feed);
+        String defaultPath = storage.getUriTemplate(LocationType.DATA);
         for (Cluster cluster : feed.getClusters().getClusters()) {
-            final String uriTemplate = FeedHelper.createStorage(cluster, feed).getUriTemplate();
+            final String uriTemplate = FeedHelper.createStorage(cluster, feed).getUriTemplate(LocationType.DATA);
             if (!FeedGroup.getDatePattern(uriTemplate).equals(
                     FeedGroup.getDatePattern(defaultPath))) {
                 throw new ValidationException("Feeds default path pattern: "
-                        + FeedHelper.createStorage(feed).getUriTemplate()
+                        + storage.getUriTemplate(LocationType.DATA)
                         + ", does not match with cluster: "
                         + cluster.getName()
                         + " path pattern: "
@@ -127,7 +129,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
             if (group != null && !group.canContainFeed(feed)) {
                 throw new ValidationException(
                         "Feed " + feed.getName() + "'s frequency: " + feed.getFrequency().toString()
-                                + ", path pattern: " + FeedHelper.createStorage(feed)
+                                + ", path pattern: " + storage
                                 + " does not match with group: " + group.getName() + "'s frequency: "
                                 + group.getFrequency()
                                 + ", date pattern: " + group.getDatePattern());

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/main/java/org/apache/falcon/group/FeedGroup.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroup.java b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
index d517828..d288925 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroup.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
@@ -21,6 +21,7 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.common.FeedDataPath;
 import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.LocationType;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -95,6 +96,7 @@ public class FeedGroup {
 
     public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException {
         return this.frequency.equals(feed.getFrequency())
-                && this.datePattern.equals(getDatePattern(FeedHelper.createStorage(feed).getUriTemplate()));
+                && this.datePattern.equals(getDatePattern(
+                    FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA)));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
index d054873..0a35a71 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
@@ -24,6 +24,7 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.service.ConfigurationChangeListener;
 
 import java.util.Collections;
@@ -116,6 +117,6 @@ public final class FeedGroupMap implements ConfigurationChangeListener {
     public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed)
         throws FalconException {
         return getGroups(feed.getGroups(), feed.getFrequency(),
-                FeedHelper.createStorage(feed).getUriTemplate());
+                FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
index 458111d..90eaebc 100644
--- a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
@@ -33,15 +33,15 @@ public class CatalogStorageTest {
 
     @Test
     public void testGetType() throws Exception {
-        String table = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
-        CatalogStorage storage = new CatalogStorage(table);
+        String table = "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us";
+        CatalogStorage storage = new CatalogStorage(CatalogStorage.CATALOG_URL, table);
         Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
     }
 
     @Test
-    public void testParseVaildURI() throws URISyntaxException {
-        String table = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
-        CatalogStorage storage = new CatalogStorage(table);
+    public void testParseFeedUriValid() throws URISyntaxException {
+        String table = "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us";
+        CatalogStorage storage = new CatalogStorage(CatalogStorage.CATALOG_URL, table);
         Assert.assertEquals("${hcatNode}", storage.getCatalogUrl());
         Assert.assertEquals("clicksdb", storage.getDatabase());
         Assert.assertEquals("clicks", storage.getTable());
@@ -53,77 +53,97 @@ public class CatalogStorageTest {
         Assert.assertFalse(storage.hasPartition("unknown"));
     }
 
+    @Test
+    public void testCreateFromUriTemplate() throws Exception {
+        String uriTemplate = "thrift://localhost:49083/clicksdb/clicks/region=us;ds=${YEAR}-${MONTH}-${DAY}";
+        CatalogStorage storage = new CatalogStorage(uriTemplate);
+        Assert.assertEquals("thrift://localhost:49083", storage.getCatalogUrl());
+        Assert.assertEquals("clicksdb", storage.getDatabase());
+        Assert.assertEquals("clicks", storage.getTable());
+        Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
+        Assert.assertEquals(2, storage.getPartitions().size());
+        Assert.assertEquals("us", storage.getPartitionValue("region"));
+        Assert.assertTrue(storage.hasPartition("region"));
+        Assert.assertNull(storage.getPartitionValue("unknown"));
+        Assert.assertFalse(storage.hasPartition("unknown"));
+    }
 
-    @DataProvider(name = "invalidTableURIs")
-    public Object[][] createInvalidTableURIData() {
+    @DataProvider(name = "invalidFeedURIs")
+    public Object[][] createParseFeedUriInvalid() {
         return new Object[][] {
-            {"catalog:default:clicks:ds=$YEAR-$MONTH-$DAY#region=us", ""},
-            {"default:clicks:ds=$YEAR-$MONTH-$DAY#region=us", ""},
-            {"catalog:default#ds=$YEAR-$MONTH-$DAY;region=us", ""},
-            {"catalog://default/clicks#ds=$YEAR-$MONTH-$DAY:region=us", ""},
+            {"catalog:default:clicks:ds=${YEAR}-${MONTH}-${DAY}#region=us", ""},
+            {"default:clicks:ds=${YEAR}-${MONTH}-${DAY}#region=us", ""},
+            {"catalog:default#ds=${YEAR}-${MONTH}-${DAY};region=us", ""},
+            {"catalog://default/clicks#ds=${YEAR}-${MONTH}-${DAY}:region=us", ""},
         };
     }
 
-    @Test(dataProvider = "invalidTableURIs", expectedExceptions = URISyntaxException.class)
-    public void testParseInvalidURI(String tableUri, String ignore) throws URISyntaxException {
-        new CatalogStorage(tableUri);
+    @Test(dataProvider = "invalidFeedURIs", expectedExceptions = URISyntaxException.class)
+    public void testParseFeedUriInvalid(String tableUri, String ignore) throws URISyntaxException {
+        new CatalogStorage(CatalogStorage.CATALOG_URL, tableUri);
         Assert.fail("Exception must have been thrown");
     }
 
     @Test
     public void testIsIdenticalPositive() throws Exception {
-        CatalogStorage table1 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
-        CatalogStorage table2 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+        CatalogStorage table1 = new CatalogStorage(CatalogStorage.CATALOG_URL,
+                "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
+        CatalogStorage table2 = new CatalogStorage(CatalogStorage.CATALOG_URL,
+                "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
         Assert.assertTrue(table1.isIdentical(table2));
 
         final String catalogUrl = "thrift://localhost:49083";
         CatalogStorage table3 = new CatalogStorage(catalogUrl,
-                "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+                "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
         CatalogStorage table4 = new CatalogStorage(catalogUrl,
-                "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+                "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
         Assert.assertTrue(table3.isIdentical(table4));
     }
 
     @Test
     public void testIsIdenticalNegative() throws Exception {
-        CatalogStorage table1 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
-        CatalogStorage table2 = new CatalogStorage("catalog:clicksdb:impressions#ds=$YEAR-$MONTH-$DAY;region=us");
+        CatalogStorage table1 = new CatalogStorage(CatalogStorage.CATALOG_URL,
+                "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
+        CatalogStorage table2 = new CatalogStorage(CatalogStorage.CATALOG_URL,
+                "catalog:clicksdb:impressions#ds=${YEAR}-${MONTH}-${DAY};region=us");
         Assert.assertFalse(table1.isIdentical(table2));
 
         final String catalogUrl = "thrift://localhost:49083";
         CatalogStorage table3 = new CatalogStorage(catalogUrl,
-                "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+                "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
         CatalogStorage table4 = new CatalogStorage(catalogUrl,
-                "catalog:clicksdb:impressions#ds=$YEAR-$MONTH-$DAY;region=us");
+                "catalog:clicksdb:impressions#ds=${YEAR}-${MONTH}-${DAY};region=us");
         Assert.assertFalse(table3.isIdentical(table4));
 
         CatalogStorage table5 = new CatalogStorage("thrift://localhost:49084",
-                "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+                "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
         CatalogStorage table6 = new CatalogStorage("thrift://localhost:49083",
-                "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+                "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
         Assert.assertFalse(table5.isIdentical(table6));
     }
 
     @Test
     public void testGetUriTemplateWithCatalogUrl() throws Exception {
         final String catalogUrl = "thrift://localhost:49083";
-        String tableUri = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
-        String uriTemplate = "thrift://localhost:49083/clicksdb/clicks/region=us;ds=$YEAR-$MONTH-$DAY";
+        String tableUri = "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us";
+        String uriTemplate = "thrift://localhost:49083/clicksdb/clicks/region=us;ds=${YEAR}-${MONTH}-${DAY}";
 
         CatalogStorage table = new CatalogStorage(catalogUrl, tableUri);
 
         Assert.assertEquals(uriTemplate, table.getUriTemplate());
         Assert.assertEquals(uriTemplate, table.getUriTemplate(LocationType.DATA));
+        Assert.assertEquals(table.getUriTemplate(), table.getUriTemplate(LocationType.DATA));
     }
 
     @Test
     public void testGetUriTemplateWithOutCatalogUrl() throws Exception {
-        String tableUri = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
-        String uriTemplate = "${hcatNode}/clicksdb/clicks/region=us;ds=$YEAR-$MONTH-$DAY";
+        String tableUri = "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us";
+        String uriTemplate = "${hcatNode}/clicksdb/clicks/region=us;ds=${YEAR}-${MONTH}-${DAY}";
 
-        CatalogStorage table = new CatalogStorage(tableUri);
+        CatalogStorage table = new CatalogStorage(CatalogStorage.CATALOG_URL, tableUri);
 
         Assert.assertEquals(uriTemplate, table.getUriTemplate());
         Assert.assertEquals(uriTemplate, table.getUriTemplate(LocationType.DATA));
+        Assert.assertEquals(table.getUriTemplate(), table.getUriTemplate(LocationType.DATA));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index 3e3b575..a059652 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -39,12 +39,35 @@ public class FileSystemStorageTest {
         List<Location> locations = new ArrayList<Location>();
         locations.add(location);
 
-        FileSystemStorage storage = new FileSystemStorage(locations);
+        FileSystemStorage storage = new FileSystemStorage(FileSystemStorage.FILE_SYSTEM_URL, locations);
         Assert.assertEquals(storage.getType(), Storage.TYPE.FILESYSTEM);
     }
 
     @Test
-    public void testGetUriTemplate() throws Exception {
+    public void testCreateFromUriTemplate() throws Exception {
+        String feedBasePath = "DATA=hdfs://localhost:8020"
+                + "/data/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}"
+                + "#"
+                + "META=hdfs://localhost:8020"
+                + "/meta/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}"
+                + "#"
+                + "STATS=hdfs://localhost:8020"
+                + "/stats/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}";
+
+        FileSystemStorage storage = new FileSystemStorage(feedBasePath);
+        Assert.assertEquals(storage.getUriTemplate(), feedBasePath + "#TMP=/tmp");
+
+        Assert.assertEquals("hdfs://localhost:8020", storage.getStorageUrl());
+        Assert.assertEquals("hdfs://localhost:8020/data/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}",
+                storage.getUriTemplate(LocationType.DATA));
+        Assert.assertEquals("hdfs://localhost:8020/stats/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}",
+                storage.getUriTemplate(LocationType.STATS));
+        Assert.assertEquals("hdfs://localhost:8020/meta/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}",
+                storage.getUriTemplate(LocationType.META));
+    }
+
+    @Test
+    public void testGetUriTemplateForData() throws Exception {
         final Location location = new Location();
         location.setPath("/foo/bar");
         location.setType(LocationType.DATA);
@@ -52,7 +75,52 @@ public class FileSystemStorageTest {
         locations.add(location);
 
         FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
-        Assert.assertEquals(storage.getUriTemplate(), "hdfs://localhost:41020/foo/bar");
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "hdfs://localhost:41020/foo/bar");
+    }
+
+    @Test
+    public void testGetUriTemplate() throws Exception {
+        final Location dataLocation = new Location();
+        dataLocation.setPath("/data/foo/bar");
+        dataLocation.setType(LocationType.DATA);
+
+        final Location metaLocation = new Location();
+        metaLocation.setPath("/meta/foo/bar");
+        metaLocation.setType(LocationType.META);
+
+        final Location statsLocation = new Location();
+        statsLocation.setPath("/stats/foo/bar");
+        statsLocation.setType(LocationType.STATS);
+
+        final Location tmpLocation = new Location();
+        tmpLocation.setPath("/tmp/foo/bar");
+        tmpLocation.setType(LocationType.TMP);
+
+        List<Location> locations = new ArrayList<Location>();
+        locations.add(dataLocation);
+        locations.add(metaLocation);
+        locations.add(statsLocation);
+        locations.add(tmpLocation);
+
+        StringBuilder expected = new StringBuilder();
+        expected.append(LocationType.DATA)
+                .append(FileSystemStorage.LOCATION_TYPE_SEP)
+                .append("hdfs://localhost:41020/data/foo/bar")
+                .append(FileSystemStorage.FEED_PATH_SEP)
+                .append(LocationType.META)
+                .append(FileSystemStorage.LOCATION_TYPE_SEP)
+                .append("hdfs://localhost:41020/meta/foo/bar")
+                .append(FileSystemStorage.FEED_PATH_SEP)
+                .append(LocationType.STATS)
+                .append(FileSystemStorage.LOCATION_TYPE_SEP)
+                .append("hdfs://localhost:41020/stats/foo/bar")
+                .append(FileSystemStorage.FEED_PATH_SEP)
+                .append(LocationType.TMP)
+                .append(FileSystemStorage.LOCATION_TYPE_SEP)
+                .append("hdfs://localhost:41020/tmp/foo/bar");
+
+        FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
+        Assert.assertEquals(storage.getUriTemplate(), expected.toString());
     }
 
     @Test
@@ -63,8 +131,8 @@ public class FileSystemStorageTest {
         List<Location> locations = new ArrayList<Location>();
         locations.add(location);
 
-        FileSystemStorage storage = new FileSystemStorage(locations);
-        Assert.assertEquals(storage.getUriTemplate(), "${nameNode}/foo/bar");
+        FileSystemStorage storage = new FileSystemStorage(FileSystemStorage.FILE_SYSTEM_URL, locations);
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "${nameNode}/foo/bar");
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index 8bcff78..66bdf5c 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -275,7 +275,8 @@ public class FeedEntityParserTest extends AbstractTestBase {
         parser.parseAndValidate(feed2.toString());
     }
 
-    @Test(expectedExceptions = ValidationException.class)
+    // TODO Disabled the test since I do not see anything invalid in here.
+    @Test(enabled = false, expectedExceptions = ValidationException.class)
     public void testInvalidFeedClusterDataLocation() throws JAXBException, FalconException {
         Feed feed1 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
                 (FeedEntityParserTest.class.getResourceAsStream(FEED_XML)));
@@ -450,7 +451,7 @@ public class FeedEntityParserTest extends AbstractTestBase {
         final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
         Feed feedWithTable = parser.parse(inputStream);
         Assert.assertEquals(feedWithTable.getTable().getUri(),
-                "catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR");
+                "catalog:default:clicks#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}");
     }
 
     @Test (expectedExceptions = FalconException.class)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/test/resources/config/feed/hive-table-feed.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/hive-table-feed.xml b/common/src/test/resources/config/feed/hive-table-feed.xml
index e84f90a..bc78ac7 100644
--- a/common/src/test/resources/config/feed/hive-table-feed.xml
+++ b/common/src/test/resources/config/feed/hive-table-feed.xml
@@ -41,7 +41,7 @@
         </cluster>
     </clusters>
 
-    <table uri="catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR" />
+    <table uri="catalog:default:clicks#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
 
     <ACL owner="testuser" group="group" permission="0x755"/>
     <schema location="/schema/clicks" provider="protobuf"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 128a73d..1329733 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -31,6 +31,7 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.security.CurrentUser;
@@ -72,9 +73,9 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
         properties.put(inName + ".initial-instance", SchemaHelper.formatDateUTC(cluster.getValidity().getStart()));
         properties.put(inName + ".done-flag", "notused");
 
-        String locPath = FeedHelper.createStorage(clusterName, feed).getUriTemplate().replace('$', '%');
-        properties.put(inName + ".uri-template",
-                new Path(locPath).toUri().getScheme() != null ? locPath : "${nameNode}" + locPath);
+        String locPath = FeedHelper.createStorage(clusterName, feed)
+                .getUriTemplate(LocationType.DATA).replace('$', '%');
+        properties.put(inName + ".uri-template", locPath);
 
         properties.put(inName + ".start-instance", in.getStart());
         properties.put(inName + ".end-instance", in.getEnd());

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index e3a42ca..137bc7c 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -47,6 +47,7 @@ import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Validity;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
@@ -147,7 +148,8 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         assertEquals(feed.getTimezone().getID(), ds.getTimezone());
         assertEquals("${coord:" + feed.getFrequency().toString() + "}", ds.getFrequency());
         assertEquals("", ds.getDoneFlag());
-        assertEquals(ds.getUriTemplate(), FeedHelper.createStorage(feedCluster, feed).getUriTemplate());
+        assertEquals(ds.getUriTemplate(),
+                FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA));
 
         for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
             if (prop.getName().equals("mapred.job.priority")) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java b/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java
new file mode 100644
index 0000000..93de109
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.falcon.catalog.HiveCatalogService;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.hcatalog.api.HCatClient;
+import org.apache.hcatalog.api.HCatCreateDBDesc;
+import org.apache.hcatalog.api.HCatCreateTableDesc;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+/**
+ * Tests feed entity validation to verify if the table specified is valid.
+ */
+public class FeedEntityValidationIT {
+
+    private static final String METASTORE_URL = "thrift://localhost:49083";
+    private static final String DATABASE_NAME = "falcondb";
+    private static final String TABLE_NAME = "clicks";
+    private static final String TABLE_URI =
+            "catalog:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}";
+
+    private final TestContext context = new TestContext();
+    private HCatClient client;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        TestContext.prepare();
+
+        client = HiveCatalogService.get(METASTORE_URL);
+
+        createDatabase();
+        createTable();
+    }
+
+    private void createDatabase() throws Exception {
+        HCatCreateDBDesc dbDesc = HCatCreateDBDesc.create(DATABASE_NAME)
+                .ifNotExists(true).build();
+        client.createDatabase(dbDesc);
+    }
+
+    public void createTable() throws Exception {
+        ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+        cols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, "id comment"));
+        cols.add(new HCatFieldSchema("value", HCatFieldSchema.Type.STRING, "value comment"));
+
+        HCatCreateTableDesc tableDesc = HCatCreateTableDesc
+                .create(DATABASE_NAME, TABLE_NAME, cols)
+                .fileFormat("rcfile")
+                .ifNotExists(true)
+                .comments("falcon integration test")
+                .build();
+        client.createTable(tableDesc);
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        dropTable();
+        dropDatabase();
+    }
+
+    private void dropTable() throws Exception {
+        client.dropTable(DATABASE_NAME, TABLE_NAME, true);
+    }
+
+    private void dropDatabase() throws Exception {
+        client.dropDatabase(DATABASE_NAME, true, HCatClient.DropDBMode.CASCADE);
+    }
+
+    /**
+     * Positive test.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testFeedEntityWithValidTable() throws Exception {
+        Map<String, String> overlay = context.getUniqueOverlay();
+        overlay.put("colo", "default");
+
+        ClientResponse response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+        context.assertSuccessful(response);
+
+        // submission will parse and validate the feed with table
+        overlay.put("tableUri", TABLE_URI);
+        response = context.submitToFalcon("/hive-table-feed.xml", overlay, EntityType.FEED);
+        context.assertSuccessful(response);
+    }
+
+    @DataProvider(name = "invalidTableUris")
+    public Object[][] createInvalidTableUriData() {
+        return new Object[][] {
+            // does not match with group input's frequency
+            {"catalog:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
+            {"catalog:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
+            {"badscheme:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
+            {"catalog:" + DATABASE_NAME + ":" + "badtable" + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
+            {"catalog:" + "baddb" + ":" + TABLE_NAME + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
+            {"catalog:" + "baddb" + ":" + "badtable" + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
+        };
+    }
+
+    @Test (dataProvider = "invalidTableUris")
+    public void testFeedEntityWithInvalidTableUri(String tableUri, String ignore)
+        throws Exception {
+
+        Map<String, String> overlay = context.getUniqueOverlay();
+        overlay.put("colo", "default");
+
+        ClientResponse response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+        context.assertSuccessful(response);
+
+        // submission will parse and validate the feed with table
+        overlay.put("tableUri", tableUri);
+        response = context.submitToFalcon("/hive-table-feed.xml", overlay, EntityType.FEED);
+        context.assertFailure(response);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/webapp/src/test/resources/hive-table-feed.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/hive-table-feed.xml b/webapp/src/test/resources/hive-table-feed.xml
new file mode 100644
index 0000000..8f9d80a
--- /dev/null
+++ b/webapp/src/test/resources/hive-table-feed.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--~
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<feed description="clicks log" name="##inputFeedName##" xmlns="uri:falcon:feed:0.1">
+    <groups>input</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="##cluster##" type="source">
+            <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
+            <retention limit="hours(24)" action="delete"/>
+        </cluster>
+    </clusters>
+
+    <table uri="##tableUri##" />
+
+    <ACL owner="testuser" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>