You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/11/12 12:05:28 UTC
[11/12] FALCON-85 Hive (HCatalog) integration. Contributed by
Venkatesh Seetharam FALCON-163 Merge FALCON-85 branch into main line.
Contributed by Venkatesh Seetharam
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
new file mode 100644
index 0000000..51e4d6e
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.catalog;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hcatalog.api.HCatClient;
+import org.apache.hcatalog.api.HCatDatabase;
+import org.apache.hcatalog.api.HCatPartition;
+import org.apache.hcatalog.api.HCatTable;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * An implementation of CatalogService that uses Hive Meta Store (HCatalog)
+ * as the backing Catalog registry.
+ */
+public class HiveCatalogService extends AbstractCatalogService {
+
+ private static final Logger LOG = Logger.getLogger(HiveCatalogService.class);
+
+ private static final ConcurrentHashMap<String, HCatClient> CACHE = new ConcurrentHashMap<String, HCatClient>();
+
+ public static HCatClient get(Cluster cluster) throws FalconException {
+ assert cluster != null : "Cluster cant be null";
+
+ String metastoreUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
+ return get(metastoreUrl);
+ }
+
+ public static synchronized HCatClient get(String metastoreUrl) throws FalconException {
+
+ if (!CACHE.containsKey(metastoreUrl)) {
+ HCatClient hCatClient = getHCatClient(metastoreUrl);
+ LOG.info("Caching HCatalog client object for " + metastoreUrl);
+ CACHE.putIfAbsent(metastoreUrl, hCatClient);
+ }
+
+ return CACHE.get(metastoreUrl);
+ }
+
+ private static HCatClient getHCatClient(String metastoreUrl) throws FalconException {
+ try {
+ HiveConf hcatConf = new HiveConf();
+ hcatConf.set("hive.metastore.local", "false");
+ hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl);
+ hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+ hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ HCatSemanticAnalyzer.class.getName());
+ hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+
+ hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+
+ return HCatClient.create(hcatConf);
+ } catch (HCatException e) {
+ throw new FalconException(e);
+ }
+ }
+
+
+ @Override
+ public boolean isAlive(String catalogBaseUrl) throws FalconException {
+ LOG.info("Checking if the service is alive for: " + catalogBaseUrl);
+
+ try {
+ HCatClient client = get(catalogBaseUrl);
+ client.close();
+ HCatDatabase database = client.getDatabase("default");
+ return database != null;
+ } catch (HCatException e) {
+ throw new FalconException(e);
+ }
+ }
+
+ @Override
+ public boolean tableExists(String catalogUrl, String database, String tableName)
+ throws FalconException {
+ LOG.info("Checking if the table exists: " + tableName);
+
+ try {
+ HCatClient client = get(catalogUrl);
+ HCatTable table = client.getTable(database, tableName);
+ return table != null;
+ } catch (HCatException e) {
+ throw new FalconException(e);
+ }
+ }
+
+ @Override
+ public boolean isTableExternal(String catalogUrl, String database, String tableName)
+ throws FalconException {
+ LOG.info("Returns a list of table properties for:" + tableName);
+
+ try {
+ HCatClient client = get(catalogUrl);
+ HCatTable table = client.getTable(database, tableName);
+ return !table.getTabletype().equals("MANAGED_TABLE");
+ } catch (HCatException e) {
+ throw new FalconException(e);
+ }
+ }
+
+ @Override
+ public List<CatalogPartition> listPartitionsByFilter(String catalogUrl, String database,
+ String tableName, String filter)
+ throws FalconException {
+ LOG.info("List partitions for : " + tableName + ", partition filter: " + filter);
+
+ try {
+ List<CatalogPartition> catalogPartitionList = new ArrayList<CatalogPartition>();
+
+ HCatClient client = get(catalogUrl);
+ List<HCatPartition> hCatPartitions = client.listPartitionsByFilter(database, tableName, filter);
+ for (HCatPartition hCatPartition : hCatPartitions) {
+ CatalogPartition partition = createCatalogPartition(hCatPartition);
+ catalogPartitionList.add(partition);
+ }
+
+ return catalogPartitionList;
+ } catch (HCatException e) {
+ throw new FalconException(e);
+ }
+ }
+
+ private CatalogPartition createCatalogPartition(HCatPartition hCatPartition) {
+ final CatalogPartition catalogPartition = new CatalogPartition();
+ catalogPartition.setDatabaseName(hCatPartition.getDatabaseName());
+ catalogPartition.setTableName(hCatPartition.getTableName());
+ catalogPartition.setValues(hCatPartition.getValues());
+ catalogPartition.setInputFormat(hCatPartition.getInputFormat());
+ catalogPartition.setOutputFormat(hCatPartition.getOutputFormat());
+ catalogPartition.setLocation(hCatPartition.getLocation());
+ catalogPartition.setSerdeInfo(hCatPartition.getSerDe());
+ catalogPartition.setCreateTime(hCatPartition.getCreateTime());
+ catalogPartition.setLastAccessTime(hCatPartition.getLastAccessTime());
+
+ List<String> tableColumns = new ArrayList<String>();
+ for (HCatFieldSchema hCatFieldSchema : hCatPartition.getColumns()) {
+ tableColumns.add(hCatFieldSchema.getName());
+ }
+ catalogPartition.setTableColumns(tableColumns);
+
+ return catalogPartition;
+ }
+
+ @Override
+ public boolean dropPartitions(String catalogUrl, String database,
+ String tableName, Map<String, String> partitions)
+ throws FalconException {
+ LOG.info("Dropping partitions for : " + tableName + ", partitions: " + partitions);
+
+ try {
+ HCatClient client = get(catalogUrl);
+ client.dropPartitions(database, tableName, partitions, true);
+ } catch (HCatException e) {
+ throw new FalconException(e);
+ }
+
+ return true;
+ }
+
+ @Override
+ public CatalogPartition getPartition(String catalogUrl, String database, String tableName,
+ Map<String, String> partitionSpec) throws FalconException {
+ LOG.info("List partitions for : " + tableName + ", partition spec: " + partitionSpec);
+
+ try {
+ HCatClient client = get(catalogUrl);
+ HCatPartition hCatPartition = client.getPartition(database, tableName, partitionSpec);
+ return createCatalogPartition(hCatPartition);
+ } catch (HCatException e) {
+ throw new FalconException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
index 49662d1..644afd2 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
@@ -84,7 +84,7 @@ public abstract class AbstractCleanupHandler {
return paths;
}
- private FileSystem getFileSystem(org.apache.falcon.entity.v0.cluster.Cluster cluster)
+ protected FileSystem getFileSystem(org.apache.falcon.entity.v0.cluster.Cluster cluster)
throws FalconException {
FileSystem fs;
@@ -101,6 +101,16 @@ public abstract class AbstractCleanupHandler {
throws FalconException {
FileStatus[] logs = getAllLogs(cluster, entity);
+ delete(cluster, entity, retention, logs);
+ }
+
+ protected void delete(Cluster cluster, Entity entity, long retention, FileStatus[] logs)
+ throws FalconException {
+ if (logs == null || logs.length == 0) {
+ LOG.info("Nothing to delete for cluster: " + cluster.getName() + ", entity: " + entity.getName());
+ return;
+ }
+
long now = System.currentTimeMillis();
for (FileStatus log : logs) {
@@ -126,7 +136,6 @@ public abstract class AbstractCleanupHandler {
+ log.getPath());
}
}
-
}
private void deleteParentIfEmpty(FileSystem fs, Path parent) throws IOException {
@@ -136,7 +145,6 @@ public abstract class AbstractCleanupHandler {
fs.delete(parent, true);
deleteParentIfEmpty(fs, parent.getParent());
}
-
}
public abstract void cleanup() throws FalconException;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
index 0c8cf82..7dbac58 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
@@ -18,12 +18,19 @@
package org.apache.falcon.cleanup;
import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import java.io.IOException;
import java.util.Collection;
/**
@@ -44,9 +51,10 @@ public class FeedCleanupHandler extends AbstractCleanupHandler {
Cluster currentCluster = STORE.get(EntityType.CLUSTER,
cluster.getName());
if (currentCluster.getColo().equals(getCurrentColo())) {
- LOG.info("Cleaning up logs for process:" + feedName
+ LOG.info("Cleaning up logs & staged data for feed:" + feedName
+ " in cluster: " + cluster.getName() + " with retention: " + retention);
delete(currentCluster, feed, retention);
+ deleteStagedData(currentCluster, feed, retention);
} else {
LOG.info("Ignoring cleanup for process:" + feedName
+ " in cluster: " + cluster.getName() + " as this does not belong to current colo");
@@ -56,11 +64,36 @@ public class FeedCleanupHandler extends AbstractCleanupHandler {
}
}
+ /**
+ * Delete the staging area used for replicating tables.
+ *
+ * @param cluster cluster hosting the staged data
+ * @param feed feed entity
+ * @param retention retention limit
+ * @throws FalconException
+ */
+ private void deleteStagedData(Cluster cluster, Feed feed, long retention)
+ throws FalconException {
+ Storage storage = FeedHelper.createStorage(cluster, feed);
+ if (storage.getType() == Storage.TYPE.FILESYSTEM) { // FS does NOT use staging dirs
+ return;
+ }
+
+ final CatalogStorage tableStorage = (CatalogStorage) storage;
+ String stagingDir = FeedHelper.getStagingDir(cluster, feed, tableStorage, Tag.REPLICATION);
+ Path stagingPath = new Path(stagingDir + "/*/*/*"); // stagingDir/dataOutPartitionValue/nominal-time/data
+ FileSystem fs = getFileSystem(cluster);
+ try {
+ FileStatus[] paths = fs.globStatus(stagingPath);
+ delete(cluster, feed, retention, paths);
+ } catch (IOException e) {
+ throw new FalconException(e);
+ }
+ }
+
@Override
protected Path getLogPath(Entity entity, String stagingPath) {
- Path logPath = new Path(stagingPath, "falcon/workflows/feed/"
+ return new Path(stagingPath, "falcon/workflows/feed/"
+ entity.getName() + "/logs/job-*/*/*");
- return logPath;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
new file mode 100644
index 0000000..32f7605
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.catalog.CatalogServiceFactory;
+import org.apache.falcon.entity.common.FeedDataPath;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+/**
+ * A catalog registry implementation of a feed storage.
+ */
+public class CatalogStorage implements Storage {
+
+ public static final String PARTITION_SEPARATOR = ";";
+ public static final String PARTITION_KEYVAL_SEPARATOR = "=";
+ public static final String INPUT_PATH_SEPARATOR = ":";
+ public static final String OUTPUT_PATH_SEPARATOR = "/";
+ public static final String PARTITION_VALUE_QUOTE = "'";
+
+ public static final String CATALOG_URL = "${hcatNode}";
+
+ private final String catalogUrl;
+ private String database;
+ private String table;
+ private Map<String, String> partitions;
+
+ protected CatalogStorage(Feed feed) throws URISyntaxException {
+ this(CATALOG_URL, feed.getTable());
+ }
+
+ protected CatalogStorage(Cluster cluster, CatalogTable table) throws URISyntaxException {
+ this(ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint(), table);
+ }
+
+ protected CatalogStorage(String catalogUrl, CatalogTable table) throws URISyntaxException {
+ this(catalogUrl, table.getUri());
+ }
+
+ protected CatalogStorage(String catalogUrl, String tableUri) throws URISyntaxException {
+ if (catalogUrl == null || catalogUrl.length() == 0) {
+ throw new IllegalArgumentException("Catalog Registry URL cannot be null or empty");
+ }
+
+ this.catalogUrl = catalogUrl;
+
+ parseFeedUri(tableUri);
+ }
+
+ /**
+ * Validate URI to conform to catalog:$database:$table#$partitions.
+ * scheme=catalog:database=$database:table=$table#$partitions
+ * partitions=key=value;key=value
+ *
+ * @param catalogTableUri table URI to parse and validate
+ * @throws URISyntaxException
+ */
+ private void parseFeedUri(String catalogTableUri) throws URISyntaxException {
+
+ final String processed = catalogTableUri.replaceAll(DOLLAR_EXPR_START_REGEX, DOLLAR_EXPR_START_NORMALIZED)
+ .replaceAll("}", EXPR_CLOSE_NORMALIZED);
+ URI tableUri = new URI(processed);
+
+ if (!"catalog".equals(tableUri.getScheme())) {
+ throw new URISyntaxException(tableUri.toString(), "catalog scheme is missing");
+ }
+
+ final String schemeSpecificPart = tableUri.getSchemeSpecificPart();
+ if (schemeSpecificPart == null) {
+ throw new URISyntaxException(tableUri.toString(), "Database and Table are missing");
+ }
+
+ String[] paths = schemeSpecificPart.split(INPUT_PATH_SEPARATOR);
+
+ if (paths.length != 2) {
+ throw new URISyntaxException(tableUri.toString(), "URI path is not in expected format: database:table");
+ }
+
+ database = paths[0];
+ table = paths[1];
+
+ if (database == null || database.length() == 0) {
+ throw new URISyntaxException(tableUri.toString(), "DB name is missing");
+ }
+ if (table == null || table.length() == 0) {
+ throw new URISyntaxException(tableUri.toString(), "Table name is missing");
+ }
+
+ String partRaw = tableUri.getFragment();
+ if (partRaw == null || partRaw.length() == 0) {
+ throw new URISyntaxException(tableUri.toString(), "Partition details are missing");
+ }
+
+ final String rawPartition = partRaw.replaceAll(DOLLAR_EXPR_START_NORMALIZED, DOLLAR_EXPR_START_REGEX)
+ .replaceAll(EXPR_CLOSE_NORMALIZED, EXPR_CLOSE_REGEX);
+ partitions = new LinkedHashMap<String, String>(); // preserve insertion order
+ String[] parts = rawPartition.split(PARTITION_SEPARATOR);
+ for (String part : parts) {
+ if (part == null || part.length() == 0) {
+ continue;
+ }
+
+ String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR);
+ if (keyVal.length != 2) {
+ throw new URISyntaxException(tableUri.toString(),
+ "Partition key value pair is not specified properly in (" + part + ")");
+ }
+
+ partitions.put(keyVal[0], keyVal[1]);
+ }
+ }
+
+ /**
+ * Create an instance from the URI Template that was generated using
+ * the getUriTemplate() method.
+ *
+ * @param uriTemplate the uri template from org.apache.falcon.entity.CatalogStorage#getUriTemplate
+ * @throws URISyntaxException
+ */
+ protected CatalogStorage(String uriTemplate) throws URISyntaxException {
+ if (uriTemplate == null || uriTemplate.length() == 0) {
+ throw new IllegalArgumentException("URI template cannot be null or empty");
+ }
+
+ final String processed = uriTemplate.replaceAll(DOLLAR_EXPR_START_REGEX, DOLLAR_EXPR_START_NORMALIZED)
+ .replaceAll("}", EXPR_CLOSE_NORMALIZED);
+ URI uri = new URI(processed);
+
+ this.catalogUrl = uri.getScheme() + "://" + uri.getAuthority();
+
+ parseUriTemplate(uri);
+ }
+
+ private void parseUriTemplate(URI uriTemplate) throws URISyntaxException {
+ String path = uriTemplate.getPath();
+ String[] paths = path.split(OUTPUT_PATH_SEPARATOR);
+ if (paths.length != 4) {
+ throw new URISyntaxException(uriTemplate.toString(),
+ "URI path is not in expected format: database:table");
+ }
+
+ database = paths[1];
+ table = paths[2];
+ String partRaw = paths[3];
+
+ if (database == null || database.length() == 0) {
+ throw new URISyntaxException(uriTemplate.toString(), "DB name is missing");
+ }
+ if (table == null || table.length() == 0) {
+ throw new URISyntaxException(uriTemplate.toString(), "Table name is missing");
+ }
+ if (partRaw == null || partRaw.length() == 0) {
+ throw new URISyntaxException(uriTemplate.toString(), "Partition details are missing");
+ }
+
+ String rawPartition = partRaw.replaceAll(DOLLAR_EXPR_START_NORMALIZED, DOLLAR_EXPR_START_REGEX)
+ .replaceAll(EXPR_CLOSE_NORMALIZED, EXPR_CLOSE_REGEX);
+ partitions = new LinkedHashMap<String, String>();
+ String[] parts = rawPartition.split(PARTITION_SEPARATOR);
+ for (String part : parts) {
+ if (part == null || part.length() == 0) {
+ continue;
+ }
+
+ String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR);
+ if (keyVal.length != 2) {
+ throw new URISyntaxException(uriTemplate.toString(),
+ "Partition key value pair is not specified properly in (" + part + ")");
+ }
+
+ partitions.put(keyVal[0], keyVal[1]);
+ }
+ }
+
+ public String getCatalogUrl() {
+ return catalogUrl;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public Map<String, String> getPartitions() {
+ return partitions;
+ }
+
+ /**
+ * @param key partition key
+ * @return partition value
+ */
+ public String getPartitionValue(String key) {
+ return partitions.get(key);
+ }
+
+ /**
+ * @param key partition key
+ * @return if partitions map includes the key or not
+ */
+ public boolean hasPartition(String key) {
+ return partitions.containsKey(key);
+ }
+
+ public String getDatedPartitionKey() {
+ String datedPartitionKey = null;
+
+ for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
+
+ Matcher matcher = FeedDataPath.PATTERN.matcher(entry.getValue());
+ if (matcher.find()) {
+ datedPartitionKey = entry.getKey();
+ break;
+ }
+ }
+
+ return datedPartitionKey;
+ }
+
+ /**
+ * Convert the partition map to filter string.
+ * Each key value pair is separated by ';'.
+ *
+ * @return filter string
+ */
+ public String toPartitionFilter() {
+ StringBuilder filter = new StringBuilder();
+ filter.append("(");
+ for (Map.Entry<String, String> entry : partitions.entrySet()) {
+ if (filter.length() > 1) {
+ filter.append(PARTITION_SEPARATOR);
+ }
+ filter.append(entry.getKey());
+ filter.append(PARTITION_KEYVAL_SEPARATOR);
+ filter.append(PARTITION_VALUE_QUOTE);
+ filter.append(entry.getValue());
+ filter.append(PARTITION_VALUE_QUOTE);
+ }
+ filter.append(")");
+ return filter.toString();
+ }
+
+ /**
+ * Convert the partition map to path string.
+ * Each key value pair is separated by '/'.
+ *
+ * @return path string
+ */
+ public String toPartitionAsPath() {
+ StringBuilder partitionFilter = new StringBuilder();
+
+ for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
+ partitionFilter.append(entry.getKey())
+ .append(PARTITION_KEYVAL_SEPARATOR)
+ .append(entry.getValue())
+ .append(OUTPUT_PATH_SEPARATOR);
+ }
+
+ partitionFilter.setLength(partitionFilter.length() - 1);
+ return partitionFilter.toString();
+ }
+
+ @Override
+ public TYPE getType() {
+ return TYPE.TABLE;
+ }
+
+ /**
+ * LocationType does NOT matter here.
+ */
+ @Override
+ public String getUriTemplate() {
+ return getUriTemplate(LocationType.DATA);
+ }
+
+ /**
+ * LocationType does NOT matter here.
+ */
+ @Override
+ public String getUriTemplate(LocationType locationType) {
+ StringBuilder uriTemplate = new StringBuilder();
+ uriTemplate.append(catalogUrl);
+ uriTemplate.append(OUTPUT_PATH_SEPARATOR);
+ uriTemplate.append(database);
+ uriTemplate.append(OUTPUT_PATH_SEPARATOR);
+ uriTemplate.append(table);
+ uriTemplate.append(OUTPUT_PATH_SEPARATOR);
+ for (Map.Entry<String, String> entry : partitions.entrySet()) {
+ uriTemplate.append(entry.getKey());
+ uriTemplate.append(PARTITION_KEYVAL_SEPARATOR);
+ uriTemplate.append(entry.getValue());
+ uriTemplate.append(PARTITION_SEPARATOR);
+ }
+ uriTemplate.setLength(uriTemplate.length() - 1);
+
+ return uriTemplate.toString();
+ }
+
+ @Override
+ public boolean exists() throws FalconException {
+ return CatalogServiceFactory.getCatalogService().tableExists(catalogUrl, database, table);
+ }
+
+ @Override
+ public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
+ CatalogStorage catalogStorage = (CatalogStorage) toCompareAgainst;
+
+ return !(getCatalogUrl() != null && !getCatalogUrl().equals(catalogStorage.getCatalogUrl()))
+ && getDatabase().equals(catalogStorage.getDatabase())
+ && getTable().equals(catalogStorage.getTable())
+ && getPartitions().equals(catalogStorage.getPartitions());
+ }
+
+ @Override
+ public String toString() {
+ return "CatalogStorage{"
+ + "catalogUrl='" + catalogUrl + '\''
+ + ", database='" + database + '\''
+ + ", table='" + table + '\''
+ + ", partitions=" + partitions
+ + '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index fc4a467..ba80cac 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -253,12 +253,12 @@ public final class EntityUtil {
default:
}
+ final int freq = frequency.getFrequencyAsInt();
if (count > 2) {
- startCal.add(frequency.getTimeUnit().getCalendarUnit(),
- ((count - 2) / frequency.getFrequency()) * frequency.getFrequency());
+ startCal.add(frequency.getTimeUnit().getCalendarUnit(), ((count - 2) / freq) * freq);
}
while (startCal.getTime().before(now)) {
- startCal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequency());
+ startCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
}
return startCal.getTime();
}
@@ -288,15 +288,15 @@ public final class EntityUtil {
default:
}
+ final int freq = frequency.getFrequencyAsInt();
if (count > 2) {
- startCal.add(frequency.getTimeUnit().getCalendarUnit(),
- (count / frequency.getFrequency()) * frequency.getFrequency());
- count = (count / frequency.getFrequency());
+ startCal.add(frequency.getTimeUnit().getCalendarUnit(), (count / freq) * freq);
+ count = (count / freq);
} else {
count = 0;
}
while (startCal.getTime().before(instanceTime)) {
- startCal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequency());
+ startCal.add(frequency.getTimeUnit().getCalendarUnit(), freq);
count++;
}
return count + 1;
@@ -558,6 +558,7 @@ public final class EntityUtil {
.equalsIgnoreCase("true")) {
return null;
}
+
LateProcess lateProcess = new LateProcess();
lateProcess.setDelay(new Frequency(RuntimeProperties.get()
.getProperty("feed.late.frequency", "hours(3)")));
@@ -598,10 +599,7 @@ public final class EntityUtil {
}
public static boolean responsibleFor(String colo) {
- if (DeploymentUtil.isEmbeddedMode() || (!DeploymentUtil.isPrism()
- && colo.equals(DeploymentUtil.getCurrentColo()))) {
- return true;
- }
- return false;
+ return DeploymentUtil.isEmbeddedMode() || (!DeploymentUtil.isPrism()
+ && colo.equals(DeploymentUtil.getCurrentColo()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index c96120d..67257e3 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -20,11 +20,20 @@ package org.apache.falcon.entity;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.entity.v0.feed.*;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.expression.ExpressionHelper;
+import java.net.URISyntaxException;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -44,32 +53,164 @@ public final class FeedHelper {
return null;
}
- public static Location getLocation(Feed feed, LocationType type,
- String clusterName) {
- Cluster cluster = getCluster(feed, clusterName);
- if (cluster != null && cluster.getLocations() != null
- && cluster.getLocations().getLocations().size() != 0) {
- return getLocation(cluster.getLocations(), type);
- } else {
- return getLocation(feed.getLocations(), type);
+ public static Storage createStorage(Feed feed) throws FalconException {
+
+ final Locations feedLocations = feed.getLocations();
+ if (feedLocations != null
+ && feedLocations.getLocations().size() != 0) {
+ return new FileSystemStorage(feed);
+ }
+
+ try {
+ final CatalogTable table = feed.getTable();
+ if (table != null) {
+ return new CatalogStorage(feed);
+ }
+ } catch (URISyntaxException e) {
+ throw new FalconException(e);
}
+ throw new FalconException("Both catalog and locations are not defined.");
+ }
+
+ public static Storage createStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
+ Feed feed) throws FalconException {
+ return createStorage(getCluster(feed, clusterEntity.getName()), feed, clusterEntity);
+ }
+
+ public static Storage createStorage(String clusterName, Feed feed)
+ throws FalconException {
+
+ return createStorage(getCluster(feed, clusterName), feed);
+ }
+
+ public static Storage createStorage(Cluster cluster, Feed feed)
+ throws FalconException {
+
+ final org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
+ EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
+
+ return createStorage(cluster, feed, clusterEntity);
}
- public static Location getLocation(Feed feed, LocationType type) {
- return getLocation(feed.getLocations(), type);
+ public static Storage createStorage(Cluster cluster, Feed feed,
+ org.apache.falcon.entity.v0.cluster.Cluster clusterEntity)
+ throws FalconException {
+
+ final List<Location> locations = getLocations(cluster, feed);
+ if (locations != null) {
+ return new FileSystemStorage(ClusterHelper.getStorageUrl(clusterEntity), locations);
+ }
+
+ try {
+ final CatalogTable table = getTable(cluster, feed);
+ if (table != null) {
+ return new CatalogStorage(clusterEntity, table);
+ }
+ } catch (URISyntaxException e) {
+ throw new FalconException(e);
+ }
+
+ throw new FalconException("Both catalog and locations are not defined.");
}
- public static Location getLocation(Locations locations, LocationType type) {
- for (Location loc : locations.getLocations()) {
- if (loc.getType() == type) {
- return loc;
+ /**
+ * Factory method to dole out a storage instance used for replication source.
+ *
+ * @param clusterEntity cluster entity
+ * @param feed feed entity
+ * @return an implementation of Storage
+ * @throws FalconException
+ */
+ public static Storage createReadOnlyStorage(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
+ Feed feed) throws FalconException {
+ Cluster feedCluster = getCluster(feed, clusterEntity.getName());
+ final List<Location> locations = getLocations(feedCluster, feed);
+ if (locations != null) {
+ return new FileSystemStorage(ClusterHelper.getReadOnlyStorageUrl(clusterEntity), locations);
+ }
+
+ try {
+ final CatalogTable table = getTable(feedCluster, feed);
+ if (table != null) {
+ return new CatalogStorage(clusterEntity, table);
}
+ } catch (URISyntaxException e) {
+ throw new FalconException(e);
}
- Location loc = new Location();
- loc.setPath("/tmp");
- loc.setType(type);
- return loc;
+
+ throw new FalconException("Both catalog and locations are not defined.");
+ }
+
+ public static Storage createStorage(String type, String storageUriTemplate)
+ throws URISyntaxException {
+
+ Storage.TYPE storageType = Storage.TYPE.valueOf(type);
+ if (storageType == Storage.TYPE.FILESYSTEM) {
+ return new FileSystemStorage(storageUriTemplate);
+ } else if (storageType == Storage.TYPE.TABLE) {
+ return new CatalogStorage(storageUriTemplate);
+ }
+
+ throw new IllegalArgumentException("Bad type: " + type);
+ }
+
+ public static Storage.TYPE getStorageType(Feed feed) throws FalconException {
+ final Locations feedLocations = feed.getLocations();
+ if (feedLocations != null
+ && feedLocations.getLocations().size() != 0) {
+ return Storage.TYPE.FILESYSTEM;
+ }
+
+ final CatalogTable table = feed.getTable();
+ if (table != null) {
+ return Storage.TYPE.TABLE;
+ }
+
+ throw new FalconException("Both catalog and locations are not defined.");
+ }
+
+ public static Storage.TYPE getStorageType(Feed feed,
+ Cluster cluster) throws FalconException {
+ final List<Location> locations = getLocations(cluster, feed);
+ if (locations != null) {
+ return Storage.TYPE.FILESYSTEM;
+ }
+
+ final CatalogTable table = getTable(cluster, feed);
+ if (table != null) {
+ return Storage.TYPE.TABLE;
+ }
+
+ throw new FalconException("Both catalog and locations are not defined.");
+ }
+
+ public static Storage.TYPE getStorageType(Feed feed,
+ org.apache.falcon.entity.v0.cluster.Cluster clusterEntity)
+ throws FalconException {
+ Cluster feedCluster = getCluster(feed, clusterEntity.getName());
+ return getStorageType(feed, feedCluster);
+ }
+
+ protected static List<Location> getLocations(Cluster cluster, Feed feed) {
+ // check if locations are overridden in cluster
+ final Locations clusterLocations = cluster.getLocations();
+ if (clusterLocations != null
+ && clusterLocations.getLocations().size() != 0) {
+ return clusterLocations.getLocations();
+ }
+
+ final Locations feedLocations = feed.getLocations();
+ return feedLocations == null ? null : feedLocations.getLocations();
+ }
+
+ protected static CatalogTable getTable(Cluster cluster, Feed feed) {
+ // check if table is overridden in cluster
+ if (cluster.getTable() != null) {
+ return cluster.getTable();
+ }
+
+ return feed.getTable();
}
public static String normalizePartitionExpression(String part1, String part2) {
@@ -106,4 +247,14 @@ public final class FeedHelper {
expHelp.setPropertiesForVariable(properties);
return expHelp.evaluateFullExpression(exp, String.class);
}
+
+ public static String getStagingDir(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
+ Feed feed, CatalogStorage storage, Tag tag) {
+ String workflowName = EntityUtil.getWorkflowName(
+ tag, Arrays.asList(clusterEntity.getName()), feed).toString();
+ return ClusterHelper.getCompleteLocation(clusterEntity, "staging") + "/"
+ + workflowName + "/"
+ + storage.getDatabase() + "/"
+ + storage.getTable();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
new file mode 100644
index 0000000..68370c7
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.hadoop.fs.Path;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A file system implementation of a feed storage.
+ */
+public class FileSystemStorage implements Storage {
+
+ public static final String FEED_PATH_SEP = "#";
+ public static final String LOCATION_TYPE_SEP = "=";
+
+ public static final String FILE_SYSTEM_URL = "${nameNode}";
+
+ private final String storageUrl;
+ private final List<Location> locations;
+
+ protected FileSystemStorage(Feed feed) {
+ this(FILE_SYSTEM_URL, feed.getLocations());
+ }
+
+ protected FileSystemStorage(String storageUrl, Locations locations) {
+ this(storageUrl, locations.getLocations());
+ }
+
+ protected FileSystemStorage(String storageUrl, List<Location> locations) {
+ if (storageUrl == null || storageUrl.length() == 0) {
+ throw new IllegalArgumentException("FileSystem URL cannot be null or empty");
+ }
+
+ if (locations == null || locations.size() == 0) {
+ throw new IllegalArgumentException("FileSystem Locations cannot be null or empty");
+ }
+
+ this.storageUrl = storageUrl;
+ this.locations = locations;
+ }
+
+ /**
+ * Create an instance from the URI Template that was generated using
+ * the getUriTemplate() method.
+ *
+ * @param uriTemplate the uri template from org.apache.falcon.entity.FileSystemStorage#getUriTemplate
+ * @throws URISyntaxException
+ */
+ protected FileSystemStorage(String uriTemplate) throws URISyntaxException {
+ if (uriTemplate == null || uriTemplate.length() == 0) {
+ throw new IllegalArgumentException("URI template cannot be null or empty");
+ }
+
+ String rawStorageUrl = null;
+ List<Location> rawLocations = new ArrayList<Location>();
+ String[] feedLocs = uriTemplate.split(FEED_PATH_SEP);
+ for (String rawPath : feedLocs) {
+ String[] typeAndPath = rawPath.split(LOCATION_TYPE_SEP);
+ final String processed = typeAndPath[1].replaceAll(DOLLAR_EXPR_START_REGEX, DOLLAR_EXPR_START_NORMALIZED)
+ .replaceAll("}", EXPR_CLOSE_NORMALIZED);
+ URI uri = new URI(processed);
+ if (rawStorageUrl == null) {
+ rawStorageUrl = uri.getScheme() + "://" + uri.getAuthority();
+ }
+
+ String path = uri.getPath();
+ final String finalPath = path.replaceAll(DOLLAR_EXPR_START_NORMALIZED, DOLLAR_EXPR_START_REGEX)
+ .replaceAll(EXPR_CLOSE_NORMALIZED, EXPR_CLOSE_REGEX);
+
+ Location location = new Location();
+ location.setPath(finalPath);
+ location.setType(LocationType.valueOf(typeAndPath[0]));
+ rawLocations.add(location);
+ }
+
+ this.storageUrl = rawStorageUrl;
+ this.locations = rawLocations;
+ }
+
+ @Override
+ public TYPE getType() {
+ return TYPE.FILESYSTEM;
+ }
+
+ public String getStorageUrl() {
+ return storageUrl;
+ }
+
+ public List<Location> getLocations() {
+ return locations;
+ }
+
+ @Override
+ public String getUriTemplate() {
+ String feedPathMask = getUriTemplate(LocationType.DATA);
+ String metaPathMask = getUriTemplate(LocationType.META);
+ String statsPathMask = getUriTemplate(LocationType.STATS);
+ String tmpPathMask = getUriTemplate(LocationType.TMP);
+
+ StringBuilder feedBasePaths = new StringBuilder();
+ feedBasePaths.append(LocationType.DATA.name())
+ .append(LOCATION_TYPE_SEP)
+ .append(feedPathMask);
+
+ if (metaPathMask != null) {
+ feedBasePaths.append(FEED_PATH_SEP)
+ .append(LocationType.META.name())
+ .append(LOCATION_TYPE_SEP)
+ .append(metaPathMask);
+ }
+
+ if (statsPathMask != null) {
+ feedBasePaths.append(FEED_PATH_SEP)
+ .append(LocationType.STATS.name())
+ .append(LOCATION_TYPE_SEP)
+ .append(statsPathMask);
+ }
+
+ if (tmpPathMask != null) {
+ feedBasePaths.append(FEED_PATH_SEP)
+ .append(LocationType.TMP.name())
+ .append(LOCATION_TYPE_SEP)
+ .append(tmpPathMask);
+ }
+
+ return feedBasePaths.toString();
+ }
+
+ @Override
+ public String getUriTemplate(LocationType locationType) {
+ Location locationForType = null;
+ for (Location location : locations) {
+ if (location.getType() == locationType) {
+ locationForType = location;
+ break;
+ }
+ }
+
+ if (locationForType == null) {
+ return "/tmp";
+ }
+
+ // normalize the path so trailing and double '/' are removed
+ return storageUrl + new Path(locationForType.getPath());
+ }
+
+ @Override
+ public boolean exists() throws FalconException {
+ // Directories on FS will be created if they don't exist.
+ return true;
+ }
+
+ @Override
+ public boolean isIdentical(Storage toCompareAgainst) throws FalconException {
+ FileSystemStorage fsStorage = (FileSystemStorage) toCompareAgainst;
+ final List<Location> fsStorageLocations = fsStorage.getLocations();
+
+ return getLocations().size() == fsStorageLocations.size()
+ && getLocation(getLocations(), LocationType.DATA).getPath().equals(
+ getLocation(fsStorageLocations, LocationType.DATA).getPath())
+ && getLocation(getLocations(), LocationType.META).getPath().equals(
+ getLocation(fsStorageLocations, LocationType.META).getPath())
+ && getLocation(getLocations(), LocationType.STATS).getPath().equals(
+ getLocation(fsStorageLocations, LocationType.STATS).getPath())
+ && getLocation(getLocations(), LocationType.TMP).getPath().equals(
+ getLocation(fsStorageLocations, LocationType.TMP).getPath());
+ }
+
+ private static Location getLocation(List<Location> locations, LocationType type) {
+ for (Location loc : locations) {
+ if (loc.getType() == type) {
+ return loc;
+ }
+ }
+
+ Location loc = new Location();
+ loc.setPath("/tmp");
+ loc.setType(type);
+ return loc;
+ }
+
+ @Override
+ public String toString() {
+ return "FileSystemStorage{"
+ + "storageUrl='" + storageUrl + '\''
+ + ", locations=" + locations
+ + '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/entity/Storage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java
new file mode 100644
index 0000000..0634969
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/entity/Storage.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.feed.LocationType;
+
+/**
+ * A class to encapsulate the storage for a given feed which can either be
+ * expressed as a path on the file system or a table in a catalog.
+ */
+public interface Storage {
+
+ String DOLLAR_EXPR_START_REGEX = "\\$\\{";
+ String QUESTION_EXPR_START_REGEX = "\\?\\{";
+ String EXPR_CLOSE_REGEX = "\\}";
+
+ /**
+ * URI Friendly expression.
+ */
+ String DOLLAR_EXPR_START_NORMALIZED = "_D__START_";
+ String EXPR_CLOSE_NORMALIZED = "_CLOSE_";
+
+ /**
+ * Enumeration for the various storage types.
+ */
+ enum TYPE {FILESYSTEM, TABLE}
+
+ /**
+ * Return the type of storage.
+ *
+ * @return storage type
+ */
+ TYPE getType();
+
+ /**
+ * Return the uri template.
+ *
+ * @return uri template
+ */
+ String getUriTemplate();
+
+ /**
+ * Return the uri template for a given location type.
+ *
+ * @param locationType type of location, applies only to filesystem type
+ * @return uri template
+ */
+ String getUriTemplate(LocationType locationType);
+
+ /**
+ * Check if the storage, filesystem location or catalog table exists.
+ * Filesystem location always returns true.
+ *
+ * @return true if table exists else false
+ * @throws FalconException an exception
+ */
+ boolean exists() throws FalconException;
+
+ /**
+ * Check for equality of this instance against the one in question.
+ *
+ * @param toCompareAgainst instance to compare
+ * @return true if identical else false
+ * @throws FalconException an exception
+ */
+ boolean isIdentical(Storage toCompareAgainst) throws FalconException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index b1cf8f3..e633838 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -23,12 +23,14 @@ import java.io.IOException;
import javax.jms.ConnectionFactory;
import org.apache.falcon.FalconException;
+import org.apache.falcon.catalog.CatalogServiceFactory;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.store.StoreAccessException;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.cluster.Interface;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.hadoop.conf.Configuration;
@@ -56,6 +58,10 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
validateScheme(cluster, Interfacetype.WRITE);
validateScheme(cluster, Interfacetype.WORKFLOW);
validateScheme(cluster, Interfacetype.MESSAGING);
+ if (CatalogServiceFactory.isEnabled()
+ && ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY) != null) {
+ validateScheme(cluster, Interfacetype.REGISTRY);
+ }
if (!EntityUtil.responsibleFor(cluster.getColo())) {
return;
@@ -66,8 +72,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
validateExecuteInterface(cluster);
validateWorkflowInterface(cluster);
validateMessagingInterface(cluster);
-
- // Interfacetype.REGISTRY is not validated as its not used
+ validateRegistryInterface(cluster);
}
private void validateScheme(Cluster cluster, Interfacetype interfacetype)
@@ -150,4 +155,29 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
+ " for: " + implementation, e);
}
}
+
+ private void validateRegistryInterface(Cluster cluster) throws ValidationException {
+ final boolean isCatalogRegistryEnabled = CatalogServiceFactory.isEnabled();
+ if (!isCatalogRegistryEnabled) {
+ return; // ignore the registry interface for backwards compatibility
+ }
+
+ // continue validation only if a catalog service is provided
+ final Interface catalogInterface = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY);
+ if (catalogInterface == null) {
+ LOG.info("Catalog service is not enabled for cluster: " + cluster.getName());
+ return;
+ }
+
+ final String catalogUrl = catalogInterface.getEndpoint();
+ LOG.info("Validating catalog registry interface: " + catalogUrl);
+
+ try {
+ if (!CatalogServiceFactory.getCatalogService().isAlive(catalogUrl)) {
+ throw new ValidationException("Unable to reach Catalog server:" + catalogUrl);
+ }
+ } catch (FalconException e) {
+ throw new ValidationException("Invalid Catalog server or port: " + catalogUrl, e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 1c323fd..8d7903b 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -20,8 +20,10 @@ package org.apache.falcon.entity.parser;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityGraph;
@@ -71,6 +73,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
validateFeedCutOffPeriod(feed, cluster);
}
+ validateFeedStorage(feed);
validateFeedPartitionExpression(feed);
validateFeedGroups(feed);
@@ -105,21 +108,20 @@ public class FeedEntityParser extends EntityParser<Feed> {
return processes;
}
- private void validateFeedGroups(Feed feed) throws ValidationException {
+ private void validateFeedGroups(Feed feed) throws FalconException {
String[] groupNames = feed.getGroups() != null ? feed.getGroups().split(",") : new String[]{};
- String defaultPath = FeedHelper.getLocation(feed, LocationType.DATA)
- .getPath();
+ final Storage storage = FeedHelper.createStorage(feed);
+ String defaultPath = storage.getUriTemplate(LocationType.DATA);
for (Cluster cluster : feed.getClusters().getClusters()) {
- if (!FeedGroup.getDatePattern(
- FeedHelper.getLocation(feed, LocationType.DATA,
- cluster.getName()).getPath()).equals(
+ final String uriTemplate = FeedHelper.createStorage(cluster, feed).getUriTemplate(LocationType.DATA);
+ if (!FeedGroup.getDatePattern(uriTemplate).equals(
FeedGroup.getDatePattern(defaultPath))) {
throw new ValidationException("Feeds default path pattern: "
- + FeedHelper.getLocation(feed, LocationType.DATA).getPath()
+ + storage.getUriTemplate(LocationType.DATA)
+ ", does not match with cluster: "
+ cluster.getName()
+ " path pattern: "
- + FeedHelper.getLocation(feed, LocationType.DATA, cluster.getName()).getPath());
+ + uriTemplate);
}
}
for (String groupName : groupNames) {
@@ -127,7 +129,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
if (group != null && !group.canContainFeed(feed)) {
throw new ValidationException(
"Feed " + feed.getName() + "'s frequency: " + feed.getFrequency().toString()
- + ", path pattern: " + FeedHelper.getLocation(feed, LocationType.DATA).getPath()
+ + ", path pattern: " + storage
+ " does not match with group: " + group.getName() + "'s frequency: "
+ group.getFrequency()
+ ", date pattern: " + group.getDatePattern());
@@ -159,9 +161,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), newFeed, clusterName);
CrossEntityValidations.validateInstanceRange(process, input, newFeed);
- if (input.getPartition() != null) {
- CrossEntityValidations.validateInputPartition(input, newFeed);
- }
+ validateInputPartition(newFeed, input);
}
}
@@ -179,6 +179,19 @@ public class FeedEntityParser extends EntityParser<Feed> {
}
}
+ private void validateInputPartition(Feed newFeed, Input input) throws FalconException {
+ if (input.getPartition() == null) {
+ return;
+ }
+
+ final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(newFeed);
+ if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
+ CrossEntityValidations.validateInputPartition(input, newFeed);
+ } else if (baseFeedStorageType == Storage.TYPE.TABLE) {
+ throw new ValidationException("Input partitions are not supported for table storage: " + input.getName());
+ }
+ }
+
private void validateClusterValidity(Date start, Date end, String clusterName) throws FalconException {
try {
if (start.after(end)) {
@@ -280,4 +293,74 @@ public class FeedEntityParser extends EntityParser<Feed> {
"Alteast one of the partition tags has to be a cluster expression for cluster " + cl.getName());
}
}
+
+ /**
+ * Ensure table is already defined in the catalog registry.
+ * Does not matter for FileSystem storage.
+ */
+ private void validateFeedStorage(Feed feed) throws FalconException {
+ final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed);
+ validateMultipleSourcesExist(feed, baseFeedStorageType);
+ validateUniformStorageType(feed, baseFeedStorageType);
+ validatePartitions(feed, baseFeedStorageType);
+ validateStorageExists(feed);
+ }
+
+ private void validateMultipleSourcesExist(Feed feed, Storage.TYPE baseFeedStorageType) throws FalconException {
+ if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
+ return;
+ }
+
+ // validate that there is only one source cluster
+ int numberOfSourceClusters = 0;
+ for (Cluster cluster : feed.getClusters().getClusters()) {
+ if (cluster.getType() == ClusterType.SOURCE) {
+ numberOfSourceClusters++;
+ }
+ }
+
+ if (numberOfSourceClusters > 1) {
+ throw new ValidationException("Multiple sources are not supported for feed with table storage: "
+ + feed.getName());
+ }
+ }
+
+ private void validateUniformStorageType(Feed feed, Storage.TYPE feedStorageType) throws FalconException {
+ for (Cluster cluster : feed.getClusters().getClusters()) {
+ Storage.TYPE feedClusterStorageType = FeedHelper.getStorageType(feed, cluster);
+
+ if (feedStorageType != feedClusterStorageType) {
+ throw new ValidationException("The storage type is not uniform for cluster: " + cluster.getName());
+ }
+ }
+ }
+
+ private void validatePartitions(Feed feed, Storage.TYPE storageType) throws FalconException {
+ if (storageType == Storage.TYPE.TABLE && feed.getPartitions() != null) {
+ throw new ValidationException("Partitions are not supported for feeds with table storage. "
+ + "It should be defined as part of the table URI. "
+ + feed.getName());
+ }
+ }
+
+ private void validateStorageExists(Feed feed) throws FalconException {
+ StringBuilder buffer = new StringBuilder();
+ for (Cluster cluster : feed.getClusters().getClusters()) {
+ final Storage storage = FeedHelper.createStorage(cluster, feed);
+ if (!storage.exists()) {
+ // this is only true for table, filesystem always returns true
+ CatalogStorage catalogStorage = (CatalogStorage) storage;
+ buffer.append("Table [")
+ .append(catalogStorage.getTable())
+ .append("] does not exist for feed: ")
+ .append(feed.getName())
+ .append(", cluster: ")
+ .append(cluster.getName());
+ }
+ }
+
+ if (buffer.length() > 0) {
+ throw new ValidationException(buffer.toString());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index e4a9cf0..8647d43 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -29,6 +29,8 @@ import java.util.TimeZone;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -68,25 +70,24 @@ public class ProcessEntityParser extends EntityParser<Process> {
}
validateEntityExists(EntityType.CLUSTER, clusterName);
validateProcessValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd());
- validateHDFSpaths(process, clusterName);
+ validateHDFSPaths(process, clusterName);
if (process.getInputs() != null) {
for (Input input : process.getInputs().getInputs()) {
validateEntityExists(EntityType.FEED, input.getFeed());
- Feed feed = (Feed) ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
+ Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), feed, clusterName);
CrossEntityValidations.validateInstanceRange(process, input, feed);
- if (input.getPartition() != null) {
- CrossEntityValidations.validateInputPartition(input, feed);
- }
+ validateInputPartition(input, feed);
+ validateOptionalInputsForTableStorage(feed, input);
}
}
if (process.getOutputs() != null) {
for (Output output : process.getOutputs().getOutputs()) {
validateEntityExists(EntityType.FEED, output.getFeed());
- Feed feed = (Feed) ConfigurationStore.get().get(EntityType.FEED, output.getFeed());
+ Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed());
CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
CrossEntityValidations.validateInstance(process, output, feed);
}
@@ -96,7 +97,7 @@ public class ProcessEntityParser extends EntityParser<Process> {
validateLateInputs(process);
}
- private void validateHDFSpaths(Process process, String clusterName) throws FalconException {
+ private void validateHDFSPaths(Process process, String clusterName) throws FalconException {
org.apache.falcon.entity.v0.cluster.Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER,
clusterName);
@@ -130,8 +131,7 @@ public class ProcessEntityParser extends EntityParser<Process> {
}
private String getNameNode(Cluster cluster, String clusterName) throws ValidationException {
- // cluster should never be null as it is validated while submitting
- // feeds.
+ // cluster should never be null as it is validated while submitting feeds.
if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme() == null) {
throw new ValidationException(
"Cannot get valid nameNode scheme from write interface of cluster: " + clusterName);
@@ -152,6 +152,19 @@ public class ProcessEntityParser extends EntityParser<Process> {
}
}
+ private void validateInputPartition(Input input, Feed feed) throws FalconException {
+ if (input.getPartition() == null) {
+ return;
+ }
+
+ final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed);
+ if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
+ CrossEntityValidations.validateInputPartition(input, feed);
+ } else if (baseFeedStorageType == Storage.TYPE.TABLE) {
+ throw new ValidationException("Input partitions are not supported for table storage: " + input.getName());
+ }
+ }
+
private void validateDatasetName(Inputs inputs, Outputs outputs) throws ValidationException {
Set<String> datasetNames = new HashSet<String>();
if (inputs != null) {
@@ -172,6 +185,10 @@ public class ProcessEntityParser extends EntityParser<Process> {
}
private void validateLateInputs(Process process) throws ValidationException {
+ if (process.getLateProcess() == null) {
+ return;
+ }
+
Map<String, String> feeds = new HashMap<String, String>();
if (process.getInputs() != null) {
for (Input in : process.getInputs().getInputs()) {
@@ -179,21 +196,27 @@ public class ProcessEntityParser extends EntityParser<Process> {
}
}
- if (process.getLateProcess() != null) {
- for (LateInput lp : process.getLateProcess().getLateInputs()) {
- if (!feeds.keySet().contains(lp.getInput())) {
- throw new ValidationException("Late Input: " + lp.getInput() + " is not specified in the inputs");
- }
- try {
- Feed feed = (Feed) ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput()));
- if (feed.getLateArrival() == null) {
- throw new ValidationException(
- "Late Input feed: " + lp.getInput() + " is not configured with late arrival cut-off");
- }
- } catch (FalconException e) {
- throw new ValidationException(e);
+ for (LateInput lp : process.getLateProcess().getLateInputs()) {
+ if (!feeds.keySet().contains(lp.getInput())) {
+ throw new ValidationException("Late Input: " + lp.getInput() + " is not specified in the inputs");
+ }
+
+ try {
+ Feed feed = ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput()));
+ if (feed.getLateArrival() == null) {
+ throw new ValidationException(
+ "Late Input feed: " + lp.getInput() + " is not configured with late arrival cut-off");
}
+ } catch (FalconException e) {
+ throw new ValidationException(e);
}
}
}
+
+ private void validateOptionalInputsForTableStorage(Feed feed, Input input) throws FalconException {
+ if (input.isOptional() && FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
+ throw new ValidationException("Optional Input is not supported for feeds with table storage! "
+ + input.getName());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/group/FeedGroup.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroup.java b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
index 5dca46f..d288925 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroup.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
@@ -17,6 +17,7 @@
*/
package org.apache.falcon.group;
+import org.apache.falcon.FalconException;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.common.FeedDataPath;
import org.apache.falcon.entity.v0.Frequency;
@@ -93,8 +94,9 @@ public class FeedGroup {
return datePattern;
}
- public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) {
+ public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException {
return this.frequency.equals(feed.getFrequency())
- && this.datePattern.equals(getDatePattern(FeedHelper.getLocation(feed, LocationType.DATA).getPath()));
+ && this.datePattern.equals(getDatePattern(
+ FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
index f0d2e0b..7fbb61a 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
@@ -114,8 +114,8 @@ public final class FeedGroupMap implements ConfigurationChangeListener {
return groupSet;
}
- public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed) {
+ public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException {
return getGroups(feed.getGroups(), feed.getFrequency(),
- FeedHelper.getLocation(feed, LocationType.DATA).getPath());
+ FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index a9d39de..fc69933 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -21,10 +21,10 @@ package org.apache.falcon.update;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Partition;
import org.apache.falcon.entity.v0.feed.Partitions;
import org.apache.falcon.entity.v0.process.Cluster;
@@ -78,18 +78,15 @@ public final class UpdateHelper {
}
}
- public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedProcess) {
- if (!FeedHelper.getLocation(oldFeed.getLocations(), LocationType.DATA)
- .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.DATA).getPath())
- || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.META)
- .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.META).getPath())
- || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.STATS)
- .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.STATS).getPath())
- || !FeedHelper.getLocation(oldFeed.getLocations(), LocationType.TMP)
- .getPath().equals(FeedHelper.getLocation(newFeed.getLocations(), LocationType.TMP).getPath())) {
+ public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedProcess)
+ throws FalconException {
+ Storage oldFeedStorage = FeedHelper.createStorage(oldFeed);
+ Storage newFeedStorage = FeedHelper.createStorage(newFeed);
+
+ if (!oldFeedStorage.isIdentical(newFeedStorage)) {
return true;
}
- LOG.debug(oldFeed.toShortString() + ": Location identical. Ignoring...");
+ LOG.debug(oldFeed.toShortString() + ": Storage identical. Ignoring...");
if (!oldFeed.getFrequency().equals(newFeed.getFrequency())) {
return true;
@@ -128,17 +125,12 @@ public final class UpdateHelper {
}
for (Cluster cluster : affectedProcess.getClusters().getClusters()) {
- if (!FeedHelper
- .getCluster(oldFeed, cluster.getName()).getValidity().getStart()
+ oldFeedStorage = FeedHelper.createStorage(cluster.getName(), oldFeed);
+ newFeedStorage = FeedHelper.createStorage(cluster.getName(), newFeed);
+
+ if (!FeedHelper.getCluster(oldFeed, cluster.getName()).getValidity().getStart()
.equals(FeedHelper.getCluster(newFeed, cluster.getName()).getValidity().getStart())
- || !FeedHelper.getLocation(oldFeed, LocationType.DATA, cluster.getName()).getPath()
- .equals(FeedHelper.getLocation(newFeed, LocationType.DATA, cluster.getName()).getPath())
- || !FeedHelper.getLocation(oldFeed, LocationType.META, cluster.getName()).getPath()
- .equals(FeedHelper.getLocation(newFeed, LocationType.META, cluster.getName()).getPath())
- || !FeedHelper.getLocation(oldFeed, LocationType.STATS, cluster.getName()).getPath()
- .equals(FeedHelper.getLocation(newFeed, LocationType.STATS, cluster.getName()).getPath())
- || !FeedHelper.getLocation(oldFeed, LocationType.TMP, cluster.getName()).getPath()
- .equals(FeedHelper.getLocation(newFeed, LocationType.TMP, cluster.getName()).getPath())) {
+ || !oldFeedStorage.isIdentical(newFeedStorage)) {
return true;
}
LOG.debug(oldFeed.toShortString() + ": Feed on cluster" + cluster.getName() + " identical. Ignoring...");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 17695d2..b86a715 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -81,7 +81,7 @@ public abstract class AbstractWorkflowEngine {
public abstract String getWorkflowStatus(String cluster, String jobId) throws FalconException;
- public abstract String getWorkflowProperty(String cluster, String jobId, String property) throws FalconException;
+ public abstract Properties getWorkflowProperties(String cluster, String jobId) throws FalconException;
public abstract InstancesResult getJobDetails(String cluster, String jobId) throws FalconException;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/log4j.xml b/common/src/main/resources/log4j.xml
index 959e26c..734d17c 100644
--- a/common/src/main/resources/log4j.xml
+++ b/common/src/main/resources/log4j.xml
@@ -28,7 +28,7 @@
</appender>
<appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="${user.dir}/logs/application.log"/>
+ <param name="File" value="${user.dir}/target/logs/application.log"/>
<param name="Append" value="true"/>
<param name="Threshold" value="debug"/>
<layout class="org.apache.log4j.PatternLayout">
@@ -37,7 +37,7 @@
</appender>
<appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="${user.dir}/logs/audit.log"/>
+ <param name="File" value="${user.dir}/target/logs/audit.log"/>
<param name="Append" value="true"/>
<param name="Threshold" value="debug"/>
<layout class="org.apache.log4j.PatternLayout">
@@ -46,7 +46,7 @@
</appender>
<appender name="TRANSACTIONLOG" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="${user.dir}/logs/tranlog.log"/>
+ <param name="File" value="${user.dir}/target/logs/tranlog.log"/>
<param name="Append" value="true"/>
<param name="Threshold" value="debug"/>
<layout class="org.apache.log4j.PatternLayout">
@@ -55,7 +55,7 @@
</appender>
<appender name="METRIC" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="${user.dir}/logs/metric.log"/>
+ <param name="File" value="${user.dir}/target/logs/metric.log"/>
<param name="Append" value="true"/>
<param name="Threshold" value="debug"/>
<layout class="org.apache.log4j.PatternLayout">
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index fea2a31..5473f5d 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -27,6 +27,8 @@
*.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager
*.ConfigSyncService.impl=org.apache.falcon.resource.ConfigSyncService
*.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager
+*.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService
+
*.application.services=org.apache.falcon.entity.store.ConfigurationStore,\
org.apache.falcon.service.ProcessSubscriberService,\
org.apache.falcon.rerun.service.RetryService,\
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
index fa21a90..10a9cc0 100644
--- a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
@@ -17,14 +17,17 @@
*/
package org.apache.falcon.cleanup;
-import java.io.IOException;
-
import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -33,6 +36,9 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import java.io.IOException;
+import java.io.InputStream;
+
/**
* Test for log cleanup service.
*/
@@ -41,6 +47,11 @@ public class LogCleanupServiceTest extends AbstractTestBase {
private FileSystem fs;
private FileSystem tfs;
private EmbeddedCluster targetDfsCluster;
+ private Path sourceStagingPath1;
+ private Path sourceStagingPath2;
+ private Path targetStagingPath1;
+ private Path targetStagingPath2;
+
private final Path instanceLogPath = new Path("/projects/falcon/staging/falcon/workflows/process/"
+ "sample" + "/logs/job-2010-01-01-01-00/000");
private final Path instanceLogPath1 = new Path("/projects/falcon/staging/falcon/workflows/process/"
@@ -107,7 +118,43 @@ public class LogCleanupServiceTest extends AbstractTestBase {
fs.createNewFile(new Path(feedInstanceLogPath, "oozie.log"));
tfs.createNewFile(new Path(feedInstanceLogPath, "oozie.log"));
+ // table feed staging dir setup
+ initializeStagingDirs();
+ createStageData(sourceStagingPath1, targetStagingPath1);
+
Thread.sleep(61000);
+
+ createStageData(sourceStagingPath2, targetStagingPath2);
+ }
+
+ private void initializeStagingDirs() throws Exception {
+ final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
+ Feed tableFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(inputStream);
+ getStore().publish(EntityType.FEED, tableFeed);
+
+ final Cluster srcCluster = dfsCluster.getCluster();
+ final CatalogStorage sourceStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
+ String sourceStagingDir = FeedHelper.getStagingDir(srcCluster, tableFeed, sourceStorage, Tag.REPLICATION);
+
+ sourceStagingPath1 = new Path(sourceStagingDir + "/ds=2012092400/" + System.currentTimeMillis());
+ sourceStagingPath2 = new Path(sourceStagingDir + "/ds=2012092500/" + System.currentTimeMillis());
+
+ final Cluster targetCluster = targetDfsCluster.getCluster();
+ final CatalogStorage targetStorage = (CatalogStorage) FeedHelper.createStorage(targetCluster, tableFeed);
+ String targetStagingDir = FeedHelper.getStagingDir(targetCluster, tableFeed, targetStorage, Tag.REPLICATION);
+
+ targetStagingPath1 = new Path(targetStagingDir + "/ds=2012092400/" + System.currentTimeMillis());
+ targetStagingPath2 = new Path(targetStagingDir + "/ds=2012092500/" + System.currentTimeMillis());
+ }
+
+ private void createStageData(Path sourcePath, Path targetPath) throws Exception {
+ fs.mkdirs(sourcePath);
+ fs.createNewFile(new Path(sourcePath, "_metadata.xml"));
+ fs.createNewFile(new Path(sourcePath, "data.txt"));
+
+ tfs.mkdirs(targetPath);
+ tfs.createNewFile(new Path(targetPath, "_metadata.xml"));
+ tfs.createNewFile(new Path(targetPath, "data.txt"));
}
@Test
@@ -120,7 +167,6 @@ public class LogCleanupServiceTest extends AbstractTestBase {
Assert.assertFalse(fs.exists(instanceLogPath1));
Assert.assertFalse(fs.exists(instanceLogPath2));
Assert.assertTrue(fs.exists(instanceLogPath3));
-
}
@Test
@@ -134,5 +180,18 @@ public class LogCleanupServiceTest extends AbstractTestBase {
Assert.assertTrue(fs.exists(feedInstanceLogPath1));
Assert.assertTrue(tfs.exists(feedInstanceLogPath1));
+ // source table replication staging dirs
+ Assert.assertFalse(fs.exists(new Path(sourceStagingPath1, "_metadata.xml")));
+ Assert.assertFalse(fs.exists(new Path(sourceStagingPath1, "data.txt")));
+
+ Assert.assertTrue(fs.exists(new Path(sourceStagingPath2, "_metadata.xml")));
+ Assert.assertTrue(fs.exists(new Path(sourceStagingPath2, "data.txt")));
+
+ // target table replication staging dirs
+ Assert.assertFalse(tfs.exists(new Path(targetStagingPath1, "_metadata.xml")));
+ Assert.assertFalse(tfs.exists(new Path(targetStagingPath1, "data.txt")));
+
+ Assert.assertTrue(tfs.exists(new Path(targetStagingPath2, "_metadata.xml")));
+ Assert.assertTrue(tfs.exists(new Path(targetStagingPath2, "data.txt")));
}
}