You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/12/12 06:17:25 UTC
incubator-falcon git commit: FALCON-419 Update deprecated HCatalog
API to use Hive Metastore API. Contributed by Shwetha GS
Repository: incubator-falcon
Updated Branches:
refs/heads/master b364820dd -> a38cebf42
FALCON-419 Update deprecated HCatalog API to use Hive Metastore API. Contributed by Shwetha GS
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/a38cebf4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/a38cebf4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/a38cebf4
Branch: refs/heads/master
Commit: a38cebf42b0addaf8fc2e7892f55166a3cd902b5
Parents: b364820
Author: shwethags <sh...@inmobi.com>
Authored: Fri Dec 12 10:46:19 2014 +0530
Committer: shwethags <sh...@inmobi.com>
Committed: Fri Dec 12 10:46:54 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 +
common/pom.xml | 2 +-
.../falcon/catalog/AbstractCatalogService.java | 43 ++---
.../apache/falcon/catalog/CatalogPartition.java | 13 --
.../falcon/catalog/HiveCatalogService.java | 163 ++++++++-----------
.../apache/falcon/entity/CatalogStorage.java | 112 +++----------
pom.xml | 15 +-
.../FalconAuthenticationFilterTest.java | 2 +-
.../apache/falcon/latedata/LateDataHandler.java | 19 ++-
.../falcon/catalog/HiveCatalogServiceIT.java | 52 ++----
.../apache/falcon/late/LateDataHandlerIT.java | 6 +-
.../lifecycle/TableStorageFeedEvictorIT.java | 13 +-
.../org/apache/falcon/resource/TestContext.java | 20 +++
.../org/apache/falcon/util/HiveTestUtils.java | 16 +-
14 files changed, 178 insertions(+), 301 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f671e56..e9758c0 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -32,6 +32,9 @@ Trunk (Unreleased)
Seetharam)
OPTIMIZATIONS
+ FALCON-419 Update deprecated HCatalog API to use Hive Metastore API.
+ (Shwetha GS)
+
FALCON-423 Updating falcon server endpoint in distributed setup doesn't
work. (Srikanth Sundarrajan)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 50dd2ea..b349c2f 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -154,7 +154,7 @@
<dependency>
<groupId>org.apache.hive</groupId>
- <artifactId>hive-common</artifactId>
+ <artifactId>hive-metastore</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
index 348fac0..9abdc93 100644
--- a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
@@ -22,7 +22,6 @@ import org.apache.falcon.FalconException;
import org.apache.hadoop.conf.Configuration;
import java.util.List;
-import java.util.Map;
/**
* Interface definition for a catalog registry service
@@ -88,47 +87,49 @@ public abstract class AbstractCatalogService {
/**
* Drops a given partition. Executed in the workflow engine.
*
- *
* @param conf conf object
* @param catalogUrl url for the catalog service
* @param database database the table belongs to
* @param tableName tableName to check if it exists
- * @param partitions list of partitions as Key=Value pairs
+ * @param partitionValues list of partition values
+ * @param deleteData should dropPartition also delete the corresponding data
* @return if the partition was dropped
* @throws FalconException
*/
- public abstract boolean dropPartitions(Configuration conf, String catalogUrl,
- String database, String tableName,
- Map<String, String> partitions) throws FalconException;
+ public abstract boolean dropPartition(Configuration conf, String catalogUrl,
+ String database, String tableName, List<String> partitionValues,
+ boolean deleteData) throws FalconException;
/**
- * Gets the partition. Executed in the workflow engine.
- *
+ * Drops the partitions. Executed in the workflow engine.
*
- * @param conf conf
+ * @param conf conf object
* @param catalogUrl url for the catalog service
* @param database database the table belongs to
* @param tableName tableName to check if it exists
- * @param partitionSpec The partition specification, {[col_name,value],[col_name2,value2]}.
- * All partition-key-values must be specified.
- * @return An instance of CatalogPartition.
+ * @param partitionValues list of partition values
+ * @param deleteData should dropPartition also delete the corresponding data
+ * @return if the partition was dropped
* @throws FalconException
*/
- public abstract CatalogPartition getPartition(Configuration conf, String catalogUrl,
- String database, String tableName,
- Map<String, String> partitionSpec)
- throws FalconException;
+ public abstract void dropPartitions(Configuration conf, String catalogUrl,
+ String database, String tableName,
+ List<String> partitionValues, boolean deleteData) throws FalconException;
/**
+ * Gets the partition. Executed in the workflow engine.
+ *
*
* @param conf conf
* @param catalogUrl url for the catalog service
* @param database database the table belongs to
- * @param tableName table name
- * @return list of partition column names of the table
+ * @param tableName tableName to check if it exists
+ * @param partitionValues Values for partition columns.
+ * @return An instance of CatalogPartition.
* @throws FalconException
*/
- public abstract List<String> getTablePartitionCols(Configuration conf, String catalogUrl,
- String database,
- String tableName) throws FalconException;
+ public abstract CatalogPartition getPartition(Configuration conf, String catalogUrl,
+ String database, String tableName,
+ List<String> partitionValues)
+ throws FalconException;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
index c5d4705..032ae38 100644
--- a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
+++ b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
@@ -30,7 +30,6 @@ public class CatalogPartition {
private List<String> values;
private long createTime;
private long lastAccessTime;
- private List<String> tableColumns;
private String inputFormat;
private String outputFormat;
private String location;
@@ -59,10 +58,6 @@ public class CatalogPartition {
this.lastAccessTime = lastAccessTime;
}
- protected void setTableColumns(List<String> tableColumns) {
- this.tableColumns = tableColumns;
- }
-
protected void setInputFormat(String inputFormat) {
this.inputFormat = inputFormat;
}
@@ -97,14 +92,6 @@ public class CatalogPartition {
return this.tableName;
}
- /**
- * Gets the columns of the table.
- *
- * @return the columns
- */
- public List<String> getColumns() {
- return this.tableColumns;
- }
/**
* Gets the input format.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
index 3216f1e..f59b83b 100644
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -24,6 +24,11 @@ import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.Credentials;
@@ -31,12 +36,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.io.Text;
import org.apache.hcatalog.api.HCatClient;
-import org.apache.hcatalog.api.HCatDatabase;
-import org.apache.hcatalog.api.HCatPartition;
-import org.apache.hcatalog.api.HCatTable;
import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
-import org.apache.hcatalog.common.HCatException;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +45,6 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
/**
* An implementation of CatalogService that uses Hive Meta Store (HCatalog)
@@ -55,25 +54,8 @@ public class HiveCatalogService extends AbstractCatalogService {
private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogService.class);
- /**
- * This is only used for tests.
- *
- * @param metastoreUrl metastore url
- * @return client object
- * @throws FalconException
- */
- public static HCatClient getHCatClient(String metastoreUrl) throws FalconException {
- try {
- HiveConf hcatConf = createHiveConf(new Configuration(false), metastoreUrl);
- return HCatClient.create(hcatConf);
- } catch (HCatException e) {
- throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e);
- } catch (IOException e) {
- throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e);
- }
- }
- private static HiveConf createHiveConf(Configuration conf,
+ public static HiveConf createHiveConf(Configuration conf,
String metastoreUrl) throws IOException {
HiveConf hcatConf = new HiveConf(conf, HiveConf.class);
@@ -97,8 +79,8 @@ public class HiveCatalogService extends AbstractCatalogService {
* @return hive metastore client handle
* @throws FalconException
*/
- private static HCatClient createHCatClient(Configuration conf,
- String metastoreUrl) throws FalconException {
+ private static HiveMetaStoreClient createClient(Configuration conf,
+ String metastoreUrl) throws FalconException {
try {
LOG.info("Creating HCatalog client object for metastore {} using conf {}",
metastoreUrl, conf.toString());
@@ -117,11 +99,9 @@ public class HiveCatalogService extends AbstractCatalogService {
OozieActionConfigurationHelper.dumpConf(hcatConf, "hive conf ");
- return HCatClient.create(hcatConf);
- } catch (HCatException e) {
- throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e);
- } catch (IOException e) {
- throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e);
+ return new HiveMetaStoreClient(hcatConf);
+ } catch (Exception e) {
+ throw new FalconException("Exception creating HiveMetaStoreClient: " + e.getMessage(), e);
}
}
@@ -158,8 +138,8 @@ public class HiveCatalogService extends AbstractCatalogService {
* @return hive metastore client handle
* @throws FalconException
*/
- private static HCatClient createProxiedHCatClient(Configuration conf,
- String catalogUrl) throws FalconException {
+ private static HiveMetaStoreClient createProxiedClient(Configuration conf,
+ String catalogUrl) throws FalconException {
try {
final HiveConf hcatConf = createHiveConf(conf, catalogUrl);
@@ -167,15 +147,13 @@ public class HiveCatalogService extends AbstractCatalogService {
addSecureCredentialsAndToken(conf, hcatConf, proxyUGI);
LOG.info("Creating HCatalog client object for {}", catalogUrl);
- return proxyUGI.doAs(new PrivilegedExceptionAction<HCatClient>() {
- public HCatClient run() throws Exception {
- return HCatClient.create(hcatConf);
+ return proxyUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() {
+ public HiveMetaStoreClient run() throws Exception {
+ return new HiveMetaStoreClient(hcatConf);
}
});
- } catch (IOException e) {
- throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(), e);
- } catch (InterruptedException e) {
- throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(), e);
+ } catch (Exception e) {
+ throw new FalconException("Exception creating Proxied HiveMetaStoreClient: " + e.getMessage(), e);
}
}
@@ -216,10 +194,10 @@ public class HiveCatalogService extends AbstractCatalogService {
LOG.info("Checking if the service is alive for: {}", catalogUrl);
try {
- HCatClient client = createProxiedHCatClient(conf, catalogUrl);
- HCatDatabase database = client.getDatabase("default");
+ HiveMetaStoreClient client = createProxiedClient(conf, catalogUrl);
+ Database database = client.getDatabase("default");
return database != null;
- } catch (HCatException e) {
+ } catch (Exception e) {
throw new FalconException("Exception checking if the service is alive:" + e.getMessage(), e);
}
}
@@ -230,10 +208,10 @@ public class HiveCatalogService extends AbstractCatalogService {
LOG.info("Checking if the table exists: {}", tableName);
try {
- HCatClient client = createProxiedHCatClient(conf, catalogUrl);
- HCatTable table = client.getTable(database, tableName);
+ HiveMetaStoreClient client = createProxiedClient(conf, catalogUrl);
+ Table table = client.getTable(database, tableName);
return table != null;
- } catch (HCatException e) {
+ } catch (Exception e) {
throw new FalconException("Exception checking if the table exists:" + e.getMessage(), e);
}
}
@@ -244,10 +222,10 @@ public class HiveCatalogService extends AbstractCatalogService {
LOG.info("Checking if the table is external: {}", tableName);
try {
- HCatClient client = createHCatClient(conf, catalogUrl);
- HCatTable table = client.getTable(database, tableName);
- return !table.getTabletype().equals("MANAGED_TABLE");
- } catch (HCatException e) {
+ HiveMetaStoreClient client = createClient(conf, catalogUrl);
+ Table table = client.getTable(database, tableName);
+ return table.getTableType().equals(TableType.EXTERNAL_TABLE.name());
+ } catch (Exception e) {
throw new FalconException("Exception checking if the table is external:" + e.getMessage(), e);
}
}
@@ -261,89 +239,78 @@ public class HiveCatalogService extends AbstractCatalogService {
try {
List<CatalogPartition> catalogPartitionList = new ArrayList<CatalogPartition>();
- HCatClient client = createHCatClient(conf, catalogUrl);
- List<HCatPartition> hCatPartitions = client.listPartitionsByFilter(database, tableName, filter);
- for (HCatPartition hCatPartition : hCatPartitions) {
+ HiveMetaStoreClient client = createClient(conf, catalogUrl);
+ List<Partition> hCatPartitions = client.listPartitionsByFilter(database, tableName, filter, (short) -1);
+ for (Partition hCatPartition : hCatPartitions) {
LOG.info("Partition: " + hCatPartition.getValues());
CatalogPartition partition = createCatalogPartition(hCatPartition);
catalogPartitionList.add(partition);
}
return catalogPartitionList;
- } catch (HCatException e) {
+ } catch (Exception e) {
throw new FalconException("Exception listing partitions:" + e.getMessage(), e);
}
}
- private CatalogPartition createCatalogPartition(HCatPartition hCatPartition) {
+ private CatalogPartition createCatalogPartition(Partition hCatPartition) {
final CatalogPartition catalogPartition = new CatalogPartition();
- catalogPartition.setDatabaseName(hCatPartition.getDatabaseName());
+ catalogPartition.setDatabaseName(hCatPartition.getDbName());
catalogPartition.setTableName(hCatPartition.getTableName());
catalogPartition.setValues(hCatPartition.getValues());
- catalogPartition.setInputFormat(hCatPartition.getInputFormat());
- catalogPartition.setOutputFormat(hCatPartition.getOutputFormat());
- catalogPartition.setLocation(hCatPartition.getLocation());
- catalogPartition.setSerdeInfo(hCatPartition.getSerDe());
+ catalogPartition.setInputFormat(hCatPartition.getSd().getInputFormat());
+ catalogPartition.setOutputFormat(hCatPartition.getSd().getOutputFormat());
+ catalogPartition.setLocation(hCatPartition.getSd().getLocation());
+ catalogPartition.setSerdeInfo(hCatPartition.getSd().getSerdeInfo().getSerializationLib());
catalogPartition.setCreateTime(hCatPartition.getCreateTime());
catalogPartition.setLastAccessTime(hCatPartition.getLastAccessTime());
-
- List<String> tableColumns = new ArrayList<String>();
- for (HCatFieldSchema hCatFieldSchema : hCatPartition.getColumns()) {
- tableColumns.add(hCatFieldSchema.getName());
- }
- catalogPartition.setTableColumns(tableColumns);
-
return catalogPartition;
}
@Override
- public boolean dropPartitions(Configuration conf, String catalogUrl,
+ public boolean dropPartition(Configuration conf, String catalogUrl,
String database, String tableName,
- Map<String, String> partitions) throws FalconException {
- LOG.info("Dropping partitions for: {}, partitions: {}", tableName, partitions);
+ List<String> partitionValues, boolean deleteData) throws FalconException {
+ LOG.info("Dropping partition for: {}, partition: {}", tableName, partitionValues);
try {
- HCatClient client = createHCatClient(conf, catalogUrl);
- client.dropPartitions(database, tableName, partitions, true);
- } catch (HCatException e) {
+ HiveMetaStoreClient client = createClient(conf, catalogUrl);
+ return client.dropPartition(database, tableName, partitionValues, deleteData);
+ } catch (Exception e) {
throw new FalconException("Exception dropping partitions:" + e.getMessage(), e);
}
-
- return true;
}
@Override
- public CatalogPartition getPartition(Configuration conf, String catalogUrl,
- String database, String tableName,
- Map<String, String> partitionSpec) throws FalconException {
- LOG.info("Fetch partition for: {}, partition spec: {}", tableName, partitionSpec);
+ public void dropPartitions(Configuration conf, String catalogUrl,
+ String database, String tableName,
+ List<String> partitionValues, boolean deleteData) throws FalconException {
+ LOG.info("Dropping partitions for: {}, partitions: {}", tableName, partitionValues);
try {
- HCatClient client = createHCatClient(conf, catalogUrl);
- HCatPartition hCatPartition = client.getPartition(database, tableName, partitionSpec);
- return createCatalogPartition(hCatPartition);
- } catch (HCatException e) {
- throw new FalconException("Exception fetching partition:" + e.getMessage(), e);
+ HiveMetaStoreClient client = createClient(conf, catalogUrl);
+ List<Partition> partitions = client.listPartitions(database, tableName, partitionValues, (short) -1);
+ for (Partition part : partitions) {
+ LOG.info("Dropping partition for: {}, partition: {}", tableName, part.getValues());
+ client.dropPartition(database, tableName, part.getValues(), deleteData);
+ }
+ } catch (Exception e) {
+ throw new FalconException("Exception dropping partitions:" + e.getMessage(), e);
}
}
@Override
- public List<String> getTablePartitionCols(Configuration conf, String catalogUrl,
- String database,
- String tableName) throws FalconException {
- LOG.info("Fetching partition columns of table: " + tableName);
+ public CatalogPartition getPartition(Configuration conf, String catalogUrl,
+ String database, String tableName,
+ List<String> partitionValues) throws FalconException {
+ LOG.info("Fetch partition for: {}, partition spec: {}", tableName, partitionValues);
try {
- HCatClient client = createHCatClient(conf, catalogUrl);
- HCatTable table = client.getTable(database, tableName);
- List<HCatFieldSchema> partSchema = table.getPartCols();
- List<String> partCols = new ArrayList<String>();
- for (HCatFieldSchema part : partSchema) {
- partCols.add(part.getName());
- }
- return partCols;
- } catch (HCatException e) {
- throw new FalconException("Exception fetching partition columns: " + e.getMessage(), e);
+ HiveMetaStoreClient client = createClient(conf, catalogUrl);
+ Partition hCatPartition = client.getPartition(database, tableName, partitionValues);
+ return createCatalogPartition(hCatPartition);
+ } catch (Exception e) {
+ throw new FalconException("Exception fetching partition:" + e.getMessage(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index e68044a..59f558b 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -20,6 +20,7 @@ package org.apache.falcon.entity;
import org.apache.falcon.FalconException;
import org.apache.falcon.Pair;
+import org.apache.falcon.catalog.AbstractCatalogService;
import org.apache.falcon.catalog.CatalogPartition;
import org.apache.falcon.catalog.CatalogServiceFactory;
import org.apache.falcon.entity.common.FeedDataPath;
@@ -46,7 +47,6 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -394,25 +394,25 @@ public class CatalogStorage extends Configured implements Storage {
LOG.info("Applying retention on {}, Limit: {}, timezone: {}",
getTable(), retentionLimit, timeZone);
- // get sorted date partition keys and values
- List<String> datedPartKeys = new ArrayList<String>();
- List<String> datedPartValues = new ArrayList<String>();
List<CatalogPartition> toBeDeleted;
try {
+ // get sorted date partition keys and values
+ List<String> datedPartKeys = new ArrayList<String>();
+ List<String> datedPartValues = new ArrayList<String>();
fillSortedDatedPartitionKVs(datedPartKeys, datedPartValues, retentionLimit, timeZone);
toBeDeleted = discoverPartitionsToDelete(datedPartKeys, datedPartValues);
-
} catch (ELException e) {
throw new FalconException("Couldn't find partitions to be deleted", e);
}
+
if (toBeDeleted.isEmpty()) {
LOG.info("No partitions to delete.");
} else {
final boolean isTableExternal = CatalogServiceFactory.getCatalogService().isTableExternal(
getConf(), getCatalogUrl(), getDatabase(), getTable());
try {
- dropPartitions(toBeDeleted, datedPartKeys, isTableExternal);
+ dropPartitions(toBeDeleted, isTableExternal);
} catch (IOException e) {
throw new FalconException("Couldn't drop partitions", e);
}
@@ -513,95 +513,31 @@ public class CatalogStorage extends Configured implements Storage {
return filterBuffer.toString();
}
- private void dropPartitions(List<CatalogPartition> partitionsToDelete, List<String> datedPartKeys,
- boolean isTableExternal) throws FalconException, IOException {
-
- // get table partition columns
- List<String> partColumns = CatalogServiceFactory.getCatalogService().getTablePartitionCols(
- getConf(), getCatalogUrl(), getDatabase(), getTable());
-
- /* In case partition columns are a super-set of dated partitions, there can be multiple
- * partitions that share the same set of date-partition values. All such partitions can
- * be deleted by issuing a single HCatalog dropPartition call per date-partition values.
- * Arrange the partitions grouped by each set of date-partition values.
- */
- Map<Map<String, String>, List<CatalogPartition>> dateToPartitionsMap = new HashMap<
- Map<String, String>, List<CatalogPartition>>();
- for (CatalogPartition partitionToDrop : partitionsToDelete) {
- // create a map of name-values of all columns of this partition
- Map<String, String> partitionsMap = new HashMap<String, String>();
- for (int i = 0; i < partColumns.size(); i++) {
- partitionsMap.put(partColumns.get(i), partitionToDrop.getValues().get(i));
- }
-
- // create a map of name-values of dated sub-set of this partition
- Map<String, String> datedPartitions = new HashMap<String, String>();
- for (String datedPart : datedPartKeys) {
- datedPartitions.put(datedPart, partitionsMap.get(datedPart));
- }
+ private void dropPartitions(List<CatalogPartition> partitionsToDelete, boolean isTableExternal)
+ throws FalconException, IOException {
+ AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
+ for (CatalogPartition partition : partitionsToDelete) {
+ boolean deleted = catalogService.dropPartition(getConf(), getCatalogUrl(), getDatabase(), getTable(),
+ partition.getValues(), true);
- // add a map entry of this catalog partition corresponding to its date-partition values
- List<CatalogPartition> catalogPartitions;
- if (dateToPartitionsMap.containsKey(datedPartitions)) {
- catalogPartitions = dateToPartitionsMap.get(datedPartitions);
- } else {
- catalogPartitions = new ArrayList<CatalogPartition>();
+ if (!deleted) {
+ return;
}
- catalogPartitions.add(partitionToDrop);
- dateToPartitionsMap.put(datedPartitions, catalogPartitions);
- }
-
- // delete each entry within dateToPartitions Map
- for (Map.Entry<Map<String, String>, List<CatalogPartition>> entry : dateToPartitionsMap.entrySet()) {
- dropPartitionInstances(entry.getValue(), entry.getKey(), isTableExternal);
- }
- }
-
- private void dropPartitionInstances(List<CatalogPartition> partitionsToDrop, Map<String, String> partSpec,
- boolean isTableExternal) throws FalconException, IOException {
-
- boolean deleted = CatalogServiceFactory.getCatalogService().dropPartitions(
- getConf(), getCatalogUrl(), getDatabase(), getTable(), partSpec);
-
- if (!deleted) {
- return;
- }
- for (CatalogPartition partitionToDrop : partitionsToDrop) {
if (isTableExternal) { // nuke the dirs if an external table
- final String location = partitionToDrop.getLocation();
- final Path path = new Path(location);
- deleted = HadoopClientFactory.get()
- .createProxiedFileSystem(path.toUri()) .delete(path, true);
- }
- if (!isTableExternal || deleted) {
- // replace ',' with ';' since message producer splits instancePaths string by ','
- String partitionInfo = partitionToDrop.getValues().toString().replace("," , ";");
- LOG.info("Deleted partition: " + partitionInfo);
- instanceDates.append(partSpec).append(',');
- instancePaths.append(getEvictedPartitionPath(partitionToDrop))
- .append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
+ final Path path = new Path(partition.getLocation());
+ if (!HadoopClientFactory.get().createProxiedFileSystem(path.toUri()).delete(path, true)) {
+ throw new FalconException("Failed to delete location " + path + " for partition "
+ + partition.getValues());
+ }
}
- }
- }
- private String getEvictedPartitionPath(final CatalogPartition partitionToDrop) {
- String uriTemplate = getUriTemplate(); // no need for location type for table
- List<String> values = partitionToDrop.getValues();
- StringBuilder partitionPath = new StringBuilder();
- int index = 0;
- for (String partitionKey : getDatedPartitionKeys()) {
- String dateMask = getPartitionValue(partitionKey);
- String date = values.get(index);
-
- partitionPath.append(uriTemplate.replace(dateMask, date));
- partitionPath.append(CatalogStorage.PARTITION_SEPARATOR);
- LOG.info("partitionPath: " + partitionPath);
+ // replace ',' with ';' since message producer splits instancePaths string by ','
+ String partitionInfo = partition.getValues().toString().replace(",", ";");
+ LOG.info("Deleted partition: " + partitionInfo);
+ instanceDates.append(partitionInfo).append(',');
+ instancePaths.append(partition.getLocation()).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
}
- partitionPath.setLength(partitionPath.length() - 1);
-
- LOG.info("Return partitionPath: " + partitionPath);
- return partitionPath.toString();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7a8aacc..5a6c095 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,7 +108,6 @@
<oozie.forcebuild>false</oozie.forcebuild>
<activemq.version>5.4.3</activemq.version>
<hive.version>0.11.0</hive.version>
- <hcatalog.version>0.11.0</hcatalog.version>
<jetty.version>6.1.26</jetty.version>
<jersey.version>1.9</jersey.version>
<internal.maven.repo>file:///tmp/falcontemprepo</internal.maven.repo>
@@ -872,23 +871,11 @@
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-common</artifactId>
- <version>${hive.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
<!-- this is needed for embedded oozie -->
<dependency>
<groupId>org.apache.hcatalog</groupId>
<artifactId>webhcat-java-client</artifactId>
- <version>${hcatalog.version}</version>
+ <version>${hive.version}</version>
<exclusions>
<exclusion>
<!-- This implies you cannot use orc files -->
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java b/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java
index 4ceca29..9e8c76a 100644
--- a/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java
+++ b/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java
@@ -184,7 +184,7 @@ public class FalconAuthenticationFilterTest {
public void testGetKerberosPrincipalWithSubstitutedHostSecure() throws Exception {
String principal = StartupProperties.get().getProperty(FalconAuthenticationFilter.KERBEROS_PRINCIPAL);
- String expectedPrincipal = "falcon/" + SecurityUtil.getLocalHostName() + "@Example.com";
+ String expectedPrincipal = "falcon/" + SecurityUtil.getLocalHostName().toLowerCase() + "@Example.com";
try {
Configuration conf = new Configuration(false);
conf.set("hadoop.security.authentication", "kerberos");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index d5b7db0..d35abfa 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -153,7 +154,7 @@ public class LateDataHandler extends Configured implements Tool {
* The assumption is that if a partition has changed or reinstated, the underlying
* metric would change, either size or create time.
*
- * @param feedUriTemplate URI for the feed storage, filesystem path or table uri
+ * @param feedUri URI for the feed storage, filesystem path or table uri
* @param feedStorageType feed storage type
* @param conf configuration
* @return computed metric
@@ -161,19 +162,19 @@ public class LateDataHandler extends Configured implements Tool {
* @throws FalconException
* @throws URISyntaxException
*/
- public long computeStorageMetric(String feedUriTemplate, String feedStorageType, Configuration conf)
+ public long computeStorageMetric(String feedUri, String feedStorageType, Configuration conf)
throws IOException, FalconException, URISyntaxException {
Storage.TYPE storageType = Storage.TYPE.valueOf(feedStorageType);
if (storageType == Storage.TYPE.FILESYSTEM) {
// usage on file system is the metric
- return getFileSystemUsageMetric(feedUriTemplate, conf);
+ return getFileSystemUsageMetric(feedUri, conf);
} else if (storageType == Storage.TYPE.TABLE) {
// todo: this should have been done in oozie mapper but el ${coord:dataIn('input')} returns hcat scheme
- feedUriTemplate = feedUriTemplate.replace("hcat", "thrift");
+ feedUri = feedUri.replace("hcat", "thrift");
// creation time of the given partition is the metric
- return getTablePartitionCreateTimeMetric(feedUriTemplate);
+ return getTablePartitionCreateTimeMetric(feedUri);
}
throw new IllegalArgumentException("Unknown storage type: " + feedStorageType);
@@ -222,20 +223,20 @@ public class LateDataHandler extends Configured implements Tool {
* If this partition was reinstated, the assumption is that the create time of
* this partition would change.
*
- * @param feedUriTemplate catalog table uri
+ * @param feedUri catalog table uri
* @return metric as creation time of the given partition
* @throws IOException
* @throws URISyntaxException
* @throws FalconException
*/
- private long getTablePartitionCreateTimeMetric(String feedUriTemplate)
+ private long getTablePartitionCreateTimeMetric(String feedUri)
throws IOException, URISyntaxException, FalconException {
CatalogStorage storage = (CatalogStorage)
- FeedHelper.createStorage(Storage.TYPE.TABLE.name(), feedUriTemplate, getConf());
+ FeedHelper.createStorage(Storage.TYPE.TABLE.name(), feedUri, getConf());
CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
getConf(), storage.getCatalogUrl(), storage.getDatabase(),
- storage.getTable(), storage.getPartitions());
+ storage.getTable(), new ArrayList(storage.getPartitions().values()));
return partition == null ? 0 : partition.getCreateTime();
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
index fcf7f96..6fd23a0 100644
--- a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
+++ b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
@@ -38,8 +38,8 @@ import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -64,7 +64,7 @@ public class HiveCatalogServiceIT {
CurrentUser.authenticate(TestContext.REMOTE_USER);
hiveCatalogService = new HiveCatalogService();
- client = HiveCatalogService.getHCatClient(METASTORE_URL);
+ client = TestContext.getHCatClient(METASTORE_URL);
createDatabase();
createTable();
@@ -170,11 +170,6 @@ public class HiveCatalogServiceIT {
}
@Test
- public void testGet() throws Exception {
- Assert.assertNotNull(HiveCatalogService.getHCatClient(METASTORE_URL));
- }
-
- @Test
public void testIsAlive() throws Exception {
Assert.assertTrue(hiveCatalogService.isAlive(conf, METASTORE_URL));
}
@@ -247,7 +242,7 @@ public class HiveCatalogServiceIT {
throws Exception {
List<CatalogPartition> filteredPartitions = hiveCatalogService.listPartitionsByFilter(
- conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, greaterThanFilter);
+ conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, greaterThanFilter);
Assert.assertEquals(filteredPartitions.size(), expectedPartitionCount);
}
@@ -272,34 +267,23 @@ public class HiveCatalogServiceIT {
@Test
public void testDropPartition() throws Exception {
- Map<String, String> partialPartitionSpec = new HashMap<String, String>();
- partialPartitionSpec.put("ds", "20130903");
-
- Assert.assertTrue(hiveCatalogService.dropPartitions(
- conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partialPartitionSpec));
-
+ hiveCatalogService.dropPartition(
+ conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, Arrays.asList("20130902", "in"), true);
List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME);
- Assert.assertEquals(partitions.size(), 1, "Unexpected number of partitions");
- Assert.assertEquals(new String[]{"20130902", "in"},
+ Assert.assertEquals(partitions.size(), 2, "Unexpected number of partitions");
+ Assert.assertEquals(new String[]{"20130903", "in"},
partitions.get(0).getValues().toArray(), "Mismatched partition");
- partialPartitionSpec = new HashMap<String, String>();
- partialPartitionSpec.put("ds", "20130902");
-
- Assert.assertTrue(hiveCatalogService.dropPartitions(
- conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partialPartitionSpec));
+ hiveCatalogService.dropPartitions(
+ conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, Arrays.asList("20130903"), true);
partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME);
Assert.assertEquals(partitions.size(), 0, "Unexpected number of partitions");
}
@Test
public void testGetPartition() throws Exception {
- Map<String, String> partitionSpec = new HashMap<String, String>();
- partitionSpec.put("ds", "20130902");
- partitionSpec.put("region", "in");
-
CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
- conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec);
+ conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, Arrays.asList("20130902", "in"));
Assert.assertNotNull(partition);
long createTime = partition.getCreateTime();
@@ -308,7 +292,7 @@ public class HiveCatalogServiceIT {
@Test
public void testReInstatePartition() throws Exception {
- Map<String, String> partitionSpec = new HashMap<String, String>();
+ Map<String, String> partitionSpec = new LinkedHashMap<String, String>();
partitionSpec.put("ds", "20130918");
partitionSpec.put("region", "blah");
@@ -317,7 +301,7 @@ public class HiveCatalogServiceIT {
client.addPartition(first);
CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
- conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec);
+ conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, new ArrayList<String>(partitionSpec.values()));
Assert.assertNotNull(partition);
final long originalCreateTime = partition.getCreateTime();
@@ -331,7 +315,7 @@ public class HiveCatalogServiceIT {
client.addPartition(second);
CatalogPartition reInstatedPartition = CatalogServiceFactory.getCatalogService().getPartition(
- conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec);
+ conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, new ArrayList<String>(partitionSpec.values()));
Assert.assertNotNull(reInstatedPartition);
final long reInstatedCreateTime = reInstatedPartition.getCreateTime();
@@ -345,14 +329,4 @@ public class HiveCatalogServiceIT {
{EXTERNAL_TABLE_NAME},
};
}
-
- @Test (dataProvider = "tableName")
- public void testGetTablePartitionCols(String tableName) throws Exception {
- List<String> partCols = CatalogServiceFactory.getCatalogService().getTablePartitionCols(
- conf, METASTORE_URL, DATABASE_NAME, tableName);
- Assert.assertEquals(partCols.size(), 2);
- Collections.sort(partCols);
- Assert.assertEquals(partCols.get(0), "ds");
- Assert.assertEquals(partCols.get(1), "region");
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
index c4e046b..1885bb7 100644
--- a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
+++ b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
@@ -20,7 +20,6 @@ package org.apache.falcon.late;
import org.apache.falcon.catalog.CatalogPartition;
import org.apache.falcon.catalog.CatalogServiceFactory;
-import org.apache.falcon.catalog.HiveCatalogService;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
@@ -42,6 +41,7 @@ import org.testng.annotations.Test;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -199,7 +199,7 @@ public class LateDataHandlerIT {
}
private void reinstatePartition() throws Exception {
- final HCatClient client = HiveCatalogService.getHCatClient(metastoreUrl);
+ final HCatClient client = context.getHCatClient(metastoreUrl);
Map<String, String> partitionSpec = new HashMap<String, String>();
partitionSpec.put("ds", PARTITION_VALUE);
@@ -213,7 +213,7 @@ public class LateDataHandlerIT {
client.addPartition(reinstatedPartition);
CatalogPartition reInstatedPartition = CatalogServiceFactory.getCatalogService().getPartition(
- conf, metastoreUrl, DATABASE_NAME, TABLE_NAME, partitionSpec);
+ conf, metastoreUrl, DATABASE_NAME, TABLE_NAME, new ArrayList<String>(partitionSpec.values()));
Assert.assertNotNull(reInstatedPartition);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
index d508a2d..ba6698c 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
@@ -20,9 +20,9 @@ package org.apache.falcon.lifecycle;
import org.apache.commons.el.ExpressionEvaluatorImpl;
import org.apache.falcon.Pair;
-import org.apache.falcon.catalog.HiveCatalogService;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.resource.TestContext;
import org.apache.falcon.retention.FeedEvictor;
import org.apache.falcon.util.HiveTestUtils;
import org.apache.hadoop.conf.Configuration;
@@ -87,7 +87,7 @@ public class TableStorageFeedEvictorIT {
public void setUp() throws Exception {
FeedEvictor.OUT.set(stream);
- client = HiveCatalogService.getHCatClient(METASTORE_URL);
+ client = TestContext.getHCatClient(METASTORE_URL);
HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME);
final List<String> partitionKeys = Arrays.asList("ds", "region");
@@ -156,7 +156,7 @@ public class TableStorageFeedEvictorIT {
"-retentionType", "instance",
"-retentionLimit", retentionLimit,
"-timeZone", timeZone,
- "-frequency", "daily",
+ "-frequency", "days(1)",
"-logFile", logFile,
"-falconFeedStorageType", Storage.TYPE.TABLE.name(),
});
@@ -372,15 +372,16 @@ public class TableStorageFeedEvictorIT {
FileSystem fs = path.getFileSystem(new Configuration());
for (String candidatePartition : candidatePartitions) {
+ path = new Path(EXTERNAL_TABLE_LOCATION + candidatePartition);
if (isTableExternal) {
- touch(fs, EXTERNAL_TABLE_LOCATION + candidatePartition);
+ touch(fs, path.toString());
}
Map<String, String> partition = new HashMap<String, String>();
partition.put("ds", candidatePartition); //yyyyMMDD
partition.put("region", "in");
HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(
- DATABASE_NAME, tableName, null, partition).build();
+ DATABASE_NAME, tableName, isTableExternal ? path.toString() : null, partition).build();
client.addPartition(addPtn);
}
}
@@ -408,7 +409,7 @@ public class TableStorageFeedEvictorIT {
}
private void touch(FileSystem fs, String path) throws Exception {
- fs.create(new Path(path)).close();
+ fs.mkdirs(new Path(path));
}
private void dropPartitions(String tableName, List<String> candidatePartitions) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index 64f98d4..23df745 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.net.util.TrustManagerUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconRuntimException;
+import org.apache.falcon.catalog.HiveCatalogService;
import org.apache.falcon.cli.FalconCLI;
import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.client.FalconClient;
@@ -42,11 +43,14 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hcatalog.api.HCatClient;
import org.testng.Assert;
import javax.net.ssl.HostnameVerifier;
@@ -123,6 +127,22 @@ public class TestContext {
}
}
+ /**
+ * This is only used for tests.
+ *
+ * @param metastoreUrl metastore url
+ * @return client object
+ * @throws FalconException
+ */
+ public static HCatClient getHCatClient(String metastoreUrl) throws FalconException {
+ try {
+ HiveConf hcatConf = HiveCatalogService.createHiveConf(new Configuration(false), metastoreUrl);
+ return HCatClient.create(hcatConf);
+ } catch (Exception e) {
+ throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e);
+ }
+ }
+
public void configure() throws Exception {
try {
StartupProperties.get().setProperty(
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java b/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java
index 3b71f08..9fd1a9d 100644
--- a/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java
+++ b/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java
@@ -18,7 +18,7 @@
package org.apache.falcon.util;
-import org.apache.falcon.catalog.HiveCatalogService;
+import org.apache.falcon.resource.TestContext;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -44,20 +44,20 @@ public final class HiveTestUtils {
public static void createDatabase(String metaStoreUrl,
String databaseName) throws Exception {
- HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl);
+ HCatClient client = TestContext.getHCatClient(metaStoreUrl);
HCatCreateDBDesc dbDesc = HCatCreateDBDesc.create(databaseName)
.ifNotExists(true).build();
client.createDatabase(dbDesc);
}
public static void dropDatabase(String metaStoreUrl, String databaseName) throws Exception {
- HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl);
+ HCatClient client = TestContext.getHCatClient(metaStoreUrl);
client.dropDatabase(databaseName, true, HCatClient.DropDBMode.CASCADE);
}
public static void createTable(String metaStoreUrl, String databaseName,
String tableName) throws Exception {
- HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl);
+ HCatClient client = TestContext.getHCatClient(metaStoreUrl);
ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
cols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, "id comment"));
cols.add(new HCatFieldSchema("value", HCatFieldSchema.Type.STRING, "value comment"));
@@ -72,7 +72,7 @@ public final class HiveTestUtils {
public static void createTable(String metaStoreUrl, String databaseName, String tableName,
List<String> partitionKeys) throws Exception {
- HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl);
+ HCatClient client = TestContext.getHCatClient(metaStoreUrl);
ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
cols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, "id comment"));
cols.add(new HCatFieldSchema("value", HCatFieldSchema.Type.STRING, "value comment"));
@@ -112,7 +112,7 @@ public final class HiveTestUtils {
.location(externalLocation)
.build();
- HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl);
+ HCatClient client = TestContext.getHCatClient(metaStoreUrl);
client.createTable(tableDesc);
}
@@ -147,7 +147,7 @@ public final class HiveTestUtils {
public static void dropTable(String metaStoreUrl, String databaseName,
String tableName) throws Exception {
- HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl);
+ HCatClient client = TestContext.getHCatClient(metaStoreUrl);
client.dropTable(databaseName, tableName, true);
}
@@ -182,6 +182,6 @@ public final class HiveTestUtils {
Map<String, String> partitionSpec = new HashMap<String, String>();
partitionSpec.put(partitionKey, partitionValue);
- return HiveCatalogService.getHCatClient(metastoreUrl).getPartition(databaseName, tableName, partitionSpec);
+ return TestContext.getHCatClient(metastoreUrl).getPartition(databaseName, tableName, partitionSpec);
}
}