You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/12/12 06:17:25 UTC

incubator-falcon git commit: FALCON-419 Update deprecated HCatalog API to use Hive Metastore API. Contributed by Shwetha GS

Repository: incubator-falcon
Updated Branches:
  refs/heads/master b364820dd -> a38cebf42


FALCON-419 Update deprecated HCatalog API to use Hive Metastore API. Contributed by Shwetha GS


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/a38cebf4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/a38cebf4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/a38cebf4

Branch: refs/heads/master
Commit: a38cebf42b0addaf8fc2e7892f55166a3cd902b5
Parents: b364820
Author: shwethags <sh...@inmobi.com>
Authored: Fri Dec 12 10:46:19 2014 +0530
Committer: shwethags <sh...@inmobi.com>
Committed: Fri Dec 12 10:46:54 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 common/pom.xml                                  |   2 +-
 .../falcon/catalog/AbstractCatalogService.java  |  43 ++---
 .../apache/falcon/catalog/CatalogPartition.java |  13 --
 .../falcon/catalog/HiveCatalogService.java      | 163 ++++++++-----------
 .../apache/falcon/entity/CatalogStorage.java    | 112 +++----------
 pom.xml                                         |  15 +-
 .../FalconAuthenticationFilterTest.java         |   2 +-
 .../apache/falcon/latedata/LateDataHandler.java |  19 ++-
 .../falcon/catalog/HiveCatalogServiceIT.java    |  52 ++----
 .../apache/falcon/late/LateDataHandlerIT.java   |   6 +-
 .../lifecycle/TableStorageFeedEvictorIT.java    |  13 +-
 .../org/apache/falcon/resource/TestContext.java |  20 +++
 .../org/apache/falcon/util/HiveTestUtils.java   |  16 +-
 14 files changed, 178 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f671e56..e9758c0 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -32,6 +32,9 @@ Trunk (Unreleased)
    Seetharam)
 
   OPTIMIZATIONS
+   FALCON-419 Update deprecated HCatalog API to use Hive Metastore API.
+   (Shwetha GS)
+
    FALCON-423 Updating falcon server endpoint in distributed setup doesn't 
    work. (Srikanth Sundarrajan)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 50dd2ea..b349c2f 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -154,7 +154,7 @@
 
         <dependency>
             <groupId>org.apache.hive</groupId>
-            <artifactId>hive-common</artifactId>
+            <artifactId>hive-metastore</artifactId>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
index 348fac0..9abdc93 100644
--- a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
@@ -22,7 +22,6 @@ import org.apache.falcon.FalconException;
 import org.apache.hadoop.conf.Configuration;
 
 import java.util.List;
-import java.util.Map;
 
 /**
  * Interface definition for a catalog registry service
@@ -88,47 +87,49 @@ public abstract class AbstractCatalogService {
     /**
      * Drops a given partition. Executed in the workflow engine.
      *
-     *
      * @param conf  conf object
      * @param catalogUrl url for the catalog service
      * @param database database the table belongs to
      * @param tableName tableName to check if it exists
-     * @param partitions list of partitions as Key=Value pairs
+     * @param partitionValues list of partition values
+     * @param deleteData should dropPartition also delete the corresponding data
      * @return if the partition was dropped
      * @throws FalconException
      */
-    public abstract boolean dropPartitions(Configuration conf, String catalogUrl,
-                                           String database, String tableName,
-                                           Map<String, String> partitions) throws FalconException;
+    public abstract boolean dropPartition(Configuration conf, String catalogUrl,
+                                           String database, String tableName, List<String> partitionValues,
+                                           boolean deleteData) throws FalconException;
 
     /**
-     * Gets the partition. Executed in the workflow engine.
-     *
+     * Drops the partitions. Executed in the workflow engine.
      *
-     * @param conf  conf
+     * @param conf  conf object
      * @param catalogUrl url for the catalog service
      * @param database database the table belongs to
      * @param tableName tableName to check if it exists
-     * @param partitionSpec The partition specification, {[col_name,value],[col_name2,value2]}.
-     *                      All partition-key-values must be specified.
-     * @return An instance of CatalogPartition.
+     * @param partitionValues list of partition values
+     * @param deleteData should dropPartition also delete the corresponding data
+     * @return if the partition was dropped
      * @throws FalconException
      */
-    public abstract CatalogPartition getPartition(Configuration conf, String catalogUrl,
-                                                  String database, String tableName,
-                                                  Map<String, String> partitionSpec)
-        throws FalconException;
+    public abstract void dropPartitions(Configuration conf, String catalogUrl,
+                                        String database, String tableName,
+                                        List<String> partitionValues, boolean deleteData) throws FalconException;
 
     /**
+     * Gets the partition. Executed in the workflow engine.
+     *
      *
      * @param conf  conf
      * @param catalogUrl url for the catalog service
      * @param database database the table belongs to
-     * @param tableName table name
-     * @return list of partition column names of the table
+     * @param tableName tableName to check if it exists
+     * @param partitionValues Values for partition columns.
+     * @return An instance of CatalogPartition.
      * @throws FalconException
      */
-    public abstract List<String> getTablePartitionCols(Configuration conf, String catalogUrl,
-                                                       String database,
-                                                       String tableName) throws FalconException;
+    public abstract CatalogPartition getPartition(Configuration conf, String catalogUrl,
+                                                  String database, String tableName,
+                                                  List<String> partitionValues)
+        throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
index c5d4705..032ae38 100644
--- a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
+++ b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
@@ -30,7 +30,6 @@ public class CatalogPartition {
     private List<String> values;
     private long createTime;
     private long lastAccessTime;
-    private List<String> tableColumns;
     private String inputFormat;
     private String outputFormat;
     private String location;
@@ -59,10 +58,6 @@ public class CatalogPartition {
         this.lastAccessTime = lastAccessTime;
     }
 
-    protected void setTableColumns(List<String> tableColumns) {
-        this.tableColumns = tableColumns;
-    }
-
     protected void setInputFormat(String inputFormat) {
         this.inputFormat = inputFormat;
     }
@@ -97,14 +92,6 @@ public class CatalogPartition {
         return this.tableName;
     }
 
-    /**
-     * Gets the columns of the table.
-     *
-     * @return the columns
-     */
-    public List<String> getColumns() {
-        return this.tableColumns;
-    }
 
     /**
      * Gets the input format.

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
index 3216f1e..f59b83b 100644
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -24,6 +24,11 @@ import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.Credentials;
@@ -31,12 +36,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.io.Text;
 import org.apache.hcatalog.api.HCatClient;
-import org.apache.hcatalog.api.HCatDatabase;
-import org.apache.hcatalog.api.HCatPartition;
-import org.apache.hcatalog.api.HCatTable;
 import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
-import org.apache.hcatalog.common.HCatException;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +45,6 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 /**
  * An implementation of CatalogService that uses Hive Meta Store (HCatalog)
@@ -55,25 +54,8 @@ public class HiveCatalogService extends AbstractCatalogService {
 
     private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogService.class);
 
-    /**
-     * This is only used for tests.
-     *
-     * @param metastoreUrl metastore url
-     * @return client object
-     * @throws FalconException
-     */
-    public static HCatClient getHCatClient(String metastoreUrl) throws FalconException {
-        try {
-            HiveConf hcatConf = createHiveConf(new Configuration(false), metastoreUrl);
-            return HCatClient.create(hcatConf);
-        } catch (HCatException e) {
-            throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e);
-        } catch (IOException e) {
-            throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e);
-        }
-    }
 
-    private static HiveConf createHiveConf(Configuration conf,
+    public static HiveConf createHiveConf(Configuration conf,
                                            String metastoreUrl) throws IOException {
         HiveConf hcatConf = new HiveConf(conf, HiveConf.class);
 
@@ -97,8 +79,8 @@ public class HiveCatalogService extends AbstractCatalogService {
      * @return hive metastore client handle
      * @throws FalconException
      */
-    private static HCatClient createHCatClient(Configuration conf,
-                                               String metastoreUrl) throws FalconException {
+    private static HiveMetaStoreClient createClient(Configuration conf,
+                                                    String metastoreUrl) throws FalconException {
         try {
             LOG.info("Creating HCatalog client object for metastore {} using conf {}",
                 metastoreUrl, conf.toString());
@@ -117,11 +99,9 @@ public class HiveCatalogService extends AbstractCatalogService {
 
             OozieActionConfigurationHelper.dumpConf(hcatConf, "hive conf ");
 
-            return HCatClient.create(hcatConf);
-        } catch (HCatException e) {
-            throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e);
-        } catch (IOException e) {
-            throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e);
+            return new HiveMetaStoreClient(hcatConf);
+        } catch (Exception e) {
+            throw new FalconException("Exception creating HiveMetaStoreClient: " + e.getMessage(), e);
         }
     }
 
@@ -158,8 +138,8 @@ public class HiveCatalogService extends AbstractCatalogService {
      * @return hive metastore client handle
      * @throws FalconException
      */
-    private static HCatClient createProxiedHCatClient(Configuration conf,
-                                                      String catalogUrl) throws FalconException {
+    private static HiveMetaStoreClient createProxiedClient(Configuration conf,
+                                                           String catalogUrl) throws FalconException {
 
         try {
             final HiveConf hcatConf = createHiveConf(conf, catalogUrl);
@@ -167,15 +147,13 @@ public class HiveCatalogService extends AbstractCatalogService {
             addSecureCredentialsAndToken(conf, hcatConf, proxyUGI);
 
             LOG.info("Creating HCatalog client object for {}", catalogUrl);
-            return proxyUGI.doAs(new PrivilegedExceptionAction<HCatClient>() {
-                public HCatClient run() throws Exception {
-                    return HCatClient.create(hcatConf);
+            return proxyUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() {
+                public HiveMetaStoreClient run() throws Exception {
+                    return new HiveMetaStoreClient(hcatConf);
                 }
             });
-        } catch (IOException e) {
-            throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(), e);
-        } catch (InterruptedException e) {
-            throw new FalconException("Exception creating Proxied HCatClient: " + e.getMessage(), e);
+        } catch (Exception e) {
+            throw new FalconException("Exception creating Proxied HiveMetaStoreClient: " + e.getMessage(), e);
         }
     }
 
@@ -216,10 +194,10 @@ public class HiveCatalogService extends AbstractCatalogService {
         LOG.info("Checking if the service is alive for: {}", catalogUrl);
 
         try {
-            HCatClient client = createProxiedHCatClient(conf, catalogUrl);
-            HCatDatabase database = client.getDatabase("default");
+            HiveMetaStoreClient client = createProxiedClient(conf, catalogUrl);
+            Database database = client.getDatabase("default");
             return database != null;
-        } catch (HCatException e) {
+        } catch (Exception e) {
             throw new FalconException("Exception checking if the service is alive:" + e.getMessage(), e);
         }
     }
@@ -230,10 +208,10 @@ public class HiveCatalogService extends AbstractCatalogService {
         LOG.info("Checking if the table exists: {}", tableName);
 
         try {
-            HCatClient client = createProxiedHCatClient(conf, catalogUrl);
-            HCatTable table = client.getTable(database, tableName);
+            HiveMetaStoreClient client = createProxiedClient(conf, catalogUrl);
+            Table table = client.getTable(database, tableName);
             return table != null;
-        } catch (HCatException e) {
+        } catch (Exception e) {
             throw new FalconException("Exception checking if the table exists:" + e.getMessage(), e);
         }
     }
@@ -244,10 +222,10 @@ public class HiveCatalogService extends AbstractCatalogService {
         LOG.info("Checking if the table is external: {}", tableName);
 
         try {
-            HCatClient client = createHCatClient(conf, catalogUrl);
-            HCatTable table = client.getTable(database, tableName);
-            return !table.getTabletype().equals("MANAGED_TABLE");
-        } catch (HCatException e) {
+            HiveMetaStoreClient client = createClient(conf, catalogUrl);
+            Table table = client.getTable(database, tableName);
+            return table.getTableType().equals(TableType.EXTERNAL_TABLE.name());
+        } catch (Exception e) {
             throw new FalconException("Exception checking if the table is external:" + e.getMessage(), e);
         }
     }
@@ -261,89 +239,78 @@ public class HiveCatalogService extends AbstractCatalogService {
         try {
             List<CatalogPartition> catalogPartitionList = new ArrayList<CatalogPartition>();
 
-            HCatClient client = createHCatClient(conf, catalogUrl);
-            List<HCatPartition> hCatPartitions = client.listPartitionsByFilter(database, tableName, filter);
-            for (HCatPartition hCatPartition : hCatPartitions) {
+            HiveMetaStoreClient client = createClient(conf, catalogUrl);
+            List<Partition> hCatPartitions = client.listPartitionsByFilter(database, tableName, filter, (short) -1);
+            for (Partition hCatPartition : hCatPartitions) {
                 LOG.info("Partition: " + hCatPartition.getValues());
                 CatalogPartition partition = createCatalogPartition(hCatPartition);
                 catalogPartitionList.add(partition);
             }
 
             return catalogPartitionList;
-        } catch (HCatException e) {
+        } catch (Exception e) {
             throw new FalconException("Exception listing partitions:" + e.getMessage(), e);
         }
     }
 
-    private CatalogPartition createCatalogPartition(HCatPartition hCatPartition) {
+    private CatalogPartition createCatalogPartition(Partition hCatPartition) {
         final CatalogPartition catalogPartition = new CatalogPartition();
-        catalogPartition.setDatabaseName(hCatPartition.getDatabaseName());
+        catalogPartition.setDatabaseName(hCatPartition.getDbName());
         catalogPartition.setTableName(hCatPartition.getTableName());
         catalogPartition.setValues(hCatPartition.getValues());
-        catalogPartition.setInputFormat(hCatPartition.getInputFormat());
-        catalogPartition.setOutputFormat(hCatPartition.getOutputFormat());
-        catalogPartition.setLocation(hCatPartition.getLocation());
-        catalogPartition.setSerdeInfo(hCatPartition.getSerDe());
+        catalogPartition.setInputFormat(hCatPartition.getSd().getInputFormat());
+        catalogPartition.setOutputFormat(hCatPartition.getSd().getOutputFormat());
+        catalogPartition.setLocation(hCatPartition.getSd().getLocation());
+        catalogPartition.setSerdeInfo(hCatPartition.getSd().getSerdeInfo().getSerializationLib());
         catalogPartition.setCreateTime(hCatPartition.getCreateTime());
         catalogPartition.setLastAccessTime(hCatPartition.getLastAccessTime());
-
-        List<String> tableColumns = new ArrayList<String>();
-        for (HCatFieldSchema hCatFieldSchema : hCatPartition.getColumns()) {
-            tableColumns.add(hCatFieldSchema.getName());
-        }
-        catalogPartition.setTableColumns(tableColumns);
-
         return catalogPartition;
     }
 
     @Override
-    public boolean dropPartitions(Configuration conf, String catalogUrl,
+    public boolean dropPartition(Configuration conf, String catalogUrl,
                                   String database, String tableName,
-                                  Map<String, String> partitions) throws FalconException {
-        LOG.info("Dropping partitions for: {}, partitions: {}", tableName, partitions);
+                                  List<String> partitionValues, boolean deleteData) throws FalconException {
+        LOG.info("Dropping partition for: {}, partition: {}", tableName, partitionValues);
 
         try {
-            HCatClient client = createHCatClient(conf, catalogUrl);
-            client.dropPartitions(database, tableName, partitions, true);
-        } catch (HCatException e) {
+            HiveMetaStoreClient client = createClient(conf, catalogUrl);
+            return client.dropPartition(database, tableName, partitionValues, deleteData);
+        } catch (Exception e) {
             throw new FalconException("Exception dropping partitions:" + e.getMessage(), e);
         }
-
-        return true;
     }
 
     @Override
-    public CatalogPartition getPartition(Configuration conf, String catalogUrl,
-                                         String database, String tableName,
-                                         Map<String, String> partitionSpec) throws FalconException {
-        LOG.info("Fetch partition for: {}, partition spec: {}", tableName, partitionSpec);
+    public void dropPartitions(Configuration conf, String catalogUrl,
+                               String database, String tableName,
+                               List<String> partitionValues, boolean deleteData) throws FalconException {
+        LOG.info("Dropping partitions for: {}, partitions: {}", tableName, partitionValues);
 
         try {
-            HCatClient client = createHCatClient(conf, catalogUrl);
-            HCatPartition hCatPartition = client.getPartition(database, tableName, partitionSpec);
-            return createCatalogPartition(hCatPartition);
-        } catch (HCatException e) {
-            throw new FalconException("Exception fetching partition:" + e.getMessage(), e);
+            HiveMetaStoreClient client = createClient(conf, catalogUrl);
+            List<Partition> partitions = client.listPartitions(database, tableName, partitionValues, (short) -1);
+            for (Partition part : partitions) {
+                LOG.info("Dropping partition for: {}, partition: {}", tableName, part.getValues());
+                client.dropPartition(database, tableName, part.getValues(), deleteData);
+            }
+        } catch (Exception e) {
+            throw new FalconException("Exception dropping partitions:" + e.getMessage(), e);
         }
     }
 
     @Override
-    public List<String> getTablePartitionCols(Configuration conf, String catalogUrl,
-                                              String database,
-                                              String tableName) throws FalconException {
-        LOG.info("Fetching partition columns of table: " + tableName);
+    public CatalogPartition getPartition(Configuration conf, String catalogUrl,
+                                         String database, String tableName,
+                                         List<String> partitionValues) throws FalconException {
+        LOG.info("Fetch partition for: {}, partition spec: {}", tableName, partitionValues);
 
         try {
-            HCatClient client = createHCatClient(conf, catalogUrl);
-            HCatTable table = client.getTable(database, tableName);
-            List<HCatFieldSchema> partSchema = table.getPartCols();
-            List<String> partCols = new ArrayList<String>();
-            for (HCatFieldSchema part : partSchema) {
-                partCols.add(part.getName());
-            }
-            return partCols;
-        } catch (HCatException e) {
-            throw new FalconException("Exception fetching partition columns: " + e.getMessage(), e);
+            HiveMetaStoreClient client = createClient(conf, catalogUrl);
+            Partition hCatPartition = client.getPartition(database, tableName, partitionValues);
+            return createCatalogPartition(hCatPartition);
+        } catch (Exception e) {
+            throw new FalconException("Exception fetching partition:" + e.getMessage(), e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
index e68044a..59f558b 100644
--- a/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/CatalogStorage.java
@@ -20,6 +20,7 @@ package org.apache.falcon.entity;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Pair;
+import org.apache.falcon.catalog.AbstractCatalogService;
 import org.apache.falcon.catalog.CatalogPartition;
 import org.apache.falcon.catalog.CatalogServiceFactory;
 import org.apache.falcon.entity.common.FeedDataPath;
@@ -46,7 +47,6 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -394,25 +394,25 @@ public class CatalogStorage extends Configured implements Storage {
         LOG.info("Applying retention on {}, Limit: {}, timezone: {}",
                 getTable(), retentionLimit, timeZone);
 
-        // get sorted date partition keys and values
-        List<String> datedPartKeys = new ArrayList<String>();
-        List<String> datedPartValues = new ArrayList<String>();
         List<CatalogPartition> toBeDeleted;
         try {
+            // get sorted date partition keys and values
+            List<String> datedPartKeys = new ArrayList<String>();
+            List<String> datedPartValues = new ArrayList<String>();
             fillSortedDatedPartitionKVs(datedPartKeys, datedPartValues, retentionLimit, timeZone);
             toBeDeleted = discoverPartitionsToDelete(datedPartKeys, datedPartValues);
-
         } catch (ELException e) {
             throw new FalconException("Couldn't find partitions to be deleted", e);
 
         }
+
         if (toBeDeleted.isEmpty()) {
             LOG.info("No partitions to delete.");
         } else {
             final boolean isTableExternal = CatalogServiceFactory.getCatalogService().isTableExternal(
                 getConf(), getCatalogUrl(), getDatabase(), getTable());
             try {
-                dropPartitions(toBeDeleted, datedPartKeys, isTableExternal);
+                dropPartitions(toBeDeleted, isTableExternal);
             } catch (IOException e) {
                 throw new FalconException("Couldn't drop partitions", e);
             }
@@ -513,95 +513,31 @@ public class CatalogStorage extends Configured implements Storage {
         return filterBuffer.toString();
     }
 
-    private void dropPartitions(List<CatalogPartition> partitionsToDelete, List<String> datedPartKeys,
-                                boolean isTableExternal) throws FalconException, IOException {
-
-        // get table partition columns
-        List<String> partColumns = CatalogServiceFactory.getCatalogService().getTablePartitionCols(
-            getConf(), getCatalogUrl(), getDatabase(), getTable());
-
-        /* In case partition columns are a super-set of dated partitions, there can be multiple
-         * partitions that share the same set of date-partition values. All such partitions can
-         * be deleted by issuing a single HCatalog dropPartition call per date-partition values.
-         * Arrange the partitions grouped by each set of date-partition values.
-         */
-        Map<Map<String, String>, List<CatalogPartition>> dateToPartitionsMap = new HashMap<
-                Map<String, String>, List<CatalogPartition>>();
-        for (CatalogPartition partitionToDrop : partitionsToDelete) {
-            // create a map of name-values of all columns of this partition
-            Map<String, String> partitionsMap = new HashMap<String, String>();
-            for (int i = 0; i < partColumns.size(); i++) {
-                partitionsMap.put(partColumns.get(i), partitionToDrop.getValues().get(i));
-            }
-
-            // create a map of name-values of dated sub-set of this partition
-            Map<String, String> datedPartitions = new HashMap<String, String>();
-            for (String datedPart : datedPartKeys) {
-                datedPartitions.put(datedPart, partitionsMap.get(datedPart));
-            }
+    private void dropPartitions(List<CatalogPartition> partitionsToDelete, boolean isTableExternal)
+        throws FalconException, IOException {
+        AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService();
+        for (CatalogPartition partition : partitionsToDelete) {
+            boolean deleted = catalogService.dropPartition(getConf(), getCatalogUrl(), getDatabase(), getTable(),
+                    partition.getValues(), true);
 
-            // add a map entry of this catalog partition corresponding to its date-partition values
-            List<CatalogPartition> catalogPartitions;
-            if (dateToPartitionsMap.containsKey(datedPartitions)) {
-                catalogPartitions = dateToPartitionsMap.get(datedPartitions);
-            } else {
-                catalogPartitions = new ArrayList<CatalogPartition>();
+            if (!deleted) {
+                return;
             }
-            catalogPartitions.add(partitionToDrop);
-            dateToPartitionsMap.put(datedPartitions, catalogPartitions);
-        }
-
-        // delete each entry within dateToPartitions Map
-        for (Map.Entry<Map<String, String>, List<CatalogPartition>> entry : dateToPartitionsMap.entrySet()) {
-            dropPartitionInstances(entry.getValue(), entry.getKey(), isTableExternal);
-        }
-    }
-
-    private void dropPartitionInstances(List<CatalogPartition> partitionsToDrop, Map<String, String> partSpec,
-                                        boolean isTableExternal) throws FalconException, IOException {
-
-        boolean deleted = CatalogServiceFactory.getCatalogService().dropPartitions(
-            getConf(), getCatalogUrl(), getDatabase(), getTable(), partSpec);
-
-        if (!deleted) {
-            return;
-        }
 
-        for (CatalogPartition partitionToDrop : partitionsToDrop) {
             if (isTableExternal) { // nuke the dirs if an external table
-                final String location = partitionToDrop.getLocation();
-                final Path path = new Path(location);
-                deleted = HadoopClientFactory.get()
-                        .createProxiedFileSystem(path.toUri()) .delete(path, true);
-            }
-            if (!isTableExternal || deleted) {
-                // replace ',' with ';' since message producer splits instancePaths string by ','
-                String partitionInfo = partitionToDrop.getValues().toString().replace("," , ";");
-                LOG.info("Deleted partition: " + partitionInfo);
-                instanceDates.append(partSpec).append(',');
-                instancePaths.append(getEvictedPartitionPath(partitionToDrop))
-                        .append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
+                final Path path = new Path(partition.getLocation());
+                if (!HadoopClientFactory.get().createProxiedFileSystem(path.toUri()).delete(path, true)) {
+                    throw new FalconException("Failed to delete location " + path + " for partition "
+                            + partition.getValues());
+                }
             }
-        }
-    }
 
-    private String getEvictedPartitionPath(final CatalogPartition partitionToDrop) {
-        String uriTemplate = getUriTemplate(); // no need for location type for table
-        List<String> values = partitionToDrop.getValues();
-        StringBuilder partitionPath = new StringBuilder();
-        int index = 0;
-        for (String partitionKey : getDatedPartitionKeys()) {
-            String dateMask = getPartitionValue(partitionKey);
-            String date = values.get(index);
-
-            partitionPath.append(uriTemplate.replace(dateMask, date));
-            partitionPath.append(CatalogStorage.PARTITION_SEPARATOR);
-            LOG.info("partitionPath: " + partitionPath);
+            // replace ',' with ';' since message producer splits instancePaths string by ','
+            String partitionInfo = partition.getValues().toString().replace(",", ";");
+            LOG.info("Deleted partition: " + partitionInfo);
+            instanceDates.append(partitionInfo).append(',');
+            instancePaths.append(partition.getLocation()).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
         }
-        partitionPath.setLength(partitionPath.length() - 1);
-
-        LOG.info("Return partitionPath: " + partitionPath);
-        return partitionPath.toString();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7a8aacc..5a6c095 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,7 +108,6 @@
         <oozie.forcebuild>false</oozie.forcebuild>
         <activemq.version>5.4.3</activemq.version>
         <hive.version>0.11.0</hive.version>
-        <hcatalog.version>0.11.0</hcatalog.version>
         <jetty.version>6.1.26</jetty.version>
         <jersey.version>1.9</jersey.version>
         <internal.maven.repo>file:///tmp/falcontemprepo</internal.maven.repo>
@@ -872,23 +871,11 @@
                 </exclusions>
             </dependency>
 
-            <dependency>
-                <groupId>org.apache.hive</groupId>
-                <artifactId>hive-common</artifactId>
-                <version>${hive.version}</version>
-                <exclusions>
-                    <exclusion>
-                        <groupId>org.apache.hadoop</groupId>
-                        <artifactId>hadoop-client</artifactId>
-                    </exclusion>
-                </exclusions>
-            </dependency>
-
             <!--  this is needed for embedded oozie -->
             <dependency>
                 <groupId>org.apache.hcatalog</groupId>
                 <artifactId>webhcat-java-client</artifactId>
-                <version>${hcatalog.version}</version>
+                <version>${hive.version}</version>
                 <exclusions>
                     <exclusion>
                         <!-- This implies you cannot use orc files -->

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java b/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java
index 4ceca29..9e8c76a 100644
--- a/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java
+++ b/prism/src/test/java/org/apache/falcon/security/FalconAuthenticationFilterTest.java
@@ -184,7 +184,7 @@ public class FalconAuthenticationFilterTest {
     public void testGetKerberosPrincipalWithSubstitutedHostSecure() throws Exception {
         String principal = StartupProperties.get().getProperty(FalconAuthenticationFilter.KERBEROS_PRINCIPAL);
 
-        String expectedPrincipal = "falcon/" + SecurityUtil.getLocalHostName() + "@Example.com";
+        String expectedPrincipal = "falcon/" + SecurityUtil.getLocalHostName().toLowerCase() + "@Example.com";
         try {
             Configuration conf = new Configuration(false);
             conf.set("hadoop.security.authentication", "kerberos");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index d5b7db0..d35abfa 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.*;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -153,7 +154,7 @@ public class LateDataHandler extends Configured implements Tool {
      * The assumption is that if a partition has changed or reinstated, the underlying
      * metric would change, either size or create time.
      *
-     * @param feedUriTemplate URI for the feed storage, filesystem path or table uri
+     * @param feedUri URI for the feed storage, filesystem path or table uri
      * @param feedStorageType feed storage type
      * @param conf configuration
      * @return computed metric
@@ -161,19 +162,19 @@ public class LateDataHandler extends Configured implements Tool {
      * @throws FalconException
      * @throws URISyntaxException
      */
-    public long computeStorageMetric(String feedUriTemplate, String feedStorageType, Configuration conf)
+    public long computeStorageMetric(String feedUri, String feedStorageType, Configuration conf)
         throws IOException, FalconException, URISyntaxException {
 
         Storage.TYPE storageType = Storage.TYPE.valueOf(feedStorageType);
 
         if (storageType == Storage.TYPE.FILESYSTEM) {
             // usage on file system is the metric
-            return getFileSystemUsageMetric(feedUriTemplate, conf);
+            return getFileSystemUsageMetric(feedUri, conf);
         } else if (storageType == Storage.TYPE.TABLE) {
             // todo: this should have been done in oozie mapper but el ${coord:dataIn('input')} returns hcat scheme
-            feedUriTemplate = feedUriTemplate.replace("hcat", "thrift");
+            feedUri = feedUri.replace("hcat", "thrift");
             // creation time of the given partition is the metric
-            return getTablePartitionCreateTimeMetric(feedUriTemplate);
+            return getTablePartitionCreateTimeMetric(feedUri);
         }
 
         throw new IllegalArgumentException("Unknown storage type: " + feedStorageType);
@@ -222,20 +223,20 @@ public class LateDataHandler extends Configured implements Tool {
      * If this partition was reinstated, the assumption is that the create time of
      * this partition would change.
      *
-     * @param feedUriTemplate catalog table uri
+     * @param feedUri catalog table uri
      * @return metric as creation time of the given partition
      * @throws IOException
      * @throws URISyntaxException
      * @throws FalconException
      */
-    private long getTablePartitionCreateTimeMetric(String feedUriTemplate)
+    private long getTablePartitionCreateTimeMetric(String feedUri)
         throws IOException, URISyntaxException, FalconException {
 
         CatalogStorage storage = (CatalogStorage)
-                FeedHelper.createStorage(Storage.TYPE.TABLE.name(), feedUriTemplate, getConf());
+                FeedHelper.createStorage(Storage.TYPE.TABLE.name(), feedUri, getConf());
         CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
                 getConf(), storage.getCatalogUrl(), storage.getDatabase(),
-                storage.getTable(), storage.getPartitions());
+                storage.getTable(), new ArrayList(storage.getPartitions().values()));
         return partition == null ? 0 : partition.getCreateTime();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
index fcf7f96..6fd23a0 100644
--- a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
+++ b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
@@ -38,8 +38,8 @@ import org.testng.annotations.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -64,7 +64,7 @@ public class HiveCatalogServiceIT {
         CurrentUser.authenticate(TestContext.REMOTE_USER);
 
         hiveCatalogService = new HiveCatalogService();
-        client = HiveCatalogService.getHCatClient(METASTORE_URL);
+        client = TestContext.getHCatClient(METASTORE_URL);
 
         createDatabase();
         createTable();
@@ -170,11 +170,6 @@ public class HiveCatalogServiceIT {
     }
 
     @Test
-    public void testGet() throws Exception {
-        Assert.assertNotNull(HiveCatalogService.getHCatClient(METASTORE_URL));
-    }
-
-    @Test
     public void testIsAlive() throws Exception {
         Assert.assertTrue(hiveCatalogService.isAlive(conf, METASTORE_URL));
     }
@@ -247,7 +242,7 @@ public class HiveCatalogServiceIT {
         throws Exception {
 
         List<CatalogPartition> filteredPartitions = hiveCatalogService.listPartitionsByFilter(
-            conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, greaterThanFilter);
+                conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, greaterThanFilter);
         Assert.assertEquals(filteredPartitions.size(), expectedPartitionCount);
     }
 
@@ -272,34 +267,23 @@ public class HiveCatalogServiceIT {
 
     @Test
     public void testDropPartition() throws Exception {
-        Map<String, String> partialPartitionSpec = new HashMap<String, String>();
-        partialPartitionSpec.put("ds", "20130903");
-
-        Assert.assertTrue(hiveCatalogService.dropPartitions(
-            conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partialPartitionSpec));
-
+        hiveCatalogService.dropPartition(
+                conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, Arrays.asList("20130902", "in"), true);
         List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME);
-        Assert.assertEquals(partitions.size(), 1, "Unexpected number of partitions");
-        Assert.assertEquals(new String[]{"20130902", "in"},
+        Assert.assertEquals(partitions.size(), 2, "Unexpected number of partitions");
+        Assert.assertEquals(new String[]{"20130903", "in"},
                 partitions.get(0).getValues().toArray(), "Mismatched partition");
 
-        partialPartitionSpec = new HashMap<String, String>();
-        partialPartitionSpec.put("ds", "20130902");
-
-        Assert.assertTrue(hiveCatalogService.dropPartitions(
-            conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partialPartitionSpec));
+        hiveCatalogService.dropPartitions(
+                conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, Arrays.asList("20130903"), true);
         partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME);
         Assert.assertEquals(partitions.size(), 0, "Unexpected number of partitions");
     }
 
     @Test
     public void testGetPartition() throws Exception {
-        Map<String, String> partitionSpec = new HashMap<String, String>();
-        partitionSpec.put("ds", "20130902");
-        partitionSpec.put("region", "in");
-
         CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
-            conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec);
+                conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, Arrays.asList("20130902", "in"));
         Assert.assertNotNull(partition);
 
         long createTime = partition.getCreateTime();
@@ -308,7 +292,7 @@ public class HiveCatalogServiceIT {
 
     @Test
     public void testReInstatePartition() throws Exception {
-        Map<String, String> partitionSpec = new HashMap<String, String>();
+        Map<String, String> partitionSpec = new LinkedHashMap<String, String>();
         partitionSpec.put("ds", "20130918");
         partitionSpec.put("region", "blah");
 
@@ -317,7 +301,7 @@ public class HiveCatalogServiceIT {
         client.addPartition(first);
 
         CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
-            conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec);
+            conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, new ArrayList<String>(partitionSpec.values()));
         Assert.assertNotNull(partition);
         final long originalCreateTime = partition.getCreateTime();
 
@@ -331,7 +315,7 @@ public class HiveCatalogServiceIT {
         client.addPartition(second);
 
         CatalogPartition reInstatedPartition = CatalogServiceFactory.getCatalogService().getPartition(
-            conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec);
+            conf, METASTORE_URL, DATABASE_NAME, TABLE_NAME, new ArrayList<String>(partitionSpec.values()));
         Assert.assertNotNull(reInstatedPartition);
         final long reInstatedCreateTime = reInstatedPartition.getCreateTime();
 
@@ -345,14 +329,4 @@ public class HiveCatalogServiceIT {
             {EXTERNAL_TABLE_NAME},
         };
     }
-
-    @Test  (dataProvider = "tableName")
-    public void testGetTablePartitionCols(String tableName) throws Exception {
-        List<String> partCols = CatalogServiceFactory.getCatalogService().getTablePartitionCols(
-            conf, METASTORE_URL, DATABASE_NAME, tableName);
-        Assert.assertEquals(partCols.size(), 2);
-        Collections.sort(partCols);
-        Assert.assertEquals(partCols.get(0), "ds");
-        Assert.assertEquals(partCols.get(1), "region");
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
index c4e046b..1885bb7 100644
--- a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
+++ b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
@@ -20,7 +20,6 @@ package org.apache.falcon.late;
 
 import org.apache.falcon.catalog.CatalogPartition;
 import org.apache.falcon.catalog.CatalogServiceFactory;
-import org.apache.falcon.catalog.HiveCatalogService;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
@@ -42,6 +41,7 @@ import org.testng.annotations.Test;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -199,7 +199,7 @@ public class LateDataHandlerIT {
     }
 
     private void reinstatePartition() throws Exception {
-        final HCatClient client = HiveCatalogService.getHCatClient(metastoreUrl);
+        final HCatClient client = context.getHCatClient(metastoreUrl);
 
         Map<String, String> partitionSpec = new HashMap<String, String>();
         partitionSpec.put("ds", PARTITION_VALUE);
@@ -213,7 +213,7 @@ public class LateDataHandlerIT {
         client.addPartition(reinstatedPartition);
 
         CatalogPartition reInstatedPartition = CatalogServiceFactory.getCatalogService().getPartition(
-            conf, metastoreUrl, DATABASE_NAME, TABLE_NAME, partitionSpec);
+            conf, metastoreUrl, DATABASE_NAME, TABLE_NAME, new ArrayList<String>(partitionSpec.values()));
         Assert.assertNotNull(reInstatedPartition);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
index d508a2d..ba6698c 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
@@ -20,9 +20,9 @@ package org.apache.falcon.lifecycle;
 
 import org.apache.commons.el.ExpressionEvaluatorImpl;
 import org.apache.falcon.Pair;
-import org.apache.falcon.catalog.HiveCatalogService;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.resource.TestContext;
 import org.apache.falcon.retention.FeedEvictor;
 import org.apache.falcon.util.HiveTestUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -87,7 +87,7 @@ public class TableStorageFeedEvictorIT {
     public void setUp() throws Exception {
         FeedEvictor.OUT.set(stream);
 
-        client = HiveCatalogService.getHCatClient(METASTORE_URL);
+        client = TestContext.getHCatClient(METASTORE_URL);
 
         HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME);
         final List<String> partitionKeys = Arrays.asList("ds", "region");
@@ -156,7 +156,7 @@ public class TableStorageFeedEvictorIT {
                 "-retentionType", "instance",
                 "-retentionLimit", retentionLimit,
                 "-timeZone", timeZone,
-                "-frequency", "daily",
+                "-frequency", "days(1)",
                 "-logFile", logFile,
                 "-falconFeedStorageType", Storage.TYPE.TABLE.name(),
             });
@@ -372,15 +372,16 @@ public class TableStorageFeedEvictorIT {
         FileSystem fs = path.getFileSystem(new Configuration());
 
         for (String candidatePartition : candidatePartitions) {
+            path = new Path(EXTERNAL_TABLE_LOCATION + candidatePartition);
             if (isTableExternal) {
-                touch(fs, EXTERNAL_TABLE_LOCATION + candidatePartition);
+                touch(fs, path.toString());
             }
 
             Map<String, String> partition = new HashMap<String, String>();
             partition.put("ds", candidatePartition); //yyyyMMDD
             partition.put("region", "in");
             HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(
-                    DATABASE_NAME, tableName, null, partition).build();
+                    DATABASE_NAME, tableName, isTableExternal ? path.toString() : null, partition).build();
             client.addPartition(addPtn);
         }
     }
@@ -408,7 +409,7 @@ public class TableStorageFeedEvictorIT {
     }
 
     private void touch(FileSystem fs, String path) throws Exception {
-        fs.create(new Path(path)).close();
+        fs.mkdirs(new Path(path));
     }
 
     private void dropPartitions(String tableName, List<String> candidatePartitions) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index 64f98d4..23df745 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.net.util.TrustManagerUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.FalconRuntimException;
+import org.apache.falcon.catalog.HiveCatalogService;
 import org.apache.falcon.cli.FalconCLI;
 import org.apache.falcon.client.FalconCLIException;
 import org.apache.falcon.client.FalconClient;
@@ -42,11 +43,14 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hcatalog.api.HCatClient;
 import org.testng.Assert;
 
 import javax.net.ssl.HostnameVerifier;
@@ -123,6 +127,22 @@ public class TestContext {
         }
     }
 
+    /**
+     * This is only used for tests.
+     *
+     * @param metastoreUrl metastore url
+     * @return client object
+     * @throws FalconException
+     */
+    public static HCatClient getHCatClient(String metastoreUrl) throws FalconException {
+        try {
+            HiveConf hcatConf = HiveCatalogService.createHiveConf(new Configuration(false), metastoreUrl);
+            return HCatClient.create(hcatConf);
+        } catch (Exception e) {
+            throw new FalconException("Exception creating HCatClient: " + e.getMessage(), e);
+        }
+    }
+
     public void configure() throws Exception {
         try {
             StartupProperties.get().setProperty(

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a38cebf4/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java b/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java
index 3b71f08..9fd1a9d 100644
--- a/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java
+++ b/webapp/src/test/java/org/apache/falcon/util/HiveTestUtils.java
@@ -18,7 +18,7 @@
 
 package org.apache.falcon.util;
 
-import org.apache.falcon.catalog.HiveCatalogService;
+import org.apache.falcon.resource.TestContext;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -44,20 +44,20 @@ public final class HiveTestUtils {
 
     public static void createDatabase(String metaStoreUrl,
                                       String databaseName) throws Exception {
-        HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl);
+        HCatClient client = TestContext.getHCatClient(metaStoreUrl);
         HCatCreateDBDesc dbDesc = HCatCreateDBDesc.create(databaseName)
                 .ifNotExists(true).build();
         client.createDatabase(dbDesc);
     }
 
     public static  void dropDatabase(String metaStoreUrl, String databaseName) throws Exception {
-        HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl);
+        HCatClient client = TestContext.getHCatClient(metaStoreUrl);
         client.dropDatabase(databaseName, true, HCatClient.DropDBMode.CASCADE);
     }
 
     public static void createTable(String metaStoreUrl, String databaseName,
                                    String tableName) throws Exception {
-        HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl);
+        HCatClient client = TestContext.getHCatClient(metaStoreUrl);
         ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
         cols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, "id comment"));
         cols.add(new HCatFieldSchema("value", HCatFieldSchema.Type.STRING, "value comment"));
@@ -72,7 +72,7 @@ public final class HiveTestUtils {
 
     public static void createTable(String metaStoreUrl, String databaseName, String tableName,
                                    List<String> partitionKeys) throws Exception {
-        HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl);
+        HCatClient client = TestContext.getHCatClient(metaStoreUrl);
         ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
         cols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, "id comment"));
         cols.add(new HCatFieldSchema("value", HCatFieldSchema.Type.STRING, "value comment"));
@@ -112,7 +112,7 @@ public final class HiveTestUtils {
                 .location(externalLocation)
                 .build();
 
-        HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl);
+        HCatClient client = TestContext.getHCatClient(metaStoreUrl);
         client.createTable(tableDesc);
     }
 
@@ -147,7 +147,7 @@ public final class HiveTestUtils {
 
     public static void dropTable(String metaStoreUrl, String databaseName,
                                  String tableName) throws Exception {
-        HCatClient client = HiveCatalogService.getHCatClient(metaStoreUrl);
+        HCatClient client = TestContext.getHCatClient(metaStoreUrl);
         client.dropTable(databaseName, tableName, true);
     }
 
@@ -182,6 +182,6 @@ public final class HiveTestUtils {
         Map<String, String> partitionSpec = new HashMap<String, String>();
         partitionSpec.put(partitionKey, partitionValue);
 
-        return HiveCatalogService.getHCatClient(metastoreUrl).getPartition(databaseName, tableName, partitionSpec);
+        return TestContext.getHCatClient(metastoreUrl).getPartition(databaseName, tableName, partitionSpec);
     }
 }