You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/11/23 00:43:24 UTC

[gobblin] branch master updated: [GOBBLIN-1580] Check table exists instead of call create table directly to make sure table exists (#3432)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b50cf1f  [GOBBLIN-1580] Check table exists instead of call create table directly to make sure table exists (#3432)
b50cf1f is described below

commit b50cf1fcaf459717263a432286a569e0393a06ef
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Mon Nov 22 16:43:17 2021 -0800

    [GOBBLIN-1580] Check table exists instead of call create table directly to make sure table exists (#3432)
    
    * [hotfix] workaround to catch exception when iceberg does not support get metrics for non-union type
    
    * address comments
    
    * [GOBBLIN-1580]Check table exists instead of call create table directly to make sure table exists
---
 .../hive/metastore/HiveMetaStoreBasedRegister.java | 78 ++++++++++++----------
 1 file changed, 41 insertions(+), 37 deletions(-)

diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 020eba7..56e8276 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -273,53 +273,56 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
    * Or will create the table thru. RPC and return retVal from remote MetaStore.
    */
   private boolean ensureHiveTableExistenceBeforeAlternation(String tableName, String dbName, IMetaStoreClient client,
-      Table table, HiveSpec spec) throws TException, IOException{
+      Table table) throws TException, IOException{
     try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName)) {
       try {
-        try (Timer.Context context = this.metricContext.timer(CREATE_HIVE_TABLE).time()) {
-          client.createTable(getTableWithCreateTimeNow(table));
-          log.info(String.format("Created Hive table %s in db %s", tableName, dbName));
-          return true;
-        } catch (AlreadyExistsException e) {
-          log.debug("Table {}.{} already existed", table.getDbName(), table.getTableName());
+        if(!existsTable(dbName, tableName)) {
+          try (Timer.Context context = this.metricContext.timer(CREATE_HIVE_TABLE).time()) {
+            client.createTable(getTableWithCreateTimeNow(table));
+            log.info(String.format("Created Hive table %s in db %s", tableName, dbName));
+            return true;
+          }
         }
       }catch (TException e) {
         log.error(
             String.format("Unable to create Hive table %s in db %s: " + e.getMessage(), tableName, dbName), e);
         throw e;
       }
-
       log.info("Table {} already exists in db {}.", tableName, dbName);
-      try {
-        HiveTable existingTable;
-        try (Timer.Context context = this.metricContext.timer(GET_HIVE_TABLE).time()) {
-          existingTable = HiveMetaStoreUtils.getHiveTable(client.getTable(dbName, tableName));
-        }
-        HiveTable schemaSourceTable = existingTable;
-        if (state.contains(SCHEMA_SOURCE_DB)) {
-          try (Timer.Context context = this.metricContext.timer(GET_SCHEMA_SOURCE_HIVE_TABLE).time()) {
-            // We assume the schema source table has the same table name as the origin table, so only the db name can be configured
-            schemaSourceTable = HiveMetaStoreUtils.getHiveTable(client.getTable(state.getProp(SCHEMA_SOURCE_DB, dbName),
-                tableName));
-          }
-        }
-        if(shouldUpdateLatestSchema) {
-          updateSchema(spec, table, schemaSourceTable);
+      // When the logic up to here it means table already existed in db. Return false.
+      return false;
+    }
+  }
+
+  private void alterTableIfNeeded (String tableName, String dbName, IMetaStoreClient client,
+      Table table, HiveSpec spec) throws TException, IOException {
+    try {
+      HiveTable existingTable;
+      try (Timer.Context context = this.metricContext.timer(GET_HIVE_TABLE).time()) {
+        existingTable = HiveMetaStoreUtils.getHiveTable(client.getTable(dbName, tableName));
+      }
+      HiveTable schemaSourceTable = existingTable;
+      if (state.contains(SCHEMA_SOURCE_DB)) {
+        try (Timer.Context context = this.metricContext.timer(GET_SCHEMA_SOURCE_HIVE_TABLE).time()) {
+          // We assume the schema source table has the same table name as the origin table, so only the db name can be configured
+          schemaSourceTable = HiveMetaStoreUtils.getHiveTable(client.getTable(state.getProp(SCHEMA_SOURCE_DB, dbName),
+              tableName));
         }
-        if (needToUpdateTable(existingTable, HiveMetaStoreUtils.getHiveTable(table))) {
-          try (Timer.Context context = this.metricContext.timer(ALTER_TABLE).time()) {
-            client.alter_table(dbName, tableName, getNewTblByMergingExistingTblProps(table, existingTable));
-          }
-          log.info(String.format("updated Hive table %s in db %s", tableName, dbName));
+      }
+      if(shouldUpdateLatestSchema) {
+        updateSchema(spec, table, schemaSourceTable);
+      }
+      if (needToUpdateTable(existingTable, HiveMetaStoreUtils.getHiveTable(table))) {
+        try (Timer.Context context = this.metricContext.timer(ALTER_TABLE).time()) {
+          client.alter_table(dbName, tableName, getNewTblByMergingExistingTblProps(table, existingTable));
         }
-      } catch (TException e2) {
-        log.error(
-            String.format("Unable to create or alter Hive table %s in db %s: " + e2.getMessage(), tableName, dbName),
-            e2);
-        throw e2;
+        log.info(String.format("updated Hive table %s in db %s", tableName, dbName));
       }
-      // When the logic up to here it means table already existed in db and alteration happen. Return false.
-      return false;
+    } catch (TException e2) {
+      log.error(
+          String.format("Unable to alter Hive table %s in db %s: " + e2.getMessage(), tableName, dbName),
+          e2);
+      throw e2;
     }
   }
 
@@ -475,15 +478,16 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
         this.tableAndDbExistenceCache.get(dbName + ":" + tableName, new Callable<Boolean>() {
           @Override
           public Boolean call() throws Exception {
-            return ensureHiveTableExistenceBeforeAlternation(tableName, dbName, client, table, spec);
+            return ensureHiveTableExistenceBeforeAlternation(tableName, dbName, client, table);
           }
         });
       } catch (ExecutionException ee) {
         throw new IOException("Table existence checking throwing execution exception.", ee);
       }
     } else {
-      this.ensureHiveTableExistenceBeforeAlternation(tableName, dbName, client, table, spec);
+      this.ensureHiveTableExistenceBeforeAlternation(tableName, dbName, client, table);
     }
+    alterTableIfNeeded(tableName, dbName, client, table, spec);
   }
 
   @Override