You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/05/04 16:01:01 UTC

[incubator-iceberg] branch master updated: Fix Hadoop commit race condition (#990)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new b8037d2  Fix Hadoop commit race condition (#990)
b8037d2 is described below

commit b8037d24256cd7133d535cce0c8399acb138e30c
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Mon May 4 09:00:49 2020 -0700

    Fix Hadoop commit race condition (#990)
---
 .../iceberg/hadoop/HadoopTableOperations.java      | 34 ++++++++++++++--------
 1 file changed, 22 insertions(+), 12 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
index 8bad148..431fc89 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,11 +57,12 @@ public class HadoopTableOperations implements TableOperations {
 
   private final Configuration conf;
   private final Path location;
-  private TableMetadata currentMetadata = null;
-  private Integer version = null;
-  private boolean shouldRefresh = true;
   private HadoopFileIO defaultFileIo = null;
 
+  private volatile TableMetadata currentMetadata = null;
+  private volatile Integer version = null;
+  private volatile boolean shouldRefresh = true;
+
   protected HadoopTableOperations(Path location, Configuration conf) {
     this.conf = conf;
     this.location = location;
@@ -74,6 +76,18 @@ public class HadoopTableOperations implements TableOperations {
     return currentMetadata;
   }
 
+  private synchronized Pair<Integer, TableMetadata> versionAndMetadata() {
+    return Pair.of(version, currentMetadata);
+  }
+
+  private synchronized void updateVersionAndMetadata(int newVersion, String metadataFile) {
+    // update if the current version is out of date
+    if (version == null || version != newVersion) {
+      this.version = newVersion;
+      this.currentMetadata = checkUUID(currentMetadata, TableMetadataParser.read(io(), metadataFile));
+    }
+  }
+
   @Override
   public TableMetadata refresh() {
     int ver = version != null ? version : readVersionHint();
@@ -93,12 +107,7 @@ public class HadoopTableOperations implements TableOperations {
         nextMetadataFile = getMetadataFile(ver + 1);
       }
 
-      // only load if the current version is out of date
-      if (version == null || version != ver) {
-        this.version = ver;
-
-        this.currentMetadata = checkUUID(currentMetadata, TableMetadataParser.read(io(), metadataFile.toString()));
-      }
+      updateVersionAndMetadata(ver, metadataFile.toString());
 
       this.shouldRefresh = false;
       return currentMetadata;
@@ -108,8 +117,9 @@ public class HadoopTableOperations implements TableOperations {
   }
 
   @Override
-  public synchronized void commit(TableMetadata base, TableMetadata metadata) {
-    if (base != current()) {
+  public void commit(TableMetadata base, TableMetadata metadata) {
+    Pair<Integer, TableMetadata> current = versionAndMetadata();
+    if (base != current.second()) {
       throw new CommitFailedException("Cannot commit changes based on stale table metadata");
     }
 
@@ -131,7 +141,7 @@ public class HadoopTableOperations implements TableOperations {
     Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + fileExtension);
     TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));
 
-    int nextVersion = (version != null ? version : 0) + 1;
+    int nextVersion = (current.first() != null ? current.first() : 0) + 1;
     Path finalMetadataFile = metadataFilePath(nextVersion, codec);
     FileSystem fs = getFileSystem(tempMetadataFile, conf);