You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/19 17:00:57 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-861] Skip getPartition() call to Hive Metastore when a partition already e…
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 396fc33 [GOBBLIN-861] Skip getPartition() call to Hive Metastore when a partition already e…
396fc33 is described below
commit 396fc33c59b1b91b449cd0eec82531b902b59b4d
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Mon Aug 19 10:00:49 2019 -0700
[GOBBLIN-861] Skip getPartition() call to Hive Metastore when a partition already e…
Closes #2716 from ZihanLi58/861
---
.../hive/metastore/HiveMetaStoreBasedRegister.java | 62 +++++++++++++++-------
1 file changed, 43 insertions(+), 19 deletions(-)
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index b676737..9c07337 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -95,6 +95,7 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
public static final String GET_HIVE_TABLE = HIVE_REGISTER_METRICS_PREFIX + "getTableTimer";
public static final String DROP_TABLE = HIVE_REGISTER_METRICS_PREFIX + "dropTableTimer";
public static final String PATH_REGISTER_TIMER = HIVE_REGISTER_METRICS_PREFIX + "pathRegisterTimer";
+ public static final String SKIP_PARTITION_DIFF_COMPUTATION = HIVE_REGISTER_METRICS_PREFIX + "skip.partition.diff.computation";
/**
* To reduce lock aquisition and RPC to metaStoreClient, we cache the result of query regarding to
* the existence of databases and tables in {@link #tableAndDbExistenceCache},
@@ -128,11 +129,16 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
private final boolean optimizedChecks;
+ //If this is true, after we know the partition is existing, we will skip the partition in stead of getting the existing
+ // partition and computing the diff to see if it needs to be updated. Use this only when you can make sure the metadata
+ //for a partition is immutable
+ private final boolean skipDiffComputation;
public HiveMetaStoreBasedRegister(State state, Optional<String> metastoreURI) throws IOException {
super(state);
this.optimizedChecks = state.getPropAsBoolean(this.OPTIMIZED_CHECK_ENABLED, true);
+ this.skipDiffComputation = state.getPropAsBoolean(this.SKIP_PARTITION_DIFF_COMPUTATION, false);
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(this.props.getNumThreads());
@@ -472,25 +478,10 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
table.getTableName(), nativePartition.getSd().getLocation()));
} catch (TException e) {
try {
- HivePartition existingPartition;
- try (Timer.Context context = this.metricContext.timer(GET_HIVE_PARTITION).time()) {
- existingPartition = HiveMetaStoreUtils.getHivePartition(
- client.getPartition(table.getDbName(), table.getTableName(), nativePartition.getValues()));
- }
-
- if (needToUpdatePartition(existingPartition, partition)) {
- log.info(String.format("Partition update required. ExistingPartition %s, newPartition %s",
- stringifyPartition(existingPartition), stringifyPartition(partition)));
- Partition newPartition = getPartitionWithCreateTime(nativePartition, existingPartition);
- log.info(String.format("Altering partition %s", newPartition));
- try (Timer.Context context = this.metricContext.timer(ALTER_PARTITION).time()) {
- client.alter_partition(table.getDbName(), table.getTableName(), newPartition);
- }
- log.info(String.format("Updated partition %s in table %s with location %s", stringifyPartition(newPartition),
- table.getTableName(), nativePartition.getSd().getLocation()));
+ if (this.skipDiffComputation) {
+ onPartitionExistWithoutComputingDiff(table, nativePartition, e);
} else {
- log.info(String.format("Partition %s in table %s with location %s already exists and no need to update",
- stringifyPartition(nativePartition), table.getTableName(), nativePartition.getSd().getLocation()));
+ onPartitionExist(client, table, partition, nativePartition);
}
} catch (Throwable e2) {
log.error(String.format(
@@ -502,6 +493,39 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
}
}
+ private void onPartitionExist(IMetaStoreClient client, Table table, HivePartition partition, Partition nativePartition) throws TException {
+ HivePartition existingPartition;
+ try (Timer.Context context = this.metricContext.timer(GET_HIVE_PARTITION).time()) {
+ existingPartition = HiveMetaStoreUtils.getHivePartition(
+ client.getPartition(table.getDbName(), table.getTableName(), nativePartition.getValues()));
+ }
+
+ if (needToUpdatePartition(existingPartition, partition)) {
+ log.info(String.format("Partition update required. ExistingPartition %s, newPartition %s",
+ stringifyPartition(existingPartition), stringifyPartition(partition)));
+ Partition newPartition = getPartitionWithCreateTime(nativePartition, existingPartition);
+ log.info(String.format("Altering partition %s", newPartition));
+ try (Timer.Context context = this.metricContext.timer(ALTER_PARTITION).time()) {
+ client.alter_partition(table.getDbName(), table.getTableName(), newPartition);
+ }
+ log.info(String.format("Updated partition %s in table %s with location %s", stringifyPartition(newPartition),
+ table.getTableName(), nativePartition.getSd().getLocation()));
+ } else {
+ log.debug(String.format("Partition %s in table %s with location %s already exists and no need to update",
+ stringifyPartition(nativePartition), table.getTableName(), nativePartition.getSd().getLocation()));
+ }
+ }
+
+ private void onPartitionExistWithoutComputingDiff(Table table, Partition nativePartition, TException e) throws TException {
+ if (e instanceof AlreadyExistsException) {
+ log.debug(String.format("Partition %s in table %s with location %s already exists and no need to update",
+ stringifyPartition(nativePartition), table.getTableName(), nativePartition.getSd().getLocation()));
+ }
+ else {
+ throw e;
+ }
+ }
+
private static String stringifyPartition(Partition partition) {
if (log.isDebugEnabled()) {
return stringifyPartitionVerbose(partition);
@@ -541,7 +565,7 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
try (Timer.Context context = this.metricContext.timer(GET_HIVE_PARTITION).time()) {
hivePartition = client.get().getPartition(dbName, tableName, partitionValues);
}
- return Optional.of(HiveMetaStoreUtils.getHivePartition(hivePartition));
+ return Optional.of(HiveMetaStoreUtils.getHivePartition(hivePartition));
} catch (NoSuchObjectException e) {
return Optional.<HivePartition> absent();
} catch (TException e) {