You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/09/01 19:15:43 UTC
git commit: FALCON-87 Hive table integration with feed entity.
Contributed by Venkatesh Seetharam
Updated Branches:
refs/heads/master edf03d4a8 -> 1e6ace432
FALCON-87 Hive table integration with feed entity. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/1e6ace43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/1e6ace43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/1e6ace43
Branch: refs/heads/master
Commit: 1e6ace4327910250f60e6d1e41ce5b1726bf4a4f
Parents: edf03d4
Author: srikanth.sundarrajan <sr...@inmobi.com>
Authored: Sun Sep 1 21:53:53 2013 +0530
Committer: srikanth.sundarrajan <sr...@inmobi.com>
Committed: Sun Sep 1 21:53:53 2013 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 +
client/src/main/resources/feed-0.1.xsd | 22 ++-
.../falcon/catalog/AbstractCatalogService.java | 4 +
.../falcon/catalog/HiveCatalogService.java | 32 ++-
.../apache/falcon/entity/CatalogStorage.java | 194 +++++++++++++++++++
.../org/apache/falcon/entity/FeedHelper.java | 101 ++++++++--
.../apache/falcon/entity/FileSystemStorage.java | 124 ++++++++++++
.../java/org/apache/falcon/entity/Storage.java | 74 +++++++
.../falcon/entity/parser/FeedEntityParser.java | 45 ++++-
.../java/org/apache/falcon/group/FeedGroup.java | 6 +-
.../org/apache/falcon/group/FeedGroupMap.java | 6 +-
.../org/apache/falcon/update/UpdateHelper.java | 34 ++--
.../falcon/entity/CatalogStorageTest.java | 129 ++++++++++++
.../falcon/entity/FileSystemStorageTest.java | 133 +++++++++++++
.../entity/parser/FeedEntityParserTest.java | 25 ++-
.../apache/falcon/update/UpdateHelperTest.java | 24 ++-
.../resources/config/feed/hive-table-feed.xml | 48 +++++
.../test/resources/config/feed/invalid-feed.xml | 53 +++++
.../falcon/converter/OozieFeedMapper.java | 70 +++----
.../falcon/converter/OozieFeedMapperTest.java | 32 ++-
.../falcon/converter/OozieProcessMapper.java | 6 +-
.../workflow/OozieProcessWorkflowBuilder.java | 3 +-
.../converter/OozieProcessMapperTest.java | 10 +-
.../org/apache/falcon/resource/TestContext.java | 8 +-
24 files changed, 1063 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b2dc1d3..046fc27 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,9 @@ Trunk (Unreleased)
INCOMPATIBLE CHANGES
NEW FEATURES
+ FALCON-87 Hive table integration with feed entity. (Venkatesh
+ Seetharam via Srikanth Sundarrajan)
+
FALCON-86 Hive table integration with cluster entity. (Venaktesh
Seetharam via Srikanth Sundarrajan)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/client/src/main/resources/feed-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd
index bf6fa81..00b5172 100644
--- a/client/src/main/resources/feed-0.1.xsd
+++ b/client/src/main/resources/feed-0.1.xsd
@@ -105,7 +105,10 @@
</xs:element>
<xs:element type="late-arrival" name="late-arrival" minOccurs="0"/>
<xs:element type="clusters" name="clusters"/>
- <xs:element type="locations" name="locations"/>
+ <xs:choice minOccurs="1" maxOccurs="1">
+ <xs:element type="locations" name="locations"/>
+ <xs:element type="catalog-table" name="table"/>
+ </xs:choice>
<xs:element type="ACL" name="ACL"/>
<xs:element type="schema" name="schema"/>
<xs:element type="properties" name="properties" minOccurs="0"/>
@@ -139,7 +142,10 @@
<xs:sequence>
<xs:element type="validity" name="validity"/>
<xs:element type="retention" name="retention"/>
- <xs:element type="locations" name="locations" minOccurs="0"/>
+ <xs:choice minOccurs="0" maxOccurs="1">
+ <xs:element type="locations" name="locations" minOccurs="0"/>
+ <xs:element type="catalog-table" name="table"/>
+ </xs:choice>
</xs:sequence>
<xs:attribute type="IDENTIFIER" name="name" use="required"/>
<xs:attribute type="cluster-type" name="type" use="optional"/>
@@ -194,7 +200,7 @@
<xs:complexType name="locations">
<xs:annotation>
<xs:documentation>
- A list of locations.
+ A list of locations on the file system.
</xs:documentation>
</xs:annotation>
<xs:choice maxOccurs="unbounded" minOccurs="0">
@@ -356,4 +362,14 @@
<xs:pattern value="(\w+=[^,]+)?([,]?[ ]*[\w]+=[^,]+)*"/>
</xs:restriction>
</xs:simpleType>
+ <xs:complexType name="catalog-table">
+ <xs:annotation>
+ <xs:documentation>
+ catalog specifies the uri of a Hive table along with the partition spec.
+ uri="catalog:$database:$table#(partition-key=partition-value);+"
+ Example: catalog:logs-db:clicks#ds=${YEAR}-${MONTH}-${DAY}
+ </xs:documentation>
+ </xs:annotation>
+ <xs:attribute type="xs:string" name="uri" use="required"/>
+ </xs:complexType>
</xs:schema>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
index 4086611..a4b6cfd 100644
--- a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
@@ -23,6 +23,7 @@ import org.apache.falcon.FalconException;
/**
* Interface definition for a catalog registry service
* such as Hive or HCatalog.
+ * Catalog should minimally support the following operations.
*/
public abstract class AbstractCatalogService {
@@ -34,4 +35,7 @@ public abstract class AbstractCatalogService {
* @throws FalconException exception
*/
public abstract boolean isAlive(String catalogBaseUrl) throws FalconException;
+
+ public abstract boolean tableExists(String catalogUrl, String database, String tableName)
+ throws FalconException;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
index e6f7fe2..67d14af 100644
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -22,6 +22,7 @@ import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.falcon.FalconException;
+import org.apache.falcon.security.CurrentUser;
import org.apache.log4j.Logger;
import javax.ws.rs.core.MediaType;
@@ -40,7 +41,36 @@ public class HiveCatalogService extends AbstractCatalogService {
Client client = Client.create();
WebResource service = client.resource(catalogBaseUrl);
- ClientResponse response = service.path("status").accept(MediaType.APPLICATION_JSON).head();
+ ClientResponse response = service.path("status")
+ .accept(MediaType.APPLICATION_JSON)
+ .head();
+ // .get(ClientResponse.class); // todo this isnt working
+
+ if (LOG.isDebugEnabled() && response.getStatus() != 200) {
+ LOG.debug("Output from Server .... \n" + response.getEntity(String.class));
+ }
+
+ return response.getStatus() == 200;
+ }
+
+ @Override
+ public boolean tableExists(String catalogUrl, String database, String tableName)
+ throws FalconException {
+ LOG.info("Checking if the table exists: " + tableName);
+
+ Client client = Client.create();
+ WebResource service = client.resource(catalogUrl);
+
+ ClientResponse response = service.path("ddl/database/").path(database)
+ .path("/table").path(tableName)
+ .queryParam("user.name", CurrentUser.getUser())
+ .accept(MediaType.APPLICATION_JSON)
+ .get(ClientResponse.class);
+
+ if (LOG.isDebugEnabled() && response.getStatus() != 200) {
+ LOG.debug("Output from Server .... \n" + response.getEntity(String.class));
+ }
+
return response.getStatus() == 200;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
new file mode 100644
index 0000000..5c64f27
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.catalog.CatalogServiceFactory;
+import org.apache.falcon.entity.v0.feed.LocationType;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A catalog registry implementation of a feed storage.
+ */
+public class CatalogStorage implements Storage {
+
+ public static final String PARTITION_SEPARATOR = ";";
+ public static final String PARTITION_KEYVAL_SEPARATOR = "=";
+ public static final String INPUT_PATH_SEPARATOR = ":";
+ public static final String OUTPUT_PATH_SEPARATOR = "/";
+
+ private final String catalogUrl;
+ private String database;
+ private String table;
+ private Map<String, String> partitions;
+
+ protected CatalogStorage(String catalogTable) throws URISyntaxException {
+ this("${hcatNode}", catalogTable);
+ }
+
+ protected CatalogStorage(String catalogUrl, String tableUri) throws URISyntaxException {
+ if (catalogUrl == null || catalogUrl.length() == 0) {
+ throw new IllegalArgumentException("Catalog Registry URL cannot be null or empty");
+ }
+
+ this.catalogUrl = catalogUrl;
+
+ parse(tableUri);
+ }
+
+ /**
+ * Validate URI to conform to catalog:$database:$table#$partitions.
+ * scheme=catalog:database=$database:table=$table#$partitions
+ * partitions=key=value;key=value
+ *
+ * @param catalogTableUri table URI to parse and validate
+ * @throws URISyntaxException
+ */
+ private void parse(String catalogTableUri) throws URISyntaxException {
+
+ URI tableUri = new URI(catalogTableUri);
+
+ if (!"catalog".equals(tableUri.getScheme())) {
+ throw new URISyntaxException(tableUri.toString(), "catalog scheme is missing");
+ }
+
+ final String schemeSpecificPart = tableUri.getSchemeSpecificPart();
+ if (schemeSpecificPart == null) {
+ throw new URISyntaxException(tableUri.toString(), "Database and Table are missing");
+ }
+
+ String[] paths = schemeSpecificPart.split(INPUT_PATH_SEPARATOR);
+
+ if (paths.length != 2) {
+ throw new URISyntaxException(tableUri.toString(), "URI path is not in expected format: database:table");
+ }
+
+ database = paths[0];
+ table = paths[1];
+
+ if (database == null || database.length() == 0) {
+ throw new URISyntaxException(tableUri.toString(), "DB name is missing");
+ }
+ if (table == null || table.length() == 0) {
+ throw new URISyntaxException(tableUri.toString(), "Table name is missing");
+ }
+
+ String partRaw = tableUri.getFragment();
+ if (partRaw == null || partRaw.length() == 0) {
+ throw new URISyntaxException(tableUri.toString(), "Partition details are missing");
+ }
+
+ partitions = new HashMap<String, String>();
+ String[] parts = partRaw.split(PARTITION_SEPARATOR);
+ for (String part : parts) {
+ if (part == null || part.length() == 0) {
+ continue;
+ }
+
+ String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR);
+ if (keyVal.length != 2) {
+ throw new URISyntaxException(tableUri.toString(),
+ "Partition key value pair is not specified properly in (" + part + ")");
+ }
+
+ partitions.put(keyVal[0], keyVal[1]);
+ }
+ }
+
+ public String getCatalogUrl() {
+ return catalogUrl;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public Map<String, String> getPartitions() {
+ return partitions;
+ }
+
+ /**
+ * @param key partition key
+ * @return partition value
+ */
+ public String getPartitionValue(String key) {
+ return partitions.get(key);
+ }
+
+ /**
+ * @param key partition key
+ * @return if partitions map includes the key or not
+ */
+ public boolean hasPartition(String key) {
+ return partitions.containsKey(key);
+ }
+
+ @Override
+ public TYPE getType() {
+ return TYPE.TABLE;
+ }
+
+ @Override
+ public String getUriTemplate() {
+ return getUriTemplate(LocationType.DATA);
+ }
+
+ @Override
+ public String getUriTemplate(LocationType locationType) {
+ StringBuilder uriTemplate = new StringBuilder();
+ uriTemplate.append(catalogUrl);
+ uriTemplate.append(OUTPUT_PATH_SEPARATOR);
+ uriTemplate.append(database);
+ uriTemplate.append(OUTPUT_PATH_SEPARATOR);
+ uriTemplate.append(table);
+ uriTemplate.append(OUTPUT_PATH_SEPARATOR);
+ for (Map.Entry<String, String> entry : partitions.entrySet()) {
+ uriTemplate.append(entry.getKey());
+ uriTemplate.append(PARTITION_KEYVAL_SEPARATOR);
+ uriTemplate.append(entry.getValue());
+ uriTemplate.append(PARTITION_SEPARATOR);
+ }
+ uriTemplate.setLength(uriTemplate.length() - 1);
+
+ return uriTemplate.toString();
+ }
+
+ @Override
+ public boolean exists() throws FalconException {
+ return CatalogServiceFactory.getCatalogService().tableExists(catalogUrl, database, table);
+ }
+
+ @Override
+ public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
+ CatalogStorage catalogStorage = (CatalogStorage) toCompareAgainst;
+
+ return !(getCatalogUrl() != null && !getCatalogUrl().equals(catalogStorage.getCatalogUrl()))
+ && getDatabase().equals(catalogStorage.getDatabase())
+ && getTable().equals(catalogStorage.getTable())
+ && getPartitions().equals(catalogStorage.getPartitions());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index c96120d..d212a98 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -20,11 +20,19 @@ package org.apache.falcon.entity;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.entity.v0.feed.*;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.expression.ExpressionHelper;
+import java.net.URISyntaxException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -44,32 +52,87 @@ public final class FeedHelper {
return null;
}
- public static Location getLocation(Feed feed, LocationType type,
- String clusterName) {
- Cluster cluster = getCluster(feed, clusterName);
- if (cluster != null && cluster.getLocations() != null
- && cluster.getLocations().getLocations().size() != 0) {
- return getLocation(cluster.getLocations(), type);
- } else {
- return getLocation(feed.getLocations(), type);
+ public static Storage createStorage(Feed feed) throws FalconException {
+
+ final List<Location> locations = feed.getLocations().getLocations();
+ if (locations != null) {
+ return new FileSystemStorage(locations);
+ }
+
+ try {
+ final CatalogTable table = feed.getTable();
+ if (table != null) {
+ return new CatalogStorage(table.getUri());
+ }
+ } catch (URISyntaxException e) {
+ throw new FalconException(e);
}
+ throw new FalconException("Both catalog and locations are not defined.");
}
- public static Location getLocation(Feed feed, LocationType type) {
- return getLocation(feed.getLocations(), type);
+ public static Storage createStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
+ Feed feed) throws FalconException {
+ return createStorage(getCluster(feed, clusterEntity.getName()), feed, clusterEntity);
+ }
+
+ public static Storage createStorage(String clusterName, Feed feed)
+ throws FalconException {
+
+ return createStorage(getCluster(feed, clusterName), feed);
+ }
+
+ public static Storage createStorage(Cluster cluster, Feed feed)
+ throws FalconException {
+
+ final org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
+ EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
+
+ return createStorage(cluster, feed, clusterEntity);
}
- public static Location getLocation(Locations locations, LocationType type) {
- for (Location loc : locations.getLocations()) {
- if (loc.getType() == type) {
- return loc;
+ public static Storage createStorage(Cluster cluster, Feed feed,
+ org.apache.falcon.entity.v0.cluster.Cluster clusterEntity)
+ throws FalconException {
+
+ final List<Location> locations = getLocations(cluster, feed);
+ if (locations != null) {
+ return new FileSystemStorage(ClusterHelper.getStorageUrl(clusterEntity), locations);
+ }
+
+ try {
+ final CatalogTable table = getTable(cluster, feed);
+ if (table != null) {
+ return new CatalogStorage(
+ ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint(),
+ table.getUri());
}
+ } catch (URISyntaxException e) {
+ throw new FalconException(e);
}
- Location loc = new Location();
- loc.setPath("/tmp");
- loc.setType(type);
- return loc;
+
+ throw new FalconException("Both catalog and locations are not defined.");
+ }
+
+ private static List<Location> getLocations(Cluster cluster, Feed feed) {
+ // check if locations are overridden in cluster
+ final Locations clusterLocations = cluster.getLocations();
+ if (clusterLocations != null
+ && clusterLocations.getLocations().size() != 0) {
+ return clusterLocations.getLocations();
+ }
+
+ final Locations feedLocations = feed.getLocations();
+ return feedLocations == null ? null : feedLocations.getLocations();
+ }
+
+ private static CatalogTable getTable(Cluster cluster, Feed feed) {
+ // check if table is overridden in cluster
+ if (cluster.getTable() != null) {
+ return cluster.getTable();
+ }
+
+ return feed.getTable();
}
public static String normalizePartitionExpression(String part1, String part2) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
new file mode 100644
index 0000000..95b18c5
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+
+import java.util.List;
+
+/**
+ * A file system implementation of a feed storage.
+ */
+public class FileSystemStorage implements Storage {
+
+ private final String storageUrl;
+ private final List<Location> locations;
+
+ protected FileSystemStorage(List<Location> locations) {
+ this("${nameNode}", locations);
+ }
+
+ protected FileSystemStorage(String storageUrl, List<Location> locations) {
+ if (storageUrl == null || storageUrl.length() == 0) {
+ throw new IllegalArgumentException("FileSystem URL cannot be null or empty");
+ }
+
+ if (locations == null || locations.size() == 0) {
+ throw new IllegalArgumentException("FileSystem Locations cannot be null or empty");
+ }
+
+ this.storageUrl = storageUrl;
+ this.locations = locations;
+ }
+
+ @Override
+ public TYPE getType() {
+ return TYPE.FILESYSTEM;
+ }
+
+ public String getStorageUrl() {
+ return storageUrl;
+ }
+
+ public List<Location> getLocations() {
+ return locations;
+ }
+
+ @Override
+ public String getUriTemplate() {
+ return getUriTemplate(LocationType.DATA);
+ }
+
+ @Override
+ public String getUriTemplate(LocationType locationType) {
+ Location locationForType = null;
+ for (Location location : locations) {
+ if (location.getType() == locationType) {
+ locationForType = location;
+ break;
+ }
+ }
+
+ if (locationForType == null) {
+ return "/tmp";
+ }
+
+ StringBuilder uriTemplate = new StringBuilder();
+ uriTemplate.append(storageUrl);
+ uriTemplate.append(locationForType.getPath());
+ return uriTemplate.toString();
+ }
+
+ @Override
+ public boolean exists() throws FalconException {
+ // Directories on FS will be created if they don't exist.
+ return true;
+ }
+
+ @Override
+ public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
+ FileSystemStorage fsStorage = (FileSystemStorage) toCompareAgainst;
+ final List<Location> fsStorageLocations = fsStorage.getLocations();
+
+ return getLocations().size() == fsStorageLocations.size()
+ && getLocation(getLocations(), LocationType.DATA).getPath().equals(
+ getLocation(fsStorageLocations, LocationType.DATA).getPath())
+ && getLocation(getLocations(), LocationType.META).getPath().equals(
+ getLocation(fsStorageLocations, LocationType.META).getPath())
+ && getLocation(getLocations(), LocationType.STATS).getPath().equals(
+ getLocation(fsStorageLocations, LocationType.STATS).getPath())
+ && getLocation(getLocations(), LocationType.TMP).getPath().equals(
+ getLocation(fsStorageLocations, LocationType.TMP).getPath());
+ }
+
+ private static Location getLocation(List<Location> locations, LocationType type) {
+ for (Location loc : locations) {
+ if (loc.getType() == type) {
+ return loc;
+ }
+ }
+
+ Location loc = new Location();
+ loc.setPath("/tmp");
+ loc.setType(type);
+ return loc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/entity/Storage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java
new file mode 100644
index 0000000..18dad65
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/Storage.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.feed.LocationType;
+
+/**
+ * A class to encapsulate the storage for a given feed which can either be
+ * expressed as a path on the file system or a table in a catalog.
+ */
+public interface Storage {
+
+ /**
+ * Enumeration for the various storage types.
+ */
+ enum TYPE {FILESYSTEM, TABLE}
+
+ /**
+ * Return the type of storage.
+ *
+ * @return storage type
+ */
+ TYPE getType();
+
+ /**
+ * Return the uri template.
+ *
+ * @return uri template
+ */
+ String getUriTemplate();
+
+ /**
+ * Return the uri template for a given location type.
+ *
+ * @param locationType type of location, applies only to filesystem type
+ * @return uri template
+ */
+ String getUriTemplate(LocationType locationType);
+
+ /**
+ * Check if the storage, filesystem location or catalog table exists.
+ * Filesystem location always returns true.
+ *
+ * @return true if table exists else false
+ * @throws FalconException an exception
+ */
+ boolean exists() throws FalconException;
+
+ /**
+ * Check for equality of this instance against the one in question.
+ *
+ * @param toCompareAgainst instance to compare
+ * @return true if identical else false
+ * @throws FalconException an exception
+ */
+ boolean isIdentical(Storage toCompareAgainst) throws FalconException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 1c323fd..d0435fb 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -20,8 +20,10 @@ package org.apache.falcon.entity.parser;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityGraph;
@@ -29,7 +31,6 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Cluster;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
@@ -71,6 +72,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
validateFeedCutOffPeriod(feed, cluster);
}
+ validateFeedStorage(feed);
validateFeedPartitionExpression(feed);
validateFeedGroups(feed);
@@ -105,21 +107,19 @@ public class FeedEntityParser extends EntityParser<Feed> {
return processes;
}
- private void validateFeedGroups(Feed feed) throws ValidationException {
+ private void validateFeedGroups(Feed feed) throws FalconException {
String[] groupNames = feed.getGroups() != null ? feed.getGroups().split(",") : new String[]{};
- String defaultPath = FeedHelper.getLocation(feed, LocationType.DATA)
- .getPath();
+ String defaultPath = FeedHelper.createStorage(feed).getUriTemplate();
for (Cluster cluster : feed.getClusters().getClusters()) {
- if (!FeedGroup.getDatePattern(
- FeedHelper.getLocation(feed, LocationType.DATA,
- cluster.getName()).getPath()).equals(
+ final String uriTemplate = FeedHelper.createStorage(cluster, feed).getUriTemplate();
+ if (!FeedGroup.getDatePattern(uriTemplate).equals(
FeedGroup.getDatePattern(defaultPath))) {
throw new ValidationException("Feeds default path pattern: "
- + FeedHelper.getLocation(feed, LocationType.DATA).getPath()
+ + FeedHelper.createStorage(feed).getUriTemplate()
+ ", does not match with cluster: "
+ cluster.getName()
+ " path pattern: "
- + FeedHelper.getLocation(feed, LocationType.DATA, cluster.getName()).getPath());
+ + uriTemplate);
}
}
for (String groupName : groupNames) {
@@ -127,7 +127,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
if (group != null && !group.canContainFeed(feed)) {
throw new ValidationException(
"Feed " + feed.getName() + "'s frequency: " + feed.getFrequency().toString()
- + ", path pattern: " + FeedHelper.getLocation(feed, LocationType.DATA).getPath()
+ + ", path pattern: " + FeedHelper.createStorage(feed)
+ " does not match with group: " + group.getName() + "'s frequency: "
+ group.getFrequency()
+ ", date pattern: " + group.getDatePattern());
@@ -280,4 +280,29 @@ public class FeedEntityParser extends EntityParser<Feed> {
"Alteast one of the partition tags has to be a cluster expression for cluster " + cl.getName());
}
}
+
+ /**
+ * Ensure table is already defined in the catalog registry.
+ * Does not matter for FileSystem storage.
+ */
+ private void validateFeedStorage(Feed feed) throws FalconException {
+ StringBuilder buffer = new StringBuilder();
+ for (Cluster cluster : feed.getClusters().getClusters()) {
+ final Storage storage = FeedHelper.createStorage(cluster, feed);
+ if (!storage.exists()) {
+ // this is only true for table, filesystem always returns true
+ CatalogStorage catalogStorage = (CatalogStorage) storage;
+ buffer.append("Table [")
+ .append(catalogStorage.getTable())
+ .append("] does not exist for feed: ")
+ .append(feed.getName())
+ .append(", cluster: ")
+ .append(cluster.getName());
+ }
+ }
+
+ if (buffer.length() > 0) {
+ throw new ValidationException(buffer.toString());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/group/FeedGroup.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroup.java b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
index 5dca46f..d517828 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroup.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
@@ -17,10 +17,10 @@
*/
package org.apache.falcon.group;
+import org.apache.falcon.FalconException;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.common.FeedDataPath;
import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.LocationType;
import java.util.ArrayList;
import java.util.Collections;
@@ -93,8 +93,8 @@ public class FeedGroup {
return datePattern;
}
- public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) {
+ public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException {
return this.frequency.equals(feed.getFrequency())
- && this.datePattern.equals(getDatePattern(FeedHelper.getLocation(feed, LocationType.DATA).getPath()));
+ && this.datePattern.equals(getDatePattern(FeedHelper.createStorage(feed).getUriTemplate()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
index ed44b48..d054873 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
@@ -24,7 +24,6 @@ import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.service.ConfigurationChangeListener;
import java.util.Collections;
@@ -114,8 +113,9 @@ public final class FeedGroupMap implements ConfigurationChangeListener {
return groupSet;
}
- public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed) {
+ public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed)
+ throws FalconException {
return getGroups(feed.getGroups(), feed.getFrequency(),
- FeedHelper.getLocation(feed, LocationType.DATA).getPath());
+ FeedHelper.createStorage(feed).getUriTemplate());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index a9d39de..fc69933 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -21,10 +21,10 @@ package org.apache.falcon.update;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Partition;
import org.apache.falcon.entity.v0.feed.Partitions;
import org.apache.falcon.entity.v0.process.Cluster;
@@ -78,18 +78,15 @@ public final class UpdateHelper {
}
}
- public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedProcess) {
- if (!FeedHelper.getLocation(oldFeed.getLocations(), LocationType.DATA)
- .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.DATA).getPath())
- || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.META)
- .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.META).getPath())
- || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.STATS)
- .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.STATS).getPath())
- || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.TMP)
- .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.TMP).getPath())) {
+ public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedProcess)
+ throws FalconException {
+ Storage oldFeedStorage = FeedHelper.createStorage(oldFeed);
+ Storage newFeedStorage = FeedHelper.createStorage(newFeed);
+
+ if (!oldFeedStorage.isIdentical(newFeedStorage)) {
return true;
}
- LOG.debug(oldFeed.toShortString() + ": Location identical. Ignoring...");
+ LOG.debug(oldFeed.toShortString() + ": Storage identical. Ignoring...");
if (!oldFeed.getFrequency().equals(newFeed.getFrequency())) {
return true;
@@ -128,17 +125,12 @@ public final class UpdateHelper {
}
for (Cluster cluster : affectedProcess.getClusters().getClusters()) {
- if (!FeedHelper
- .getCluster(oldFeed, cluster.getName()).getValidity().getStart()
+ oldFeedStorage = FeedHelper.createStorage(cluster.getName(), oldFeed);
+ newFeedStorage = FeedHelper.createStorage(cluster.getName(), newFeed);
+
+ if (!FeedHelper.getCluster(oldFeed, cluster.getName()).getValidity().getStart()
.equals(FeedHelper.getCluster(newFeed, cluster.getName()).getValidity().getStart())
- || !FeedHelper.getLocation(oldFeed, LocationType.DATA, cluster.getName()).getPath()
- .equals(FeedHelper.getLocation(newFeed, LocationType.DATA, cluster.getName()).getPath())
- || !FeedHelper.getLocation(oldFeed, LocationType.META, cluster.getName()).getPath()
- .equals(FeedHelper.getLocation(newFeed, LocationType.META, cluster.getName()).getPath())
- || !FeedHelper.getLocation(oldFeed, LocationType.STATS, cluster.getName()).getPath()
- .equals(FeedHelper.getLocation(newFeed, LocationType.STATS, cluster.getName()).getPath())
- || !FeedHelper.getLocation(oldFeed, LocationType.TMP, cluster.getName()).getPath()
- .equals(FeedHelper.getLocation(newFeed, LocationType.TMP, cluster.getName()).getPath())) {
+ || !oldFeedStorage.isIdentical(newFeedStorage)) {
return true;
}
LOG.debug(oldFeed.toShortString() + ": Feed on cluster" + cluster.getName() + " identical. Ignoring...");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
new file mode 100644
index 0000000..458111d
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.net.URISyntaxException;
+
+/**
+ * Test class for Catalog Table Storage.
+ * Exists will be covered in integration tests as it actually checks if the table exists.
+ */
+public class CatalogStorageTest {
+
+ @Test
+ public void testGetType() throws Exception {
+ String table = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
+ CatalogStorage storage = new CatalogStorage(table);
+ Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
+ }
+
+ @Test
+ public void testParseVaildURI() throws URISyntaxException {
+ String table = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
+ CatalogStorage storage = new CatalogStorage(table);
+ Assert.assertEquals("${hcatNode}", storage.getCatalogUrl());
+ Assert.assertEquals("clicksdb", storage.getDatabase());
+ Assert.assertEquals("clicks", storage.getTable());
+ Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
+ Assert.assertEquals(2, storage.getPartitions().size());
+ Assert.assertEquals("us", storage.getPartitionValue("region"));
+ Assert.assertTrue(storage.hasPartition("region"));
+ Assert.assertNull(storage.getPartitionValue("unknown"));
+ Assert.assertFalse(storage.hasPartition("unknown"));
+ }
+
+
+ @DataProvider(name = "invalidTableURIs")
+ public Object[][] createInvalidTableURIData() {
+ return new Object[][] {
+ {"catalog:default:clicks:ds=$YEAR-$MONTH-$DAY#region=us", ""},
+ {"default:clicks:ds=$YEAR-$MONTH-$DAY#region=us", ""},
+ {"catalog:default#ds=$YEAR-$MONTH-$DAY;region=us", ""},
+ {"catalog://default/clicks#ds=$YEAR-$MONTH-$DAY:region=us", ""},
+ };
+ }
+
+ @Test(dataProvider = "invalidTableURIs", expectedExceptions = URISyntaxException.class)
+ public void testParseInvalidURI(String tableUri, String ignore) throws URISyntaxException {
+ new CatalogStorage(tableUri);
+ Assert.fail("Exception must have been thrown");
+ }
+
+ @Test
+ public void testIsIdenticalPositive() throws Exception {
+ CatalogStorage table1 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+ CatalogStorage table2 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+ Assert.assertTrue(table1.isIdentical(table2));
+
+ final String catalogUrl = "thrift://localhost:49083";
+ CatalogStorage table3 = new CatalogStorage(catalogUrl,
+ "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+ CatalogStorage table4 = new CatalogStorage(catalogUrl,
+ "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+ Assert.assertTrue(table3.isIdentical(table4));
+ }
+
+ @Test
+ public void testIsIdenticalNegative() throws Exception {
+ CatalogStorage table1 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+ CatalogStorage table2 = new CatalogStorage("catalog:clicksdb:impressions#ds=$YEAR-$MONTH-$DAY;region=us");
+ Assert.assertFalse(table1.isIdentical(table2));
+
+ final String catalogUrl = "thrift://localhost:49083";
+ CatalogStorage table3 = new CatalogStorage(catalogUrl,
+ "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+ CatalogStorage table4 = new CatalogStorage(catalogUrl,
+ "catalog:clicksdb:impressions#ds=$YEAR-$MONTH-$DAY;region=us");
+ Assert.assertFalse(table3.isIdentical(table4));
+
+ CatalogStorage table5 = new CatalogStorage("thrift://localhost:49084",
+ "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+ CatalogStorage table6 = new CatalogStorage("thrift://localhost:49083",
+ "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
+ Assert.assertFalse(table5.isIdentical(table6));
+ }
+
+ @Test
+ public void testGetUriTemplateWithCatalogUrl() throws Exception {
+ final String catalogUrl = "thrift://localhost:49083";
+ String tableUri = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
+ String uriTemplate = "thrift://localhost:49083/clicksdb/clicks/region=us;ds=$YEAR-$MONTH-$DAY";
+
+ CatalogStorage table = new CatalogStorage(catalogUrl, tableUri);
+
+ Assert.assertEquals(uriTemplate, table.getUriTemplate());
+ Assert.assertEquals(uriTemplate, table.getUriTemplate(LocationType.DATA));
+ }
+
+ @Test
+ public void testGetUriTemplateWithOutCatalogUrl() throws Exception {
+ String tableUri = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
+ String uriTemplate = "${hcatNode}/clicksdb/clicks/region=us;ds=$YEAR-$MONTH-$DAY";
+
+ CatalogStorage table = new CatalogStorage(tableUri);
+
+ Assert.assertEquals(uriTemplate, table.getUriTemplate());
+ Assert.assertEquals(uriTemplate, table.getUriTemplate(LocationType.DATA));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
new file mode 100644
index 0000000..3e3b575
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test class for File System Storage.
+ */
+public class FileSystemStorageTest {
+
+ @Test
+ public void testGetType() throws Exception {
+ final Location location = new Location();
+ location.setPath("/foo/bar");
+ location.setType(LocationType.DATA);
+ List<Location> locations = new ArrayList<Location>();
+ locations.add(location);
+
+ FileSystemStorage storage = new FileSystemStorage(locations);
+ Assert.assertEquals(storage.getType(), Storage.TYPE.FILESYSTEM);
+ }
+
+ @Test
+ public void testGetUriTemplate() throws Exception {
+ final Location location = new Location();
+ location.setPath("/foo/bar");
+ location.setType(LocationType.DATA);
+ List<Location> locations = new ArrayList<Location>();
+ locations.add(location);
+
+ FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
+ Assert.assertEquals(storage.getUriTemplate(), "hdfs://localhost:41020/foo/bar");
+ }
+
+ @Test
+ public void testGetUriTemplateWithOutStorageURL() throws Exception {
+ final Location location = new Location();
+ location.setPath("/foo/bar");
+ location.setType(LocationType.DATA);
+ List<Location> locations = new ArrayList<Location>();
+ locations.add(location);
+
+ FileSystemStorage storage = new FileSystemStorage(locations);
+ Assert.assertEquals(storage.getUriTemplate(), "${nameNode}/foo/bar");
+ }
+
+ @Test
+ public void testGetUriTemplateWithLocationType() throws Exception {
+ final Location location = new Location();
+ location.setPath("/foo/bar");
+ location.setType(LocationType.DATA);
+ List<Location> locations = new ArrayList<Location>();
+ locations.add(location);
+
+ FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
+ Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "hdfs://localhost:41020/foo/bar");
+ }
+
+ @Test
+ public void testExists() throws Exception {
+ final Location location = new Location();
+ location.setPath("/foo/bar");
+ location.setType(LocationType.DATA);
+ List<Location> locations = new ArrayList<Location>();
+ locations.add(location);
+
+ FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
+ Assert.assertTrue(storage.exists());
+ }
+
+ @Test
+ public void testIsIdentical() throws Exception {
+ final String storageUrl = "hdfs://localhost:41020";
+ final Location location1 = new Location();
+ location1.setPath("/foo/bar");
+ location1.setType(LocationType.DATA);
+ List<Location> locations1 = new ArrayList<Location>();
+ locations1.add(location1);
+ FileSystemStorage storage1 = new FileSystemStorage(storageUrl, locations1);
+
+ final Location location2 = new Location();
+ location2.setPath("/foo/bar");
+ location2.setType(LocationType.DATA);
+ List<Location> locations2 = new ArrayList<Location>();
+ locations2.add(location2);
+ FileSystemStorage storage2 = new FileSystemStorage(storageUrl, locations2);
+
+ Assert.assertTrue(storage1.isIdentical(storage2));
+ }
+
+ @Test
+ public void testIsIdenticalNegative() throws Exception {
+ final String storageUrl = "hdfs://localhost:41020";
+ final Location location1 = new Location();
+ location1.setPath("/foo/baz");
+ location1.setType(LocationType.DATA);
+ List<Location> locations1 = new ArrayList<Location>();
+ locations1.add(location1);
+ FileSystemStorage storage1 = new FileSystemStorage(storageUrl, locations1);
+
+ final Location location2 = new Location();
+ location2.setPath("/foo/bar");
+ location2.setType(LocationType.DATA);
+ List<Location> locations2 = new ArrayList<Location>();
+ locations2.add(location2);
+ FileSystemStorage storage2 = new FileSystemStorage(storageUrl, locations2);
+
+ Assert.assertFalse(storage1.isIdentical(storage2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index 4cc4a0e..8bcff78 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -116,12 +116,12 @@ public class FeedEntityParserTest extends AbstractTestBase {
assertEquals(feed.getClusters().getClusters().get(1).getRetention()
.getLimit().toString(), "hours(6)");
- assertEquals(FeedHelper.getLocation(feed, LocationType.DATA).getPath(),
- "/projects/falcon/clicks");
- assertEquals(FeedHelper.getLocation(feed, LocationType.META).getPath(),
- "/projects/falcon/clicksMetaData");
- assertEquals(FeedHelper.getLocation(feed, LocationType.STATS).getPath(),
- "/projects/falcon/clicksStats");
+ assertEquals("${nameNode}/projects/falcon/clicks",
+ FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
+ assertEquals("${nameNode}/projects/falcon/clicksMetaData",
+ FeedHelper.createStorage(feed).getUriTemplate(LocationType.META));
+ assertEquals("${nameNode}/projects/falcon/clicksStats",
+ FeedHelper.createStorage(feed).getUriTemplate(LocationType.STATS));
assertEquals(feed.getACL().getGroup(), "group");
assertEquals(feed.getACL().getOwner(), "testuser");
@@ -444,4 +444,17 @@ public class FeedEntityParserTest extends AbstractTestBase {
Assert.assertEquals(org.xml.sax.SAXParseException.class, e.getCause().getCause().getClass());
}
}
+
+ @Test
+ public void testParseFeedWithTable() throws FalconException {
+ final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
+ Feed feedWithTable = parser.parse(inputStream);
+ Assert.assertEquals(feedWithTable.getTable().getUri(),
+ "catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR");
+ }
+
+ @Test (expectedExceptions = FalconException.class)
+ public void testParseInvalidFeedWithTable() throws FalconException {
+ parser.parse(FeedEntityParserTest.class.getResourceAsStream("/config/feed/invalid-feed.xml"));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
index d4b5a2a..e8f80bd 100644
--- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
@@ -28,7 +28,9 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.feed.Partition;
import org.apache.falcon.entity.v0.feed.Properties;
import org.apache.falcon.entity.v0.feed.Property;
@@ -117,11 +119,11 @@ public class UpdateHelperTest extends AbstractTestBase {
newFeed.getLateArrival().setCutOff(oldFeed.getLateArrival().getCutOff());
Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
- FeedHelper.getLocation(newFeed, LocationType.DATA).setPath("/test");
+ getLocation(newFeed, LocationType.DATA).setPath("/test");
Assert.assertTrue(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
- FeedHelper.getLocation(newFeed, LocationType.DATA).setPath(
- FeedHelper.getLocation(oldFeed, LocationType.DATA).getPath());
+ getLocation(newFeed, LocationType.DATA).setPath(
+ getLocation(oldFeed, LocationType.DATA).getPath());
Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
newFeed.setFrequency(Frequency.fromString("months(1)"));
@@ -154,4 +156,20 @@ public class UpdateHelperTest extends AbstractTestBase {
process.getClusters().getClusters().get(0).getName()).getValidity().getStart());
Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
}
+
+ private static Location getLocation(Feed feed, LocationType type) {
+ return getLocation(feed.getLocations(), type);
+ }
+
+ private static Location getLocation(Locations locations, LocationType type) {
+ for (Location loc : locations.getLocations()) {
+ if (loc.getType() == type) {
+ return loc;
+ }
+ }
+ Location loc = new Location();
+ loc.setPath("/tmp");
+ loc.setType(type);
+ return loc;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/test/resources/config/feed/hive-table-feed.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/hive-table-feed.xml b/common/src/test/resources/config/feed/hive-table-feed.xml
new file mode 100644
index 0000000..e84f90a
--- /dev/null
+++ b/common/src/test/resources/config/feed/hive-table-feed.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1">
+ <partitions>
+ <partition name="fraud"/>
+ <partition name="good"/>
+ </partitions>
+
+ <groups>online,bi</groups>
+
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+ <late-arrival cut-off="hours(6)"/>
+
+ <clusters>
+ <cluster name="testCluster" type="source" partition="*/${cluster.colo}">
+ <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+ <retention limit="hours(48)" action="delete"/>
+ <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+ </cluster>
+ <cluster name="backupCluster" type="target">
+ <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+ <retention limit="hours(6)" action="archive"/>
+ <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+ </cluster>
+ </clusters>
+
+ <table uri="catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR" />
+
+ <ACL owner="testuser" group="group" permission="0x755"/>
+ <schema location="/schema/clicks" provider="protobuf"/>
+</feed>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/common/src/test/resources/config/feed/invalid-feed.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/invalid-feed.xml b/common/src/test/resources/config/feed/invalid-feed.xml
new file mode 100644
index 0000000..b7273a9
--- /dev/null
+++ b/common/src/test/resources/config/feed/invalid-feed.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1">
+ <partitions>
+ <partition name="fraud"/>
+ <partition name="good"/>
+ </partitions>
+
+ <groups>online,bi</groups>
+
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+ <late-arrival cut-off="hours(6)"/>
+
+ <clusters>
+ <cluster name="testCluster" type="source" partition="*/${cluster.colo}">
+ <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+ <retention limit="hours(48)" action="delete"/>
+ <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+ </cluster>
+ <cluster name="backupCluster" type="target">
+ <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+ <retention limit="hours(6)" action="archive"/>
+ <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+ </cluster>
+ </clusters>
+
+ <locations>
+ <location type="data" path="/projects/falcon/clicks"/>
+ <location type="stats" path="/projects/falcon/clicksStats"/>
+ <location type="meta" path="/projects/falcon/clicksMetaData"/>
+ </locations>
+ <table uri="catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR" />
+
+ <ACL owner="testuser" group="group" permission="0x755"/>
+ <schema location="/schema/clicks" provider="protobuf"/>
+</feed>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index adef3ec..00c261b 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -21,9 +21,9 @@ package org.apache.falcon.converter;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
-import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
@@ -62,7 +62,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
private static final String REPLICATION_COORD_TEMPLATE = "/config/coordinator/replication-coordinator.xml";
private static final String REPLICATION_WF_TEMPLATE = "/config/workflow/replication-workflow.xml";
- private static final String FEED_PATH_SEP = "#";
+ public static final String FEED_PATH_SEP = "#";
private static final String TIMEOUT = "timeout";
private static final String PARALLEL = "parallel";
@@ -116,7 +116,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
WORKFLOW retentionWorkflow = new WORKFLOW();
try {
//
- WORKFLOWAPP retWfApp = createRetentionWorkflow(cluster);
+ WORKFLOWAPP retWfApp = createRetentionWorkflow();
retWfApp.setName(wfName);
marshal(cluster, retWfApp, wfPath);
retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
@@ -124,23 +124,9 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
- String feedPathMask = getLocationURI(cluster, feed, LocationType.DATA);
- String metaPathMask = getLocationURI(cluster, feed, LocationType.META);
- String statsPathMask = getLocationURI(cluster, feed, LocationType.STATS);
- String tmpPathMask = getLocationURI(cluster, feed, LocationType.TMP);
-
- StringBuilder feedBasePaths = new StringBuilder(feedPathMask);
- if (metaPathMask != null) {
- feedBasePaths.append(FEED_PATH_SEP).append(metaPathMask);
- }
- if (statsPathMask != null) {
- feedBasePaths.append(FEED_PATH_SEP).append(statsPathMask);
- }
- if (tmpPathMask != null) {
- feedBasePaths.append(FEED_PATH_SEP).append(tmpPathMask);
- }
+ String feedBasePaths = getFeedDataPath(cluster, feed);
- props.put("feedDataPath", feedBasePaths.toString().replaceAll("\\$\\{", "\\?\\{"));
+ props.put("feedDataPath", feedBasePaths);
props.put("timeZone", feed.getTimezone().getID());
props.put("frequency", feed.getFrequency().getTimeUnit().name());
props.put("limit", feedCluster.getRetention().getLimit().toString());
@@ -156,6 +142,29 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
}
}
+ protected String getFeedDataPath(Cluster cluster, Feed feed) throws FalconException {
+ final Storage storage = FeedHelper.createStorage(cluster, feed);
+ String feedPathMask = storage.getUriTemplate(LocationType.DATA);
+ String metaPathMask = storage.getUriTemplate(LocationType.META);
+ String statsPathMask = storage.getUriTemplate(LocationType.STATS);
+ String tmpPathMask = storage.getUriTemplate(LocationType.TMP);
+
+ StringBuilder feedBasePaths = new StringBuilder(feedPathMask);
+ if (metaPathMask != null) {
+ feedBasePaths.append(FEED_PATH_SEP).append(metaPathMask);
+ }
+
+ if (statsPathMask != null) {
+ feedBasePaths.append(FEED_PATH_SEP).append(statsPathMask);
+ }
+
+ if (tmpPathMask != null) {
+ feedBasePaths.append(FEED_PATH_SEP).append(tmpPathMask);
+ }
+
+ return feedBasePaths.toString().replaceAll("\\$\\{", "\\?\\{");
+ }
+
private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath)
throws FalconException {
@@ -263,10 +272,8 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
SYNCDATASET inputDataset = (SYNCDATASET) replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(0);
SYNCDATASET outputDataset = (SYNCDATASET) replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(1);
- inputDataset.setUriTemplate(new Path(ClusterHelper.getStorageUrl(srcCluster),
- FeedHelper.getLocation(feed, LocationType.DATA, srcCluster.getName()).getPath()).toString());
- outputDataset.setUriTemplate(getStoragePath(
- FeedHelper.getLocation(feed, LocationType.DATA, trgCluster.getName()).getPath()));
+ inputDataset.setUriTemplate(FeedHelper.createStorage(srcCluster, feed).getUriTemplate(LocationType.DATA));
+ outputDataset.setUriTemplate(FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
setDatasetValues(inputDataset, feed, srcCluster);
setDatasetValues(outputDataset, feed, srcCluster);
if (feed.getAvailabilityFlag() == null) {
@@ -338,7 +345,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
marshal(cluster, repWFapp, wfPath);
}
- private WORKFLOWAPP createRetentionWorkflow(Cluster cluster) throws IOException, FalconException {
+ private WORKFLOWAPP createRetentionWorkflow() throws IOException, FalconException {
return getWorkflowTemplate(RETENTION_WF_TEMPLATE);
}
@@ -353,19 +360,4 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
}
return props;
}
-
- private String getLocationURI(Cluster cluster, Feed feed, LocationType type) {
- String path = FeedHelper.getLocation(feed, type, cluster.getName())
- .getPath();
-
- if (!path.equals("/tmp")) {
- if (new Path(path).toUri().getScheme() == null) {
- return new Path(ClusterHelper.getStorageUrl(cluster), path)
- .toString();
- } else {
- return path;
- }
- }
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index 7fbe179..3b3132a 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -19,6 +19,7 @@ package org.apache.falcon.converter;
import static org.testng.Assert.assertEquals;
+import java.util.Calendar;
import java.util.Collection;
import java.util.List;
@@ -27,6 +28,7 @@ import javax.xml.bind.Unmarshaller;
import org.apache.falcon.FalconException;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
@@ -74,7 +76,6 @@ public class OozieFeedMapperTest {
ClusterHelper.getInterface(trgCluster, Interfacetype.WRITE).setEndpoint(trgHdfsUrl);
feed = (Feed) storeEntity(EntityType.FEED, FEED);
-
}
protected Entity storeEntity(EntityType type, String path) throws Exception {
@@ -148,6 +149,35 @@ public class OozieFeedMapperTest {
break;
}
}
+ }
+
+ @Test
+ public void testRetentionCoords() throws FalconException {
+ org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName());
+ final Calendar instance = Calendar.getInstance();
+ instance.roll(Calendar.YEAR, 1);
+ cluster.getValidity().setEnd(instance.getTime());
+
+ OozieFeedMapper feedMapper = new OozieFeedMapper(feed);
+ List<COORDINATORAPP> coords = feedMapper.getCoordinators(srcCluster, new Path("/projects/falcon/"));
+ COORDINATORAPP coord = coords.get(0);
+ Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
+ Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
+ Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
+
+ String feedDataPath = null;
+ org.apache.falcon.oozie.coordinator.CONFIGURATION configuration =
+ coord.getAction().getWorkflow().getConfiguration();
+ for (Property property : configuration.getProperty()) {
+ if ("feedDataPath".equals(property.getName())) {
+ feedDataPath = property.getValue();
+ break;
+ }
+ }
+
+ if (feedDataPath != null) {
+ Assert.assertEquals(feedDataPath, feedMapper.getFeedDataPath(srcCluster, feed));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index 8f75736..86ad937 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -295,10 +295,8 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
SYNCDATASET syncdataset = new SYNCDATASET();
syncdataset.setName(datasetName);
- String locPath = FeedHelper.getLocation(feed, locationType,
- cluster.getName()).getPath();
- syncdataset.setUriTemplate(new Path(locPath).toUri().getScheme() != null ? locPath : "${nameNode}"
- + locPath);
+ String locPath = FeedHelper.createStorage(cluster, feed).getUriTemplate(locationType);
+ syncdataset.setUriTemplate(locPath);
syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 3f70557..128a73d 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -31,7 +31,6 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.security.CurrentUser;
@@ -73,7 +72,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
properties.put(inName + ".initial-instance", SchemaHelper.formatDateUTC(cluster.getValidity().getStart()));
properties.put(inName + ".done-flag", "notused");
- String locPath = FeedHelper.getLocation(feed, LocationType.DATA, clusterName).getPath().replace('$', '%');
+ String locPath = FeedHelper.createStorage(clusterName, feed).getUriTemplate().replace('$', '%');
properties.put(inName + ".uri-template",
new Path(locPath).toUri().getScheme() != null ? locPath : "${nameNode}" + locPath);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index 7be41da..e3a42ca 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -47,7 +47,6 @@ import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Validity;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
@@ -142,13 +141,14 @@ public class OozieProcessMapperTest extends AbstractTestBase {
ConfigurationStore store = ConfigurationStore.get();
Feed feed = store.get(EntityType.FEED, process.getInputs().getInputs().get(0).getFeed());
SYNCDATASET ds = (SYNCDATASET) coord.getDatasets().getDatasetOrAsyncDataset().get(0);
- assertEquals(SchemaHelper.formatDateUTC(feed.getClusters().getClusters().get(0).getValidity().getStart()),
- ds.getInitialInstance());
+
+ final org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
+ assertEquals(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()), ds.getInitialInstance());
assertEquals(feed.getTimezone().getID(), ds.getTimezone());
assertEquals("${coord:" + feed.getFrequency().toString() + "}", ds.getFrequency());
assertEquals("", ds.getDoneFlag());
- assertEquals(ds.getUriTemplate(), "${nameNode}" + FeedHelper.getLocation(feed, LocationType.DATA,
- feed.getClusters().getClusters().get(0).getName()).getPath());
+ assertEquals(ds.getUriTemplate(), FeedHelper.createStorage(feedCluster, feed).getUriTemplate());
+
for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
if (prop.getName().equals("mapred.job.priority")) {
assertEquals(prop.getValue(), "LOW");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1e6ace43/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index f762c4b..74bab07 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -295,9 +295,11 @@ public class TestContext {
ServletInputStream rawlogStream = getServletInputStream(tmpFile);
- return this.service.path("api/entities/submit/" + entityType.name().toLowerCase()).header("Remote-User",
- "testuser")
- .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML).post(ClientResponse.class, rawlogStream);
+ return this.service.path("api/entities/submit/" + entityType.name().toLowerCase())
+ .header("Remote-User", REMOTE_USER)
+ .accept(MediaType.TEXT_XML)
+ .type(MediaType.TEXT_XML)
+ .post(ClientResponse.class, rawlogStream);
}
public void assertRequestId(ClientResponse clientRepsonse) {