You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by su...@apache.org on 2015/04/02 13:25:20 UTC
[2/2] falcon git commit: FALCON-1091 Monitoring plugin that registers
catalog partition - code. Contributed by Suhas Vasu / PallaviRao / Shwetha GS
FALCON-1091 Monitoring plugin that registers catalog partition - code. Contributed by Suhas Vasu / PallaviRao / Shwetha GS
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4b0a920f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4b0a920f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4b0a920f
Branch: refs/heads/master
Commit: 4b0a920f6b6336e2bf4926adc8dc329f88f556e2
Parents: 13bc6b6
Author: Suhas V <su...@inmobi.com>
Authored: Thu Apr 2 16:54:40 2015 +0530
Committer: Suhas V <su...@inmobi.com>
Committed: Thu Apr 2 16:54:40 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../falcon/catalog/AbstractCatalogService.java | 42 +++
.../falcon/catalog/CatalogPartitionHandler.java | 298 +++++++++++++++++++
.../falcon/catalog/CatalogServiceFactory.java | 10 +-
.../falcon/catalog/HiveCatalogService.java | 107 ++++++-
.../apache/falcon/entity/CatalogStorage.java | 87 ++----
.../org/apache/falcon/entity/FeedHelper.java | 135 ++++-----
.../apache/falcon/entity/FileSystemStorage.java | 29 +-
.../falcon/entity/common/FeedDataPath.java | 53 ++--
.../falcon/expression/ExpressionHelper.java | 11 +-
.../apache/falcon/util/FalconRadixUtils.java | 16 +-
.../workflow/WorkflowExecutionContext.java | 2 +-
common/src/main/resources/startup.properties | 13 +
.../apache/falcon/entity/FeedDataPathTest.java | 10 +-
.../apache/falcon/entity/FeedHelperTest.java | 54 ++++
.../falcon/entity/FileSystemStorageTest.java | 3 +-
docs/src/site/twiki/InstallationSteps.twiki | 21 ++
.../mapred/ClassicClientProtocolProvider.java | 21 +-
.../org/apache/falcon/logging/LogProvider.java | 3 +-
.../ProcessExecutionCoordinatorBuilder.java | 2 +-
.../workflow/engine/OozieWorkflowEngine.java | 2 +-
.../OozieProcessWorkflowBuilderTest.java | 2 +-
prism/pom.xml | 6 +-
.../falcon/retention/FeedEvictorTest.java | 5 +-
.../src/main/resources/mapred-site.xml | 4 +
.../src/main/resources/yarn-site.xml | 5 -
.../catalog/CatalogPartitionHandlerIT.java | 79 +++++
.../falcon/catalog/HiveCatalogServiceIT.java | 61 +++-
.../lifecycle/TableStorageFeedEvictorIT.java | 16 +-
.../org/apache/falcon/util/HiveTestUtils.java | 9 +
.../org/apache/falcon/util/OozieTestUtils.java | 24 +-
webapp/src/test/resources/cluster-template.xml | 2 +-
webapp/src/test/resources/feed-template1.xml | 3 +-
webapp/src/test/resources/feed-template2.xml | 7 +-
34 files changed, 901 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5c52cc3..399e401 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,9 @@ Trunk (Unreleased)
INCOMPATIBLE CHANGES
NEW FEATURES
+ FALCON-1091 Monitoring plugin that registers catalog partition - code
+ (Suhas Vasu / PallaviRao / Shwetha GS via Suhas Vasu)
+
FALCON-790 Falcon UI to enable entity/process/feed edits and
management. (Armando Reyna/Kenneth Ho via Srikanth Sundarrajan)
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
index 9abdc93..41d50df 100644
--- a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
@@ -65,6 +65,10 @@ public abstract class AbstractCatalogService {
public abstract boolean isTableExternal(Configuration conf, String catalogUrl, String database,
String tableName) throws FalconException;
+ public abstract List<CatalogPartition> listPartitions(Configuration conf, String catalogUrl,
+ String database, String tableName,
+ List<String> values) throws FalconException;
+
/**
* List partitions by filter. Executed in the workflow engine.
*
@@ -132,4 +136,42 @@ public abstract class AbstractCatalogService {
String database, String tableName,
List<String> partitionValues)
throws FalconException;
+
+ /**
+ * Gets the partition columns for the table in catalog service.
+ * @param conf
+ * @param catalogUrl url for the catalog service
+ * @param database
+ * @param tableName
+ * @return ordered list of partition columns for the table
+ * @throws FalconException
+ */
+ public abstract List<String> getPartitionColumns(Configuration conf, String catalogUrl, String database,
+ String tableName) throws FalconException;
+
+ /**
+ * Adds the partition to the table.
+ * @param conf
+ * @param catalogUrl
+ * @param database
+ * @param tableName
+ * @param values
+ * @param location
+ * @throws FalconException
+ */
+ public abstract void addPartition(Configuration conf, String catalogUrl, String database,
+ String tableName, List<String> values, String location) throws FalconException;
+
+ /**
+ * Updates an existing partition in the table.
+ * @param conf
+ * @param catalogUrl
+ * @param database
+ * @param tableName
+ * @param partValues
+ * @param location
+ * @throws FalconException
+ */
+ public abstract void updatePartition(Configuration conf, String catalogUrl, String database, String tableName,
+ List<String> partValues, String location) throws FalconException;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
new file mode 100644
index 0000000..f8a3d46
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/catalog/CatalogPartitionHandler.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.catalog;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowExecutionListener;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Date;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collection;
+import java.util.Arrays;
+import java.util.TimeZone;
+import java.util.Properties;
+
+/**
+ * Listens to workflow execution completion events.
+ * It syncs HCat partitions based on the feeds created/evicted/replicated.
+ */
+public class CatalogPartitionHandler implements WorkflowExecutionListener{
+ private static final Logger LOG = LoggerFactory.getLogger(CatalogPartitionHandler.class);
+
+ public static final ConfigurationStore STORE = ConfigurationStore.get();
+ public static final String CATALOG_TABLE = "catalog.table";
+ private ExpressionHelper evaluator = ExpressionHelper.get();
+ private static CatalogPartitionHandler catalogInstance = new CatalogPartitionHandler();
+ private static final boolean IS_CATALOG_ENABLED = CatalogServiceFactory.isEnabled();
+ public static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+
+ private static final PathFilter PATH_FILTER = new PathFilter() {
+ @Override public boolean accept(Path path) {
+ try {
+ FileSystem fs = path.getFileSystem(new Configuration());
+ return !path.getName().startsWith("_") && !path.getName().startsWith(".") && !fs.isFile(path);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ public static final CatalogPartitionHandler get() {
+ return catalogInstance;
+ }
+
+ @Override
+ public void onSuccess(WorkflowExecutionContext context) throws FalconException {
+ if (!IS_CATALOG_ENABLED) {
+ //Skip if catalog service is not enabled
+ return;
+ }
+
+ String[] feedNames = context.getOutputFeedNamesList();
+ String[] feedPaths = context.getOutputFeedInstancePathsList();
+ Cluster cluster = STORE.get(EntityType.CLUSTER, context.getClusterName());
+ Configuration clusterConf = ClusterHelper.getConfiguration(cluster);
+
+ if (StringUtils.isEmpty(ClusterHelper.getRegistryEndPoint(cluster))) {
+ //Skip if registry endpoint is not defined for the cluster
+ LOG.info("Catalog endpoint not defined for cluster {}. Skipping partition registration", cluster.getName());
+ return;
+ }
+
+ for (int index = 0; index < feedNames.length; index++) {
+ LOG.info("Partition handling for feed {} for path {}", feedNames[index], feedPaths[index]);
+ Feed feed = STORE.get(EntityType.FEED, feedNames[index]);
+
+ Storage storage = FeedHelper.createStorage(cluster, feed);
+ if (storage.getType() == Storage.TYPE.TABLE) {
+ //Do nothing if the feed is already table based
+ LOG.info("Feed {} is already table based. Skipping partition registration", feed.getName());
+ continue;
+ }
+
+ CatalogStorage catalogStorage = getCatalogStorageFromFeedProperties(feed, cluster, clusterConf);
+ if (catalogStorage == null) {
+ //There is no catalog defined in the feed properties. So, skip partition registration
+ LOG.info("Feed {} doesn't have table defined in its properties/table doesn't exist. "
+ + "Skipping partition registration", feed.getName());
+ continue;
+ }
+
+ //Generate static partition values - get the date from feed path and evaluate partitions in catalog spec
+ Path feedPath = new Path(new Path(feedPaths[index]).toUri().getPath());
+
+ String templatePath = new Path(storage.getUriTemplate(LocationType.DATA)).toUri().getPath();
+ LOG.debug("Template {} catalogInstance path {}", templatePath, feedPath);
+ Date date = FeedHelper.getDate(templatePath, feedPath, UTC);
+ if (date == null) {
+ LOG.info("Feed {} catalogInstance path {} doesn't match the template {}. "
+ + "Skipping partition registration",
+ feed.getName(), feedPath, templatePath);
+ continue;
+ }
+
+ LOG.debug("Reference date from path {} is {}", feedPath, SchemaHelper.formatDateUTC(date));
+ ExpressionHelper.setReferenceDate(date);
+ List<String> partitionValues = new ArrayList<String>();
+ for (Map.Entry<String, String> entry : catalogStorage.getPartitions().entrySet()) {
+ LOG.debug("Evaluating partition {}", entry.getValue());
+ partitionValues.add(evaluator.evaluateFullExpression(entry.getValue(), String.class));
+ }
+
+ LOG.debug("Static partition - {}", partitionValues);
+ WorkflowExecutionContext.EntityOperations operation = context.getOperation();
+ switch (operation) {
+ case DELETE:
+ dropPartitions(clusterConf, catalogStorage, partitionValues);
+ break;
+
+ case GENERATE:
+ case REPLICATE:
+ registerPartitions(clusterConf, catalogStorage, feedPath, partitionValues);
+ break;
+
+ default:
+ throw new FalconException("Unhandled operation " + operation);
+ }
+ }
+ }
+
+ //Register additional partitions. Compare the expected partitions and the existing partitions
+ //1.exist (intersection) expected --> partition already exists, so update partition
+ //2.exist - expected --> partition is not required anymore, so drop partition
+ //3.expected - exist --> partition doesn't exist, so add partition
+ private void registerPartitions(Configuration conf, CatalogStorage storage, Path staticPath,
+ List<String> staticPartition) throws FalconException {
+ try {
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+ if (!fs.exists(staticPath)) {
+ //Do nothing if the output path doesn't exist
+ return;
+ }
+
+ List<String> partitionColumns = getPartitionColumns(conf, storage);
+ int dynamicPartCols = partitionColumns.size() - staticPartition.size();
+ Path searchPath = staticPath;
+ if (dynamicPartCols > 0) {
+ searchPath = new Path(staticPath, StringUtils.repeat("*", "/", dynamicPartCols));
+ }
+
+ //Figure out the dynamic partitions from the directories on hdfs
+ FileStatus[] files = fs.globStatus(searchPath, PATH_FILTER);
+ Map<List<String>, String> partitions = new HashMap<List<String>, String>();
+ for (FileStatus file : files) {
+ List<String> dynamicParts = getDynamicPartitions(file.getPath(), staticPath);
+ List<String> partitionValues = new ArrayList<String>(staticPartition);
+ partitionValues.addAll(dynamicParts);
+ LOG.debug("Final partition - " + partitionValues);
+ partitions.put(partitionValues, file.getPath().toString());
+ }
+
+ List<List<String>> existPartitions = listPartitions(conf, storage, staticPartition);
+ Collection<List<String>> targetPartitions = partitions.keySet();
+
+ Collection<List<String>> partitionsForDrop = CollectionUtils.subtract(existPartitions, targetPartitions);
+ Collection<List<String>> partitionsForAdd = CollectionUtils.subtract(targetPartitions, existPartitions);
+ Collection<List<String>> partitionsForUpdate =
+ CollectionUtils.intersection(existPartitions, targetPartitions);
+
+ for (List<String> partition : partitionsForDrop) {
+ dropPartitions(conf, storage, partition);
+ }
+
+ for (List<String> partition : partitionsForAdd) {
+ addPartition(conf, storage, partition, partitions.get(partition));
+ }
+
+ for (List<String> partition : partitionsForUpdate) {
+ updatePartition(conf, storage, partition, partitions.get(partition));
+ }
+ } catch(IOException e) {
+ throw new FalconException(e);
+ }
+ }
+
+ private void updatePartition(Configuration conf, CatalogStorage storage, List<String> partition, String location)
+ throws FalconException {
+ AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
+ catalogService.updatePartition(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(),
+ partition, location);
+ }
+
+ private void addPartition(Configuration conf, CatalogStorage storage, List<String> partition, String location)
+ throws FalconException {
+ AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
+ catalogService.addPartition(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), partition,
+ location);
+ }
+
+ private List<List<String>> listPartitions(Configuration conf, CatalogStorage storage, List<String> staticPartitions)
+ throws FalconException {
+ AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
+ List<CatalogPartition> partitions = catalogService.listPartitions(conf, storage.getCatalogUrl(),
+ storage.getDatabase(), storage.getTable(), staticPartitions);
+ List<List<String>> existPartitions = new ArrayList<List<String>>();
+ for (CatalogPartition partition : partitions) {
+ existPartitions.add(partition.getValues());
+ }
+ return existPartitions;
+ }
+
+ //Returns the dynamic partitions of the data path
+ protected List<String> getDynamicPartitions(Path path, Path staticPath) {
+ String dynPart = path.toUri().getPath().substring(staticPath.toString().length());
+ dynPart = StringUtils.removeStart(dynPart, "/");
+ dynPart = StringUtils.removeEnd(dynPart, "/");
+ if (StringUtils.isEmpty(dynPart)) {
+ return new ArrayList<String>();
+ }
+ return Arrays.asList(dynPart.split("/"));
+ }
+
+ private List<String> getPartitionColumns(Configuration conf, CatalogStorage storage) throws FalconException {
+ AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
+ return catalogService.getPartitionColumns(conf, storage.getCatalogUrl(), storage.getDatabase(),
+ storage.getTable());
+ }
+
+ private void dropPartitions(Configuration conf, CatalogStorage storage, List<String> values)
+ throws FalconException {
+ AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
+ catalogService.dropPartitions(conf, storage.getCatalogUrl(), storage.getDatabase(),
+ storage.getTable(), values, false);
+ }
+
+ //Get the catalog template from feed properties as feed is filesystem based
+ protected CatalogStorage getCatalogStorageFromFeedProperties(Feed feed, Cluster cluster, Configuration conf)
+ throws FalconException {
+ Properties properties = FeedHelper.getFeedProperties(feed);
+ String tableUri = properties.getProperty(CATALOG_TABLE);
+ if (tableUri == null) {
+ return null;
+ }
+
+ CatalogTable table = new CatalogTable();
+ table.setUri(tableUri.replace("{", "${"));
+ CatalogStorage storage = null;
+ try {
+ storage = new CatalogStorage(cluster, table);
+ } catch (URISyntaxException e) {
+ throw new FalconException(e);
+ }
+
+ AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
+ if (!catalogService.tableExists(conf, storage.getCatalogUrl(), storage.getDatabase(), storage.getTable())) {
+ return null;
+ }
+ return storage;
+ }
+
+ @Override
+ public void onFailure(WorkflowExecutionContext context) throws FalconException {
+ //no-op
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java b/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java
index c8a0fa0..77e6851 100644
--- a/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java
+++ b/common/src/main/java/org/apache/falcon/catalog/CatalogServiceFactory.java
@@ -22,12 +22,16 @@ import org.apache.falcon.FalconException;
import org.apache.falcon.util.ReflectionUtils;
import org.apache.falcon.util.StartupProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Factory for providing appropriate catalog service
* implementation to the falcon service.
*/
@SuppressWarnings("unchecked")
public final class CatalogServiceFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(CatalogServiceFactory.class);
public static final String CATALOG_SERVICE = "catalog.service.impl";
@@ -35,7 +39,11 @@ public final class CatalogServiceFactory {
}
public static boolean isEnabled() {
- return StartupProperties.get().containsKey(CATALOG_SERVICE);
+ boolean isEnabled = StartupProperties.get().containsKey(CATALOG_SERVICE);
+ if (!isEnabled) {
+ LOG.info("Catalog service disabled. Partitions will not registered");
+ }
+ return isEnabled;
}
public static AbstractCatalogService getCatalogService() throws FalconException {
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
index 25a4a46..3d57631 100644
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -21,14 +21,11 @@ package org.apache.falcon.catalog;
import org.apache.falcon.FalconException;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.security.SecurityUtil;
-import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.Credentials;
@@ -44,6 +41,7 @@ import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
/**
@@ -53,6 +51,8 @@ import java.util.List;
public class HiveCatalogService extends AbstractCatalogService {
private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogService.class);
+ public static final String CREATE_TIME = "falcon.create_time";
+ public static final String UPDATE_TIME = "falcon.update_time";
public static HiveConf createHiveConf(Configuration conf,
@@ -97,8 +97,6 @@ public class HiveCatalogService extends AbstractCatalogService {
ugi.addCredentials(credentials); // credentials cannot be null
}
- OozieActionConfigurationHelper.dumpConf(hcatConf, "hive conf ");
-
return new HiveMetaStoreClient(hcatConf);
} catch (Exception e) {
throw new FalconException("Exception creating HiveMetaStoreClient: " + e.getMessage(), e);
@@ -176,7 +174,7 @@ public class HiveCatalogService extends AbstractCatalogService {
String metaStoreServicePrincipal)
throws IOException {
- LOG.info("Creating delegation tokens for principal={}", metaStoreServicePrincipal);
+ LOG.debug("Creating delegation tokens for principal={}", metaStoreServicePrincipal);
HCatClient hcatClient = HCatClient.create(hcatConf);
String delegationToken = hcatClient.getDelegationToken(
CurrentUser.getUser(), metaStoreServicePrincipal);
@@ -211,6 +209,8 @@ public class HiveCatalogService extends AbstractCatalogService {
HiveMetaStoreClient client = createProxiedClient(conf, catalogUrl);
Table table = client.getTable(database, tableName);
return table != null;
+ } catch (NoSuchObjectException e) {
+ return false;
} catch (Exception e) {
throw new FalconException("Exception checking if the table exists:" + e.getMessage(), e);
}
@@ -231,6 +231,29 @@ public class HiveCatalogService extends AbstractCatalogService {
}
@Override
+ public List<CatalogPartition> listPartitions(Configuration conf, String catalogUrl,
+ String database, String tableName,
+ List<String> values) throws FalconException {
+ LOG.info("List partitions for: {}, partition filter: {}", tableName, values);
+
+ try {
+ List<CatalogPartition> catalogPartitionList = new ArrayList<CatalogPartition>();
+
+ HiveMetaStoreClient client = createClient(conf, catalogUrl);
+ List<Partition> hCatPartitions = client.listPartitions(database, tableName, values, (short) -1);
+ for (Partition hCatPartition : hCatPartitions) {
+ LOG.debug("Partition: " + hCatPartition.getValues());
+ CatalogPartition partition = createCatalogPartition(hCatPartition);
+ catalogPartitionList.add(partition);
+ }
+
+ return catalogPartitionList;
+ } catch (Exception e) {
+ throw new FalconException("Exception listing partitions:" + e.getMessage(), e);
+ }
+ }
+
+ @Override
public List<CatalogPartition> listPartitionsByFilter(Configuration conf, String catalogUrl,
String database, String tableName,
String filter) throws FalconException {
@@ -267,6 +290,7 @@ public class HiveCatalogService extends AbstractCatalogService {
return catalogPartition;
}
+ //Drop single partition
@Override
public boolean dropPartition(Configuration conf, String catalogUrl,
String database, String tableName,
@@ -313,4 +337,73 @@ public class HiveCatalogService extends AbstractCatalogService {
throw new FalconException("Exception fetching partition:" + e.getMessage(), e);
}
}
+
+ @Override
+ public List<String> getPartitionColumns(Configuration conf, String catalogUrl, String database,
+ String tableName) throws FalconException {
+ LOG.info("Fetching partition columns of table: " + tableName);
+
+ try {
+ HiveMetaStoreClient client = createClient(conf, catalogUrl);
+ Table table = client.getTable(database, tableName);
+ List<String> partCols = new ArrayList<String>();
+ for (FieldSchema part : table.getPartitionKeys()) {
+ partCols.add(part.getName());
+ }
+ return partCols;
+ } catch (Exception e) {
+ throw new FalconException("Exception fetching partition columns: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void addPartition(Configuration conf, String catalogUrl, String database,
+ String tableName, List<String> partValues, String location) throws FalconException {
+ LOG.info("Adding partition {} for {}.{} with location {}", partValues, database, tableName, location);
+
+ try {
+ HiveMetaStoreClient client = createClient(conf, catalogUrl);
+ Table table = client.getTable(database, tableName);
+ org.apache.hadoop.hive.metastore.api.Partition part = new org.apache.hadoop.hive.metastore.api.Partition();
+ part.setDbName(database);
+ part.setTableName(tableName);
+ part.setValues(partValues);
+ part.setSd(table.getSd());
+ part.getSd().setLocation(location);
+ part.setParameters(table.getParameters());
+ if (part.getParameters() == null) {
+ part.setParameters(new HashMap<String, String>());
+ }
+ part.getParameters().put(CREATE_TIME, String.valueOf(System.currentTimeMillis()));
+ client.add_partition(part);
+
+ } catch (Exception e) {
+ throw new FalconException("Exception adding partition: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void updatePartition(Configuration conf, String catalogUrl, String database,
+ String tableName, List<String> partValues, String location) throws FalconException {
+ LOG.info("Updating partition {} of {}.{} with location {}", partValues, database, tableName, location);
+
+ try {
+ HiveMetaStoreClient client = createClient(conf, catalogUrl);
+ Table table = client.getTable(database, tableName);
+ org.apache.hadoop.hive.metastore.api.Partition part = new org.apache.hadoop.hive.metastore.api.Partition();
+ part.setDbName(database);
+ part.setTableName(tableName);
+ part.setValues(partValues);
+ part.setSd(table.getSd());
+ part.getSd().setLocation(location);
+ part.setParameters(table.getParameters());
+ if (part.getParameters() == null) {
+ part.setParameters(new HashMap<String, String>());
+ }
+ part.getParameters().put(UPDATE_TIME, String.valueOf(System.currentTimeMillis()));
+ client.alter_partition(database, tableName, part);
+ } catch (Exception e) {
+ throw new FalconException("Exception updating partition: " + e.getMessage(), e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index 59f558b..7930fba 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -30,6 +30,7 @@ import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.retention.EvictedInstanceSerDe;
import org.apache.falcon.retention.EvictionHelper;
@@ -43,15 +44,11 @@ import javax.servlet.jsp.el.ELException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.TimeZone;
-import java.util.TreeMap;
import java.util.regex.Matcher;
/**
@@ -90,7 +87,7 @@ public class CatalogStorage extends Configured implements Storage {
this(CATALOG_URL, feed.getTable());
}
- protected CatalogStorage(Cluster cluster, CatalogTable table) throws URISyntaxException {
+ public CatalogStorage(Cluster cluster, CatalogTable table) throws URISyntaxException {
this(ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint(), table);
}
@@ -397,10 +394,7 @@ public class CatalogStorage extends Configured implements Storage {
List<CatalogPartition> toBeDeleted;
try {
// get sorted date partition keys and values
- List<String> datedPartKeys = new ArrayList<String>();
- List<String> datedPartValues = new ArrayList<String>();
- fillSortedDatedPartitionKVs(datedPartKeys, datedPartValues, retentionLimit, timeZone);
- toBeDeleted = discoverPartitionsToDelete(datedPartKeys, datedPartValues);
+ toBeDeleted = discoverPartitionsToDelete(retentionLimit, timeZone);
} catch (ELException e) {
throw new FalconException("Couldn't find partitions to be deleted", e);
@@ -428,58 +422,30 @@ public class CatalogStorage extends Configured implements Storage {
return instanceDates;
}
- private List<CatalogPartition> discoverPartitionsToDelete(List<String> datedPartKeys, List<String> datedPartValues)
+ private List<CatalogPartition> discoverPartitionsToDelete(String retentionLimit, String timezone)
throws FalconException, ELException {
-
- final String filter = createFilter(datedPartKeys, datedPartValues);
- return CatalogServiceFactory.getCatalogService().listPartitionsByFilter(
- getConf(), getCatalogUrl(), getDatabase(), getTable(), filter);
- }
-
- private void fillSortedDatedPartitionKVs(List<String> sortedPartKeys, List<String> sortedPartValues,
- String retentionLimit, String timeZone) throws ELException {
Pair<Date, Date> range = EvictionHelper.getDateRange(retentionLimit);
-
- // sort partition keys and values by the date pattern present in value
- Map<FeedDataPath.VARS, String> sortedPartKeyMap = new TreeMap<FeedDataPath.VARS, String>();
- Map<FeedDataPath.VARS, String> sortedPartValueMap = new TreeMap<FeedDataPath.VARS, String>();
+ ExpressionHelper.setReferenceDate(range.first);
+ Map<String, String> partitionsToDelete = new LinkedHashMap<String, String>();
+ ExpressionHelper expressionHelper = ExpressionHelper.get();
for (Map.Entry<String, String> entry : getPartitions().entrySet()) {
- String datePattern = entry.getValue();
- String mask = datePattern.replaceAll(FeedDataPath.VARS.YEAR.regex(), "yyyy")
- .replaceAll(FeedDataPath.VARS.MONTH.regex(), "MM")
- .replaceAll(FeedDataPath.VARS.DAY.regex(), "dd")
- .replaceAll(FeedDataPath.VARS.HOUR.regex(), "HH")
- .replaceAll(FeedDataPath.VARS.MINUTE.regex(), "mm");
-
- // find the first date pattern present in date mask
- FeedDataPath.VARS vars = FeedDataPath.VARS.presentIn(mask);
- // skip this partition if date mask doesn't contain any date format
- if (vars == null) {
- continue;
- }
-
- // construct dated partition value as per format
- DateFormat dateFormat = new SimpleDateFormat(mask);
- dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
- String partitionValue = dateFormat.format(range.first);
-
- // add partition key and value in their sorted maps
- if (!sortedPartKeyMap.containsKey(vars)) {
- sortedPartKeyMap.put(vars, entry.getKey());
- }
-
- if (!sortedPartValueMap.containsKey(vars)) {
- sortedPartValueMap.put(vars, partitionValue);
+ if (FeedDataPath.PATTERN.matcher(entry.getValue()).find()) {
+ partitionsToDelete.put(entry.getKey(),
+ expressionHelper.evaluateFullExpression(entry.getValue(), String.class));
}
}
-
- // add map entries to lists of partition keys and values
- sortedPartKeys.addAll(sortedPartKeyMap.values());
- sortedPartValues.addAll(sortedPartValueMap.values());
+ final String filter = createFilter(partitionsToDelete);
+ return CatalogServiceFactory.getCatalogService().listPartitionsByFilter(
+ getConf(), getCatalogUrl(), getDatabase(), getTable(), filter);
}
- private String createFilter(List<String> datedPartKeys, List<String> datedPartValues) throws ELException {
- int numPartitions = datedPartKeys.size();
+ /**
+ * Creates hive partition filter from inputs partition map.
+ * @param partitionsMap - ordered map of partition keys and values
+ * @return partition filter
+ * @throws ELException
+ */
+ private String createFilter(Map<String, String> partitionsMap) throws ELException {
/* Construct filter query string. As an example, suppose the dated partition keys
* are: [year, month, day, hour] and dated partition values are [2014, 02, 24, 10].
@@ -489,23 +455,26 @@ public class CatalogStorage extends Configured implements Storage {
* or (year = '2014' and month = '02' and day = '24' and hour < '10')"
*/
StringBuilder filterBuffer = new StringBuilder();
- for (int curr = 0; curr < numPartitions; curr++) {
+ List<String> keys = new ArrayList<String>(partitionsMap.keySet());
+ for (int curr = 0; curr < partitionsMap.size(); curr++) {
if (curr > 0) {
filterBuffer.append(FILTER_OR);
}
filterBuffer.append(FILTER_ST_BRACKET);
for (int prev = 0; prev < curr; prev++) {
- filterBuffer.append(datedPartKeys.get(prev))
+ String key = keys.get(prev);
+ filterBuffer.append(key)
.append(FILTER_EQUALS)
.append(FILTER_QUOTE)
- .append(datedPartValues.get(prev))
+ .append(partitionsMap.get(key))
.append(FILTER_QUOTE)
.append(FILTER_AND);
}
- filterBuffer.append(datedPartKeys.get(curr))
+ String key = keys.get(curr);
+ filterBuffer.append(key)
.append(FILTER_LESS_THAN)
.append(FILTER_QUOTE)
- .append(datedPartValues.get(curr))
+ .append(partitionsMap.get(key))
.append(FILTER_QUOTE)
.append(FILTER_END_BRACKET);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index ca31f95..7f9acc9 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -42,18 +42,7 @@ import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URISyntaxException;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.TreeMap;
+import java.util.*;
import java.util.regex.Matcher;
/**
@@ -233,10 +222,24 @@ public final class FeedHelper {
return clusterLocations.getLocations();
}
- final Locations feedLocations = feed.getLocations();
+ Locations feedLocations = feed.getLocations();
return feedLocations == null ? null : feedLocations.getLocations();
}
+ public static Location getLocation(Feed feed, org.apache.falcon.entity.v0.cluster.Cluster cluster,
+ LocationType type) {
+ List<Location> locations = getLocations(getCluster(feed, cluster.getName()), feed);
+ if (locations != null) {
+ for (Location location : locations) {
+ if (location.getType() == type) {
+ return location;
+ }
+ }
+ }
+
+ return null;
+ }
+
public static Sla getSLAs(Cluster cluster, Feed feed) {
final Sla clusterSla = cluster.getSla();
if (clusterSla != null) {
@@ -348,89 +351,55 @@ public final class FeedHelper {
}
/**
- * Replaces timed variables with corresponding time notations e.g., ${YEAR} with yyyy and so on.
- * @param templatePath - template feed path
- * @return time notations
- */
- public static String getDateFormatInPath(String templatePath) {
- String mask = extractDatePartFromPathMask(templatePath, templatePath);
- //yyyyMMddHHmm
- return mask.replaceAll(FeedDataPath.VARS.YEAR.regex(), "yyyy")
- .replaceAll(FeedDataPath.VARS.MONTH.regex(), "MM")
- .replaceAll(FeedDataPath.VARS.DAY.regex(), "dd")
- .replaceAll(FeedDataPath.VARS.HOUR.regex(), "HH")
- .replaceAll(FeedDataPath.VARS.MINUTE.regex(), "mm");
- }
-
- /**
- * Extracts the date part of the path and builds a date format mask.
- * @param mask - Path pattern containing ${YEAR}, ${MONTH}...
- * @param inPath - Path from which date part need to be extracted
- * @return - Parts of inPath with non-date-part stripped out.
- *
- * Example: extractDatePartFromPathMask("/data/foo/${YEAR}/${MONTH}", "/data/foo/2012/${MONTH}");
- * Returns: 2012${MONTH}.
- */
- private static String extractDatePartFromPathMask(String mask, String inPath) {
- String[] elements = FeedDataPath.PATTERN.split(mask);
-
- String out = inPath;
- for (String element : elements) {
- out = out.replaceFirst(element, "");
- }
- return out;
- }
-
- private static Map<FeedDataPath.VARS, String> getDatePartMap(String path, String mask) {
- Map<FeedDataPath.VARS, String> map = new TreeMap<FeedDataPath.VARS, String>();
- Matcher matcher = FeedDataPath.DATE_FIELD_PATTERN.matcher(mask);
- int start = 0;
- while (matcher.find(start)) {
- String subMask = mask.substring(matcher.start(), matcher.end());
- String subPath = path.substring(matcher.start(), matcher.end());
- FeedDataPath.VARS var = FeedDataPath.VARS.from(subMask);
- if (!map.containsKey(var)) {
- map.put(var, subPath);
- }
- start = matcher.start() + 1;
- }
- return map;
- }
-
- /**
* Extracts date from the actual data path e.g., /path/2014/05/06 maps to 2014-05-06T00:00Z.
- * @param file - actual data path
+ * @param instancePath - actual data path
* @param templatePath - template path from feed definition
- * @param dateMask - path mask from getDateFormatInPath()
* @param timeZone
* @return date corresponding to the path
*/
//consider just the first occurrence of the pattern
- public static Date getDate(Path file, String templatePath, String dateMask, String timeZone) {
- String path = extractDatePartFromPathMask(templatePath, file.toString());
- Map<FeedDataPath.VARS, String> map = getDatePartMap(path, dateMask);
-
- if (map.isEmpty()) {
- return null;
- }
+ public static Date getDate(String templatePath, Path instancePath, TimeZone timeZone) {
+ String path = instancePath.toString();
+ Matcher matcher = FeedDataPath.PATTERN.matcher(templatePath);
+ Calendar cal = Calendar.getInstance(timeZone);
+ int lastEnd = 0;
+
+ Set<FeedDataPath.VARS> matchedVars = new HashSet<FeedDataPath.VARS>();
+ while (matcher.find()) {
+ FeedDataPath.VARS pathVar = FeedDataPath.VARS.from(matcher.group());
+ String pad = templatePath.substring(lastEnd, matcher.start());
+ if (!path.startsWith(pad)) {
+ //Template and path do not match
+ return null;
+ }
- StringBuilder date = new StringBuilder();
- int ordinal = 0;
- for (Map.Entry<FeedDataPath.VARS, String> entry : map.entrySet()) {
- if (ordinal++ == entry.getKey().ordinal()) {
- date.append(entry.getValue());
- } else {
+ int value;
+ try {
+ value = Integer.valueOf(path.substring(pad.length(), pad.length() + pathVar.getValueSize()));
+ } catch (NumberFormatException e) {
+ //Not a valid number for variable
return null;
}
+
+ pathVar.setCalendar(cal, value);
+ lastEnd = matcher.end();
+ path = path.substring(pad.length() + pathVar.getValueSize());
+ matchedVars.add(pathVar);
}
- try {
- DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, date.length()));
- dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
- return dateFormat.parse(date.toString());
- } catch (ParseException e) {
+ //Match the remaining constant at the end
+ if (!templatePath.substring(lastEnd).equals(path)) {
return null;
}
+
+
+ //Reset other fields
+ for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {
+ if (!matchedVars.contains(var)) {
+ cal.set(var.getCalendarField(), 0);
+ }
+ }
+ return cal.getTime();
}
public static Path getFeedBasePath(String feedPath) throws IOException {
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 1ba7b9d..a5caf8e 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -77,7 +77,7 @@ public class FileSystemStorage extends Configured implements Storage {
private final String storageUrl;
private final List<Location> locations;
- protected FileSystemStorage(Feed feed) {
+ public FileSystemStorage(Feed feed) {
this(FILE_SYSTEM_URL, feed.getLocations());
}
@@ -293,11 +293,11 @@ public class FileSystemStorage extends Configured implements Storage {
}
@Override
- public StringBuilder evict(String retentionLimit, String timeZone,
- Path logFilePath) throws FalconException {
+ public StringBuilder evict(String retentionLimit, String timeZone, Path logFilePath) throws FalconException {
+ TimeZone tz = TimeZone.getTimeZone(timeZone);
try{
for (Location location : getLocations()) {
- fileSystemEvictor(getUriTemplate(location.getType()), retentionLimit, timeZone, logFilePath);
+ fileSystemEvictor(getUriTemplate(location.getType()), retentionLimit, tz, logFilePath);
}
EvictedInstanceSerDe.serializeEvictedInstancePaths(
HadoopClientFactory.get().createProxiedFileSystem(logFilePath.toUri(), getConf()),
@@ -311,7 +311,7 @@ public class FileSystemStorage extends Configured implements Storage {
return instanceDates;
}
- private void fileSystemEvictor(String feedPath, String retentionLimit, String timeZone,
+ private void fileSystemEvictor(String feedPath, String retentionLimit, TimeZone timeZone,
Path logFilePath) throws IOException, ELException, FalconException {
Path normalizedPath = new Path(feedPath);
FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(normalizedPath.toUri());
@@ -319,28 +319,26 @@ public class FileSystemStorage extends Configured implements Storage {
LOG.info("Normalized path: {}", feedPath);
Pair<Date, Date> range = EvictionHelper.getDateRange(retentionLimit);
- String dateMask = FeedHelper.getDateFormatInPath(feedPath);
- List<Path> toBeDeleted = discoverInstanceToDelete(feedPath, timeZone, dateMask, range.first, fs);
+ List<Path> toBeDeleted = discoverInstanceToDelete(feedPath, timeZone, range.first, fs);
if (toBeDeleted.isEmpty()) {
LOG.info("No instances to delete.");
return;
}
DateFormat dateFormat = new SimpleDateFormat(FeedHelper.FORMAT);
- dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+ dateFormat.setTimeZone(timeZone);
Path feedBasePath = FeedHelper.getFeedBasePath(feedPath);
for (Path path : toBeDeleted) {
deleteInstance(fs, path, feedBasePath);
- Date date = FeedHelper.getDate(new Path(path.toUri().getPath()), feedPath, dateMask, timeZone);
+ Date date = FeedHelper.getDate(feedPath, new Path(path.toUri().getPath()), timeZone);
instanceDates.append(dateFormat.format(date)).append(',');
instancePaths.append(path).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
}
}
- private List<Path> discoverInstanceToDelete(String inPath, String timeZone, String dateMask,
- Date start, FileSystem fs) throws IOException {
-
+ private List<Path> discoverInstanceToDelete(String inPath, TimeZone timeZone, Date start, FileSystem fs)
+ throws IOException {
FileStatus[] files = findFilesForFeed(fs, inPath);
if (files == null || files.length == 0) {
return Collections.emptyList();
@@ -348,8 +346,7 @@ public class FileSystemStorage extends Configured implements Storage {
List<Path> toBeDeleted = new ArrayList<Path>();
for (FileStatus file : files) {
- Date date = FeedHelper.getDate(new Path(file.getPath().toUri().getPath()),
- inPath, dateMask, timeZone);
+ Date date = FeedHelper.getDate(inPath, new Path(file.getPath().toUri().getPath()), timeZone);
LOG.debug("Considering {}", file.getPath().toUri().getPath());
LOG.debug("Date: {}", date);
if (date != null && !isDateInRange(date, start)) {
@@ -427,8 +424,8 @@ public class FileSystemStorage extends Configured implements Storage {
String feedInstancePath = ExpressionHelper.substitute(basePath, allProperties);
FileStatus fileStatus = getFileStatus(fileSystem, new Path(feedInstancePath));
FeedInstanceStatus instance = new FeedInstanceStatus(feedInstancePath);
- String dateMask = FeedHelper.getDateFormatInPath(basePath);
- Date date = FeedHelper.getDate(new Path(feedInstancePath), basePath, dateMask, tz.getID());
+
+ Date date = FeedHelper.getDate(basePath, new Path(feedInstancePath), tz);
instance.setInstance(SchemaHelper.formatDateUTC(date));
if (fileStatus != null) {
instance.setCreationTime(fileStatus.getModificationTime());
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
index 6ededbb..afe913d 100644
--- a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
+++ b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
@@ -17,6 +17,7 @@
*/
package org.apache.falcon.entity.common;
+import java.util.Calendar;
import java.util.regex.Pattern;
/**
@@ -30,43 +31,49 @@ public final class FeedDataPath {
* Standard variables for feed time components.
*/
public static enum VARS {
- YEAR("yyyy", "([0-9]{4})"), MONTH("MM", "(0[1-9]|1[0-2])"), DAY("dd", "(0[1-9]|1[0-9]|2[0-9]|3[0-1])"),
- HOUR("HH", "([0-1][0-9]|2[0-4])"), MINUTE("mm", "([0-5][0-9]|60)");
+ YEAR("([0-9]{4})", Calendar.YEAR, 4), MONTH("(0[1-9]|1[0-2])", Calendar.MONTH, 2),
+ DAY("(0[1-9]|1[0-9]|2[0-9]|3[0-1])", Calendar.DAY_OF_MONTH, 2),
+ HOUR("([0-1][0-9]|2[0-4])", Calendar.HOUR_OF_DAY, 2), MINUTE("([0-5][0-9]|60)", Calendar.MINUTE, 2);
private final Pattern pattern;
- private final String datePattern;
- private final String patternRegularExpression;
+ private final String valuePattern;
+ private final int calendarField;
+ private final int valueSize;
- private VARS(String datePattern, String patternRegularExpression) {
+ private VARS(String patternRegularExpression, int calField, int numDigits) {
pattern = Pattern.compile("\\$\\{" + name() + "\\}");
- this.datePattern = datePattern;
- this.patternRegularExpression = patternRegularExpression;
+ this.valuePattern = patternRegularExpression;
+ this.calendarField = calField;
+ this.valueSize = numDigits;
}
- public String getPatternRegularExpression() {
- return patternRegularExpression;
- }
-
- public String getDatePattern() {
- return datePattern;
+ public String getValuePattern() {
+ return valuePattern;
}
public String regex() {
return pattern.pattern();
}
- public static VARS from(String str) {
- for (VARS var : VARS.values()) {
- if (var.datePattern.equals(str)) {
- return var;
- }
+ public int getCalendarField() {
+ return calendarField;
+ }
+
+ public int getValueSize() {
+ return valueSize;
+ }
+
+ public void setCalendar(Calendar cal, int value) {
+ if (this == MONTH) {
+ cal.set(calendarField, value - 1);
+ } else {
+ cal.set(calendarField, value);
}
- return null;
}
- public static VARS presentIn(String str) {
+ public static VARS from(String str) {
for (VARS var : VARS.values()) {
- if (str.contains(var.datePattern)) {
+ if (var.pattern.matcher(str).matches()) {
return var;
}
}
@@ -77,8 +84,4 @@ public final class FeedDataPath {
public static final Pattern PATTERN = Pattern.compile(VARS.YEAR.regex()
+ "|" + VARS.MONTH.regex() + "|" + VARS.DAY.regex() + "|"
+ VARS.HOUR.regex() + "|" + VARS.MINUTE.regex());
-
- public static final Pattern DATE_FIELD_PATTERN = Pattern
- .compile("yyyy|MM|dd|HH|mm");
-
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
index 2b50119..65aaeba 100644
--- a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
+++ b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
@@ -21,6 +21,8 @@ package org.apache.falcon.expression;
import org.apache.commons.el.ExpressionEvaluatorImpl;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.common.FeedDataPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.servlet.jsp.el.ELException;
import javax.servlet.jsp.el.ExpressionEvaluator;
@@ -41,9 +43,10 @@ import java.util.regex.Pattern;
*/
public final class ExpressionHelper implements FunctionMapper, VariableResolver {
+ private static final Logger LOG = LoggerFactory.getLogger(ExpressionHelper.class);
private static final ExpressionHelper INSTANCE = new ExpressionHelper();
- private ThreadLocal<Properties> threadVariables = new ThreadLocal<Properties>();
+ private static final ThreadLocal<Properties> THREAD_VARIABLES = new ThreadLocal<Properties>();
private static final Pattern SYS_PROPERTY_PATTERN = Pattern.compile("\\$\\{[A-Za-z0-9_.]+\\}");
@@ -94,18 +97,20 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver
}
public void setPropertiesForVariable(Properties properties) {
- threadVariables.set(properties);
+ THREAD_VARIABLES.set(properties);
}
@Override
public Object resolveVariable(String field) {
- return threadVariables.get().get(field);
+ return THREAD_VARIABLES.get().get(field);
}
private static ThreadLocal<Date> referenceDate = new ThreadLocal<Date>();
public static void setReferenceDate(Date date) {
referenceDate.set(date);
+ Properties variables = getTimeVariables(date, TimeZone.getTimeZone("UTC"));
+ THREAD_VARIABLES.set(variables);
}
public static Properties getTimeVariables(Date date, TimeZone tz) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java b/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java
index 4bf6e00..573180a 100644
--- a/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java
+++ b/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java
@@ -18,6 +18,7 @@
package org.apache.falcon.util;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.entity.common.FeedDataPath;
@@ -192,7 +193,7 @@ public class FalconRadixUtils {
String regex = key.substring(0, key.indexOf("}") + 1);
// match the text and the regex
FeedDataPath.VARS var = getMatchingRegex(regex);
- if (matchPart(regex, input.substring(0, var.getDatePattern().length()))) {
+ if (matchPart(regex, input.substring(0, var.getValueSize()))) {
newRoot = child; // if it matches then this is the newRoot
break;
}
@@ -214,9 +215,13 @@ public class FalconRadixUtils {
if (StringUtils.isBlank(templateString)) {
return 0;
}
+
+ // Since we are only interested in the length, can replace pattern with a random string
for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {
- templateString = templateString.replace("${" + var.name() + "}", var.getDatePattern());
+ templateString = templateString.replace("${" + var.name() + "}",
+ RandomStringUtils.random(var.getValueSize()));
}
+
return templateString.length();
}
@@ -246,11 +251,12 @@ public class FalconRadixUtils {
private FeedDataPath.VARS getMatchingRegex(String inputPart) {
//inputPart will be something like ${YEAR}
+
inputPart = inputPart.replace("${", "\\$\\{");
inputPart = inputPart.replace("}", "\\}");
for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {
- if (StringUtils.equals(inputPart, var.regex())) {
+ if (inputPart.equals("${" + var.name() + "}")) {
return var;
}
}
@@ -298,8 +304,8 @@ public class FalconRadixUtils {
for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {//find which regex is this
if (StringUtils.equals(var.regex(), template)) {// regex found, do matching
//find part of the input string which should be matched against regex
- String desiredPart = input.substring(0, var.getDatePattern().length());
- Pattern pattern = Pattern.compile(var.getPatternRegularExpression());
+ String desiredPart = input.substring(0, var.getValueSize());
+ Pattern pattern = Pattern.compile(var.getValuePattern());
Matcher matcher = pattern.matcher(desiredPart);
if (!matcher.matches()) {
return false;
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 8d69b9a..887cea2 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -96,7 +96,7 @@ public class WorkflowExecutionContext {
private final Map<WorkflowExecutionArgs, String> context;
private final long creationTime;
- protected WorkflowExecutionContext(Map<WorkflowExecutionArgs, String> context) {
+ public WorkflowExecutionContext(Map<WorkflowExecutionArgs, String> context) {
this.context = context;
creationTime = System.currentTimeMillis();
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 99dab59..4f41548 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -64,6 +64,8 @@
*.system.lib.location=${FALCON_HOME}/sharedlibs
# Location to store user entity configurations
+
+#Configurations used in UTs
debug.config.store.uri=file://${user.dir}/target/store
debug.config.oozie.conf.uri=${user.dir}/target/oozie
debug.system.lib.location=${system.lib.location}
@@ -73,6 +75,17 @@ debug.libext.feed.retention.paths=${falcon.libext}
debug.libext.feed.replication.paths=${falcon.libext}
debug.libext.process.paths=${falcon.libext}
+#Configurations used in ITs
+it.config.store.uri=file://${user.dir}/target/store
+it.config.oozie.conf.uri=${user.dir}/target/oozie
+it.system.lib.location=${system.lib.location}
+it.broker.url=tcp://localhost:61616
+it.retry.recorder.path=${user.dir}/target/retry
+it.libext.feed.retention.paths=${falcon.libext}
+it.libext.feed.replication.paths=${falcon.libext}
+it.libext.process.paths=${falcon.libext}
+it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandler
+
*.falcon.cleanup.service.frequency=minutes(5)
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java b/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java
index c405556..4c293bb 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java
@@ -29,7 +29,7 @@ public class FeedDataPathTest {
@Test
public void testMinutesRegularExpression() {
- String monthPattern = FeedDataPath.VARS.MINUTE.getPatternRegularExpression();
+ String monthPattern = FeedDataPath.VARS.MINUTE.getValuePattern();
Assert.assertFalse("0".matches(monthPattern));
Assert.assertFalse("1".matches(monthPattern));
Assert.assertFalse("61".matches(monthPattern));
@@ -45,7 +45,7 @@ public class FeedDataPathTest {
@Test
public void testHourRegularExpression() {
- String hourPattern = FeedDataPath.VARS.HOUR.getPatternRegularExpression();
+ String hourPattern = FeedDataPath.VARS.HOUR.getValuePattern();
Assert.assertFalse("0".matches(hourPattern));
Assert.assertFalse("1".matches(hourPattern));
Assert.assertFalse("2".matches(hourPattern));
@@ -67,7 +67,7 @@ public class FeedDataPathTest {
@Test
public void testDayRegularExpression() {
- String dayPattern = FeedDataPath.VARS.DAY.getPatternRegularExpression();
+ String dayPattern = FeedDataPath.VARS.DAY.getValuePattern();
Assert.assertFalse("0".matches(dayPattern));
Assert.assertFalse("1".matches(dayPattern));
Assert.assertFalse("32".matches(dayPattern));
@@ -86,7 +86,7 @@ public class FeedDataPathTest {
@Test
public void testMonthRegularExpression() {
- String monthPattern = FeedDataPath.VARS.MONTH.getPatternRegularExpression();
+ String monthPattern = FeedDataPath.VARS.MONTH.getValuePattern();
Assert.assertFalse("0".matches(monthPattern));
Assert.assertFalse("1".matches(monthPattern));
Assert.assertFalse("13".matches(monthPattern));
@@ -105,7 +105,7 @@ public class FeedDataPathTest {
@Test
public void testYearRegularExpression() {
- String monthPattern = FeedDataPath.VARS.YEAR.getPatternRegularExpression();
+ String monthPattern = FeedDataPath.VARS.YEAR.getValuePattern();
Assert.assertFalse("0".matches(monthPattern));
Assert.assertFalse("1".matches(monthPattern));
Assert.assertFalse("13".matches(monthPattern));
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
index f6994fc..63ab7da 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
@@ -18,16 +18,25 @@
package org.apache.falcon.entity;
+import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Properties;
import org.apache.falcon.entity.v0.cluster.Property;
+import org.apache.falcon.entity.v0.feed.*;
+import org.apache.hadoop.fs.Path;
import org.testng.Assert;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import java.util.Date;
+import java.util.TimeZone;
+
/**
* Test for feed helper methods.
*/
public class FeedHelperTest {
+ public static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+
@Test
public void testPartitionExpression() {
Assert.assertEquals(FeedHelper.normalizePartitionExpression(" /a// ", " /b// "), "a/b");
@@ -51,4 +60,49 @@ public class FeedHelperTest {
"name/*/pvalue");
Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster, "IN"), "IN");
}
+
+ @DataProvider(name = "fsPathsforDate")
+ public Object[][] createPathsForGetDate() {
+ return new Object[][] {
+ {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}", "/data/2015/01/01/00/30", "2015-01-01T00:30Z"},
+ {"/data/${YEAR}-${MONTH}-${DAY}-${HOUR}-${MINUTE}", "/data/2015-01-01-01-00", "2015-01-01T01:00Z"},
+ {"/data/${YEAR}/${MONTH}/${DAY}", "/data/2015/01/01", "2015-01-01T00:00Z"},
+ {"/data/${YEAR}/${MONTH}/${DAY}/data", "/data/2015/01/01/data", "2015-01-01T00:00Z"},
+ {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}", "/data/2015-01-01/00/30", null},
+ {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/data", "/data/2015-01-01/00/30", null},
+ };
+ }
+
+ @Test(dataProvider = "fsPathsforDate")
+ public void testGetDateFromPath(String template, String path, String expectedDate) throws Exception {
+ Date date = FeedHelper.getDate(template, new Path(path), UTC);
+ Assert.assertEquals(SchemaHelper.formatDateUTC(date), expectedDate);
+ }
+
+ @Test
+ public void testGetLocations() {
+ Cluster cluster = new Cluster();
+ cluster.setName("name");
+ Feed feed = new Feed();
+ Location location1 = new Location();
+ location1.setType(LocationType.META);
+ Locations locations = new Locations();
+ locations.getLocations().add(location1);
+
+ Location location2 = new Location();
+ location2.setType(LocationType.DATA);
+ locations.getLocations().add(location2);
+
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = new org.apache.falcon.entity.v0.feed.Cluster();
+ feedCluster.setName("name");
+
+ feed.setLocations(locations);
+ Clusters clusters = new Clusters();
+ feed.setClusters(clusters);
+ feed.getClusters().getClusters().add(feedCluster);
+
+ Assert.assertEquals(FeedHelper.getLocations(feedCluster, feed),
+ locations.getLocations());
+ Assert.assertEquals(FeedHelper.getLocation(feed, cluster, LocationType.DATA), location2);
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index 1667161..8b81a29 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -458,8 +458,7 @@ public class FileSystemStorageTest {
instance.setStatus(FeedInstanceStatus.AvailabilityStatus.MISSING);
instance.setSize(-1);
instance.setCreationTime(0);
- String dateMask = FeedHelper.getDateFormatInPath(basePath);
- Date date = FeedHelper.getDate(new Path(path), basePath, dateMask, tz.getID());
+ Date date = FeedHelper.getDate(basePath, new Path(path), tz);
instance.setInstance(SchemaHelper.formatDateUTC(date));
Calendar cal = Calendar.getInstance();
cal.setTime(dataStart);
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/docs/src/site/twiki/InstallationSteps.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/InstallationSteps.twiki b/docs/src/site/twiki/InstallationSteps.twiki
index 3f622c7..1dd242a 100644
--- a/docs/src/site/twiki/InstallationSteps.twiki
+++ b/docs/src/site/twiki/InstallationSteps.twiki
@@ -165,6 +165,27 @@ In addition you can set any other environment variables you might need. This fil
#export FALCON_EXPANDED_WEBAPP_DIR=
</verbatim>
+*Configuring Monitoring plugin to register catalog partitions*
+Falcon comes with a monitoring plugin that registers catalog partition. This comes in really handy during migration from filesystem based feeds to hcatalog based feeds.
+This plugin enables the user to de-couple the partition registration and assume that all partitions are already on hcatalog even before the migration, simplifying the hcatalog migration.
+
+By default this plugin is disabled.
+To enable this plugin and leverage the feature, there are 3 pre-requisites:
+
+<verbatim>
+In {package dir}/conf/startup.properties, add
+*.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandler
+
+In the cluster definition, ensure registry endpoint is defined.
+Ex:
+<interface type="registry" endpoint="thrift://localhost:1109" version="0.13.3"/>
+
+In the feed definition, ensure the corresponding catalog table is mentioned in feed-properties
+Ex:
+<properties>
+ <property name="catalog.table" value="catalog:default:in_table#year={YEAR};month={MONTH};day={DAY};hour={HOUR};minute={MINUTE}"/>
+</properties>
+</verbatim>
*NOTE for Mac OS users*
<verbatim>
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java b/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java
index 2167375..cdd06db 100644
--- a/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java
+++ b/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java
@@ -22,8 +22,14 @@ import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+import java.io.BufferedOutputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.PrintStream;
import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Classic protocol provider for Hadoop v2 based tests.
@@ -32,6 +38,10 @@ public class ClassicClientProtocolProvider extends ClientProtocolProvider {
private static final String LOCALHOST = "localhost";
+ private static final ConcurrentHashMap<String, LocalJobRunner> CACHE = new ConcurrentHashMap<String, LocalJobRunner>();
+
+ private boolean initialised = false;
+
@Override
public ClientProtocol create(Configuration conf) throws IOException {
String framework = conf.get(MRConfig.FRAMEWORK_NAME, "unittests");
@@ -40,7 +50,16 @@ public class ClassicClientProtocolProvider extends ClientProtocolProvider {
if (!"unittests".equals(framework) || !tracker.startsWith(LOCALHOST)) {
return null;
}
- return new LocalJobRunner(conf);
+
+ if (!CACHE.containsKey(tracker)) {
+ CACHE.putIfAbsent(tracker, new LocalJobRunner(conf));
+ }
+
+ if (!initialised) {
+ System.setOut(new PrintStream(new BufferedOutputStream(new FileOutputStream("target/logs/system-out.log")), true));
+ initialised = true;
+ }
+ return CACHE.get(tracker);
}
@Override
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
index bac421f..b4eae5d 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.OozieClientException;
-import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,7 +97,7 @@ public final class LogProvider {
if (fs.exists(jobPath)) {
return getFormatedRunId(runId);
} else {
- Log.warn("No run dirs are available in logs dir:" + jobPath);
+ LOG.warn("No run dirs are available in logs dir:" + jobPath);
return "-";
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index 7a87919..0366350 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -260,7 +260,7 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
private void initializeOutputPaths(Cluster cluster, COORDINATORAPP coord, Properties props) throws FalconException {
if (entity.getOutputs() == null) {
- props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "NONE");
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), IGNORE);
props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), IGNORE);
return;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 462e26b..f4ffbc1 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -87,7 +87,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
WorkflowJob.Status.FAILED);
private static final List<WorkflowJob.Status> WF_SUSPEND_PRECOND = Arrays.asList(WorkflowJob.Status.RUNNING);
private static final List<WorkflowJob.Status> WF_RESUME_PRECOND = Arrays.asList(WorkflowJob.Status.SUSPENDED);
- private static final List<WorkflowJob.Status> WF_RERUN_PRECOND =
+ public static final List<WorkflowJob.Status> WF_RERUN_PRECOND =
Arrays.asList(WorkflowJob.Status.FAILED, WorkflowJob.Status.KILLED, WorkflowJob.Status.SUCCEEDED);
private static final List<CoordinatorAction.Status> COORD_RERUN_PRECOND =
Arrays.asList(CoordinatorAction.Status.TIMEDOUT, CoordinatorAction.Status.FAILED);
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 545beb1..4e5c3f0 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -731,7 +731,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
verifyBrokerProperties(cluster, props);
Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "clicks");
- Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "NONE");
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "IGNORE");
}
@Test
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/prism/pom.xml
----------------------------------------------------------------------
diff --git a/prism/pom.xml b/prism/pom.xml
index 4a3054a..af9b132 100644
--- a/prism/pom.xml
+++ b/prism/pom.xml
@@ -195,11 +195,11 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>aspectj-maven-plugin</artifactId>
- <version>1.4</version>
+ <version>1.7</version>
<configuration>
<verbose>true</verbose>
- <source>1.6</source>
- <complianceLevel>1.6</complianceLevel>
+ <source>1.7</source>
+ <complianceLevel>1.7</complianceLevel>
<includes>
<include>org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java</include>
<include>org/apache/falcon/resource/proxy/InstanceManagerProxy.java</include>
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
----------------------------------------------------------------------
diff --git a/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java b/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
index 970d381..a2feccf 100644
--- a/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
+++ b/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
@@ -255,7 +255,7 @@ public class FeedEvictorTest {
Assert.assertEquals("instances=NULL", stream.getBuffer());
stream.clear();
- String dataPath = "/data/YYYY/feed4/dd/MM/02/more/hello";
+ String dataPath = "/data/YYYY/feed4/dd/MM/more/hello";
String logFile = hdfsUrl + "/falcon/staging/feed/instancePaths-2012-01-01-02-00.csv";
FeedEvictor.main(new String[] {
"-feedBasePath", LocationType.DATA.name() + "="
@@ -273,6 +273,7 @@ public class FeedEvictorTest {
assertFailures(fs, pair);
} catch (Exception e) {
+ e.printStackTrace();
Assert.fail("Unknown exception", e);
}
}
@@ -308,7 +309,7 @@ public class FeedEvictorTest {
stream.clear();
String dataPath = LocationType.DATA.name() + "="
+ cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY)
- + "/data/YYYY/feed4/dd/MM/02/more/hello";
+ + "/data/YYYY/feed4/dd/MM/more/hello";
String logFile = hdfsUrl + "/falcon/staging/feed/instancePaths-2012-01-01-02-00.csv";
FeedEvictor.main(new String[]{
"-feedBasePath", dataPath,
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/test-tools/hadoop-webapp/src/main/resources/mapred-site.xml
----------------------------------------------------------------------
diff --git a/test-tools/hadoop-webapp/src/main/resources/mapred-site.xml b/test-tools/hadoop-webapp/src/main/resources/mapred-site.xml
index cf297de..a6914cd 100644
--- a/test-tools/hadoop-webapp/src/main/resources/mapred-site.xml
+++ b/test-tools/hadoop-webapp/src/main/resources/mapred-site.xml
@@ -65,4 +65,8 @@
</description>
</property>
+ <property>
+ <name>mapreduce.framework.name</name>
+ <value>unittests</value>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/test-tools/hadoop-webapp/src/main/resources/yarn-site.xml
----------------------------------------------------------------------
diff --git a/test-tools/hadoop-webapp/src/main/resources/yarn-site.xml b/test-tools/hadoop-webapp/src/main/resources/yarn-site.xml
index 658752b..52fdf6d 100644
--- a/test-tools/hadoop-webapp/src/main/resources/yarn-site.xml
+++ b/test-tools/hadoop-webapp/src/main/resources/yarn-site.xml
@@ -28,11 +28,6 @@
</property>
<property>
- <name>mapreduce.framework.name</name>
- <value>unittests</value>
- </property>
-
- <property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>0.0.0.0:41025</value>
</property>
http://git-wip-us.apache.org/repos/asf/falcon/blob/4b0a920f/webapp/src/test/java/org/apache/falcon/catalog/CatalogPartitionHandlerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/CatalogPartitionHandlerIT.java b/webapp/src/test/java/org/apache/falcon/catalog/CatalogPartitionHandlerIT.java
new file mode 100644
index 0000000..c7b7d3b
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/catalog/CatalogPartitionHandlerIT.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.catalog;
+
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.util.HiveTestUtils;
+import org.apache.falcon.util.OozieTestUtils;
+import org.apache.hive.hcatalog.api.HCatPartition;
+import org.apache.oozie.client.WorkflowJob;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * IT for catalog partition handler which is JMS message listener.
+ */
+@Test(groups = {"exhaustive"})
+public class CatalogPartitionHandlerIT {
+ private static final String METASTORE_URL = "thrift://localhost:49083";
+ private static final String DB = "falcon_db";
+ private static final String TABLE = "output_table";
+
+ @BeforeClass
+ public void prepare() throws Exception {
+ TestContext.prepare();
+ }
+
+ // TODO : Enable this after oozie/hadoop config file changes
+ @Test(enabled = false)
+ public void testPartitionRegistration() throws Exception {
+ TestContext context = newContext();
+
+ HiveTestUtils.createDatabase(METASTORE_URL, DB);
+ HiveTestUtils.createTable(METASTORE_URL, DB, TABLE, Arrays.asList("ds"));
+ context.scheduleProcess();
+ List<WorkflowJob> instances = OozieTestUtils.waitForProcessWFtoStart(context);
+ OozieTestUtils.waitForInstanceToComplete(context, instances.get(0).getId());
+
+ HCatPartition partition = HiveTestUtils.getPartition(METASTORE_URL, DB, TABLE, "ds", "2012-04-19");
+ Assert.assertNotNull(partition);
+ }
+
+ private ThreadLocal<TestContext> contexts = new ThreadLocal<TestContext>();
+
+ private TestContext newContext() {
+ contexts.set(new TestContext());
+ return contexts.get();
+ }
+
+ @AfterMethod
+ public void cleanup() throws Exception {
+ TestContext testContext = contexts.get();
+ if (testContext != null) {
+ OozieTestUtils.killOozieJobs(testContext);
+ }
+
+ contexts.remove();
+ }
+}