You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/08/22 19:13:03 UTC
[incubator-iceberg] branch master updated: Minor updates to
BaseMetastoreTableOperations (#390)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 92b5b45 Minor updates to BaseMetastoreTableOperations (#390)
92b5b45 is described below
commit 92b5b450bbf3c392785b45b97446f88ed2d5b6c5
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Aug 22 12:12:56 2019 -0700
Minor updates to BaseMetastoreTableOperations (#390)
This includes 2 updates to BaseMetastoreTableOperations:
* Use overwrite to create metadata JSON files to avoid S3 negative caching
* Add an optional predicate to determine if loading metadata should be retried after an exception
---
.../org/apache/iceberg/BaseMetastoreTableOperations.java | 15 ++++++++++++---
.../java/org/apache/iceberg/TableMetadataParser.java | 16 +++++++++++++---
core/src/main/java/org/apache/iceberg/util/Tasks.java | 14 +++++++++++++-
3 files changed, 38 insertions(+), 7 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
index 9013f39..55f3305 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
@@ -23,6 +23,7 @@ import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
@@ -74,16 +75,23 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
OutputFile newMetadataLocation = io().newOutputFile(newTableMetadataFilePath);
// write the new metadata
- TableMetadataParser.write(metadata, newMetadataLocation);
+ // use overwrite to avoid negative caching in S3. this is safe because the metadata location is
+ // always unique because it includes a UUID.
+ TableMetadataParser.overwrite(metadata, newMetadataLocation);
return newTableMetadataFilePath;
}
protected void refreshFromMetadataLocation(String newLocation) {
- refreshFromMetadataLocation(newLocation, 20);
+ refreshFromMetadataLocation(newLocation, null, 20);
}
protected void refreshFromMetadataLocation(String newLocation, int numRetries) {
+ refreshFromMetadataLocation(newLocation, null, numRetries);
+ }
+
+ protected void refreshFromMetadataLocation(String newLocation, Predicate<Exception> shouldRetry,
+ int numRetries) {
// use null-safe equality check because new tables have a null metadata location
if (!Objects.equal(currentMetadataLocation, newLocation)) {
LOG.info("Refreshing table metadata from new version: {}", newLocation);
@@ -91,7 +99,8 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
AtomicReference<TableMetadata> newMetadata = new AtomicReference<>();
Tasks.foreach(newLocation)
.retry(numRetries).exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */)
- .suppressFailureWhenFinished()
+ .throwFailureWhenFinished()
+ .shouldRetryTest(shouldRetry)
.run(metadataLocation -> newMetadata.set(
TableMetadataParser.read(this, io().newInputFile(metadataLocation))));
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index 03f947d..8feaa8a 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.util.Comparator;
@@ -95,10 +96,19 @@ public class TableMetadataParser {
static final String TIMESTAMP_MS = "timestamp-ms";
static final String SNAPSHOT_LOG = "snapshot-log";
+ public static void overwrite(TableMetadata metadata, OutputFile outputFile) {
+ internalWrite(metadata, outputFile, true);
+ }
+
public static void write(TableMetadata metadata, OutputFile outputFile) {
- Codec codec = Codec.fromFileName(outputFile.location());
- try (OutputStreamWriter writer = new OutputStreamWriter(
- codec == Codec.GZIP ? new GZIPOutputStream(outputFile.create()) : outputFile.create())) {
+ internalWrite(metadata, outputFile, false);
+ }
+
+ public static void internalWrite(
+ TableMetadata metadata, OutputFile outputFile, boolean overwrite) {
+ boolean isGzip = Codec.fromFileName(outputFile.location()) == Codec.GZIP;
+ OutputStream stream = overwrite ? outputFile.createOrOverwrite() : outputFile.create();
+ try (OutputStreamWriter writer = new OutputStreamWriter(isGzip ? new GZIPOutputStream(stream) : stream)) {
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
generator.useDefaultPrettyPrinter();
toJson(metadata, generator);
diff --git a/core/src/main/java/org/apache/iceberg/util/Tasks.java b/core/src/main/java/org/apache/iceberg/util/Tasks.java
index a353f2b..239044a 100644
--- a/core/src/main/java/org/apache/iceberg/util/Tasks.java
+++ b/core/src/main/java/org/apache/iceberg/util/Tasks.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +82,7 @@ public class Tasks {
private List<Class<? extends Exception>> stopRetryExceptions = Lists.newArrayList(
UnrecoverableException.class);
private List<Class<? extends Exception>> onlyRetryExceptions = null;
+ private Predicate<Exception> shouldRetryPredicate = null;
private int maxAttempts = 1; // not all operations can be retried
private long minSleepTimeMs = 1000; // 1 second
private long maxSleepTimeMs = 600000; // 10 minutes
@@ -146,6 +148,11 @@ public class Tasks {
return this;
}
+ public Builder<I> shouldRetryTest(Predicate<Exception> shouldRetry) {
+ this.shouldRetryPredicate = shouldRetry;
+ return this;
+ }
+
public Builder<I> noRetry() {
this.maxAttempts = 1;
return this;
@@ -405,7 +412,12 @@ public class Tasks {
throw e;
}
- if (onlyRetryExceptions != null) {
+ if (shouldRetryPredicate != null) {
+ if (!shouldRetryPredicate.test(e)) {
+ throw e;
+ }
+
+ } else if (onlyRetryExceptions != null) {
// if onlyRetryExceptions are present, then this retries if one is found
boolean matchedRetryException = false;
for (Class<? extends Exception> exClass : onlyRetryExceptions) {