You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/06/29 15:00:48 UTC
[incubator-iceberg] branch master updated: Add HiveCatalog
implementation (#240)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new e133f53 Add HiveCatalog implementation (#240)
e133f53 is described below
commit e133f537c309270dc251f8095a141fdeee30045c
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Sat Jun 29 08:00:44 2019 -0700
Add HiveCatalog implementation (#240)
---
.../java/org/apache/iceberg/catalog/Catalog.java | 113 +++++++++++++----
.../java/org/apache/iceberg/catalog/Namespace.java | 37 ++++--
.../apache/iceberg/catalog/TableIdentifier.java | 36 ++++--
.../org/apache/iceberg/BaseMetastoreCatalog.java | 82 ++++++++++++
.../java/org/apache/iceberg/hive/ClientPool.java | 4 +-
.../java/org/apache/iceberg/hive/HiveCatalog.java | 138 +++++++++++++++++++++
.../org/apache/iceberg/hive/HiveClientPool.java | 4 +-
.../apache/iceberg/hive/HiveTableOperations.java | 4 +-
.../org/apache/iceberg/hive/HiveTableBaseTest.java | 22 ++--
.../{HiveTablesTest.java => HiveTableTest.java} | 106 ++++++++++++++--
10 files changed, 476 insertions(+), 70 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
index 1ff31c7..dfe2c30 100644
--- a/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
+++ b/api/src/main/java/org/apache/iceberg/catalog/Catalog.java
@@ -27,49 +27,114 @@ import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
/**
- * Top level Catalog APIs that supports table DDLs and namespace listing.
+ * A Catalog API for table create, drop, and load operations.
*/
public interface Catalog {
+
/**
- * creates the table or throws {@link AlreadyExistsException}.
+ * Create a table.
*
- * @param tableIdentifier an identifier to identify this table in a namespace.
- * @param schema the schema for this table, can not be null.
- * @param spec the partition spec for this table, can not be null.
- * @param tableProperties can be null or empty
- * @return Table instance that was created
+ * @param identifier a table identifier
+ * @param schema a schema
+ * @param spec a partition spec
+ * @param location a location for the table; leave null if unspecified
+ * @param properties a string map of table properties
+ * @return a Table instance
+ * @throws AlreadyExistsException if the table already exists
*/
Table createTable(
- TableIdentifier tableIdentifier,
+ TableIdentifier identifier,
+ Schema schema,
+ PartitionSpec spec,
+ String location,
+ Map<String, String> properties);
+
+ /**
+ * Create a table.
+ *
+ * @param identifier a table identifier
+ * @param schema a schema
+ * @param spec a partition spec
+ * @param properties a string map of table properties
+ * @return a Table instance
+ * @throws AlreadyExistsException if the table already exists
+ */
+ default Table createTable(
+ TableIdentifier identifier,
Schema schema,
PartitionSpec spec,
- Map<String, String> tableProperties);
+ Map<String, String> properties) {
+ return createTable(identifier, schema, spec, null, properties);
+ }
+
+ /**
+ * Create a table.
+ *
+ * @param identifier a table identifier
+ * @param schema a schema
+ * @param spec a partition spec
+ * @return a Table instance
+ * @throws AlreadyExistsException if the table already exists
+ */
+ default Table createTable(
+ TableIdentifier identifier,
+ Schema schema,
+ PartitionSpec spec) {
+ return createTable(identifier, schema, spec, null, null);
+ }
+
+ /**
+ * Create an unpartitioned table.
+ *
+ * @param identifier a table identifier
+ * @param schema a schema
+ * @return a Table instance
+ * @throws AlreadyExistsException if the table already exists
+ */
+ default Table createTable(
+ TableIdentifier identifier,
+ Schema schema) {
+ return createTable(identifier, schema, PartitionSpec.unpartitioned(), null, null);
+ }
/**
- * Check if table exists or not.
+ * Check whether table exists.
*
- * @param tableIdentifier an identifier to identify this table in a namespace.
- * @return true if table exists, false if it doesn't.
+ * @param identifier a table identifier
+ * @return true if the table exists, false otherwise
*/
- boolean tableExists(TableIdentifier tableIdentifier);
+ default boolean tableExists(TableIdentifier identifier) {
+ try {
+ loadTable(identifier);
+ return true;
+ } catch (NoSuchTableException e) {
+ return false;
+ }
+ }
/**
- * Drops the table if it exists, otherwise throws {@link NoSuchTableException}
- * The implementation should not delete the underlying data but ensure that a
- * subsequent call to {@link Catalog#tableExists(TableIdentifier)} returns false.
- * <p>
- * If the table does not exists it will throw {@link NoSuchTableException}
+ * Drop a table.
*
- * @param tableIdentifier an identifier to identify this table in a namespace.
+ * @param identifier a table identifier
+ * @return true if the table was dropped, false if the table did not exist
*/
- void dropTable(TableIdentifier tableIdentifier);
+ boolean dropTable(TableIdentifier identifier);
/**
- * Renames a table. If {@code from} does not exists throws {@link NoSuchTableException}
- * If {@code to} exists than throws {@link AlreadyExistsException}.
+ * Rename a table.
*
- * @param from original name of the table.
- * @param to expected new name of the table.
+ * @param from identifier of the table to rename
+ * @param to new table name
+ * @throws NoSuchTableException if the table does not exist
*/
void renameTable(TableIdentifier from, TableIdentifier to);
+
+ /**
+ * Load a table.
+ *
+ * @param identifier a table identifier
+ * @return instance of {@link Table} implementation referred by {@code tableIdentifier}
+ * @throws NoSuchTableException if the table does not exist
+ */
+ Table loadTable(TableIdentifier identifier);
}
diff --git a/api/src/main/java/org/apache/iceberg/catalog/Namespace.java b/api/src/main/java/org/apache/iceberg/catalog/Namespace.java
index abb3464..739ede3 100644
--- a/api/src/main/java/org/apache/iceberg/catalog/Namespace.java
+++ b/api/src/main/java/org/apache/iceberg/catalog/Namespace.java
@@ -19,12 +19,28 @@
package org.apache.iceberg.catalog;
+import com.google.common.base.Joiner;
+
/**
- * Identifies a namespace in iceberg catalog
+ * A namespace in a {@link Catalog}.
*/
public class Namespace {
+ private static final Namespace EMPTY_NAMESPACE = new Namespace(new String[] {});
+ private static final Joiner DOT = Joiner.on('.');
+
+ public static Namespace empty() {
+ return EMPTY_NAMESPACE;
+ }
+
+ public static Namespace of(String... levels) {
+ if (levels.length == 0) {
+ return empty();
+ }
+
+ return new Namespace(levels);
+ }
+
private final String[] levels;
- private static final Namespace EMPTY = new Namespace(new String[] {});
private Namespace(String[] levels) {
this.levels = levels;
@@ -34,19 +50,16 @@ public class Namespace {
return levels;
}
- public boolean isEmpty() {
- return this.equals(Namespace.EMPTY);
+ public String level(int pos) {
+ return levels[pos];
}
- public static Namespace namespace(String[] levels) {
- if (levels == null || levels.length == 0) {
- return Namespace.EMPTY;
- }
-
- return new Namespace(levels);
+ public boolean isEmpty() {
+ return levels.length == 0;
}
- public static Namespace empty() {
- return EMPTY;
+ @Override
+ public String toString() {
+ return DOT.join(levels);
}
}
diff --git a/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java b/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java
index 2a05f26..2a539e1 100644
--- a/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java
+++ b/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java
@@ -19,36 +19,54 @@
package org.apache.iceberg.catalog;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+
/**
- * Identifies a table in iceberg catalog, the namespace is optional
- * so callers can use {@link #hasNamespace()} to check if namespace is present or not.
+ * Identifies a table in iceberg catalog.
*/
public class TableIdentifier {
private final Namespace namespace;
private final String name;
- public TableIdentifier(String name) {
- this(Namespace.empty(), name);
+ public static TableIdentifier of(String... names) {
+ Preconditions.checkArgument(names.length > 0, "Cannot create table identifier without a table name");
+ return new TableIdentifier(Namespace.of(Arrays.copyOf(names, names.length - 1)), names[names.length - 1]);
}
- public TableIdentifier(Namespace namespace, String name) {
- if (name == null || name.isEmpty()) {
- throw new IllegalArgumentException("name can not be null or empty");
- }
+ public static TableIdentifier of(Namespace namespace, String name) {
+ return new TableIdentifier(namespace, name);
+ }
- this.namespace = namespace == null ? Namespace.empty() : namespace;
+ private TableIdentifier(Namespace namespace, String name) {
+ Preconditions.checkArgument(name != null && !name.isEmpty(), "Invalid table name %s", name);
+ this.namespace = namespace;
this.name = name;
}
+ /**
+ * Whether the namespace is empty.
+ * @return true if the namespace is empty, false otherwise
+ */
public boolean hasNamespace() {
return !namespace.isEmpty();
}
+ /**
+ * @return the identifier namespace
+ */
public Namespace namespace() {
return namespace;
}
+ /**
+ * @return the identifier name
+ */
public String name() {
return name;
}
+
+ public String toString() {
+ return namespace.toString() + "." + name;
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
new file mode 100644
index 0000000..9e2dcdd
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+
+public abstract class BaseMetastoreCatalog implements Catalog {
+ private final Configuration conf;
+
+ protected BaseMetastoreCatalog(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Table createTable(
+ TableIdentifier identifier,
+ Schema schema,
+ PartitionSpec spec,
+ String location,
+ Map<String, String> properties) {
+ TableOperations ops = newTableOps(conf, identifier);
+ if (ops.current() != null) {
+ throw new AlreadyExistsException("Table already exists: " + identifier);
+ }
+
+ String baseLocation;
+ if (location != null) {
+ baseLocation = location;
+ } else {
+ baseLocation = defaultWarehouseLocation(conf, identifier);
+ }
+
+ TableMetadata metadata = TableMetadata.newTableMetadata(
+ ops, schema, spec, baseLocation, properties == null ? Maps.newHashMap() : properties);
+
+ ops.commit(null, metadata);
+
+ try {
+ return new BaseTable(ops, identifier.toString());
+ } catch (CommitFailedException ignored) {
+ throw new AlreadyExistsException("Table was created concurrently: " + identifier);
+ }
+ }
+
+ @Override
+ public Table loadTable(TableIdentifier identifier) {
+ TableOperations ops = newTableOps(conf, identifier);
+ if (ops.current() == null) {
+ throw new NoSuchTableException("Table does not exist: " + identifier);
+ }
+
+ return new BaseTable(ops, identifier.toString());
+ }
+
+ protected abstract TableOperations newTableOps(Configuration newConf, TableIdentifier tableIdentifier);
+
+ protected abstract String defaultWarehouseLocation(Configuration hadoopConf, TableIdentifier tableIdentifier);
+}
diff --git a/hive/src/main/java/org/apache/iceberg/hive/ClientPool.java b/hive/src/main/java/org/apache/iceberg/hive/ClientPool.java
index c26e14b..b86188a 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/ClientPool.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/ClientPool.java
@@ -31,13 +31,13 @@ abstract class ClientPool<C, E extends Exception> implements Closeable {
private final int poolSize;
private final Deque<C> clients;
- private final Class<E> reconnectExc;
+ private final Class<? extends E> reconnectExc;
private final Object signal = new Object();
private volatile int currentSize;
private boolean closed;
private int runs = 0;
- ClientPool(int poolSize, Class<E> reconnectExc) {
+ ClientPool(int poolSize, Class<? extends E> reconnectExc) {
this.poolSize = poolSize;
this.reconnectExc = reconnectExc;
this.clients = new ArrayDeque<>(poolSize);
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
new file mode 100644
index 0000000..837c589
--- /dev/null
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import com.google.common.base.Preconditions;
+import java.io.Closeable;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.thrift.TException;
+
+public class HiveCatalog extends BaseMetastoreCatalog implements Closeable {
+
+ private final HiveClientPool clients;
+
+ public HiveCatalog(Configuration conf) {
+ super(conf);
+ this.clients = new HiveClientPool(2, conf);
+ }
+
+ @Override
+ public org.apache.iceberg.Table createTable(
+ TableIdentifier identifier, Schema schema, PartitionSpec spec, String location, Map<String, String> properties) {
+ Preconditions.checkArgument(identifier.namespace().levels().length == 1,
+ "Missing database in table identifier: %s", identifier);
+ return super.createTable(identifier, schema, spec, location, properties);
+ }
+
+ @Override
+ public org.apache.iceberg.Table loadTable(TableIdentifier identifier) {
+ Preconditions.checkArgument(identifier.namespace().levels().length == 1,
+ "Missing database in table identifier: %s", identifier);
+ return super.loadTable(identifier);
+ }
+
+ @Override
+ public boolean dropTable(TableIdentifier identifier) {
+ Preconditions.checkArgument(identifier.namespace().levels().length == 1,
+ "Missing database in table identifier: %s", identifier);
+ String database = identifier.namespace().level(0);
+
+ try {
+ clients.run(client -> {
+ client.dropTable(database, identifier.name());
+ return null;
+ });
+
+ return true;
+
+ } catch (NoSuchObjectException e) {
+ return false;
+
+ } catch (TException e) {
+ throw new RuntimeException("Failed to drop " + identifier.toString(), e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted in call to dropTable", e);
+ }
+ }
+
+ @Override
+ public void renameTable(TableIdentifier from, TableIdentifier to) {
+ Preconditions.checkArgument(from.namespace().levels().length == 1,
+ "Missing database in table identifier: %s", from);
+ Preconditions.checkArgument(to.namespace().levels().length == 1,
+ "Missing database in table identifier: %s", to);
+
+ String toDatabase = to.namespace().level(0);
+ String fromDatabase = from.namespace().level(0);
+ String fromName = from.name();
+
+ try {
+ Table table = clients.run(client -> client.getTable(fromDatabase, fromName));
+ table.setDbName(toDatabase);
+ table.setTableName(to.name());
+
+ clients.run(client -> {
+ client.alter_table(fromDatabase, fromName, table);
+ return null;
+ });
+
+ } catch (TException e) {
+ throw new RuntimeException("Failed to rename " + from.toString() + " to " + to.toString(), e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted in call to rename", e);
+ }
+ }
+
+ @Override
+ public TableOperations newTableOps(Configuration configuration, TableIdentifier tableIdentifier) {
+ String dbName = tableIdentifier.namespace().level(0);
+ String tableName = tableIdentifier.name();
+ return new HiveTableOperations(configuration, clients, dbName, tableName);
+ }
+
+ protected String defaultWarehouseLocation(Configuration hadoopConf, TableIdentifier tableIdentifier) {
+ String warehouseLocation = hadoopConf.get("hive.metastore.warehouse.dir");
+ Preconditions.checkNotNull(
+ warehouseLocation,
+ "Warehouse location is not set: hive.metastore.warehouse.dir=null");
+ return String.format(
+ "%s/%s.db/%s",
+ warehouseLocation,
+ tableIdentifier.namespace().levels()[0],
+ tableIdentifier.name());
+ }
+
+ @Override
+ public void close() {
+ clients.close();
+ }
+}
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveClientPool.java b/hive/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
index 8527555..2e055e3 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveClientPool.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
class HiveClientPool extends ClientPool<HiveMetaStoreClient, TException> {
private final HiveConf hiveConf;
@@ -33,7 +34,7 @@ class HiveClientPool extends ClientPool<HiveMetaStoreClient, TException> {
}
HiveClientPool(int poolSize, Configuration conf) {
- super(poolSize, TException.class);
+ super(poolSize, TTransportException.class);
this.hiveConf = new HiveConf(conf, HiveClientPool.class);
}
@@ -49,6 +50,7 @@ class HiveClientPool extends ClientPool<HiveMetaStoreClient, TException> {
@Override
protected HiveMetaStoreClient reconnect(HiveMetaStoreClient client) {
try {
+ client.close();
client.reconnect();
} catch (MetaException e) {
throw new RuntimeMetaException(e, "Failed to reconnect to Hive Metastore");
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index ab8a790..1d13024 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -31,6 +31,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockLevel;
@@ -141,7 +142,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
new HashMap<>(),
null,
null,
- ICEBERG_TABLE_TYPE_VALUE);
+ TableType.EXTERNAL_TABLE.toString());
+ tbl.getParameters().put("EXTERNAL", "TRUE"); // using the external table type also requires this
}
tbl.setSd(storageDescriptor(metadata)); // set to pickup any schema changes
diff --git a/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java b/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
index 0072a7d..e3e2bf2 100644
--- a/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
+++ b/hive/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
@@ -70,6 +71,7 @@ public class HiveTableBaseTest {
static final String DB_NAME = "hivedb";
static final String TABLE_NAME = "tbl";
+ static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(DB_NAME, TABLE_NAME);
static final Schema schema = new Schema(Types.StructType.of(
required(1, "id", Types.LongType.get())).fields());
@@ -86,7 +88,8 @@ public class HiveTableBaseTest {
private static ExecutorService executorService;
private static TServer server;
- static HiveMetaStoreClient metastoreClient;
+ protected static HiveMetaStoreClient metastoreClient;
+ protected static HiveCatalog catalog;
@BeforeClass
public static void startMetastore() throws Exception {
@@ -101,10 +104,14 @@ public class HiveTableBaseTest {
HiveTableBaseTest.metastoreClient = new HiveMetaStoreClient(hiveConf);
metastoreClient.createDatabase(new Database(DB_NAME, "description", getDBPath(), new HashMap<>()));
+
+ HiveTableBaseTest.catalog = new HiveCatalog(hiveConf);
}
@AfterClass
public static void stopMetastore() {
+ HiveTableBaseTest.catalog.close();
+
metastoreClient.close();
HiveTableBaseTest.metastoreClient = null;
@@ -119,19 +126,18 @@ public class HiveTableBaseTest {
}
}
- HiveTables tables;
+ private Path tableLocation;
@Before
- public void createTestTable() throws Exception {
- this.tables = new HiveTables(hiveConf);
- tables.create(schema, partitionSpec, DB_NAME, TABLE_NAME);
+ public void createTestTable() {
+ this.tableLocation = new Path(catalog.createTable(TABLE_IDENTIFIER, schema, partitionSpec).location());
}
@After
public void dropTestTable() throws Exception {
- metastoreClient.dropTable(DB_NAME, TABLE_NAME);
- tables.close();
- this.tables = null;
+ // drop the table data
+ tableLocation.getFileSystem(hiveConf).delete(tableLocation, true);
+ catalog.dropTable(TABLE_IDENTIFIER);
}
private static HiveConf hiveConf(Configuration conf, int port) {
diff --git a/hive/src/test/java/org/apache/iceberg/hive/HiveTablesTest.java b/hive/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
similarity index 55%
rename from hive/src/test/java/org/apache/iceberg/hive/HiveTablesTest.java
rename to hive/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
index dc8426c..3459e99 100644
--- a/hive/src/test/java/org/apache/iceberg/hive/HiveTablesTest.java
+++ b/hive/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
@@ -19,7 +19,10 @@
package org.apache.iceberg.hive;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
+import java.io.File;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -28,12 +31,19 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Tasks;
import org.apache.thrift.TException;
@@ -44,40 +54,110 @@ import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
-public class HiveTablesTest extends HiveTableBaseTest {
+public class HiveTableTest extends HiveTableBaseTest {
@Test
public void testCreate() throws TException {
// Table should be created in hive metastore
- final org.apache.hadoop.hive.metastore.api.Table table = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+ // Table should be renamed in hive metastore
+ String tableName = TABLE_IDENTIFIER.name();
+ org.apache.hadoop.hive.metastore.api.Table table =
+ metastoreClient.getTable(TABLE_IDENTIFIER.namespace().level(0), tableName);
// check parameters are in expected state
- final Map<String, String> parameters = table.getParameters();
+ Map<String, String> parameters = table.getParameters();
Assert.assertNotNull(parameters);
Assert.assertTrue(ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(parameters.get(TABLE_TYPE_PROP)));
- Assert.assertTrue(ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getTableType()));
+ Assert.assertTrue("EXTERNAL_TABLE".equalsIgnoreCase(table.getTableType()));
// Ensure the table is pointing to empty location
- Assert.assertEquals(getTableLocation(TABLE_NAME), table.getSd().getLocation());
+ Assert.assertEquals(getTableLocation(tableName), table.getSd().getLocation());
// Ensure it is stored as unpartitioned table in hive.
Assert.assertEquals(0, table.getPartitionKeysSize());
// Only 1 snapshotFile Should exist and no manifests should exist
- Assert.assertEquals(1, metadataVersionFiles(TABLE_NAME).size());
- Assert.assertEquals(0, manifestFiles(TABLE_NAME).size());
+ Assert.assertEquals(1, metadataVersionFiles(tableName).size());
+ Assert.assertEquals(0, manifestFiles(tableName).size());
- final Table icebergTable = tables.load(DB_NAME, TABLE_NAME);
+ final Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
// Iceberg schema should match the loaded table
Assert.assertEquals(schema.asStruct(), icebergTable.schema().asStruct());
}
@Test
+ public void testRename() {
+ String renamedTableName = "rename_table_name";
+ TableIdentifier renameTableIdentifier = TableIdentifier.of(TABLE_IDENTIFIER.namespace(), renamedTableName);
+ Table original = catalog.loadTable(TABLE_IDENTIFIER);
+
+ catalog.renameTable(TABLE_IDENTIFIER, renameTableIdentifier);
+ Assert.assertFalse(catalog.tableExists(TABLE_IDENTIFIER));
+ Assert.assertTrue(catalog.tableExists(renameTableIdentifier));
+
+ Table renamed = catalog.loadTable(renameTableIdentifier);
+
+ Assert.assertEquals(original.schema().asStruct(), renamed.schema().asStruct());
+ Assert.assertEquals(original.spec(), renamed.spec());
+ Assert.assertEquals(original.location(), renamed.location());
+ Assert.assertEquals(original.currentSnapshot(), renamed.currentSnapshot());
+
+ Assert.assertTrue(catalog.dropTable(renameTableIdentifier));
+ }
+
+ @Test
+ public void testDrop() {
+ Assert.assertTrue("Table should exist", catalog.tableExists(TABLE_IDENTIFIER));
+ Assert.assertTrue("Drop should return true and drop the table", catalog.dropTable(TABLE_IDENTIFIER));
+ Assert.assertFalse("Table should not exist", catalog.tableExists(TABLE_IDENTIFIER));
+ }
+
+ @Test
+ public void testDropLeavesTableData() throws IOException {
+ Table table = catalog.loadTable(TABLE_IDENTIFIER);
+
+ GenericRecordBuilder recordBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(schema, "test"));
+ List<GenericData.Record> records = Lists.newArrayList(
+ recordBuilder.set("id", 1L).build(),
+ recordBuilder.set("id", 2L).build(),
+ recordBuilder.set("id", 3L).build()
+ );
+
+ String fileLocation = table.location().replace("file:", "") + "/data/file.avro";
+ try (FileAppender<GenericData.Record> writer = Avro.write(Files.localOutput(fileLocation))
+ .schema(schema)
+ .named("test")
+ .build()) {
+ for (GenericData.Record rec : records) {
+ writer.add(rec);
+ }
+ }
+
+ DataFile file = DataFiles.builder(table.spec())
+ .withRecordCount(3)
+ .withPath(fileLocation)
+ .withFileSizeInBytes(Files.localInput(fileLocation).getLength())
+ .build();
+
+ table.newAppend().appendFile(file).commit();
+
+ String manifestListLocation = table.currentSnapshot().manifestListLocation().replace("file:", "");
+
+ Assert.assertTrue("Drop should return true and drop the table", catalog.dropTable(TABLE_IDENTIFIER));
+ Assert.assertFalse("Table should not exist", catalog.tableExists(TABLE_IDENTIFIER));
+
+ Assert.assertTrue("Table data files should exist",
+ new File(fileLocation).exists());
+ Assert.assertTrue("Table metadata files should exist",
+ new File(manifestListLocation).exists());
+ }
+
+ @Test
public void testExistingTableUpdate() throws TException {
- Table icebergTable = tables.load(DB_NAME, TABLE_NAME);
+ Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
// add a column
icebergTable.updateSchema().addColumn("data", Types.LongType.get()).commit();
- icebergTable = tables.load(DB_NAME, TABLE_NAME);
+ icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
// Only 2 snapshotFile Should exist and no manifests should exist
Assert.assertEquals(2, metadataVersionFiles(TABLE_NAME).size());
@@ -96,7 +176,7 @@ public class HiveTablesTest extends HiveTableBaseTest {
@Test(expected = CommitFailedException.class)
public void testFailure() throws TException {
- Table icebergTable = tables.load(DB_NAME, TABLE_NAME);
+ Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
org.apache.hadoop.hive.metastore.api.Table table = metastoreClient.getTable(DB_NAME, TABLE_NAME);
String dummyLocation = "dummylocation";
table.getParameters().put(METADATA_LOCATION_PROP, dummyLocation);
@@ -108,8 +188,8 @@ public class HiveTablesTest extends HiveTableBaseTest {
@Test
public void testConcurrentFastAppends() {
- Table icebergTable = tables.load(DB_NAME, TABLE_NAME);
- Table anotherIcebergTable = tables.load(DB_NAME, TABLE_NAME);
+ Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER);
+ Table anotherIcebergTable = catalog.loadTable(TABLE_IDENTIFIER);
String fileName = UUID.randomUUID().toString();
DataFile file = DataFiles.builder(icebergTable.spec())