You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/08/22 19:13:03 UTC

[incubator-iceberg] branch master updated: Minor updates to BaseMetastoreTableOperations (#390)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 92b5b45  Minor updates to BaseMetastoreTableOperations (#390)
92b5b45 is described below

commit 92b5b450bbf3c392785b45b97446f88ed2d5b6c5
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Aug 22 12:12:56 2019 -0700

    Minor updates to BaseMetastoreTableOperations (#390)
    
    This includes 2 updates to BaseMetastoreTableOperations:
    
    * Use overwrite to create metadata JSON files to avoid S3 negative caching
    * Add an optional predicate to determine if loading metadata should be retried after an exception
---
 .../org/apache/iceberg/BaseMetastoreTableOperations.java | 15 ++++++++++++---
 .../java/org/apache/iceberg/TableMetadataParser.java     | 16 +++++++++++++---
 core/src/main/java/org/apache/iceberg/util/Tasks.java    | 14 +++++++++++++-
 3 files changed, 38 insertions(+), 7 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
index 9013f39..55f3305 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java
@@ -23,6 +23,7 @@ import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
@@ -74,16 +75,23 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
     OutputFile newMetadataLocation = io().newOutputFile(newTableMetadataFilePath);
 
     // write the new metadata
-    TableMetadataParser.write(metadata, newMetadataLocation);
+    // use overwrite to avoid negative caching in S3. this is safe because the metadata location is
+    // always unique because it includes a UUID.
+    TableMetadataParser.overwrite(metadata, newMetadataLocation);
 
     return newTableMetadataFilePath;
   }
 
   protected void refreshFromMetadataLocation(String newLocation) {
-    refreshFromMetadataLocation(newLocation, 20);
+    refreshFromMetadataLocation(newLocation, null, 20);
   }
 
   protected void refreshFromMetadataLocation(String newLocation, int numRetries) {
+    refreshFromMetadataLocation(newLocation, null, numRetries);
+  }
+
+  protected void refreshFromMetadataLocation(String newLocation, Predicate<Exception> shouldRetry,
+                                             int numRetries) {
     // use null-safe equality check because new tables have a null metadata location
     if (!Objects.equal(currentMetadataLocation, newLocation)) {
       LOG.info("Refreshing table metadata from new version: {}", newLocation);
@@ -91,7 +99,8 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
       AtomicReference<TableMetadata> newMetadata = new AtomicReference<>();
       Tasks.foreach(newLocation)
           .retry(numRetries).exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */)
-          .suppressFailureWhenFinished()
+          .throwFailureWhenFinished()
+          .shouldRetryTest(shouldRetry)
           .run(metadataLocation -> newMetadata.set(
               TableMetadataParser.read(this, io().newInputFile(metadataLocation))));
 
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index 03f947d..8feaa8a 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.StringWriter;
 import java.util.Comparator;
@@ -95,10 +96,19 @@ public class TableMetadataParser {
   static final String TIMESTAMP_MS = "timestamp-ms";
   static final String SNAPSHOT_LOG = "snapshot-log";
 
+  public static void overwrite(TableMetadata metadata, OutputFile outputFile) {
+    internalWrite(metadata, outputFile, true);
+  }
+
   public static void write(TableMetadata metadata, OutputFile outputFile) {
-    Codec codec = Codec.fromFileName(outputFile.location());
-    try (OutputStreamWriter writer = new OutputStreamWriter(
-        codec == Codec.GZIP ? new GZIPOutputStream(outputFile.create()) : outputFile.create())) {
+    internalWrite(metadata, outputFile, false);
+  }
+
+  public static void internalWrite(
+      TableMetadata metadata, OutputFile outputFile, boolean overwrite) {
+    boolean isGzip = Codec.fromFileName(outputFile.location()) == Codec.GZIP;
+    OutputStream stream = overwrite ? outputFile.createOrOverwrite() : outputFile.create();
+    try (OutputStreamWriter writer = new OutputStreamWriter(isGzip ? new GZIPOutputStream(stream) : stream)) {
       JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
       generator.useDefaultPrettyPrinter();
       toJson(metadata, generator);
diff --git a/core/src/main/java/org/apache/iceberg/util/Tasks.java b/core/src/main/java/org/apache/iceberg/util/Tasks.java
index a353f2b..239044a 100644
--- a/core/src/main/java/org/apache/iceberg/util/Tasks.java
+++ b/core/src/main/java/org/apache/iceberg/util/Tasks.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,6 +82,7 @@ public class Tasks {
     private List<Class<? extends Exception>> stopRetryExceptions = Lists.newArrayList(
         UnrecoverableException.class);
     private List<Class<? extends Exception>> onlyRetryExceptions = null;
+    private Predicate<Exception> shouldRetryPredicate = null;
     private int maxAttempts = 1;          // not all operations can be retried
     private long minSleepTimeMs = 1000;   // 1 second
     private long maxSleepTimeMs = 600000; // 10 minutes
@@ -146,6 +148,11 @@ public class Tasks {
       return this;
     }
 
+    public Builder<I> shouldRetryTest(Predicate<Exception> shouldRetry) {
+      this.shouldRetryPredicate = shouldRetry;
+      return this;
+    }
+
     public Builder<I> noRetry() {
       this.maxAttempts = 1;
       return this;
@@ -405,7 +412,12 @@ public class Tasks {
             throw e;
           }
 
-          if (onlyRetryExceptions != null) {
+          if (shouldRetryPredicate != null) {
+            if (!shouldRetryPredicate.test(e)) {
+              throw e;
+            }
+
+          } else if (onlyRetryExceptions != null) {
             // if onlyRetryExceptions are present, then this retries if one is found
             boolean matchedRetryException = false;
             for (Class<? extends Exception> exClass : onlyRetryExceptions) {