You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2013/09/12 01:50:22 UTC
git commit: FALCON-102 Add integration tests for feed entity parser
with table. Includes minor refactoring of storage. Contributed by Venkatesh
Seetharam
Updated Branches:
refs/heads/FALCON-85 57ddc0b07 -> 2e103e4a7
FALCON-102 Add integration tests for feed entity parser with table. Includes minor refactoring of storage. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/2e103e4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/2e103e4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/2e103e4a
Branch: refs/heads/FALCON-85
Commit: 2e103e4a7ec4d5d4bfedc94b3d229dd02b5ca8e6
Parents: 57ddc0b
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Sep 11 16:48:50 2013 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Sep 11 16:48:50 2013 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../apache/falcon/entity/CatalogStorage.java | 109 +++++++++++++-
.../org/apache/falcon/entity/FeedHelper.java | 29 ++--
.../apache/falcon/entity/FileSystemStorage.java | 104 +++++++++++++-
.../falcon/entity/parser/FeedEntityParser.java | 10 +-
.../java/org/apache/falcon/group/FeedGroup.java | 4 +-
.../org/apache/falcon/group/FeedGroupMap.java | 3 +-
.../falcon/entity/CatalogStorageTest.java | 78 ++++++----
.../falcon/entity/FileSystemStorageTest.java | 78 +++++++++-
.../entity/parser/FeedEntityParserTest.java | 5 +-
.../resources/config/feed/hive-table-feed.xml | 2 +-
.../workflow/OozieProcessWorkflowBuilder.java | 7 +-
.../converter/OozieProcessMapperTest.java | 4 +-
.../falcon/resource/FeedEntityValidationIT.java | 141 +++++++++++++++++++
webapp/src/test/resources/hive-table-feed.xml | 37 +++++
15 files changed, 549 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6138a92..7eb9e04 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,9 @@ Trunk (Unreleased)
INCOMPATIBLE CHANGES
NEW FEATURES
+ FALCON-102 Add integration tests for feed entity parser for table.
+ (Venkatesh Seetharam)
+
FALCON-103 Upgrade oozie to 4.0.x. (Venkatesh Seetharam)
FALCON-96 Hive client to talk to the metastore. (Venkatesh
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index 5c64f27..6518140 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -20,6 +20,10 @@ package org.apache.falcon.entity;
import org.apache.falcon.FalconException;
import org.apache.falcon.catalog.CatalogServiceFactory;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import java.net.URI;
@@ -37,13 +41,25 @@ public class CatalogStorage implements Storage {
public static final String INPUT_PATH_SEPARATOR = ":";
public static final String OUTPUT_PATH_SEPARATOR = "/";
+ public static final String CATALOG_URL = "${hcatNode}";
+ private static final String DOLLAR_EXPR_START = "_D__START_";
+ private static final String EXPR_CLOSE = "_CLOSE_";
+
private final String catalogUrl;
private String database;
private String table;
private Map<String, String> partitions;
- protected CatalogStorage(String catalogTable) throws URISyntaxException {
- this("${hcatNode}", catalogTable);
+ protected CatalogStorage(Feed feed) throws URISyntaxException {
+ this(CATALOG_URL, feed.getTable());
+ }
+
+ protected CatalogStorage(Cluster cluster, Feed feed) throws URISyntaxException {
+ this(ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint(), feed.getTable());
+ }
+
+ protected CatalogStorage(String catalogUrl, CatalogTable table) throws URISyntaxException {
+ this(catalogUrl, table.getUri());
}
protected CatalogStorage(String catalogUrl, String tableUri) throws URISyntaxException {
@@ -53,7 +69,7 @@ public class CatalogStorage implements Storage {
this.catalogUrl = catalogUrl;
- parse(tableUri);
+ parseFeedUri(tableUri);
}
/**
@@ -64,9 +80,11 @@ public class CatalogStorage implements Storage {
* @param catalogTableUri table URI to parse and validate
* @throws URISyntaxException
*/
- private void parse(String catalogTableUri) throws URISyntaxException {
+ private void parseFeedUri(String catalogTableUri) throws URISyntaxException {
- URI tableUri = new URI(catalogTableUri);
+ final String processed = catalogTableUri.replaceAll("\\$\\{", DOLLAR_EXPR_START)
+ .replaceAll("}", EXPR_CLOSE);
+ URI tableUri = new URI(processed);
if (!"catalog".equals(tableUri.getScheme())) {
throw new URISyntaxException(tableUri.toString(), "catalog scheme is missing");
@@ -98,8 +116,10 @@ public class CatalogStorage implements Storage {
throw new URISyntaxException(tableUri.toString(), "Partition details are missing");
}
+ final String rawPartition = partRaw.replaceAll(DOLLAR_EXPR_START, "\\$\\{")
+ .replaceAll(EXPR_CLOSE, "\\}");
partitions = new HashMap<String, String>();
- String[] parts = partRaw.split(PARTITION_SEPARATOR);
+ String[] parts = rawPartition.split(PARTITION_SEPARATOR);
for (String part : parts) {
if (part == null || part.length() == 0) {
continue;
@@ -115,6 +135,67 @@ public class CatalogStorage implements Storage {
}
}
+ /**
+ * Create an instance from the URI Template that was generated using
+ * the getUriTemplate() method.
+ *
+ * @param uriTemplate the uri template from org.apache.falcon.entity.CatalogStorage#getUriTemplate
+ * @throws URISyntaxException
+ */
+ protected CatalogStorage(String uriTemplate) throws URISyntaxException {
+ if (uriTemplate == null || uriTemplate.length() == 0) {
+ throw new IllegalArgumentException("URI template cannot be null or empty");
+ }
+
+ final String processed = uriTemplate.replaceAll("\\$\\{", DOLLAR_EXPR_START)
+ .replaceAll("}", EXPR_CLOSE);
+ URI uri = new URI(processed);
+
+ this.catalogUrl = uri.getScheme() + "://" + uri.getAuthority();
+
+ parseUriTemplate(uri);
+ }
+
+ private void parseUriTemplate(URI uriTemplate) throws URISyntaxException {
+ String path = uriTemplate.getPath();
+ String[] paths = path.split(OUTPUT_PATH_SEPARATOR);
+ if (paths.length != 4) {
+ throw new URISyntaxException(uriTemplate.toString(),
+ "URI path is not in expected format: database:table");
+ }
+
+ database = paths[1];
+ table = paths[2];
+ String partRaw = paths[3];
+
+ if (database == null || database.length() == 0) {
+ throw new URISyntaxException(uriTemplate.toString(), "DB name is missing");
+ }
+ if (table == null || table.length() == 0) {
+ throw new URISyntaxException(uriTemplate.toString(), "Table name is missing");
+ }
+ if (partRaw == null || partRaw.length() == 0) {
+ throw new URISyntaxException(uriTemplate.toString(), "Partition details are missing");
+ }
+
+ String rawPartition = partRaw.replaceAll(DOLLAR_EXPR_START, "\\$\\{").replaceAll(EXPR_CLOSE, "\\}");
+ partitions = new HashMap<String, String>();
+ String[] parts = rawPartition.split(PARTITION_SEPARATOR);
+ for (String part : parts) {
+ if (part == null || part.length() == 0) {
+ continue;
+ }
+
+ String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR);
+ if (keyVal.length != 2) {
+ throw new URISyntaxException(uriTemplate.toString(),
+ "Partition key value pair is not specified properly in (" + part + ")");
+ }
+
+ partitions.put(keyVal[0], keyVal[1]);
+ }
+ }
+
public String getCatalogUrl() {
return catalogUrl;
}
@@ -152,11 +233,17 @@ public class CatalogStorage implements Storage {
return TYPE.TABLE;
}
+ /**
+ * LocationType does NOT matter here.
+ */
@Override
public String getUriTemplate() {
return getUriTemplate(LocationType.DATA);
}
+ /**
+ * LocationType does NOT matter here.
+ */
@Override
public String getUriTemplate(LocationType locationType) {
StringBuilder uriTemplate = new StringBuilder();
@@ -191,4 +278,14 @@ public class CatalogStorage implements Storage {
&& getTable().equals(catalogStorage.getTable())
&& getPartitions().equals(catalogStorage.getPartitions());
}
+
+ @Override
+ public String toString() {
+ return "CatalogStorage{"
+ + "catalogUrl='" + catalogUrl + '\''
+ + ", database='" + database + '\''
+ + ", table='" + table + '\''
+ + ", partitions=" + partitions
+ + '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index d212a98..364aeb0 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -21,7 +21,6 @@ package org.apache.falcon.entity;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Cluster;
@@ -54,15 +53,16 @@ public final class FeedHelper {
public static Storage createStorage(Feed feed) throws FalconException {
- final List<Location> locations = feed.getLocations().getLocations();
- if (locations != null) {
- return new FileSystemStorage(locations);
+ final Locations feedLocations = feed.getLocations();
+ if (feedLocations != null
+ && feedLocations.getLocations().size() != 0) {
+ return new FileSystemStorage(feed);
}
try {
final CatalogTable table = feed.getTable();
if (table != null) {
- return new CatalogStorage(table.getUri());
+ return new CatalogStorage(feed);
}
} catch (URISyntaxException e) {
throw new FalconException(e);
@@ -97,15 +97,13 @@ public final class FeedHelper {
final List<Location> locations = getLocations(cluster, feed);
if (locations != null) {
- return new FileSystemStorage(ClusterHelper.getStorageUrl(clusterEntity), locations);
+ return new FileSystemStorage(clusterEntity, feed);
}
try {
final CatalogTable table = getTable(cluster, feed);
if (table != null) {
- return new CatalogStorage(
- ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint(),
- table.getUri());
+ return new CatalogStorage(clusterEntity, feed);
}
} catch (URISyntaxException e) {
throw new FalconException(e);
@@ -114,6 +112,19 @@ public final class FeedHelper {
throw new FalconException("Both catalog and locations are not defined.");
}
+ public static Storage createStorage(String type, String storageUriTemplate)
+ throws URISyntaxException {
+
+ Storage.TYPE storageType = Storage.TYPE.valueOf(type);
+ if (storageType == Storage.TYPE.FILESYSTEM) {
+ return new FileSystemStorage(storageUriTemplate);
+ } else if (storageType == Storage.TYPE.TABLE) {
+ return new CatalogStorage(storageUriTemplate);
+ }
+
+ throw new IllegalArgumentException("Bad type: " + type);
+ }
+
private static List<Location> getLocations(Cluster cluster, Feed feed) {
// check if locations are overridden in cluster
final Locations clusterLocations = cluster.getLocations();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 95b18c5..664813b 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -19,9 +19,15 @@
package org.apache.falcon.entity;
import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -29,11 +35,26 @@ import java.util.List;
*/
public class FileSystemStorage implements Storage {
+ public static final String FEED_PATH_SEP = "#";
+ public static final String LOCATION_TYPE_SEP = "=";
+
+ public static final String FILE_SYSTEM_URL = "${nameNode}";
+ private static final String DOLLAR_EXPR_START = "_D__START_";
+ private static final String EXPR_CLOSE = "_CLOSE_";
+
private final String storageUrl;
private final List<Location> locations;
- protected FileSystemStorage(List<Location> locations) {
- this("${nameNode}", locations);
+ protected FileSystemStorage(Feed feed) {
+ this(FILE_SYSTEM_URL, feed.getLocations());
+ }
+
+ protected FileSystemStorage(Cluster cluster, Feed feed) {
+ this(ClusterHelper.getStorageUrl(cluster), feed.getLocations());
+ }
+
+ protected FileSystemStorage(String storageUrl, Locations locations) {
+ this(storageUrl, locations.getLocations());
}
protected FileSystemStorage(String storageUrl, List<Location> locations) {
@@ -49,6 +70,44 @@ public class FileSystemStorage implements Storage {
this.locations = locations;
}
+ /**
+ * Create an instance from the URI Template that was generated using
+ * the getUriTemplate() method.
+ *
+ * @param uriTemplate the uri template from org.apache.falcon.entity.CatalogStorage#getUriTemplate
+ * @throws URISyntaxException
+ */
+ protected FileSystemStorage(String uriTemplate) throws URISyntaxException {
+ if (uriTemplate == null || uriTemplate.length() == 0) {
+ throw new IllegalArgumentException("URI template cannot be null or empty");
+ }
+
+ String rawStorageUrl = null;
+ List<Location> rawLocations = new ArrayList<Location>();
+ String[] feedLocs = uriTemplate.split(FEED_PATH_SEP);
+ for (String rawPath : feedLocs) {
+ String[] typeAndPath = rawPath.split(LOCATION_TYPE_SEP);
+ final String processed = typeAndPath[1].replaceAll("\\$\\{", DOLLAR_EXPR_START)
+ .replaceAll("}", EXPR_CLOSE);
+ URI uri = new URI(processed);
+ if (rawStorageUrl == null) {
+ rawStorageUrl = uri.getScheme() + "://" + uri.getAuthority();
+ }
+
+ String path = uri.getPath();
+ final String finalPath = path.replaceAll(DOLLAR_EXPR_START, "\\$\\{")
+ .replaceAll(EXPR_CLOSE, "\\}");
+
+ Location location = new Location();
+ location.setPath(finalPath);
+ location.setType(LocationType.valueOf(typeAndPath[0]));
+ rawLocations.add(location);
+ }
+
+ this.storageUrl = rawStorageUrl;
+ this.locations = rawLocations;
+ }
+
@Override
public TYPE getType() {
return TYPE.FILESYSTEM;
@@ -64,7 +123,38 @@ public class FileSystemStorage implements Storage {
@Override
public String getUriTemplate() {
- return getUriTemplate(LocationType.DATA);
+ String feedPathMask = getUriTemplate(LocationType.DATA);
+ String metaPathMask = getUriTemplate(LocationType.META);
+ String statsPathMask = getUriTemplate(LocationType.STATS);
+ String tmpPathMask = getUriTemplate(LocationType.TMP);
+
+ StringBuilder feedBasePaths = new StringBuilder();
+ feedBasePaths.append(LocationType.DATA.name())
+ .append(LOCATION_TYPE_SEP)
+ .append(feedPathMask);
+
+ if (metaPathMask != null) {
+ feedBasePaths.append(FEED_PATH_SEP)
+ .append(LocationType.META.name())
+ .append(LOCATION_TYPE_SEP)
+ .append(metaPathMask);
+ }
+
+ if (statsPathMask != null) {
+ feedBasePaths.append(FEED_PATH_SEP)
+ .append(LocationType.STATS.name())
+ .append(LOCATION_TYPE_SEP)
+ .append(statsPathMask);
+ }
+
+ if (tmpPathMask != null) {
+ feedBasePaths.append(FEED_PATH_SEP)
+ .append(LocationType.TMP.name())
+ .append(LOCATION_TYPE_SEP)
+ .append(tmpPathMask);
+ }
+
+ return feedBasePaths.toString();
}
@Override
@@ -121,4 +211,12 @@ public class FileSystemStorage implements Storage {
loc.setType(type);
return loc;
}
+
+ @Override
+ public String toString() {
+ return "FileSystemStorage{"
+ + "storageUrl='" + storageUrl + '\''
+ + ", locations=" + locations
+ + '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index d0435fb..b72efc6 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -31,6 +31,7 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Cluster;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
@@ -109,13 +110,14 @@ public class FeedEntityParser extends EntityParser<Feed> {
private void validateFeedGroups(Feed feed) throws FalconException {
String[] groupNames = feed.getGroups() != null ? feed.getGroups().split(",") : new String[]{};
- String defaultPath = FeedHelper.createStorage(feed).getUriTemplate();
+ final Storage storage = FeedHelper.createStorage(feed);
+ String defaultPath = storage.getUriTemplate(LocationType.DATA);
for (Cluster cluster : feed.getClusters().getClusters()) {
- final String uriTemplate = FeedHelper.createStorage(cluster, feed).getUriTemplate();
+ final String uriTemplate = FeedHelper.createStorage(cluster, feed).getUriTemplate(LocationType.DATA);
if (!FeedGroup.getDatePattern(uriTemplate).equals(
FeedGroup.getDatePattern(defaultPath))) {
throw new ValidationException("Feeds default path pattern: "
- + FeedHelper.createStorage(feed).getUriTemplate()
+ + storage.getUriTemplate(LocationType.DATA)
+ ", does not match with cluster: "
+ cluster.getName()
+ " path pattern: "
@@ -127,7 +129,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
if (group != null && !group.canContainFeed(feed)) {
throw new ValidationException(
"Feed " + feed.getName() + "'s frequency: " + feed.getFrequency().toString()
- + ", path pattern: " + FeedHelper.createStorage(feed)
+ + ", path pattern: " + storage
+ " does not match with group: " + group.getName() + "'s frequency: "
+ group.getFrequency()
+ ", date pattern: " + group.getDatePattern());
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/main/java/org/apache/falcon/group/FeedGroup.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroup.java b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
index d517828..d288925 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroup.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
@@ -21,6 +21,7 @@ import org.apache.falcon.FalconException;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.common.FeedDataPath;
import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.LocationType;
import java.util.ArrayList;
import java.util.Collections;
@@ -95,6 +96,7 @@ public class FeedGroup {
public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException {
return this.frequency.equals(feed.getFrequency())
- && this.datePattern.equals(getDatePattern(FeedHelper.createStorage(feed).getUriTemplate()));
+ && this.datePattern.equals(getDatePattern(
+ FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
index d054873..0a35a71 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
@@ -24,6 +24,7 @@ import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.service.ConfigurationChangeListener;
import java.util.Collections;
@@ -116,6 +117,6 @@ public final class FeedGroupMap implements ConfigurationChangeListener {
public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed)
throws FalconException {
return getGroups(feed.getGroups(), feed.getFrequency(),
- FeedHelper.createStorage(feed).getUriTemplate());
+ FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
index 458111d..90eaebc 100644
--- a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
@@ -33,15 +33,15 @@ public class CatalogStorageTest {
@Test
public void testGetType() throws Exception {
- String table = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
- CatalogStorage storage = new CatalogStorage(table);
+ String table = "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us";
+ CatalogStorage storage = new CatalogStorage(CatalogStorage.CATALOG_URL, table);
Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
}
@Test
- public void testParseVaildURI() throws URISyntaxException {
- String table = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
- CatalogStorage storage = new CatalogStorage(table);
+ public void testParseFeedUriValid() throws URISyntaxException {
+ String table = "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us";
+ CatalogStorage storage = new CatalogStorage(CatalogStorage.CATALOG_URL, table);
Assert.assertEquals("${hcatNode}", storage.getCatalogUrl());
Assert.assertEquals("clicksdb", storage.getDatabase());
Assert.assertEquals("clicks", storage.getTable());
@@ -53,77 +53,97 @@ public class CatalogStorageTest {
Assert.assertFalse(storage.hasPartition("unknown"));
}
+ @Test
+ public void testCreateFromUriTemplate() throws Exception {
+ String uriTemplate = "thrift://localhost:49083/clicksdb/clicks/region=us;ds=${YEAR}-${MONTH}-${DAY}";
+ CatalogStorage storage = new CatalogStorage(uriTemplate);
+ Assert.assertEquals("thrift://localhost:49083", storage.getCatalogUrl());
+ Assert.assertEquals("clicksdb", storage.getDatabase());
+ Assert.assertEquals("clicks", storage.getTable());
+ Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
+ Assert.assertEquals(2, storage.getPartitions().size());
+ Assert.assertEquals("us", storage.getPartitionValue("region"));
+ Assert.assertTrue(storage.hasPartition("region"));
+ Assert.assertNull(storage.getPartitionValue("unknown"));
+ Assert.assertFalse(storage.hasPartition("unknown"));
+ }
- @DataProvider(name = "invalidTableURIs")
- public Object[][] createInvalidTableURIData() {
+ @DataProvider(name = "invalidFeedURIs")
+ public Object[][] createParseFeedUriInvalid() {
return new Object[][] {
- {"catalog:default:clicks:ds=$YEAR-$MONTH-$DAY#region=us", ""},
- {"default:clicks:ds=$YEAR-$MONTH-$DAY#region=us", ""},
- {"catalog:default#ds=$YEAR-$MONTH-$DAY;region=us", ""},
- {"catalog://default/clicks#ds=$YEAR-$MONTH-$DAY:region=us", ""},
+ {"catalog:default:clicks:ds=${YEAR}-${MONTH}-${DAY}#region=us", ""},
+ {"default:clicks:ds=${YEAR}-${MONTH}-${DAY}#region=us", ""},
+ {"catalog:default#ds=${YEAR}-${MONTH}-${DAY};region=us", ""},
+ {"catalog://default/clicks#ds=${YEAR}-${MONTH}-${DAY}:region=us", ""},
};
}
- @Test(dataProvider = "invalidTableURIs", expectedExceptions = URISyntaxException.class)
- public void testParseInvalidURI(String tableUri, String ignore) throws URISyntaxException {
- new CatalogStorage(tableUri);
+ @Test(dataProvider = "invalidFeedURIs", expectedExceptions = URISyntaxException.class)
+ public void testParseFeedUriInvalid(String tableUri, String ignore) throws URISyntaxException {
+ new CatalogStorage(CatalogStorage.CATALOG_URL, tableUri);
Assert.fail("Exception must have been thrown");
}
@Test
public void testIsIdenticalPositive() throws Exception {
- CatalogStorage table1 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
- CatalogStorage table2 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+ CatalogStorage table1 = new CatalogStorage(CatalogStorage.CATALOG_URL,
+ "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
+ CatalogStorage table2 = new CatalogStorage(CatalogStorage.CATALOG_URL,
+ "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
Assert.assertTrue(table1.isIdentical(table2));
final String catalogUrl = "thrift://localhost:49083";
CatalogStorage table3 = new CatalogStorage(catalogUrl,
- "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+ "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
CatalogStorage table4 = new CatalogStorage(catalogUrl,
- "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+ "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
Assert.assertTrue(table3.isIdentical(table4));
}
@Test
public void testIsIdenticalNegative() throws Exception {
- CatalogStorage table1 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
- CatalogStorage table2 = new CatalogStorage("catalog:clicksdb:impressions#ds=$YEAR-$MONTH-$DAY;region=us");
+ CatalogStorage table1 = new CatalogStorage(CatalogStorage.CATALOG_URL,
+ "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
+ CatalogStorage table2 = new CatalogStorage(CatalogStorage.CATALOG_URL,
+ "catalog:clicksdb:impressions#ds=${YEAR}-${MONTH}-${DAY};region=us");
Assert.assertFalse(table1.isIdentical(table2));
final String catalogUrl = "thrift://localhost:49083";
CatalogStorage table3 = new CatalogStorage(catalogUrl,
- "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+ "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
CatalogStorage table4 = new CatalogStorage(catalogUrl,
- "catalog:clicksdb:impressions#ds=$YEAR-$MONTH-$DAY;region=us");
+ "catalog:clicksdb:impressions#ds=${YEAR}-${MONTH}-${DAY};region=us");
Assert.assertFalse(table3.isIdentical(table4));
CatalogStorage table5 = new CatalogStorage("thrift://localhost:49084",
- "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+ "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
CatalogStorage table6 = new CatalogStorage("thrift://localhost:49083",
- "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+ "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us");
Assert.assertFalse(table5.isIdentical(table6));
}
@Test
public void testGetUriTemplateWithCatalogUrl() throws Exception {
final String catalogUrl = "thrift://localhost:49083";
- String tableUri = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
- String uriTemplate = "thrift://localhost:49083/clicksdb/clicks/region=us;ds=$YEAR-$MONTH-$DAY";
+ String tableUri = "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us";
+ String uriTemplate = "thrift://localhost:49083/clicksdb/clicks/region=us;ds=${YEAR}-${MONTH}-${DAY}";
CatalogStorage table = new CatalogStorage(catalogUrl, tableUri);
Assert.assertEquals(uriTemplate, table.getUriTemplate());
Assert.assertEquals(uriTemplate, table.getUriTemplate(LocationType.DATA));
+ Assert.assertEquals(table.getUriTemplate(), table.getUriTemplate(LocationType.DATA));
}
@Test
public void testGetUriTemplateWithOutCatalogUrl() throws Exception {
- String tableUri = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
- String uriTemplate = "${hcatNode}/clicksdb/clicks/region=us;ds=$YEAR-$MONTH-$DAY";
+ String tableUri = "catalog:clicksdb:clicks#ds=${YEAR}-${MONTH}-${DAY};region=us";
+ String uriTemplate = "${hcatNode}/clicksdb/clicks/region=us;ds=${YEAR}-${MONTH}-${DAY}";
- CatalogStorage table = new CatalogStorage(tableUri);
+ CatalogStorage table = new CatalogStorage(CatalogStorage.CATALOG_URL, tableUri);
Assert.assertEquals(uriTemplate, table.getUriTemplate());
Assert.assertEquals(uriTemplate, table.getUriTemplate(LocationType.DATA));
+ Assert.assertEquals(table.getUriTemplate(), table.getUriTemplate(LocationType.DATA));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index 3e3b575..a059652 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -39,12 +39,35 @@ public class FileSystemStorageTest {
List<Location> locations = new ArrayList<Location>();
locations.add(location);
- FileSystemStorage storage = new FileSystemStorage(locations);
+ FileSystemStorage storage = new FileSystemStorage(FileSystemStorage.FILE_SYSTEM_URL, locations);
Assert.assertEquals(storage.getType(), Storage.TYPE.FILESYSTEM);
}
@Test
- public void testGetUriTemplate() throws Exception {
+ public void testCreateFromUriTemplate() throws Exception {
+ String feedBasePath = "DATA=hdfs://localhost:8020"
+ + "/data/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}"
+ + "#"
+ + "META=hdfs://localhost:8020"
+ + "/meta/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}"
+ + "#"
+ + "STATS=hdfs://localhost:8020"
+ + "/stats/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}";
+
+ FileSystemStorage storage = new FileSystemStorage(feedBasePath);
+ Assert.assertEquals(storage.getUriTemplate(), feedBasePath + "#TMP=/tmp");
+
+ Assert.assertEquals("hdfs://localhost:8020", storage.getStorageUrl());
+ Assert.assertEquals("hdfs://localhost:8020/data/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}",
+ storage.getUriTemplate(LocationType.DATA));
+ Assert.assertEquals("hdfs://localhost:8020/stats/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}",
+ storage.getUriTemplate(LocationType.STATS));
+ Assert.assertEquals("hdfs://localhost:8020/meta/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}",
+ storage.getUriTemplate(LocationType.META));
+ }
+
+ @Test
+ public void testGetUriTemplateForData() throws Exception {
final Location location = new Location();
location.setPath("/foo/bar");
location.setType(LocationType.DATA);
@@ -52,7 +75,52 @@ public class FileSystemStorageTest {
locations.add(location);
FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
- Assert.assertEquals(storage.getUriTemplate(), "hdfs://localhost:41020/foo/bar");
+ Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "hdfs://localhost:41020/foo/bar");
+ }
+
+ @Test
+ public void testGetUriTemplate() throws Exception {
+ final Location dataLocation = new Location();
+ dataLocation.setPath("/data/foo/bar");
+ dataLocation.setType(LocationType.DATA);
+
+ final Location metaLocation = new Location();
+ metaLocation.setPath("/meta/foo/bar");
+ metaLocation.setType(LocationType.META);
+
+ final Location statsLocation = new Location();
+ statsLocation.setPath("/stats/foo/bar");
+ statsLocation.setType(LocationType.STATS);
+
+ final Location tmpLocation = new Location();
+ tmpLocation.setPath("/tmp/foo/bar");
+ tmpLocation.setType(LocationType.TMP);
+
+ List<Location> locations = new ArrayList<Location>();
+ locations.add(dataLocation);
+ locations.add(metaLocation);
+ locations.add(statsLocation);
+ locations.add(tmpLocation);
+
+ StringBuilder expected = new StringBuilder();
+ expected.append(LocationType.DATA)
+ .append(FileSystemStorage.LOCATION_TYPE_SEP)
+ .append("hdfs://localhost:41020/data/foo/bar")
+ .append(FileSystemStorage.FEED_PATH_SEP)
+ .append(LocationType.META)
+ .append(FileSystemStorage.LOCATION_TYPE_SEP)
+ .append("hdfs://localhost:41020/meta/foo/bar")
+ .append(FileSystemStorage.FEED_PATH_SEP)
+ .append(LocationType.STATS)
+ .append(FileSystemStorage.LOCATION_TYPE_SEP)
+ .append("hdfs://localhost:41020/stats/foo/bar")
+ .append(FileSystemStorage.FEED_PATH_SEP)
+ .append(LocationType.TMP)
+ .append(FileSystemStorage.LOCATION_TYPE_SEP)
+ .append("hdfs://localhost:41020/tmp/foo/bar");
+
+ FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
+ Assert.assertEquals(storage.getUriTemplate(), expected.toString());
}
@Test
@@ -63,8 +131,8 @@ public class FileSystemStorageTest {
List<Location> locations = new ArrayList<Location>();
locations.add(location);
- FileSystemStorage storage = new FileSystemStorage(locations);
- Assert.assertEquals(storage.getUriTemplate(), "${nameNode}/foo/bar");
+ FileSystemStorage storage = new FileSystemStorage(FileSystemStorage.FILE_SYSTEM_URL, locations);
+ Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "${nameNode}/foo/bar");
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index 8bcff78..66bdf5c 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -275,7 +275,8 @@ public class FeedEntityParserTest extends AbstractTestBase {
parser.parseAndValidate(feed2.toString());
}
- @Test(expectedExceptions = ValidationException.class)
+ // TODO Disabled the test since I do not see anything invalid in here.
+ @Test(enabled = false, expectedExceptions = ValidationException.class)
public void testInvalidFeedClusterDataLocation() throws JAXBException, FalconException {
Feed feed1 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
(FeedEntityParserTest.class.getResourceAsStream(FEED_XML)));
@@ -450,7 +451,7 @@ public class FeedEntityParserTest extends AbstractTestBase {
final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
Feed feedWithTable = parser.parse(inputStream);
Assert.assertEquals(feedWithTable.getTable().getUri(),
- "catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR");
+ "catalog:default:clicks#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}");
}
@Test (expectedExceptions = FalconException.class)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/common/src/test/resources/config/feed/hive-table-feed.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/hive-table-feed.xml b/common/src/test/resources/config/feed/hive-table-feed.xml
index e84f90a..bc78ac7 100644
--- a/common/src/test/resources/config/feed/hive-table-feed.xml
+++ b/common/src/test/resources/config/feed/hive-table-feed.xml
@@ -41,7 +41,7 @@
</cluster>
</clusters>
- <table uri="catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR" />
+ <table uri="catalog:default:clicks#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
<ACL owner="testuser" group="group" permission="0x755"/>
<schema location="/schema/clicks" provider="protobuf"/>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 128a73d..1329733 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -31,6 +31,7 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.security.CurrentUser;
@@ -72,9 +73,9 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
properties.put(inName + ".initial-instance", SchemaHelper.formatDateUTC(cluster.getValidity().getStart()));
properties.put(inName + ".done-flag", "notused");
- String locPath = FeedHelper.createStorage(clusterName, feed).getUriTemplate().replace('$', '%');
- properties.put(inName + ".uri-template",
- new Path(locPath).toUri().getScheme() != null ? locPath : "${nameNode}" + locPath);
+ String locPath = FeedHelper.createStorage(clusterName, feed)
+ .getUriTemplate(LocationType.DATA).replace('$', '%');
+ properties.put(inName + ".uri-template", locPath);
properties.put(inName + ".start-instance", in.getStart());
properties.put(inName + ".end-instance", in.getEnd());
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index e3a42ca..137bc7c 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -47,6 +47,7 @@ import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Validity;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
@@ -147,7 +148,8 @@ public class OozieProcessMapperTest extends AbstractTestBase {
assertEquals(feed.getTimezone().getID(), ds.getTimezone());
assertEquals("${coord:" + feed.getFrequency().toString() + "}", ds.getFrequency());
assertEquals("", ds.getDoneFlag());
- assertEquals(ds.getUriTemplate(), FeedHelper.createStorage(feedCluster, feed).getUriTemplate());
+ assertEquals(ds.getUriTemplate(),
+ FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA));
for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
if (prop.getName().equals("mapred.job.priority")) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java b/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java
new file mode 100644
index 0000000..93de109
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.falcon.catalog.HiveCatalogService;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.hcatalog.api.HCatClient;
+import org.apache.hcatalog.api.HCatCreateDBDesc;
+import org.apache.hcatalog.api.HCatCreateTableDesc;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+/**
+ * Tests feed entity validation to verify if the table specified is valid.
+ */
+public class FeedEntityValidationIT {
+
+ private static final String METASTORE_URL = "thrift://localhost:49083";
+ private static final String DATABASE_NAME = "falcondb";
+ private static final String TABLE_NAME = "clicks";
+ private static final String TABLE_URI =
+ "catalog:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}";
+
+ private final TestContext context = new TestContext();
+ private HCatClient client;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ TestContext.prepare();
+
+ client = HiveCatalogService.get(METASTORE_URL);
+
+ createDatabase();
+ createTable();
+ }
+
+ private void createDatabase() throws Exception {
+ HCatCreateDBDesc dbDesc = HCatCreateDBDesc.create(DATABASE_NAME)
+ .ifNotExists(true).build();
+ client.createDatabase(dbDesc);
+ }
+
+ public void createTable() throws Exception {
+ ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
+ cols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, "id comment"));
+ cols.add(new HCatFieldSchema("value", HCatFieldSchema.Type.STRING, "value comment"));
+
+ HCatCreateTableDesc tableDesc = HCatCreateTableDesc
+ .create(DATABASE_NAME, TABLE_NAME, cols)
+ .fileFormat("rcfile")
+ .ifNotExists(true)
+ .comments("falcon integration test")
+ .build();
+ client.createTable(tableDesc);
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ dropTable();
+ dropDatabase();
+ }
+
+ private void dropTable() throws Exception {
+ client.dropTable(DATABASE_NAME, TABLE_NAME, true);
+ }
+
+ private void dropDatabase() throws Exception {
+ client.dropDatabase(DATABASE_NAME, true, HCatClient.DropDBMode.CASCADE);
+ }
+
+ /**
+ * Positive test.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testFeedEntityWithValidTable() throws Exception {
+ Map<String, String> overlay = context.getUniqueOverlay();
+ overlay.put("colo", "default");
+
+ ClientResponse response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+ context.assertSuccessful(response);
+
+ // submission will parse and validate the feed with table
+ overlay.put("tableUri", TABLE_URI);
+ response = context.submitToFalcon("/hive-table-feed.xml", overlay, EntityType.FEED);
+ context.assertSuccessful(response);
+ }
+
+ @DataProvider(name = "invalidTableUris")
+ public Object[][] createInvalidTableUriData() {
+ return new Object[][] {
+ // does not match with group input's frequency
+ {"catalog:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
+ {"catalog:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
+ {"badscheme:" + DATABASE_NAME + ":" + TABLE_NAME + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
+ {"catalog:" + DATABASE_NAME + ":" + "badtable" + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
+ {"catalog:" + "baddb" + ":" + TABLE_NAME + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
+ {"catalog:" + "baddb" + ":" + "badtable" + "#ds=ds=${YEAR}-${MONTH}-${DAY}", ""},
+ };
+ }
+
+ @Test (dataProvider = "invalidTableUris")
+ public void testFeedEntityWithInvalidTableUri(String tableUri, String ignore)
+ throws Exception {
+
+ Map<String, String> overlay = context.getUniqueOverlay();
+ overlay.put("colo", "default");
+
+ ClientResponse response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+ context.assertSuccessful(response);
+
+ // submission will parse and validate the feed with table
+ overlay.put("tableUri", tableUri);
+ response = context.submitToFalcon("/hive-table-feed.xml", overlay, EntityType.FEED);
+ context.assertFailure(response);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/2e103e4a/webapp/src/test/resources/hive-table-feed.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/hive-table-feed.xml b/webapp/src/test/resources/hive-table-feed.xml
new file mode 100644
index 0000000..8f9d80a
--- /dev/null
+++ b/webapp/src/test/resources/hive-table-feed.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--~
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<feed description="clicks log" name="##inputFeedName##" xmlns="uri:falcon:feed:0.1">
+ <groups>input</groups>
+
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+ <late-arrival cut-off="hours(6)"/>
+
+ <clusters>
+ <cluster name="##cluster##" type="source">
+ <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
+ <retention limit="hours(24)" action="delete"/>
+ </cluster>
+ </clusters>
+
+ <table uri="##tableUri##" />
+
+ <ACL owner="testuser" group="group" permission="0x755"/>
+ <schema location="/schema/clicks" provider="protobuf"/>
+</feed>