You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/06/29 15:00:48 UTC

[incubator-iceberg] branch master updated: Add HiveCatalog implementation (#240)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new e133f53  Add HiveCatalog implementation (#240)
e133f53 is described below

commit e133f537c309270dc251f8095a141fdeee30045c
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Sat Jun 29 08:00:44 2019 -0700

    Add HiveCatalog implementation (#240)
---
 .../java/org/apache/iceberg/catalog/Catalog.java   | 113 +++++++++++++----
 .../java/org/apache/iceberg/catalog/Namespace.java |  37 ++++--
 .../apache/iceberg/catalog/TableIdentifier.java    |  36 ++++--
 .../org/apache/iceberg/BaseMetastoreCatalog.java   |  82 ++++++++++++
 .../java/org/apache/iceberg/hive/ClientPool.java   |   4 +-
 .../java/org/apache/iceberg/hive/HiveCatalog.java  | 138 +++++++++++++++++++++
 .../org/apache/iceberg/hive/HiveClientPool.java    |   4 +-
 .../apache/iceberg/hive/HiveTableOperations.java   |   4 +-
 .../org/apache/iceberg/hive/HiveTableBaseTest.java |  22 ++--
 .../{HiveTablesTest.java => HiveTableTest.java}    | 106 ++++++++++++++--
 10 files changed, 476 insertions(+), 70 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
index 1ff31c7..dfe2c30 100644
--- a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
+++ b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
@@ -27,49 +27,114 @@ import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 
 /**
- * Top level Catalog APIs that supports table DDLs and namespace listing.
+ * A Catalog API for table create, drop, and load operations.
  */
 public interface Catalog {
+
   /**
-   * creates the table or throws {@link AlreadyExistsException}.
+   * Create a table.
    *
-   * @param tableIdentifier an identifier to identify this table in a namespace.
-   * @param schema the schema for this table, can not be null.
-   * @param spec the partition spec for this table, can not be null.
-   * @param tableProperties can be null or empty
-   * @return Table instance that was created
+   * @param identifier a table identifier
+   * @param schema a schema
+   * @param spec a partition spec
+   * @param location a location for the table; leave null if unspecified
+   * @param properties a string map of table properties
+   * @return a Table instance
+   * @throws AlreadyExistsException if the table already exists
    */
   Table createTable(
-      TableIdentifier tableIdentifier,
+      TableIdentifier identifier,
+      Schema schema,
+      PartitionSpec spec,
+      String location,
+      Map<String, String> properties);
+
+  /**
+   * Create a table.
+   *
+   * @param identifier a table identifier
+   * @param schema a schema
+   * @param spec a partition spec
+   * @param properties a string map of table properties
+   * @return a Table instance
+   * @throws AlreadyExistsException if the table already exists
+   */
+  default Table createTable(
+      TableIdentifier identifier,
       Schema schema,
       PartitionSpec spec,
-      Map<String, String> tableProperties);
+      Map<String, String> properties) {
+    return createTable(identifier, schema, spec, null, properties);
+  }
+
+  /**
+   * Create a table.
+   *
+   * @param identifier a table identifier
+   * @param schema a schema
+   * @param spec a partition spec
+   * @return a Table instance
+   * @throws AlreadyExistsException if the table already exists
+   */
+  default Table createTable(
+      TableIdentifier identifier,
+      Schema schema,
+      PartitionSpec spec) {
+    return createTable(identifier, schema, spec, null, null);
+  }
+
+  /**
+   * Create an unpartitioned table.
+   *
+   * @param identifier a table identifier
+   * @param schema a schema
+   * @return a Table instance
+   * @throws AlreadyExistsException if the table already exists
+   */
+  default Table createTable(
+      TableIdentifier identifier,
+      Schema schema) {
+    return createTable(identifier, schema, PartitionSpec.unpartitioned(), null, null);
+  }
 
   /**
-   * Check if table exists or not.
+   * Check whether table exists.
    *
-   * @param tableIdentifier an identifier to identify this table in a namespace.
-   * @return true if table exists, false if it doesn't.
+   * @param identifier a table identifier
+   * @return true if the table exists, false otherwise
    */
-  boolean tableExists(TableIdentifier tableIdentifier);
+  default boolean tableExists(TableIdentifier identifier) {
+    try {
+      loadTable(identifier);
+      return true;
+    } catch (NoSuchTableException e) {
+      return false;
+    }
+  }
 
   /**
-   * Drops the table if it exists, otherwise throws {@link NoSuchTableException}
-   * The implementation should not delete the underlying data but ensure that a
-   * subsequent call to {@link Catalog#tableExists(TableIdentifier)} returns false.
-   * <p>
-   * If the table does not exists it will throw {@link NoSuchTableException}
+   * Drop a table.
    *
-   * @param tableIdentifier an identifier to identify this table in a namespace.
+   * @param identifier a table identifier
+   * @return true if the table was dropped, false if the table did not exist
    */
-  void dropTable(TableIdentifier tableIdentifier);
+  boolean dropTable(TableIdentifier identifier);
 
   /**
-   * Renames a table. If {@code from} does not exists throws {@link NoSuchTableException}
-   * If {@code to} exists than throws {@link AlreadyExistsException}.
+   * Rename a table.
    *
-   * @param from original name of the table.
-   * @param to expected new name of the table.
+   * @param from identifier of the table to rename
+   * @param to new table name
+   * @throws NoSuchTableException if the table does not exist
    */
   void renameTable(TableIdentifier from, TableIdentifier to);
+
+  /**
+   * Load a table.
+   *
+   * @param identifier a table identifier
+   * @return instance of {@link Table} implementation referred by {@code tableIdentifier}
+   * @throws NoSuchTableException if the table does not exist
+   */
+  Table loadTable(TableIdentifier identifier);
 }
diff --git a/api/src/main/java/org/apache/iceberg/catalog/Namespace.java b/api/src/main/java/org/apache/iceberg/catalog/Namespace.java
index abb3464..739ede3 100644
--- a/api/src/main/java/org/apache/iceberg/catalog/Namespace.java
+++ b/api/src/main/java/org/apache/iceberg/catalog/Namespace.java
@@ -19,12 +19,28 @@
 
 package org.apache.iceberg.catalog;
 
+import com.google.common.base.Joiner;
+
 /**
- * Identifies a namespace in iceberg catalog
+ * A namespace in a {@link Catalog}.
  */
 public class Namespace {
+  private static final Namespace EMPTY_NAMESPACE = new Namespace(new String[] {});
+  private static final Joiner DOT = Joiner.on('.');
+
+  public static Namespace empty() {
+    return EMPTY_NAMESPACE;
+  }
+
+  public static Namespace of(String... levels) {
+    if (levels.length == 0) {
+      return empty();
+    }
+
+    return new Namespace(levels);
+  }
+
   private final String[] levels;
-  private static final Namespace EMPTY = new Namespace(new String[] {});
 
   private Namespace(String[] levels) {
     this.levels = levels;
@@ -34,19 +50,16 @@ public class Namespace {
     return levels;
   }
 
-  public boolean isEmpty() {
-    return this.equals(Namespace.EMPTY);
+  public String level(int pos) {
+    return levels[pos];
   }
 
-  public static Namespace namespace(String[] levels) {
-    if (levels == null || levels.length == 0) {
-      return Namespace.EMPTY;
-    }
-
-    return new Namespace(levels);
+  public boolean isEmpty() {
+    return levels.length == 0;
   }
 
-  public static Namespace empty() {
-    return EMPTY;
+  @Override
+  public String toString() {
+    return DOT.join(levels);
   }
 }
diff --git a/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java b/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java
index 2a05f26..2a539e1 100644
--- a/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java
+++ b/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java
@@ -19,36 +19,54 @@
 
 package org.apache.iceberg.catalog;
 
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+
 /**
- * Identifies a table in iceberg catalog, the namespace is optional
- * so callers can use {@link #hasNamespace()} to check if namespace is present or not.
+ * Identifies a table in iceberg catalog.
  */
 public class TableIdentifier {
   private final Namespace namespace;
   private final String name;
 
-  public TableIdentifier(String name) {
-    this(Namespace.empty(), name);
+  public static TableIdentifier of(String... names) {
+    Preconditions.checkArgument(names.length > 0, "Cannot create table identifier without a table name");
+    return new TableIdentifier(Namespace.of(Arrays.copyOf(names, names.length - 1)), names[names.length - 1]);
   }
 
-  public TableIdentifier(Namespace namespace, String name) {
-    if (name == null || name.isEmpty()) {
-      throw new IllegalArgumentException("name can not be null or empty");
-    }
+  public static TableIdentifier of(Namespace namespace, String name) {
+    return new TableIdentifier(namespace, name);
+  }
 
-    this.namespace = namespace == null ? Namespace.empty() : namespace;
+  private TableIdentifier(Namespace namespace, String name) {
+    Preconditions.checkArgument(name != null && !name.isEmpty(), "Invalid table name %s", name);
+    this.namespace = namespace;
     this.name = name;
   }
 
+  /**
+   * Whether the namespace is empty.
+   * @return true if the namespace is empty, false otherwise
+   */
   public boolean hasNamespace() {
     return !namespace.isEmpty();
   }
 
+  /**
+   * @return the identifier namespace
+   */
   public Namespace namespace() {
     return namespace;
   }
 
+  /**
+   * @return the identifier name
+   */
   public String name() {
     return name;
   }
+
+  public String toString() {
+    return namespace.toString() + "." + name;
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
new file mode 100644
index 0000000..9e2dcdd
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+
+public abstract class BaseMetastoreCatalog implements Catalog {
+  private final Configuration conf;
+
+  protected BaseMetastoreCatalog(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Table createTable(
+      TableIdentifier identifier,
+      Schema schema,
+      PartitionSpec spec,
+      String location,
+      Map<String, String> properties) {
+    TableOperations ops = newTableOps(conf, identifier);
+    if (ops.current() != null) {
+      throw new AlreadyExistsException("Table already exists: " + identifier);
+    }
+
+    String baseLocation;
+    if (location != null) {
+      baseLocation = location;
+    } else {
+      baseLocation = defaultWarehouseLocation(conf, identifier);
+    }
+
+    TableMetadata metadata = TableMetadata.newTableMetadata(
+        ops, schema, spec, baseLocation, properties == null ? Maps.newHashMap() : properties);
+
+    ops.commit(null, metadata);
+
+    try {
+      return new BaseTable(ops, identifier.toString());
+    } catch (CommitFailedException ignored) {
+      throw new AlreadyExistsException("Table was created concurrently: " + identifier);
+    }
+  }
+
+  @Override
+  public Table loadTable(TableIdentifier identifier) {
+    TableOperations ops = newTableOps(conf, identifier);
+    if (ops.current() == null) {
+      throw new NoSuchTableException("Table does not exist: " + identifier);
+    }
+
+    return new BaseTable(ops, identifier.toString());
+  }
+
+  protected abstract TableOperations newTableOps(Configuration newConf, TableIdentifier tableIdentifier);
+
+  protected abstract String defaultWarehouseLocation(Configuration hadoopConf, TableIdentifier tableIdentifier);
+}
diff --git a/hive/src/main/java/org/apache/iceberg/hive/ClientPool.java b/hive/src/main/java/org/apache/iceberg/hive/ClientPool.java
index c26e14b..b86188a 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/ClientPool.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/ClientPool.java
@@ -31,13 +31,13 @@ abstract class ClientPool<C, E extends Exception> implements Closeable {
 
   private final int poolSize;
   private final Deque<C> clients;
-  private final Class<E> reconnectExc;
+  private final Class<? extends E> reconnectExc;
   private final Object signal = new Object();
   private volatile int currentSize;
   private boolean closed;
   private int runs = 0;
 
-  ClientPool(int poolSize, Class<E> reconnectExc) {
+  ClientPool(int poolSize, Class<? extends E> reconnectExc) {
     this.poolSize = poolSize;
     this.reconnectExc = reconnectExc;
     this.clients = new ArrayDeque<>(poolSize);
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
new file mode 100644
index 0000000..837c589
--- /dev/null
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import com.google.common.base.Preconditions;
+import java.io.Closeable;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.thrift.TException;
+
+public class HiveCatalog extends BaseMetastoreCatalog implements Closeable {
+
+  private final HiveClientPool clients;
+
+  public HiveCatalog(Configuration conf) {
+    super(conf);
+    this.clients = new HiveClientPool(2, conf);
+  }
+
+  @Override
+  public org.apache.iceberg.Table createTable(
+      TableIdentifier identifier, Schema schema, PartitionSpec spec, String location, Map<String, String> properties) {
+    Preconditions.checkArgument(identifier.namespace().levels().length == 1,
+        "Missing database in table identifier: %s", identifier);
+    return super.createTable(identifier, schema, spec, location, properties);
+  }
+
+  @Override
+  public org.apache.iceberg.Table loadTable(TableIdentifier identifier) {
+    Preconditions.checkArgument(identifier.namespace().levels().length == 1,
+        "Missing database in table identifier: %s", identifier);
+    return super.loadTable(identifier);
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier) {
+    Preconditions.checkArgument(identifier.namespace().levels().length == 1,
+        "Missing database in table identifier: %s", identifier);
+    String database = identifier.namespace().level(0);
+
+    try {
+      clients.run(client -> {
+        client.dropTable(database, identifier.name());
+        return null;
+      });
+
+      return true;
+
+    } catch (NoSuchObjectException e) {
+      return false;
+
+    } catch (TException e) {
+      throw new RuntimeException("Failed to drop " + identifier.toString(), e);
+
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted in call to dropTable", e);
+    }
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    Preconditions.checkArgument(from.namespace().levels().length == 1,
+        "Missing database in table identifier: %s", from);
+    Preconditions.checkArgument(to.namespace().levels().length == 1,
+        "Missing database in table identifier: %s", to);
+
+    String toDatabase = to.namespace().level(0);
+    String fromDatabase = from.namespace().level(0);
+    String fromName = from.name();
+
+    try {
+      Table table = clients.run(client -> client.getTable(fromDatabase, fromName));
+      table.setDbName(toDatabase);
+      table.setTableName(to.name());
+
+      clients.run(client -> {
+        client.alter_table(fromDatabase, fromName, table);
+        return null;
+      });
+
+    } catch (TException e) {
+      throw new RuntimeException("Failed to rename " + from.toString() + " to " + to.toString(), e);
+
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted in call to rename", e);
+    }
+  }
+
+  @Override
+  public TableOperations newTableOps(Configuration configuration, TableIdentifier tableIdentifier) {
+    String dbName = tableIdentifier.namespace().level(0);
+    String tableName = tableIdentifier.name();
+    return new HiveTableOperations(configuration, clients, dbName, tableName);
+  }
+
+  protected String defaultWarehouseLocation(Configuration hadoopConf, TableIdentifier tableIdentifier) {
+    String warehouseLocation = hadoopConf.get("hive.metastore.warehouse.dir");
+    Preconditions.checkNotNull(
+        warehouseLocation,
+        "Warehouse location is not set: hive.metastore.warehouse.dir=null");
+    return String.format(
+        "%s/%s.db/%s",
+        warehouseLocation,
+        tableIdentifier.namespace().levels()[0],
+        tableIdentifier.name());
+  }
+
+  @Override
+  public void close() {
+    clients.close();
+  }
+}
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveClientPool.java b/hive/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
index 8527555..2e055e3 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
 
 class HiveClientPool extends ClientPool<HiveMetaStoreClient, TException> {
   private final HiveConf hiveConf;
@@ -33,7 +34,7 @@ class HiveClientPool extends ClientPool<HiveMetaStoreClient, TException> {
   }
 
   HiveClientPool(int poolSize, Configuration conf) {
-    super(poolSize, TException.class);
+    super(poolSize, TTransportException.class);
     this.hiveConf = new HiveConf(conf, HiveClientPool.class);
   }
 
@@ -49,6 +50,7 @@ class HiveClientPool extends ClientPool<HiveMetaStoreClient, TException> {
   @Override
   protected HiveMetaStoreClient reconnect(HiveMetaStoreClient client) {
     try {
+      client.close();
       client.reconnect();
     } catch (MetaException e) {
       throw new RuntimeMetaException(e, "Failed to reconnect to Hive Metastore");
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index ab8a790..1d13024 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -31,6 +31,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.LockComponent;
 import org.apache.hadoop.hive.metastore.api.LockLevel;
@@ -141,7 +142,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
             new HashMap<>(),
             null,
             null,
-            ICEBERG_TABLE_TYPE_VALUE);
+            TableType.EXTERNAL_TABLE.toString());
+        tbl.getParameters().put("EXTERNAL", "TRUE"); // using the external table type also requires this
       }
 
       tbl.setSd(storageDescriptor(metadata)); // set to pickup any schema changes
diff --git a/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java b/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
index 0072a7d..e3e2bf2 100644
--- a/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
+++ b/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.types.Types;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.server.TServer;
@@ -70,6 +71,7 @@ public class HiveTableBaseTest {
 
   static final String DB_NAME = "hivedb";
   static final String TABLE_NAME =  "tbl";
+  static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(DB_NAME, TABLE_NAME);
 
   static final Schema schema = new Schema(Types.StructType.of(
       required(1, "id", Types.LongType.get())).fields());
@@ -86,7 +88,8 @@ public class HiveTableBaseTest {
   private static ExecutorService executorService;
   private static TServer server;
 
-  static HiveMetaStoreClient metastoreClient;
+  protected static HiveMetaStoreClient metastoreClient;
+  protected static HiveCatalog catalog;
 
   @BeforeClass
   public static void startMetastore() throws Exception {
@@ -101,10 +104,14 @@ public class HiveTableBaseTest {
 
     HiveTableBaseTest.metastoreClient = new HiveMetaStoreClient(hiveConf);
     metastoreClient.createDatabase(new Database(DB_NAME, "description", getDBPath(), new HashMap<>()));
+
+    HiveTableBaseTest.catalog = new HiveCatalog(hiveConf);
   }
 
   @AfterClass
   public static void stopMetastore() {
+    HiveTableBaseTest.catalog.close();
+
     metastoreClient.close();
     HiveTableBaseTest.metastoreClient = null;
 
@@ -119,19 +126,18 @@ public class HiveTableBaseTest {
     }
   }
 
-  HiveTables tables;
+  private Path tableLocation;
 
   @Before
-  public void createTestTable() throws Exception {
-    this.tables = new HiveTables(hiveConf);
-    tables.create(schema, partitionSpec, DB_NAME, TABLE_NAME);
+  public void createTestTable() {
+    this.tableLocation = new Path(catalog.createTable(TABLE_IDENTIFIER, schema, partitionSpec).location());
   }
 
   @After
   public void dropTestTable() throws Exception {
-    metastoreClient.dropTable(DB_NAME, TABLE_NAME);
-    tables.close();
-    this.tables = null;
+    // drop the table data
+    tableLocation.getFileSystem(hiveConf).delete(tableLocation, true);
+    catalog.dropTable(TABLE_IDENTIFIER);
   }
 
   private static HiveConf hiveConf(Configuration conf, int port) {
diff --git a/hive/src/test/java/org/apache/iceberg/hive/HiveTablesTest.java b/hive/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
similarity index 55%
rename from hive/src/test/java/org/apache/iceberg/hive/HiveTablesTest.java
rename to hive/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
index dc8426c..3459e99 100644
--- a/hive/src/test/java/org/apache/iceberg/hive/HiveTablesTest.java
+++ b/hive/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
@@ -19,7 +19,10 @@
 
 package org.apache.iceberg.hive;
 
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.MoreExecutors;
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -28,12 +31,19 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.Tasks;
 import org.apache.thrift.TException;
@@ -44,40 +54,110 @@ import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE
 import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
 import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
 
-public class HiveTablesTest extends HiveTableBaseTest {
+public class HiveTableTest extends HiveTableBaseTest {
   @Test
   public void testCreate() throws TException {
     // Table should be created in hive metastore
-    final org.apache.hadoop.hive.metastore.api.Table table = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+    // Table should be renamed in hive metastore
+    String tableName = TABLE_IDENTIFIER.name();
+    org.apache.hadoop.hive.metastore.api.Table table =
+        metastoreClient.getTable(TABLE_IDENTIFIER.namespace().level(0), tableName);
 
     // check parameters are in expected state
-    final Map<String, String> parameters = table.getParameters();
+    Map<String, String> parameters = table.getParameters();
     Assert.assertNotNull(parameters);
     Assert.assertTrue(ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(parameters.get(TABLE_TYPE_PROP)));
-    Assert.assertTrue(ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getTableType()));
+    Assert.assertTrue("EXTERNAL_TABLE".equalsIgnoreCase(table.getTableType()));
 
     // Ensure the table is pointing to empty location
-    Assert.assertEquals(getTableLocation(TABLE_NAME), table.getSd().getLocation());
+    Assert.assertEquals(getTableLocation(tableName), table.getSd().getLocation());
 
     // Ensure it is stored as unpartitioned table in hive.
     Assert.assertEquals(0, table.getPartitionKeysSize());
 
     // Only 1 snapshotFile Should exist and no manifests should exist
-    Assert.assertEquals(1, metadataVersionFiles(TABLE_NAME).size());
-    Assert.assertEquals(0, manifestFiles(TABLE_NAME).size());
+    Assert.assertEquals(1, metadataVersionFiles(tableName).size());
+    Assert.assertEquals(0, manifestFiles(tableName).size());
 
-    final Table icebergTable = tables.load(DB_NAME, TABLE_NAME);
+    final Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
     // Iceberg schema should match the loaded table
     Assert.assertEquals(schema.asStruct(), icebergTable.schema().asStruct());
   }
 
   @Test
+  public void testRename() {
+    String renamedTableName = "rename_table_name";
+    TableIdentifier renameTableIdentifier = TableIdentifier.of(TABLE_IDENTIFIER.namespace(), renamedTableName);
+    Table original = catalog.loadTable(TABLE_IDENTIFIER);
+
+    catalog.renameTable(TABLE_IDENTIFIER, renameTableIdentifier);
+    Assert.assertFalse(catalog.tableExists(TABLE_IDENTIFIER));
+    Assert.assertTrue(catalog.tableExists(renameTableIdentifier));
+
+    Table renamed = catalog.loadTable(renameTableIdentifier);
+
+    Assert.assertEquals(original.schema().asStruct(), renamed.schema().asStruct());
+    Assert.assertEquals(original.spec(), renamed.spec());
+    Assert.assertEquals(original.location(), renamed.location());
+    Assert.assertEquals(original.currentSnapshot(), renamed.currentSnapshot());
+
+    Assert.assertTrue(catalog.dropTable(renameTableIdentifier));
+  }
+
+  @Test
+  public void testDrop() {
+    Assert.assertTrue("Table should exist", catalog.tableExists(TABLE_IDENTIFIER));
+    Assert.assertTrue("Drop should return true and drop the table", catalog.dropTable(TABLE_IDENTIFIER));
+    Assert.assertFalse("Table should not exist", catalog.tableExists(TABLE_IDENTIFIER));
+  }
+
+  @Test
+  public void testDropLeavesTableData() throws IOException {
+    Table table = catalog.loadTable(TABLE_IDENTIFIER);
+
+    GenericRecordBuilder recordBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(schema, "test"));
+    List<GenericData.Record> records = Lists.newArrayList(
+        recordBuilder.set("id", 1L).build(),
+        recordBuilder.set("id", 2L).build(),
+        recordBuilder.set("id", 3L).build()
+    );
+
+    String fileLocation = table.location().replace("file:", "") + "/data/file.avro";
+    try (FileAppender<GenericData.Record> writer = Avro.write(Files.localOutput(fileLocation))
+        .schema(schema)
+        .named("test")
+        .build()) {
+      for (GenericData.Record rec : records) {
+        writer.add(rec);
+      }
+    }
+
+    DataFile file = DataFiles.builder(table.spec())
+        .withRecordCount(3)
+        .withPath(fileLocation)
+        .withFileSizeInBytes(Files.localInput(fileLocation).getLength())
+        .build();
+
+    table.newAppend().appendFile(file).commit();
+
+    String manifestListLocation = table.currentSnapshot().manifestListLocation().replace("file:", "");
+
+    Assert.assertTrue("Drop should return true and drop the table", catalog.dropTable(TABLE_IDENTIFIER));
+    Assert.assertFalse("Table should not exist", catalog.tableExists(TABLE_IDENTIFIER));
+
+    Assert.assertTrue("Table data files should exist",
+        new File(fileLocation).exists());
+    Assert.assertTrue("Table metadata files should exist",
+        new File(manifestListLocation).exists());
+  }
+
+  @Test
   public void testExistingTableUpdate() throws TException {
-    Table icebergTable = tables.load(DB_NAME, TABLE_NAME);
+    Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
     // add a column
     icebergTable.updateSchema().addColumn("data", Types.LongType.get()).commit();
 
-    icebergTable = tables.load(DB_NAME, TABLE_NAME);
+    icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
 
     // Only 2 snapshotFile Should exist and no manifests should exist
     Assert.assertEquals(2, metadataVersionFiles(TABLE_NAME).size());
@@ -96,7 +176,7 @@ public class HiveTablesTest extends HiveTableBaseTest {
 
   @Test(expected = CommitFailedException.class)
   public void testFailure() throws TException {
-    Table icebergTable = tables.load(DB_NAME, TABLE_NAME);
+    Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
     org.apache.hadoop.hive.metastore.api.Table table = metastoreClient.getTable(DB_NAME, TABLE_NAME);
     String dummyLocation = "dummylocation";
     table.getParameters().put(METADATA_LOCATION_PROP, dummyLocation);
@@ -108,8 +188,8 @@ public class HiveTablesTest extends HiveTableBaseTest {
 
   @Test
   public void testConcurrentFastAppends() {
-    Table icebergTable = tables.load(DB_NAME, TABLE_NAME);
-    Table anotherIcebergTable = tables.load(DB_NAME, TABLE_NAME);
+    Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
+    Table anotherIcebergTable = catalog.loadTable(TABLE_IDENTIFIER);
 
     String fileName = UUID.randomUUID().toString();
     DataFile file = DataFiles.builder(icebergTable.spec())