You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2013/09/24 00:06:08 UTC
[1/2] git commit: Revert "FALCON-87 Hive table integration with feed
entity. Contributed by Venkatesh Seetharam"
Updated Branches:
refs/heads/FALCON-85 33b5f64cb -> f278ec82f
Revert "FALCON-87 Hive table integration with feed entity. Contributed by Venkatesh Seetharam"
This reverts commit 1e6ace4327910250f60e6d1e41ce5b1726bf4a4f.
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/c82b663f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/c82b663f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/c82b663f
Branch: refs/heads/FALCON-85
Commit: c82b663fcb74fe00b6ff7572e8c2735d9f878b7c
Parents: 1e6ace4
Author: srikanth.sundarrajan <sr...@inmobi.com>
Authored: Mon Sep 2 07:55:54 2013 +0530
Committer: srikanth.sundarrajan <sr...@inmobi.com>
Committed: Mon Sep 2 07:55:54 2013 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 -
client/src/main/resources/feed-0.1.xsd | 22 +--
.../falcon/catalog/AbstractCatalogService.java | 4 -
.../falcon/catalog/HiveCatalogService.java | 32 +--
.../apache/falcon/entity/CatalogStorage.java | 194 -------------------
.../org/apache/falcon/entity/FeedHelper.java | 101 ++--------
.../apache/falcon/entity/FileSystemStorage.java | 124 ------------
.../java/org/apache/falcon/entity/Storage.java | 74 -------
.../falcon/entity/parser/FeedEntityParser.java | 45 +----
.../java/org/apache/falcon/group/FeedGroup.java | 6 +-
.../org/apache/falcon/group/FeedGroupMap.java | 6 +-
.../org/apache/falcon/update/UpdateHelper.java | 34 ++--
.../falcon/entity/CatalogStorageTest.java | 129 ------------
.../falcon/entity/FileSystemStorageTest.java | 133 -------------
.../entity/parser/FeedEntityParserTest.java | 25 +--
.../apache/falcon/update/UpdateHelperTest.java | 24 +--
.../resources/config/feed/hive-table-feed.xml | 48 -----
.../test/resources/config/feed/invalid-feed.xml | 53 -----
.../falcon/converter/OozieFeedMapper.java | 70 ++++---
.../falcon/converter/OozieFeedMapperTest.java | 32 +--
.../falcon/converter/OozieProcessMapper.java | 6 +-
.../workflow/OozieProcessWorkflowBuilder.java | 3 +-
.../converter/OozieProcessMapperTest.java | 10 +-
.../org/apache/falcon/resource/TestContext.java | 8 +-
24 files changed, 123 insertions(+), 1063 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 046fc27..b2dc1d3 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,9 +5,6 @@ Trunk (Unreleased)
INCOMPATIBLE CHANGES
NEW FEATURES
- FALCON-87 Hive table integration with feed entity. (Venkatesh
- Seetharam via Srikanth Sundarrajan)
-
FALCON-86 Hive table integration with cluster entity. (Venaktesh
Seetharam via Srikanth Sundarrajan)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/client/src/main/resources/feed-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd
index 00b5172..bf6fa81 100644
--- a/client/src/main/resources/feed-0.1.xsd
+++ b/client/src/main/resources/feed-0.1.xsd
@@ -105,10 +105,7 @@
</xs:element>
<xs:element type="late-arrival" name="late-arrival" minOccurs="0"/>
<xs:element type="clusters" name="clusters"/>
- <xs:choice minOccurs="1" maxOccurs="1">
- <xs:element type="locations" name="locations"/>
- <xs:element type="catalog-table" name="table"/>
- </xs:choice>
+ <xs:element type="locations" name="locations"/>
<xs:element type="ACL" name="ACL"/>
<xs:element type="schema" name="schema"/>
<xs:element type="properties" name="properties" minOccurs="0"/>
@@ -142,10 +139,7 @@
<xs:sequence>
<xs:element type="validity" name="validity"/>
<xs:element type="retention" name="retention"/>
- <xs:choice minOccurs="0" maxOccurs="1">
- <xs:element type="locations" name="locations" minOccurs="0"/>
- <xs:element type="catalog-table" name="table"/>
- </xs:choice>
+ <xs:element type="locations" name="locations" minOccurs="0"/>
</xs:sequence>
<xs:attribute type="IDENTIFIER" name="name" use="required"/>
<xs:attribute type="cluster-type" name="type" use="optional"/>
@@ -200,7 +194,7 @@
<xs:complexType name="locations">
<xs:annotation>
<xs:documentation>
- A list of locations on the file system.
+ A list of locations.
</xs:documentation>
</xs:annotation>
<xs:choice maxOccurs="unbounded" minOccurs="0">
@@ -362,14 +356,4 @@
<xs:pattern value="(\w+=[^,]+)?([,]?[ ]*[\w]+=[^,]+)*"/>
</xs:restriction>
</xs:simpleType>
- <xs:complexType name="catalog-table">
- <xs:annotation>
- <xs:documentation>
- catalog specifies the uri of a Hive table along with the partition spec.
- uri="catalog:$database:$table#(partition-key=partition-value);+"
- Example: catalog:logs-db:clicks#ds=${YEAR}-${MONTH}-${DAY}
- </xs:documentation>
- </xs:annotation>
- <xs:attribute type="xs:string" name="uri" use="required"/>
- </xs:complexType>
</xs:schema>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
index a4b6cfd..4086611 100644
--- a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
@@ -23,7 +23,6 @@ import org.apache.falcon.FalconException;
/**
* Interface definition for a catalog registry service
* such as Hive or HCatalog.
- * Catalog should minimally support the following operations.
*/
public abstract class AbstractCatalogService {
@@ -35,7 +34,4 @@ public abstract class AbstractCatalogService {
* @throws FalconException exception
*/
public abstract boolean isAlive(String catalogBaseUrl) throws FalconException;
-
- public abstract boolean tableExists(String catalogUrl, String database, String tableName)
- throws FalconException;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
index 67d14af..e6f7fe2 100644
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -22,7 +22,6 @@ import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.falcon.FalconException;
-import org.apache.falcon.security.CurrentUser;
import org.apache.log4j.Logger;
import javax.ws.rs.core.MediaType;
@@ -41,36 +40,7 @@ public class HiveCatalogService extends AbstractCatalogService {
Client client = Client.create();
WebResource service = client.resource(catalogBaseUrl);
- ClientResponse response = service.path("status")
- .accept(MediaType.APPLICATION_JSON)
- .head();
- // .get(ClientResponse.class); // todo this isnt working
-
- if (LOG.isDebugEnabled() && response.getStatus() != 200) {
- LOG.debug("Output from Server .... \n" + response.getEntity(String.class));
- }
-
- return response.getStatus() == 200;
- }
-
- @Override
- public boolean tableExists(String catalogUrl, String database, String tableName)
- throws FalconException {
- LOG.info("Checking if the table exists: " + tableName);
-
- Client client = Client.create();
- WebResource service = client.resource(catalogUrl);
-
- ClientResponse response = service.path("ddl/database/").path(database)
- .path("/table").path(tableName)
- .queryParam("user.name", CurrentUser.getUser())
- .accept(MediaType.APPLICATION_JSON)
- .get(ClientResponse.class);
-
- if (LOG.isDebugEnabled() && response.getStatus() != 200) {
- LOG.debug("Output from Server .... \n" + response.getEntity(String.class));
- }
-
+ ClientResponse response = service.path("status").accept(MediaType.APPLICATION_JSON).head();
return response.getStatus() == 200;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
deleted file mode 100644
index 5c64f27..0000000
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.catalog.CatalogServiceFactory;
-import org.apache.falcon.entity.v0.feed.LocationType;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A catalog registry implementation of a feed storage.
- */
-public class CatalogStorage implements Storage {
-
- public static final String PARTITION_SEPARATOR = ";";
- public static final String PARTITION_KEYVAL_SEPARATOR = "=";
- public static final String INPUT_PATH_SEPARATOR = ":";
- public static final String OUTPUT_PATH_SEPARATOR = "/";
-
- private final String catalogUrl;
- private String database;
- private String table;
- private Map<String, String> partitions;
-
- protected CatalogStorage(String catalogTable) throws URISyntaxException {
- this("${hcatNode}", catalogTable);
- }
-
- protected CatalogStorage(String catalogUrl, String tableUri) throws URISyntaxException {
- if (catalogUrl == null || catalogUrl.length() == 0) {
- throw new IllegalArgumentException("Catalog Registry URL cannot be null or empty");
- }
-
- this.catalogUrl = catalogUrl;
-
- parse(tableUri);
- }
-
- /**
- * Validate URI to conform to catalog:$database:$table#$partitions.
- * scheme=catalog:database=$database:table=$table#$partitions
- * partitions=key=value;key=value
- *
- * @param catalogTableUri table URI to parse and validate
- * @throws URISyntaxException
- */
- private void parse(String catalogTableUri) throws URISyntaxException {
-
- URI tableUri = new URI(catalogTableUri);
-
- if (!"catalog".equals(tableUri.getScheme())) {
- throw new URISyntaxException(tableUri.toString(), "catalog scheme is missing");
- }
-
- final String schemeSpecificPart = tableUri.getSchemeSpecificPart();
- if (schemeSpecificPart == null) {
- throw new URISyntaxException(tableUri.toString(), "Database and Table are missing");
- }
-
- String[] paths = schemeSpecificPart.split(INPUT_PATH_SEPARATOR);
-
- if (paths.length != 2) {
- throw new URISyntaxException(tableUri.toString(), "URI path is not in expected format: database:table");
- }
-
- database = paths[0];
- table = paths[1];
-
- if (database == null || database.length() == 0) {
- throw new URISyntaxException(tableUri.toString(), "DB name is missing");
- }
- if (table == null || table.length() == 0) {
- throw new URISyntaxException(tableUri.toString(), "Table name is missing");
- }
-
- String partRaw = tableUri.getFragment();
- if (partRaw == null || partRaw.length() == 0) {
- throw new URISyntaxException(tableUri.toString(), "Partition details are missing");
- }
-
- partitions = new HashMap<String, String>();
- String[] parts = partRaw.split(PARTITION_SEPARATOR);
- for (String part : parts) {
- if (part == null || part.length() == 0) {
- continue;
- }
-
- String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR);
- if (keyVal.length != 2) {
- throw new URISyntaxException(tableUri.toString(),
- "Partition key value pair is not specified properly in (" + part + ")");
- }
-
- partitions.put(keyVal[0], keyVal[1]);
- }
- }
-
- public String getCatalogUrl() {
- return catalogUrl;
- }
-
- public String getDatabase() {
- return database;
- }
-
- public String getTable() {
- return table;
- }
-
- public Map<String, String> getPartitions() {
- return partitions;
- }
-
- /**
- * @param key partition key
- * @return partition value
- */
- public String getPartitionValue(String key) {
- return partitions.get(key);
- }
-
- /**
- * @param key partition key
- * @return if partitions map includes the key or not
- */
- public boolean hasPartition(String key) {
- return partitions.containsKey(key);
- }
-
- @Override
- public TYPE getType() {
- return TYPE.TABLE;
- }
-
- @Override
- public String getUriTemplate() {
- return getUriTemplate(LocationType.DATA);
- }
-
- @Override
- public String getUriTemplate(LocationType locationType) {
- StringBuilder uriTemplate = new StringBuilder();
- uriTemplate.append(catalogUrl);
- uriTemplate.append(OUTPUT_PATH_SEPARATOR);
- uriTemplate.append(database);
- uriTemplate.append(OUTPUT_PATH_SEPARATOR);
- uriTemplate.append(table);
- uriTemplate.append(OUTPUT_PATH_SEPARATOR);
- for (Map.Entry<String, String> entry : partitions.entrySet()) {
- uriTemplate.append(entry.getKey());
- uriTemplate.append(PARTITION_KEYVAL_SEPARATOR);
- uriTemplate.append(entry.getValue());
- uriTemplate.append(PARTITION_SEPARATOR);
- }
- uriTemplate.setLength(uriTemplate.length() - 1);
-
- return uriTemplate.toString();
- }
-
- @Override
- public boolean exists() throws FalconException {
- return CatalogServiceFactory.getCatalogService().tableExists(catalogUrl, database, table);
- }
-
- @Override
- public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
- CatalogStorage catalogStorage = (CatalogStorage) toCompareAgainst;
-
- return !(getCatalogUrl() != null && !getCatalogUrl().equals(catalogStorage.getCatalogUrl()))
- && getDatabase().equals(catalogStorage.getDatabase())
- && getTable().equals(catalogStorage.getTable())
- && getPartitions().equals(catalogStorage.getPartitions());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index d212a98..c96120d 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -20,19 +20,11 @@ package org.apache.falcon.entity;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.entity.v0.feed.CatalogTable;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.*;
import org.apache.falcon.expression.ExpressionHelper;
-import java.net.URISyntaxException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -52,87 +44,32 @@ public final class FeedHelper {
return null;
}
- public static Storage createStorage(Feed feed) throws FalconException {
-
- final List<Location> locations = feed.getLocations().getLocations();
- if (locations != null) {
- return new FileSystemStorage(locations);
- }
-
- try {
- final CatalogTable table = feed.getTable();
- if (table != null) {
- return new CatalogStorage(table.getUri());
- }
- } catch (URISyntaxException e) {
- throw new FalconException(e);
+ public static Location getLocation(Feed feed, LocationType type,
+ String clusterName) {
+ Cluster cluster = getCluster(feed, clusterName);
+ if (cluster != null && cluster.getLocations() != null
+ && cluster.getLocations().getLocations().size() != 0) {
+ return getLocation(cluster.getLocations(), type);
+ } else {
+ return getLocation(feed.getLocations(), type);
}
- throw new FalconException("Both catalog and locations are not defined.");
}
- public static Storage createStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
- Feed feed) throws FalconException {
- return createStorage(getCluster(feed, clusterEntity.getName()), feed, clusterEntity);
- }
-
- public static Storage createStorage(String clusterName, Feed feed)
- throws FalconException {
-
- return createStorage(getCluster(feed, clusterName), feed);
- }
-
- public static Storage createStorage(Cluster cluster, Feed feed)
- throws FalconException {
-
- final org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
- EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
-
- return createStorage(cluster, feed, clusterEntity);
+ public static Location getLocation(Feed feed, LocationType type) {
+ return getLocation(feed.getLocations(), type);
}
- public static Storage createStorage(Cluster cluster, Feed feed,
- org.apache.falcon.entity.v0.cluster.Cluster clusterEntity)
- throws FalconException {
-
- final List<Location> locations = getLocations(cluster, feed);
- if (locations != null) {
- return new FileSystemStorage(ClusterHelper.getStorageUrl(clusterEntity), locations);
- }
-
- try {
- final CatalogTable table = getTable(cluster, feed);
- if (table != null) {
- return new CatalogStorage(
- ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint(),
- table.getUri());
+ public static Location getLocation(Locations locations, LocationType type) {
+ for (Location loc : locations.getLocations()) {
+ if (loc.getType() == type) {
+ return loc;
}
- } catch (URISyntaxException e) {
- throw new FalconException(e);
}
-
- throw new FalconException("Both catalog and locations are not defined.");
- }
-
- private static List<Location> getLocations(Cluster cluster, Feed feed) {
- // check if locations are overridden in cluster
- final Locations clusterLocations = cluster.getLocations();
- if (clusterLocations != null
- && clusterLocations.getLocations().size() != 0) {
- return clusterLocations.getLocations();
- }
-
- final Locations feedLocations = feed.getLocations();
- return feedLocations == null ? null : feedLocations.getLocations();
- }
-
- private static CatalogTable getTable(Cluster cluster, Feed feed) {
- // check if table is overridden in cluster
- if (cluster.getTable() != null) {
- return cluster.getTable();
- }
-
- return feed.getTable();
+ Location loc = new Location();
+ loc.setPath("/tmp");
+ loc.setType(type);
+ return loc;
}
public static String normalizePartitionExpression(String part1, String part2) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
deleted file mode 100644
index 95b18c5..0000000
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
-
-import java.util.List;
-
-/**
- * A file system implementation of a feed storage.
- */
-public class FileSystemStorage implements Storage {
-
- private final String storageUrl;
- private final List<Location> locations;
-
- protected FileSystemStorage(List<Location> locations) {
- this("${nameNode}", locations);
- }
-
- protected FileSystemStorage(String storageUrl, List<Location> locations) {
- if (storageUrl == null || storageUrl.length() == 0) {
- throw new IllegalArgumentException("FileSystem URL cannot be null or empty");
- }
-
- if (locations == null || locations.size() == 0) {
- throw new IllegalArgumentException("FileSystem Locations cannot be null or empty");
- }
-
- this.storageUrl = storageUrl;
- this.locations = locations;
- }
-
- @Override
- public TYPE getType() {
- return TYPE.FILESYSTEM;
- }
-
- public String getStorageUrl() {
- return storageUrl;
- }
-
- public List<Location> getLocations() {
- return locations;
- }
-
- @Override
- public String getUriTemplate() {
- return getUriTemplate(LocationType.DATA);
- }
-
- @Override
- public String getUriTemplate(LocationType locationType) {
- Location locationForType = null;
- for (Location location : locations) {
- if (location.getType() == locationType) {
- locationForType = location;
- break;
- }
- }
-
- if (locationForType == null) {
- return "/tmp";
- }
-
- StringBuilder uriTemplate = new StringBuilder();
- uriTemplate.append(storageUrl);
- uriTemplate.append(locationForType.getPath());
- return uriTemplate.toString();
- }
-
- @Override
- public boolean exists() throws FalconException {
- // Directories on FS will be created if they don't exist.
- return true;
- }
-
- @Override
- public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
- FileSystemStorage fsStorage = (FileSystemStorage) toCompareAgainst;
- final List<Location> fsStorageLocations = fsStorage.getLocations();
-
- return getLocations().size() == fsStorageLocations.size()
- && getLocation(getLocations(), LocationType.DATA).getPath().equals(
- getLocation(fsStorageLocations, LocationType.DATA).getPath())
- && getLocation(getLocations(), LocationType.META).getPath().equals(
- getLocation(fsStorageLocations, LocationType.META).getPath())
- && getLocation(getLocations(), LocationType.STATS).getPath().equals(
- getLocation(fsStorageLocations, LocationType.STATS).getPath())
- && getLocation(getLocations(), LocationType.TMP).getPath().equals(
- getLocation(fsStorageLocations, LocationType.TMP).getPath());
- }
-
- private static Location getLocation(List<Location> locations, LocationType type) {
- for (Location loc : locations) {
- if (loc.getType() == type) {
- return loc;
- }
- }
-
- Location loc = new Location();
- loc.setPath("/tmp");
- loc.setType(type);
- return loc;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/entity/Storage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java
deleted file mode 100644
index 18dad65..0000000
--- a/common/src/main/java/org/apache/falcon/entity/Storage.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.feed.LocationType;
-
-/**
- * A class to encapsulate the storage for a given feed which can either be
- * expressed as a path on the file system or a table in a catalog.
- */
-public interface Storage {
-
- /**
- * Enumeration for the various storage types.
- */
- enum TYPE {FILESYSTEM, TABLE}
-
- /**
- * Return the type of storage.
- *
- * @return storage type
- */
- TYPE getType();
-
- /**
- * Return the uri template.
- *
- * @return uri template
- */
- String getUriTemplate();
-
- /**
- * Return the uri template for a given location type.
- *
- * @param locationType type of location, applies only to filesystem type
- * @return uri template
- */
- String getUriTemplate(LocationType locationType);
-
- /**
- * Check if the storage, filesystem location or catalog table exists.
- * Filesystem location always returns true.
- *
- * @return true if table exists else false
- * @throws FalconException an exception
- */
- boolean exists() throws FalconException;
-
- /**
- * Check for equality of this instance against the one in question.
- *
- * @param toCompareAgainst instance to compare
- * @return true if identical else false
- * @throws FalconException an exception
- */
- boolean isIdentical(Storage toCompareAgainst) throws FalconException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index d0435fb..1c323fd 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -20,10 +20,8 @@ package org.apache.falcon.entity.parser;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityGraph;
@@ -31,6 +29,7 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Cluster;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
@@ -72,7 +71,6 @@ public class FeedEntityParser extends EntityParser<Feed> {
validateFeedCutOffPeriod(feed, cluster);
}
- validateFeedStorage(feed);
validateFeedPartitionExpression(feed);
validateFeedGroups(feed);
@@ -107,19 +105,21 @@ public class FeedEntityParser extends EntityParser<Feed> {
return processes;
}
- private void validateFeedGroups(Feed feed) throws FalconException {
+ private void validateFeedGroups(Feed feed) throws ValidationException {
String[] groupNames = feed.getGroups() != null ? feed.getGroups().split(",") : new String[]{};
- String defaultPath = FeedHelper.createStorage(feed).getUriTemplate();
+ String defaultPath = FeedHelper.getLocation(feed, LocationType.DATA)
+ .getPath();
for (Cluster cluster : feed.getClusters().getClusters()) {
- final String uriTemplate = FeedHelper.createStorage(cluster, feed).getUriTemplate();
- if (!FeedGroup.getDatePattern(uriTemplate).equals(
+ if (!FeedGroup.getDatePattern(
+ FeedHelper.getLocation(feed, LocationType.DATA,
+ cluster.getName()).getPath()).equals(
FeedGroup.getDatePattern(defaultPath))) {
throw new ValidationException("Feeds default path pattern: "
- + FeedHelper.createStorage(feed).getUriTemplate()
+ + FeedHelper.getLocation(feed, LocationType.DATA).getPath()
+ ", does not match with cluster: "
+ cluster.getName()
+ " path pattern: "
- + uriTemplate);
+ + FeedHelper.getLocation(feed, LocationType.DATA, cluster.getName()).getPath());
}
}
for (String groupName : groupNames) {
@@ -127,7 +127,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
if (group != null && !group.canContainFeed(feed)) {
throw new ValidationException(
"Feed " + feed.getName() + "'s frequency: " + feed.getFrequency().toString()
- + ", path pattern: " + FeedHelper.createStorage(feed)
+ + ", path pattern: " + FeedHelper.getLocation(feed, LocationType.DATA).getPath()
+ " does not match with group: " + group.getName() + "'s frequency: "
+ group.getFrequency()
+ ", date pattern: " + group.getDatePattern());
@@ -280,29 +280,4 @@ public class FeedEntityParser extends EntityParser<Feed> {
"Alteast one of the partition tags has to be a cluster expression for cluster " + cl.getName());
}
}
-
- /**
- * Ensure table is already defined in the catalog registry.
- * Does not matter for FileSystem storage.
- */
- private void validateFeedStorage(Feed feed) throws FalconException {
- StringBuilder buffer = new StringBuilder();
- for (Cluster cluster : feed.getClusters().getClusters()) {
- final Storage storage = FeedHelper.createStorage(cluster, feed);
- if (!storage.exists()) {
- // this is only true for table, filesystem always returns true
- CatalogStorage catalogStorage = (CatalogStorage) storage;
- buffer.append("Table [")
- .append(catalogStorage.getTable())
- .append("] does not exist for feed: ")
- .append(feed.getName())
- .append(", cluster: ")
- .append(cluster.getName());
- }
- }
-
- if (buffer.length() > 0) {
- throw new ValidationException(buffer.toString());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/group/FeedGroup.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroup.java b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
index d517828..5dca46f 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroup.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
@@ -17,10 +17,10 @@
*/
package org.apache.falcon.group;
-import org.apache.falcon.FalconException;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.common.FeedDataPath;
import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.LocationType;
import java.util.ArrayList;
import java.util.Collections;
@@ -93,8 +93,8 @@ public class FeedGroup {
return datePattern;
}
- public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException {
+ public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) {
return this.frequency.equals(feed.getFrequency())
- && this.datePattern.equals(getDatePattern(FeedHelper.createStorage(feed).getUriTemplate()));
+ && this.datePattern.equals(getDatePattern(FeedHelper.getLocation(feed, LocationType.DATA).getPath()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
index d054873..ed44b48 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
@@ -24,6 +24,7 @@ import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.service.ConfigurationChangeListener;
import java.util.Collections;
@@ -113,9 +114,8 @@ public final class FeedGroupMap implements ConfigurationChangeListener {
return groupSet;
}
- public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed)
- throws FalconException {
+ public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed) {
return getGroups(feed.getGroups(), feed.getFrequency(),
- FeedHelper.createStorage(feed).getUriTemplate());
+ FeedHelper.getLocation(feed, LocationType.DATA).getPath());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index fc69933..a9d39de 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -21,10 +21,10 @@ package org.apache.falcon.update;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Partition;
import org.apache.falcon.entity.v0.feed.Partitions;
import org.apache.falcon.entity.v0.process.Cluster;
@@ -78,15 +78,18 @@ public final class UpdateHelper {
}
}
- public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedProcess)
- throws FalconException {
- Storage oldFeedStorage = FeedHelper.createStorage(oldFeed);
- Storage newFeedStorage = FeedHelper.createStorage(newFeed);
-
- if (!oldFeedStorage.isIdentical(newFeedStorage)) {
+ public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedProcess) {
+ if (!FeedHelper.getLocation(oldFeed.getLocations(), LocationType.DATA)
+ .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.DATA).getPath())
+ || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.META)
+ .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.META).getPath())
+ || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.STATS)
+ .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.STATS).getPath())
+ || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.TMP)
+ .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.TMP).getPath())) {
return true;
}
- LOG.debug(oldFeed.toShortString() + ": Storage identical. Ignoring...");
+ LOG.debug(oldFeed.toShortString() + ": Location identical. Ignoring...");
if (!oldFeed.getFrequency().equals(newFeed.getFrequency())) {
return true;
@@ -125,12 +128,17 @@ public final class UpdateHelper {
}
for (Cluster cluster : affectedProcess.getClusters().getClusters()) {
- oldFeedStorage = FeedHelper.createStorage(cluster.getName(), oldFeed);
- newFeedStorage = FeedHelper.createStorage(cluster.getName(), newFeed);
-
- if (!FeedHelper.getCluster(oldFeed, cluster.getName()).getValidity().getStart()
+ if (!FeedHelper
+ .getCluster(oldFeed, cluster.getName()).getValidity().getStart()
.equals(FeedHelper.getCluster(newFeed, cluster.getName()).getValidity().getStart())
- || !oldFeedStorage.isIdentical(newFeedStorage)) {
+ || !FeedHelper.getLocation(oldFeed, LocationType.DATA, cluster.getName()).getPath()
+ .equals(FeedHelper.getLocation(newFeed, LocationType.DATA, cluster.getName()).getPath())
+ || !FeedHelper.getLocation(oldFeed, LocationType.META, cluster.getName()).getPath()
+ .equals(FeedHelper.getLocation(newFeed, LocationType.META, cluster.getName()).getPath())
+ || !FeedHelper.getLocation(oldFeed, LocationType.STATS, cluster.getName()).getPath()
+ .equals(FeedHelper.getLocation(newFeed, LocationType.STATS, cluster.getName()).getPath())
+ || !FeedHelper.getLocation(oldFeed, LocationType.TMP, cluster.getName()).getPath()
+ .equals(FeedHelper.getLocation(newFeed, LocationType.TMP, cluster.getName()).getPath())) {
return true;
}
LOG.debug(oldFeed.toShortString() + ": Feed on cluster" + cluster.getName() + " identical. Ignoring...");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
deleted file mode 100644
index 458111d..0000000
--- a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.testng.Assert;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import java.net.URISyntaxException;
-
-/**
- * Test class for Catalog Table Storage.
- * Exists will be covered in integration tests as it actually checks if the table exists.
- */
-public class CatalogStorageTest {
-
- @Test
- public void testGetType() throws Exception {
- String table = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
- CatalogStorage storage = new CatalogStorage(table);
- Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
- }
-
- @Test
- public void testParseVaildURI() throws URISyntaxException {
- String table = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
- CatalogStorage storage = new CatalogStorage(table);
- Assert.assertEquals("${hcatNode}", storage.getCatalogUrl());
- Assert.assertEquals("clicksdb", storage.getDatabase());
- Assert.assertEquals("clicks", storage.getTable());
- Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
- Assert.assertEquals(2, storage.getPartitions().size());
- Assert.assertEquals("us", storage.getPartitionValue("region"));
- Assert.assertTrue(storage.hasPartition("region"));
- Assert.assertNull(storage.getPartitionValue("unknown"));
- Assert.assertFalse(storage.hasPartition("unknown"));
- }
-
-
- @DataProvider(name = "invalidTableURIs")
- public Object[][] createInvalidTableURIData() {
- return new Object[][] {
- {"catalog:default:clicks:ds=$YEAR-$MONTH-$DAY#region=us", ""},
- {"default:clicks:ds=$YEAR-$MONTH-$DAY#region=us", ""},
- {"catalog:default#ds=$YEAR-$MONTH-$DAY;region=us", ""},
- {"catalog://default/clicks#ds=$YEAR-$MONTH-$DAY:region=us", ""},
- };
- }
-
- @Test(dataProvider = "invalidTableURIs", expectedExceptions = URISyntaxException.class)
- public void testParseInvalidURI(String tableUri, String ignore) throws URISyntaxException {
- new CatalogStorage(tableUri);
- Assert.fail("Exception must have been thrown");
- }
-
- @Test
- public void testIsIdenticalPositive() throws Exception {
- CatalogStorage table1 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
- CatalogStorage table2 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
- Assert.assertTrue(table1.isIdentical(table2));
-
- final String catalogUrl = "thrift://localhost:49083";
- CatalogStorage table3 = new CatalogStorage(catalogUrl,
- "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
- CatalogStorage table4 = new CatalogStorage(catalogUrl,
- "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
- Assert.assertTrue(table3.isIdentical(table4));
- }
-
- @Test
- public void testIsIdenticalNegative() throws Exception {
- CatalogStorage table1 = new CatalogStorage("catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
- CatalogStorage table2 = new CatalogStorage("catalog:clicksdb:impressions#ds=$YEAR-$MONTH-$DAY;region=us");
- Assert.assertFalse(table1.isIdentical(table2));
-
- final String catalogUrl = "thrift://localhost:49083";
- CatalogStorage table3 = new CatalogStorage(catalogUrl,
- "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
- CatalogStorage table4 = new CatalogStorage(catalogUrl,
- "catalog:clicksdb:impressions#ds=$YEAR-$MONTH-$DAY;region=us");
- Assert.assertFalse(table3.isIdentical(table4));
-
- CatalogStorage table5 = new CatalogStorage("thrift://localhost:49084",
- "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
- CatalogStorage table6 = new CatalogStorage("thrift://localhost:49083",
- "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us");
- Assert.assertFalse(table5.isIdentical(table6));
- }
-
- @Test
- public void testGetUriTemplateWithCatalogUrl() throws Exception {
- final String catalogUrl = "thrift://localhost:49083";
- String tableUri = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
- String uriTemplate = "thrift://localhost:49083/clicksdb/clicks/region=us;ds=$YEAR-$MONTH-$DAY";
-
- CatalogStorage table = new CatalogStorage(catalogUrl, tableUri);
-
- Assert.assertEquals(uriTemplate, table.getUriTemplate());
- Assert.assertEquals(uriTemplate, table.getUriTemplate(LocationType.DATA));
- }
-
- @Test
- public void testGetUriTemplateWithOutCatalogUrl() throws Exception {
- String tableUri = "catalog:clicksdb:clicks#ds=$YEAR-$MONTH-$DAY;region=us";
- String uriTemplate = "${hcatNode}/clicksdb/clicks/region=us;ds=$YEAR-$MONTH-$DAY";
-
- CatalogStorage table = new CatalogStorage(tableUri);
-
- Assert.assertEquals(uriTemplate, table.getUriTemplate());
- Assert.assertEquals(uriTemplate, table.getUriTemplate(LocationType.DATA));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
deleted file mode 100644
index 3e3b575..0000000
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Test class for File System Storage.
- */
-public class FileSystemStorageTest {
-
- @Test
- public void testGetType() throws Exception {
- final Location location = new Location();
- location.setPath("/foo/bar");
- location.setType(LocationType.DATA);
- List<Location> locations = new ArrayList<Location>();
- locations.add(location);
-
- FileSystemStorage storage = new FileSystemStorage(locations);
- Assert.assertEquals(storage.getType(), Storage.TYPE.FILESYSTEM);
- }
-
- @Test
- public void testGetUriTemplate() throws Exception {
- final Location location = new Location();
- location.setPath("/foo/bar");
- location.setType(LocationType.DATA);
- List<Location> locations = new ArrayList<Location>();
- locations.add(location);
-
- FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
- Assert.assertEquals(storage.getUriTemplate(), "hdfs://localhost:41020/foo/bar");
- }
-
- @Test
- public void testGetUriTemplateWithOutStorageURL() throws Exception {
- final Location location = new Location();
- location.setPath("/foo/bar");
- location.setType(LocationType.DATA);
- List<Location> locations = new ArrayList<Location>();
- locations.add(location);
-
- FileSystemStorage storage = new FileSystemStorage(locations);
- Assert.assertEquals(storage.getUriTemplate(), "${nameNode}/foo/bar");
- }
-
- @Test
- public void testGetUriTemplateWithLocationType() throws Exception {
- final Location location = new Location();
- location.setPath("/foo/bar");
- location.setType(LocationType.DATA);
- List<Location> locations = new ArrayList<Location>();
- locations.add(location);
-
- FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
- Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "hdfs://localhost:41020/foo/bar");
- }
-
- @Test
- public void testExists() throws Exception {
- final Location location = new Location();
- location.setPath("/foo/bar");
- location.setType(LocationType.DATA);
- List<Location> locations = new ArrayList<Location>();
- locations.add(location);
-
- FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
- Assert.assertTrue(storage.exists());
- }
-
- @Test
- public void testIsIdentical() throws Exception {
- final String storageUrl = "hdfs://localhost:41020";
- final Location location1 = new Location();
- location1.setPath("/foo/bar");
- location1.setType(LocationType.DATA);
- List<Location> locations1 = new ArrayList<Location>();
- locations1.add(location1);
- FileSystemStorage storage1 = new FileSystemStorage(storageUrl, locations1);
-
- final Location location2 = new Location();
- location2.setPath("/foo/bar");
- location2.setType(LocationType.DATA);
- List<Location> locations2 = new ArrayList<Location>();
- locations2.add(location2);
- FileSystemStorage storage2 = new FileSystemStorage(storageUrl, locations2);
-
- Assert.assertTrue(storage1.isIdentical(storage2));
- }
-
- @Test
- public void testIsIdenticalNegative() throws Exception {
- final String storageUrl = "hdfs://localhost:41020";
- final Location location1 = new Location();
- location1.setPath("/foo/baz");
- location1.setType(LocationType.DATA);
- List<Location> locations1 = new ArrayList<Location>();
- locations1.add(location1);
- FileSystemStorage storage1 = new FileSystemStorage(storageUrl, locations1);
-
- final Location location2 = new Location();
- location2.setPath("/foo/bar");
- location2.setType(LocationType.DATA);
- List<Location> locations2 = new ArrayList<Location>();
- locations2.add(location2);
- FileSystemStorage storage2 = new FileSystemStorage(storageUrl, locations2);
-
- Assert.assertFalse(storage1.isIdentical(storage2));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index 8bcff78..4cc4a0e 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -116,12 +116,12 @@ public class FeedEntityParserTest extends AbstractTestBase {
assertEquals(feed.getClusters().getClusters().get(1).getRetention()
.getLimit().toString(), "hours(6)");
- assertEquals("${nameNode}/projects/falcon/clicks",
- FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
- assertEquals("${nameNode}/projects/falcon/clicksMetaData",
- FeedHelper.createStorage(feed).getUriTemplate(LocationType.META));
- assertEquals("${nameNode}/projects/falcon/clicksStats",
- FeedHelper.createStorage(feed).getUriTemplate(LocationType.STATS));
+ assertEquals(FeedHelper.getLocation(feed, LocationType.DATA).getPath(),
+ "/projects/falcon/clicks");
+ assertEquals(FeedHelper.getLocation(feed, LocationType.META).getPath(),
+ "/projects/falcon/clicksMetaData");
+ assertEquals(FeedHelper.getLocation(feed, LocationType.STATS).getPath(),
+ "/projects/falcon/clicksStats");
assertEquals(feed.getACL().getGroup(), "group");
assertEquals(feed.getACL().getOwner(), "testuser");
@@ -444,17 +444,4 @@ public class FeedEntityParserTest extends AbstractTestBase {
Assert.assertEquals(org.xml.sax.SAXParseException.class, e.getCause().getCause().getClass());
}
}
-
- @Test
- public void testParseFeedWithTable() throws FalconException {
- final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
- Feed feedWithTable = parser.parse(inputStream);
- Assert.assertEquals(feedWithTable.getTable().getUri(),
- "catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR");
- }
-
- @Test (expectedExceptions = FalconException.class)
- public void testParseInvalidFeedWithTable() throws FalconException {
- parser.parse(FeedEntityParserTest.class.getResourceAsStream("/config/feed/invalid-feed.xml"));
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
index e8f80bd..d4b5a2a 100644
--- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
@@ -28,9 +28,7 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.feed.Partition;
import org.apache.falcon.entity.v0.feed.Properties;
import org.apache.falcon.entity.v0.feed.Property;
@@ -119,11 +117,11 @@ public class UpdateHelperTest extends AbstractTestBase {
newFeed.getLateArrival().setCutOff(oldFeed.getLateArrival().getCutOff());
Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
- getLocation(newFeed, LocationType.DATA).setPath("/test");
+ FeedHelper.getLocation(newFeed, LocationType.DATA).setPath("/test");
Assert.assertTrue(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
- getLocation(newFeed, LocationType.DATA).setPath(
- getLocation(oldFeed, LocationType.DATA).getPath());
+ FeedHelper.getLocation(newFeed, LocationType.DATA).setPath(
+ FeedHelper.getLocation(oldFeed, LocationType.DATA).getPath());
Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
newFeed.setFrequency(Frequency.fromString("months(1)"));
@@ -156,20 +154,4 @@ public class UpdateHelperTest extends AbstractTestBase {
process.getClusters().getClusters().get(0).getName()).getValidity().getStart());
Assert.assertFalse(UpdateHelper.shouldUpdate(oldFeed, newFeed, process));
}
-
- private static Location getLocation(Feed feed, LocationType type) {
- return getLocation(feed.getLocations(), type);
- }
-
- private static Location getLocation(Locations locations, LocationType type) {
- for (Location loc : locations.getLocations()) {
- if (loc.getType() == type) {
- return loc;
- }
- }
- Location loc = new Location();
- loc.setPath("/tmp");
- loc.setType(type);
- return loc;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/test/resources/config/feed/hive-table-feed.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/hive-table-feed.xml b/common/src/test/resources/config/feed/hive-table-feed.xml
deleted file mode 100644
index e84f90a..0000000
--- a/common/src/test/resources/config/feed/hive-table-feed.xml
+++ /dev/null
@@ -1,48 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1">
- <partitions>
- <partition name="fraud"/>
- <partition name="good"/>
- </partitions>
-
- <groups>online,bi</groups>
-
- <frequency>hours(1)</frequency>
- <timezone>UTC</timezone>
- <late-arrival cut-off="hours(6)"/>
-
- <clusters>
- <cluster name="testCluster" type="source" partition="*/${cluster.colo}">
- <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
- <retention limit="hours(48)" action="delete"/>
- <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
- </cluster>
- <cluster name="backupCluster" type="target">
- <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
- <retention limit="hours(6)" action="archive"/>
- <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
- </cluster>
- </clusters>
-
- <table uri="catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR" />
-
- <ACL owner="testuser" group="group" permission="0x755"/>
- <schema location="/schema/clicks" provider="protobuf"/>
-</feed>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/common/src/test/resources/config/feed/invalid-feed.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/invalid-feed.xml b/common/src/test/resources/config/feed/invalid-feed.xml
deleted file mode 100644
index b7273a9..0000000
--- a/common/src/test/resources/config/feed/invalid-feed.xml
+++ /dev/null
@@ -1,53 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1">
- <partitions>
- <partition name="fraud"/>
- <partition name="good"/>
- </partitions>
-
- <groups>online,bi</groups>
-
- <frequency>hours(1)</frequency>
- <timezone>UTC</timezone>
- <late-arrival cut-off="hours(6)"/>
-
- <clusters>
- <cluster name="testCluster" type="source" partition="*/${cluster.colo}">
- <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
- <retention limit="hours(48)" action="delete"/>
- <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
- </cluster>
- <cluster name="backupCluster" type="target">
- <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
- <retention limit="hours(6)" action="archive"/>
- <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
- </cluster>
- </clusters>
-
- <locations>
- <location type="data" path="/projects/falcon/clicks"/>
- <location type="stats" path="/projects/falcon/clicksStats"/>
- <location type="meta" path="/projects/falcon/clicksMetaData"/>
- </locations>
- <table uri="catalog:default:clicks#ds=$YEAR-$MONTH-$DAY-$HOUR" />
-
- <ACL owner="testuser" group="group" permission="0x755"/>
- <schema location="/schema/clicks" provider="protobuf"/>
-</feed>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index 00c261b..adef3ec 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -21,9 +21,9 @@ package org.apache.falcon.converter;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
@@ -62,7 +62,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
private static final String REPLICATION_COORD_TEMPLATE = "/config/coordinator/replication-coordinator.xml";
private static final String REPLICATION_WF_TEMPLATE = "/config/workflow/replication-workflow.xml";
- public static final String FEED_PATH_SEP = "#";
+ private static final String FEED_PATH_SEP = "#";
private static final String TIMEOUT = "timeout";
private static final String PARALLEL = "parallel";
@@ -116,7 +116,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
WORKFLOW retentionWorkflow = new WORKFLOW();
try {
//
- WORKFLOWAPP retWfApp = createRetentionWorkflow();
+ WORKFLOWAPP retWfApp = createRetentionWorkflow(cluster);
retWfApp.setName(wfName);
marshal(cluster, retWfApp, wfPath);
retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
@@ -124,9 +124,23 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
- String feedBasePaths = getFeedDataPath(cluster, feed);
+ String feedPathMask = getLocationURI(cluster, feed, LocationType.DATA);
+ String metaPathMask = getLocationURI(cluster, feed, LocationType.META);
+ String statsPathMask = getLocationURI(cluster, feed, LocationType.STATS);
+ String tmpPathMask = getLocationURI(cluster, feed, LocationType.TMP);
+
+ StringBuilder feedBasePaths = new StringBuilder(feedPathMask);
+ if (metaPathMask != null) {
+ feedBasePaths.append(FEED_PATH_SEP).append(metaPathMask);
+ }
+ if (statsPathMask != null) {
+ feedBasePaths.append(FEED_PATH_SEP).append(statsPathMask);
+ }
+ if (tmpPathMask != null) {
+ feedBasePaths.append(FEED_PATH_SEP).append(tmpPathMask);
+ }
- props.put("feedDataPath", feedBasePaths);
+ props.put("feedDataPath", feedBasePaths.toString().replaceAll("\\$\\{", "\\?\\{"));
props.put("timeZone", feed.getTimezone().getID());
props.put("frequency", feed.getFrequency().getTimeUnit().name());
props.put("limit", feedCluster.getRetention().getLimit().toString());
@@ -142,29 +156,6 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
}
}
- protected String getFeedDataPath(Cluster cluster, Feed feed) throws FalconException {
- final Storage storage = FeedHelper.createStorage(cluster, feed);
- String feedPathMask = storage.getUriTemplate(LocationType.DATA);
- String metaPathMask = storage.getUriTemplate(LocationType.META);
- String statsPathMask = storage.getUriTemplate(LocationType.STATS);
- String tmpPathMask = storage.getUriTemplate(LocationType.TMP);
-
- StringBuilder feedBasePaths = new StringBuilder(feedPathMask);
- if (metaPathMask != null) {
- feedBasePaths.append(FEED_PATH_SEP).append(metaPathMask);
- }
-
- if (statsPathMask != null) {
- feedBasePaths.append(FEED_PATH_SEP).append(statsPathMask);
- }
-
- if (tmpPathMask != null) {
- feedBasePaths.append(FEED_PATH_SEP).append(tmpPathMask);
- }
-
- return feedBasePaths.toString().replaceAll("\\$\\{", "\\?\\{");
- }
-
private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath)
throws FalconException {
@@ -272,8 +263,10 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
SYNCDATASET inputDataset = (SYNCDATASET) replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(0);
SYNCDATASET outputDataset = (SYNCDATASET) replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(1);
- inputDataset.setUriTemplate(FeedHelper.createStorage(srcCluster, feed).getUriTemplate(LocationType.DATA));
- outputDataset.setUriTemplate(FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
+ inputDataset.setUriTemplate(new Path(ClusterHelper.getStorageUrl(srcCluster),
+ FeedHelper.getLocation(feed, LocationType.DATA, srcCluster.getName()).getPath()).toString());
+ outputDataset.setUriTemplate(getStoragePath(
+ FeedHelper.getLocation(feed, LocationType.DATA, trgCluster.getName()).getPath()));
setDatasetValues(inputDataset, feed, srcCluster);
setDatasetValues(outputDataset, feed, srcCluster);
if (feed.getAvailabilityFlag() == null) {
@@ -345,7 +338,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
marshal(cluster, repWFapp, wfPath);
}
- private WORKFLOWAPP createRetentionWorkflow() throws IOException, FalconException {
+ private WORKFLOWAPP createRetentionWorkflow(Cluster cluster) throws IOException, FalconException {
return getWorkflowTemplate(RETENTION_WF_TEMPLATE);
}
@@ -360,4 +353,19 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
}
return props;
}
+
+ private String getLocationURI(Cluster cluster, Feed feed, LocationType type) {
+ String path = FeedHelper.getLocation(feed, type, cluster.getName())
+ .getPath();
+
+ if (!path.equals("/tmp")) {
+ if (new Path(path).toUri().getScheme() == null) {
+ return new Path(ClusterHelper.getStorageUrl(cluster), path)
+ .toString();
+ } else {
+ return path;
+ }
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index 3b3132a..7fbe179 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -19,7 +19,6 @@ package org.apache.falcon.converter;
import static org.testng.Assert.assertEquals;
-import java.util.Calendar;
import java.util.Collection;
import java.util.List;
@@ -28,7 +27,6 @@ import javax.xml.bind.Unmarshaller;
import org.apache.falcon.FalconException;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
@@ -76,6 +74,7 @@ public class OozieFeedMapperTest {
ClusterHelper.getInterface(trgCluster, Interfacetype.WRITE).setEndpoint(trgHdfsUrl);
feed = (Feed) storeEntity(EntityType.FEED, FEED);
+
}
protected Entity storeEntity(EntityType type, String path) throws Exception {
@@ -149,35 +148,6 @@ public class OozieFeedMapperTest {
break;
}
}
- }
-
- @Test
- public void testRetentionCoords() throws FalconException {
- org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName());
- final Calendar instance = Calendar.getInstance();
- instance.roll(Calendar.YEAR, 1);
- cluster.getValidity().setEnd(instance.getTime());
-
- OozieFeedMapper feedMapper = new OozieFeedMapper(feed);
- List<COORDINATORAPP> coords = feedMapper.getCoordinators(srcCluster, new Path("/projects/falcon/"));
- COORDINATORAPP coord = coords.get(0);
- Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
- Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
- Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
-
- String feedDataPath = null;
- org.apache.falcon.oozie.coordinator.CONFIGURATION configuration =
- coord.getAction().getWorkflow().getConfiguration();
- for (Property property : configuration.getProperty()) {
- if ("feedDataPath".equals(property.getName())) {
- feedDataPath = property.getValue();
- break;
- }
- }
-
- if (feedDataPath != null) {
- Assert.assertEquals(feedDataPath, feedMapper.getFeedDataPath(srcCluster, feed));
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index 86ad937..8f75736 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -295,8 +295,10 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
SYNCDATASET syncdataset = new SYNCDATASET();
syncdataset.setName(datasetName);
- String locPath = FeedHelper.createStorage(cluster, feed).getUriTemplate(locationType);
- syncdataset.setUriTemplate(locPath);
+ String locPath = FeedHelper.getLocation(feed, locationType,
+ cluster.getName()).getPath();
+ syncdataset.setUriTemplate(new Path(locPath).toUri().getScheme() != null ? locPath : "${nameNode}"
+ + locPath);
syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 128a73d..3f70557 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -31,6 +31,7 @@ import org.apache.falcon.entity.v0.Frequency.TimeUnit;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.security.CurrentUser;
@@ -72,7 +73,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
properties.put(inName + ".initial-instance", SchemaHelper.formatDateUTC(cluster.getValidity().getStart()));
properties.put(inName + ".done-flag", "notused");
- String locPath = FeedHelper.createStorage(clusterName, feed).getUriTemplate().replace('$', '%');
+ String locPath = FeedHelper.getLocation(feed, LocationType.DATA, clusterName).getPath().replace('$', '%');
properties.put(inName + ".uri-template",
new Path(locPath).toUri().getScheme() != null ? locPath : "${nameNode}" + locPath);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index e3a42ca..7be41da 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -47,6 +47,7 @@ import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Validity;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
@@ -141,14 +142,13 @@ public class OozieProcessMapperTest extends AbstractTestBase {
ConfigurationStore store = ConfigurationStore.get();
Feed feed = store.get(EntityType.FEED, process.getInputs().getInputs().get(0).getFeed());
SYNCDATASET ds = (SYNCDATASET) coord.getDatasets().getDatasetOrAsyncDataset().get(0);
-
- final org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
- assertEquals(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()), ds.getInitialInstance());
+ assertEquals(SchemaHelper.formatDateUTC(feed.getClusters().getClusters().get(0).getValidity().getStart()),
+ ds.getInitialInstance());
assertEquals(feed.getTimezone().getID(), ds.getTimezone());
assertEquals("${coord:" + feed.getFrequency().toString() + "}", ds.getFrequency());
assertEquals("", ds.getDoneFlag());
- assertEquals(ds.getUriTemplate(), FeedHelper.createStorage(feedCluster, feed).getUriTemplate());
-
+ assertEquals(ds.getUriTemplate(), "${nameNode}" + FeedHelper.getLocation(feed, LocationType.DATA,
+ feed.getClusters().getClusters().get(0).getName()).getPath());
for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
if (prop.getName().equals("mapred.job.priority")) {
assertEquals(prop.getValue(), "LOW");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c82b663f/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index 74bab07..f762c4b 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -295,11 +295,9 @@ public class TestContext {
ServletInputStream rawlogStream = getServletInputStream(tmpFile);
- return this.service.path("api/entities/submit/" + entityType.name().toLowerCase())
- .header("Remote-User", REMOTE_USER)
- .accept(MediaType.TEXT_XML)
- .type(MediaType.TEXT_XML)
- .post(ClientResponse.class, rawlogStream);
+ return this.service.path("api/entities/submit/" + entityType.name().toLowerCase()).header("Remote-User",
+ "testuser")
+ .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML).post(ClientResponse.class, rawlogStream);
}
public void assertRequestId(ClientResponse clientRepsonse) {
[2/2] git commit: Merge commit
'c82b663fcb74fe00b6ff7572e8c2735d9f878b7c' into 85
Posted by ve...@apache.org.
Merge commit 'c82b663fcb74fe00b6ff7572e8c2735d9f878b7c' into 85
Conflicts:
CHANGES.txt
common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
common/src/main/java/org/apache/falcon/entity/FeedHelper.java
common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
common/src/main/java/org/apache/falcon/group/FeedGroup.java
common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
common/src/test/resources/config/feed/hive-table-feed.xml
process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/f278ec82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/f278ec82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/f278ec82
Branch: refs/heads/FALCON-85
Commit: f278ec82f4c21f9d2949f220261538e2fb15184c
Parents: 33b5f64 c82b663
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Mon Sep 23 14:48:26 2013 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Mon Sep 23 14:48:26 2013 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../org/apache/falcon/catalog/AbstractCatalogService.java | 1 -
.../src/main/java/org/apache/falcon/group/FeedGroupMap.java | 3 +--
.../test/java/org/apache/falcon/resource/TestContext.java | 8 +++-----
4 files changed, 5 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/f278ec82/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 06effed,b2dc1d3..e286b4e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -5,18 -5,7 +5,18 @@@ Trunk (Unreleased
INCOMPATIBLE CHANGES
NEW FEATURES
- FALCON-86 Hive table integration with cluster entity. (Venaktesh
+ FALCON-102 Add integration tests for feed entity parser for table.
+ (Venkatesh Seetharam)
+
+ FALCON-103 Upgrade oozie to 4.0.x. (Venkatesh Seetharam)
+
+ FALCON-96 Hive client to talk to the metastore. (Venkatesh
+ Seetharam via Srikanth Sundarrajan)
+
+ FALCON-87 Hive table integration with feed entity. (Venkatesh
+ Seetharam via Srikanth Sundarrajan)
+
- FALCON-86 Hive table integration with cluster entity. (Venaktesh
++ FALCON-86 Hive table integration with cluster entity. (Venaktesh
Seetharam via Srikanth Sundarrajan)
FALCON-88 Add embedded hive and webhcat for integration tests.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/f278ec82/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/f278ec82/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
----------------------------------------------------------------------
diff --cc common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
index 0a35a71,ed44b48..2f65e09
--- a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
@@@ -114,9 -114,8 +114,8 @@@ public final class FeedGroupMap impleme
return groupSet;
}
- public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed)
- throws FalconException {
- public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed) {
++ public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException {
return getGroups(feed.getGroups(), feed.getFrequency(),
- FeedHelper.getLocation(feed, LocationType.DATA).getPath());
+ FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
}
}