You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/19 17:00:57 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-861] Skip getPartition() call to Hive Metastore when a partition already e…

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 396fc33  [GOBBLIN-861] Skip getPartition() call to Hive Metastore when a partition already e…
396fc33 is described below

commit 396fc33c59b1b91b449cd0eec82531b902b59b4d
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Mon Aug 19 10:00:49 2019 -0700

    [GOBBLIN-861] Skip getPartition() call to Hive Metastore when a partition already e…
    
    Closes #2716 from ZihanLi58/861
---
 .../hive/metastore/HiveMetaStoreBasedRegister.java | 62 +++++++++++++++-------
 1 file changed, 43 insertions(+), 19 deletions(-)

diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index b676737..9c07337 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -95,6 +95,7 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
   public static final String GET_HIVE_TABLE = HIVE_REGISTER_METRICS_PREFIX + "getTableTimer";
   public static final String DROP_TABLE = HIVE_REGISTER_METRICS_PREFIX + "dropTableTimer";
   public static final String PATH_REGISTER_TIMER = HIVE_REGISTER_METRICS_PREFIX + "pathRegisterTimer";
+  public static final String SKIP_PARTITION_DIFF_COMPUTATION = HIVE_REGISTER_METRICS_PREFIX + "skip.partition.diff.computation";
   /**
    * To reduce lock aquisition and RPC to metaStoreClient, we cache the result of query regarding to
    * the existence of databases and tables in {@link #tableAndDbExistenceCache},
@@ -128,11 +129,16 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
 
 
   private final boolean optimizedChecks;
+  //If this is true, after we know the partition is existing, we will skip the partition in stead of getting the existing
+  // partition and computing the diff to see if it needs to be updated. Use this only when you can make sure the metadata
+  //for a partition is immutable
+  private final boolean skipDiffComputation;
 
   public HiveMetaStoreBasedRegister(State state, Optional<String> metastoreURI) throws IOException {
     super(state);
 
     this.optimizedChecks = state.getPropAsBoolean(this.OPTIMIZED_CHECK_ENABLED, true);
+    this.skipDiffComputation = state.getPropAsBoolean(this.SKIP_PARTITION_DIFF_COMPUTATION, false);
 
     GenericObjectPoolConfig config = new GenericObjectPoolConfig();
     config.setMaxTotal(this.props.getNumThreads());
@@ -472,25 +478,10 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
             table.getTableName(), nativePartition.getSd().getLocation()));
       } catch (TException e) {
         try {
-          HivePartition existingPartition;
-          try (Timer.Context context = this.metricContext.timer(GET_HIVE_PARTITION).time()) {
-            existingPartition = HiveMetaStoreUtils.getHivePartition(
-                client.getPartition(table.getDbName(), table.getTableName(), nativePartition.getValues()));
-          }
-
-          if (needToUpdatePartition(existingPartition, partition)) {
-            log.info(String.format("Partition update required. ExistingPartition %s, newPartition %s",
-                stringifyPartition(existingPartition), stringifyPartition(partition)));
-            Partition newPartition = getPartitionWithCreateTime(nativePartition, existingPartition);
-            log.info(String.format("Altering partition %s", newPartition));
-            try (Timer.Context context = this.metricContext.timer(ALTER_PARTITION).time()) {
-              client.alter_partition(table.getDbName(), table.getTableName(), newPartition);
-            }
-            log.info(String.format("Updated partition %s in table %s with location %s", stringifyPartition(newPartition),
-                table.getTableName(), nativePartition.getSd().getLocation()));
+          if (this.skipDiffComputation) {
+            onPartitionExistWithoutComputingDiff(table, nativePartition, e);
           } else {
-            log.info(String.format("Partition %s in table %s with location %s already exists and no need to update",
-                stringifyPartition(nativePartition), table.getTableName(), nativePartition.getSd().getLocation()));
+            onPartitionExist(client, table, partition, nativePartition);
           }
         } catch (Throwable e2) {
           log.error(String.format(
@@ -502,6 +493,39 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
     }
   }
 
+  private void onPartitionExist(IMetaStoreClient client, Table table, HivePartition partition, Partition nativePartition) throws TException {
+    HivePartition existingPartition;
+    try (Timer.Context context = this.metricContext.timer(GET_HIVE_PARTITION).time()) {
+      existingPartition = HiveMetaStoreUtils.getHivePartition(
+          client.getPartition(table.getDbName(), table.getTableName(), nativePartition.getValues()));
+    }
+
+    if (needToUpdatePartition(existingPartition, partition)) {
+      log.info(String.format("Partition update required. ExistingPartition %s, newPartition %s",
+          stringifyPartition(existingPartition), stringifyPartition(partition)));
+      Partition newPartition = getPartitionWithCreateTime(nativePartition, existingPartition);
+      log.info(String.format("Altering partition %s", newPartition));
+      try (Timer.Context context = this.metricContext.timer(ALTER_PARTITION).time()) {
+        client.alter_partition(table.getDbName(), table.getTableName(), newPartition);
+      }
+      log.info(String.format("Updated partition %s in table %s with location %s", stringifyPartition(newPartition),
+          table.getTableName(), nativePartition.getSd().getLocation()));
+    } else {
+      log.debug(String.format("Partition %s in table %s with location %s already exists and no need to update",
+          stringifyPartition(nativePartition), table.getTableName(), nativePartition.getSd().getLocation()));
+    }
+  }
+
+  private void onPartitionExistWithoutComputingDiff(Table table, Partition nativePartition, TException e) throws TException {
+    if (e instanceof AlreadyExistsException) {
+      log.debug(String.format("Partition %s in table %s with location %s already exists and no need to update",
+          stringifyPartition(nativePartition), table.getTableName(), nativePartition.getSd().getLocation()));
+    }
+    else {
+      throw e;
+    }
+  }
+
   private static String stringifyPartition(Partition partition) {
     if (log.isDebugEnabled()) {
       return stringifyPartitionVerbose(partition);
@@ -541,7 +565,7 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
       try (Timer.Context context = this.metricContext.timer(GET_HIVE_PARTITION).time()) {
         hivePartition = client.get().getPartition(dbName, tableName, partitionValues);
       }
-        return Optional.of(HiveMetaStoreUtils.getHivePartition(hivePartition));
+      return Optional.of(HiveMetaStoreUtils.getHivePartition(hivePartition));
     } catch (NoSuchObjectException e) {
       return Optional.<HivePartition> absent();
     } catch (TException e) {