You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/11/23 00:43:24 UTC
[gobblin] branch master updated: [GOBBLIN-1580] Check table exists instead of call create table directly to make sure table exists (#3432)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b50cf1f [GOBBLIN-1580] Check table exists instead of call create table directly to make sure table exists (#3432)
b50cf1f is described below
commit b50cf1fcaf459717263a432286a569e0393a06ef
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Mon Nov 22 16:43:17 2021 -0800
[GOBBLIN-1580] Check table exists instead of call create table directly to make sure table exists (#3432)
* [hotfix] workaround to catch exception when iceberg does not support get metrics for non-union type
* address comments
* [GOBBLIN-1580]Check table exists instead of call create table directly to make sure table exists
---
.../hive/metastore/HiveMetaStoreBasedRegister.java | 78 ++++++++++++----------
1 file changed, 41 insertions(+), 37 deletions(-)
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 020eba7..56e8276 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -273,53 +273,56 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
* Or will create the table thru. RPC and return retVal from remote MetaStore.
*/
private boolean ensureHiveTableExistenceBeforeAlternation(String tableName, String dbName, IMetaStoreClient client,
- Table table, HiveSpec spec) throws TException, IOException{
+ Table table) throws TException, IOException{
try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName)) {
try {
- try (Timer.Context context = this.metricContext.timer(CREATE_HIVE_TABLE).time()) {
- client.createTable(getTableWithCreateTimeNow(table));
- log.info(String.format("Created Hive table %s in db %s", tableName, dbName));
- return true;
- } catch (AlreadyExistsException e) {
- log.debug("Table {}.{} already existed", table.getDbName(), table.getTableName());
+ if(!existsTable(dbName, tableName)) {
+ try (Timer.Context context = this.metricContext.timer(CREATE_HIVE_TABLE).time()) {
+ client.createTable(getTableWithCreateTimeNow(table));
+ log.info(String.format("Created Hive table %s in db %s", tableName, dbName));
+ return true;
+ }
}
}catch (TException e) {
log.error(
String.format("Unable to create Hive table %s in db %s: " + e.getMessage(), tableName, dbName), e);
throw e;
}
-
log.info("Table {} already exists in db {}.", tableName, dbName);
- try {
- HiveTable existingTable;
- try (Timer.Context context = this.metricContext.timer(GET_HIVE_TABLE).time()) {
- existingTable = HiveMetaStoreUtils.getHiveTable(client.getTable(dbName, tableName));
- }
- HiveTable schemaSourceTable = existingTable;
- if (state.contains(SCHEMA_SOURCE_DB)) {
- try (Timer.Context context = this.metricContext.timer(GET_SCHEMA_SOURCE_HIVE_TABLE).time()) {
- // We assume the schema source table has the same table name as the origin table, so only the db name can be configured
- schemaSourceTable = HiveMetaStoreUtils.getHiveTable(client.getTable(state.getProp(SCHEMA_SOURCE_DB, dbName),
- tableName));
- }
- }
- if(shouldUpdateLatestSchema) {
- updateSchema(spec, table, schemaSourceTable);
+ // When the logic up to here it means table already existed in db. Return false.
+ return false;
+ }
+ }
+
+ private void alterTableIfNeeded (String tableName, String dbName, IMetaStoreClient client,
+ Table table, HiveSpec spec) throws TException, IOException {
+ try {
+ HiveTable existingTable;
+ try (Timer.Context context = this.metricContext.timer(GET_HIVE_TABLE).time()) {
+ existingTable = HiveMetaStoreUtils.getHiveTable(client.getTable(dbName, tableName));
+ }
+ HiveTable schemaSourceTable = existingTable;
+ if (state.contains(SCHEMA_SOURCE_DB)) {
+ try (Timer.Context context = this.metricContext.timer(GET_SCHEMA_SOURCE_HIVE_TABLE).time()) {
+ // We assume the schema source table has the same table name as the origin table, so only the db name can be configured
+ schemaSourceTable = HiveMetaStoreUtils.getHiveTable(client.getTable(state.getProp(SCHEMA_SOURCE_DB, dbName),
+ tableName));
}
- if (needToUpdateTable(existingTable, HiveMetaStoreUtils.getHiveTable(table))) {
- try (Timer.Context context = this.metricContext.timer(ALTER_TABLE).time()) {
- client.alter_table(dbName, tableName, getNewTblByMergingExistingTblProps(table, existingTable));
- }
- log.info(String.format("updated Hive table %s in db %s", tableName, dbName));
+ }
+ if(shouldUpdateLatestSchema) {
+ updateSchema(spec, table, schemaSourceTable);
+ }
+ if (needToUpdateTable(existingTable, HiveMetaStoreUtils.getHiveTable(table))) {
+ try (Timer.Context context = this.metricContext.timer(ALTER_TABLE).time()) {
+ client.alter_table(dbName, tableName, getNewTblByMergingExistingTblProps(table, existingTable));
}
- } catch (TException e2) {
- log.error(
- String.format("Unable to create or alter Hive table %s in db %s: " + e2.getMessage(), tableName, dbName),
- e2);
- throw e2;
+ log.info(String.format("updated Hive table %s in db %s", tableName, dbName));
}
- // When the logic up to here it means table already existed in db and alteration happen. Return false.
- return false;
+ } catch (TException e2) {
+ log.error(
+ String.format("Unable to alter Hive table %s in db %s: " + e2.getMessage(), tableName, dbName),
+ e2);
+ throw e2;
}
}
@@ -475,15 +478,16 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
this.tableAndDbExistenceCache.get(dbName + ":" + tableName, new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
- return ensureHiveTableExistenceBeforeAlternation(tableName, dbName, client, table, spec);
+ return ensureHiveTableExistenceBeforeAlternation(tableName, dbName, client, table);
}
});
} catch (ExecutionException ee) {
throw new IOException("Table existence checking throwing execution exception.", ee);
}
} else {
- this.ensureHiveTableExistenceBeforeAlternation(tableName, dbName, client, table, spec);
+ this.ensureHiveTableExistenceBeforeAlternation(tableName, dbName, client, table);
}
+ alterTableIfNeeded(tableName, dbName, client, table, spec);
}
@Override