You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/08/23 22:32:53 UTC
incubator-gobblin git commit: [GOBBLIN-566] Fix bug in
HiveMetastoreBasedRegister where it would try to alter a partition when it
didn't exist.
Repository: incubator-gobblin
Updated Branches:
refs/heads/master fcd57541a -> 27a54f05e
[GOBBLIN-566] Fix bug in HiveMetastoreBasedRegister where it would try to alter a partition when it didn't exist.
Closes #2429 from ibuenros/hive-register-fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/27a54f05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/27a54f05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/27a54f05
Branch: refs/heads/master
Commit: 27a54f05ef2c084972db23d200512e6873e954d1
Parents: fcd5754
Author: ibuenros <is...@gmail.com>
Authored: Thu Aug 23 15:32:48 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Aug 23 15:32:48 2018 -0700
----------------------------------------------------------------------
.../metastore/HiveMetaStoreBasedRegister.java | 48 ++++++++++++--------
1 file changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/27a54f05/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 2d074d9..0ee5445 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -156,7 +156,7 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
Optional<HivePartition> partition = spec.getPartition();
if (partition.isPresent()) {
- addOrAlterPartition(client.get(), table, HiveMetaStoreUtils.getPartition(partition.get()), spec);
+ addOrAlterPartition(client.get(), table, partition.get());
}
HiveMetaStoreEventHelper.submitSuccessfulPathRegistration(eventSubmitter, spec);
} catch (TException e) {
@@ -299,9 +299,8 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
}
return false;
} catch (NoSuchObjectException e) {
- try (Timer.Context context = this.metricContext.timer(ALTER_PARTITION).time()) {
- client.get().alter_partition(table.getDbName(), table.getTableName(),
- getPartitionWithCreateTimeNow(HiveMetaStoreUtils.getPartition(partition)));
+ try (Timer.Context context = this.metricContext.timer(ADD_PARTITION_TIMER).time()) {
+ client.get().add_partition(getPartitionWithCreateTimeNow(HiveMetaStoreUtils.getPartition(partition)));
}
HiveMetaStoreEventHelper.submitSuccessfulPartitionAdd(this.eventSubmitter, table, partition);
return true;
@@ -432,47 +431,60 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
}
}
- private void addOrAlterPartition(IMetaStoreClient client, Table table, Partition partition, HiveSpec spec)
+ @Override
+ public void addOrAlterPartition(HiveTable table, HivePartition partition) throws IOException {
+ try (AutoReturnableObject<IMetaStoreClient> client = this.clientPool.getClient()) {
+ addOrAlterPartition(client.get(), HiveMetaStoreUtils.getTable(table), partition);
+ } catch (TException te) {
+ throw new IOException(
+ String.format("Failed to add/alter partition %s.%s@%s", table.getDbName(), table.getTableName(), partition.getValues()),
+ te);
+ }
+ }
+
+ private void addOrAlterPartition(IMetaStoreClient client, Table table, HivePartition partition)
throws TException {
- Preconditions.checkArgument(table.getPartitionKeysSize() == partition.getValues().size(),
+ Partition nativePartition = HiveMetaStoreUtils.getPartition(partition);
+
+ Preconditions.checkArgument(table.getPartitionKeysSize() == nativePartition.getValues().size(),
String.format("Partition key size is %s but partition value size is %s", table.getPartitionKeys().size(),
- partition.getValues().size()));
+ nativePartition.getValues().size()));
try (AutoCloseableLock lock =
- this.locks.getPartitionLock(table.getDbName(), table.getTableName(), partition.getValues())) {
+ this.locks.getPartitionLock(table.getDbName(), table.getTableName(), nativePartition.getValues())) {
try {
try (Timer.Context context = this.metricContext.timer(ADD_PARTITION_TIMER).time()) {
- client.add_partition(getPartitionWithCreateTimeNow(partition));
+ client.add_partition(getPartitionWithCreateTimeNow(nativePartition));
}
- log.info(String.format("Added partition %s to table %s with location %s", stringifyPartition(partition),
- table.getTableName(), partition.getSd().getLocation()));
+ log.info(String.format("Added partition %s to table %s with location %s", stringifyPartition(nativePartition),
+ table.getTableName(), nativePartition.getSd().getLocation()));
} catch (TException e) {
try {
HivePartition existingPartition;
try (Timer.Context context = this.metricContext.timer(GET_HIVE_PARTITION).time()) {
existingPartition = HiveMetaStoreUtils.getHivePartition(
- client.getPartition(table.getDbName(), table.getTableName(), partition.getValues()));
+ client.getPartition(table.getDbName(), table.getTableName(), nativePartition.getValues()));
}
- if (needToUpdatePartition(existingPartition, spec.getPartition().get())) {
+ if (needToUpdatePartition(existingPartition, partition)) {
log.info(String.format("Partition update required. ExistingPartition %s, newPartition %s",
- stringifyPartition(existingPartition), stringifyPartition(spec.getPartition().get())));
- Partition newPartition = getPartitionWithCreateTime(partition, existingPartition);
+ stringifyPartition(existingPartition), stringifyPartition(partition)));
+ Partition newPartition = getPartitionWithCreateTime(nativePartition, existingPartition);
log.info(String.format("Altering partition %s", newPartition));
try (Timer.Context context = this.metricContext.timer(ALTER_PARTITION).time()) {
client.alter_partition(table.getDbName(), table.getTableName(), newPartition);
}
log.info(String.format("Updated partition %s in table %s with location %s", stringifyPartition(newPartition),
- table.getTableName(), partition.getSd().getLocation()));
+ table.getTableName(), nativePartition.getSd().getLocation()));
} else {
log.info(String.format("Partition %s in table %s with location %s already exists and no need to update",
- stringifyPartition(partition), table.getTableName(), partition.getSd().getLocation()));
+ stringifyPartition(nativePartition), table.getTableName(), nativePartition.getSd().getLocation()));
}
} catch (Throwable e2) {
log.error(String.format(
"Unable to add or alter partition %s in table %s with location %s: " + e2.getMessage(),
- stringifyPartitionVerbose(partition), table.getTableName(), partition.getSd().getLocation()), e2);
+ stringifyPartitionVerbose(nativePartition), table.getTableName(), nativePartition.getSd().getLocation()), e2);
throw e2;
}
}