You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/08/23 22:32:53 UTC

incubator-gobblin git commit: [GOBBLIN-566] Fix bug in HiveMetastoreBasedRegister where it would try to alter a partition when it didn't exist.

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master fcd57541a -> 27a54f05e


[GOBBLIN-566] Fix bug in HiveMetastoreBasedRegister where it would try to alter a partition when it didn't exist.

Closes #2429 from ibuenros/hive-register-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/27a54f05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/27a54f05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/27a54f05

Branch: refs/heads/master
Commit: 27a54f05ef2c084972db23d200512e6873e954d1
Parents: fcd5754
Author: ibuenros <is...@gmail.com>
Authored: Thu Aug 23 15:32:48 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Aug 23 15:32:48 2018 -0700

----------------------------------------------------------------------
 .../metastore/HiveMetaStoreBasedRegister.java   | 48 ++++++++++++--------
 1 file changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/27a54f05/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 2d074d9..0ee5445 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -156,7 +156,7 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
 
       Optional<HivePartition> partition = spec.getPartition();
       if (partition.isPresent()) {
-        addOrAlterPartition(client.get(), table, HiveMetaStoreUtils.getPartition(partition.get()), spec);
+        addOrAlterPartition(client.get(), table, partition.get());
       }
       HiveMetaStoreEventHelper.submitSuccessfulPathRegistration(eventSubmitter, spec);
     } catch (TException e) {
@@ -299,9 +299,8 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
         }
         return false;
       } catch (NoSuchObjectException e) {
-        try (Timer.Context context = this.metricContext.timer(ALTER_PARTITION).time()) {
-          client.get().alter_partition(table.getDbName(), table.getTableName(),
-              getPartitionWithCreateTimeNow(HiveMetaStoreUtils.getPartition(partition)));
+        try (Timer.Context context = this.metricContext.timer(ADD_PARTITION_TIMER).time()) {
+          client.get().add_partition(getPartitionWithCreateTimeNow(HiveMetaStoreUtils.getPartition(partition)));
         }
         HiveMetaStoreEventHelper.submitSuccessfulPartitionAdd(this.eventSubmitter, table, partition);
         return true;
@@ -432,47 +431,60 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
     }
   }
 
-  private void addOrAlterPartition(IMetaStoreClient client, Table table, Partition partition, HiveSpec spec)
+  @Override
+  public void addOrAlterPartition(HiveTable table, HivePartition partition) throws IOException {
+    try (AutoReturnableObject<IMetaStoreClient> client = this.clientPool.getClient()) {
+      addOrAlterPartition(client.get(), HiveMetaStoreUtils.getTable(table), partition);
+    } catch (TException te) {
+      throw new IOException(
+          String.format("Failed to add/alter partition %s.%s@%s", table.getDbName(), table.getTableName(), partition.getValues()),
+          te);
+    }
+  }
+
+  private void addOrAlterPartition(IMetaStoreClient client, Table table, HivePartition partition)
       throws TException {
-    Preconditions.checkArgument(table.getPartitionKeysSize() == partition.getValues().size(),
+    Partition nativePartition = HiveMetaStoreUtils.getPartition(partition);
+
+    Preconditions.checkArgument(table.getPartitionKeysSize() == nativePartition.getValues().size(),
         String.format("Partition key size is %s but partition value size is %s", table.getPartitionKeys().size(),
-            partition.getValues().size()));
+            nativePartition.getValues().size()));
 
     try (AutoCloseableLock lock =
-        this.locks.getPartitionLock(table.getDbName(), table.getTableName(), partition.getValues())) {
+        this.locks.getPartitionLock(table.getDbName(), table.getTableName(), nativePartition.getValues())) {
 
       try {
         try (Timer.Context context = this.metricContext.timer(ADD_PARTITION_TIMER).time()) {
-          client.add_partition(getPartitionWithCreateTimeNow(partition));
+          client.add_partition(getPartitionWithCreateTimeNow(nativePartition));
         }
-        log.info(String.format("Added partition %s to table %s with location %s", stringifyPartition(partition),
-            table.getTableName(), partition.getSd().getLocation()));
+        log.info(String.format("Added partition %s to table %s with location %s", stringifyPartition(nativePartition),
+            table.getTableName(), nativePartition.getSd().getLocation()));
       } catch (TException e) {
         try {
           HivePartition existingPartition;
           try (Timer.Context context = this.metricContext.timer(GET_HIVE_PARTITION).time()) {
             existingPartition = HiveMetaStoreUtils.getHivePartition(
-                client.getPartition(table.getDbName(), table.getTableName(), partition.getValues()));
+                client.getPartition(table.getDbName(), table.getTableName(), nativePartition.getValues()));
           }
 
-          if (needToUpdatePartition(existingPartition, spec.getPartition().get())) {
+          if (needToUpdatePartition(existingPartition, partition)) {
             log.info(String.format("Partition update required. ExistingPartition %s, newPartition %s",
-                stringifyPartition(existingPartition), stringifyPartition(spec.getPartition().get())));
-            Partition newPartition = getPartitionWithCreateTime(partition, existingPartition);
+                stringifyPartition(existingPartition), stringifyPartition(partition)));
+            Partition newPartition = getPartitionWithCreateTime(nativePartition, existingPartition);
             log.info(String.format("Altering partition %s", newPartition));
             try (Timer.Context context = this.metricContext.timer(ALTER_PARTITION).time()) {
               client.alter_partition(table.getDbName(), table.getTableName(), newPartition);
             }
             log.info(String.format("Updated partition %s in table %s with location %s", stringifyPartition(newPartition),
-                table.getTableName(), partition.getSd().getLocation()));
+                table.getTableName(), nativePartition.getSd().getLocation()));
           } else {
             log.info(String.format("Partition %s in table %s with location %s already exists and no need to update",
-                stringifyPartition(partition), table.getTableName(), partition.getSd().getLocation()));
+                stringifyPartition(nativePartition), table.getTableName(), nativePartition.getSd().getLocation()));
           }
         } catch (Throwable e2) {
           log.error(String.format(
               "Unable to add or alter partition %s in table %s with location %s: " + e2.getMessage(),
-              stringifyPartitionVerbose(partition), table.getTableName(), partition.getSd().getLocation()), e2);
+              stringifyPartitionVerbose(nativePartition), table.getTableName(), nativePartition.getSd().getLocation()), e2);
           throw e2;
         }
       }