You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/05/04 16:01:01 UTC
[incubator-iceberg] branch master updated: Fix Hadoop commit race
condition (#990)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b8037d2 Fix Hadoop commit race condition (#990)
b8037d2 is described below
commit b8037d24256cd7133d535cce0c8399acb138e30c
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Mon May 4 09:00:49 2020 -0700
Fix Hadoop commit race condition (#990)
---
.../iceberg/hadoop/HadoopTableOperations.java | 34 ++++++++++++++--------
1 file changed, 22 insertions(+), 12 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
index 8bad148..431fc89 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,11 +57,12 @@ public class HadoopTableOperations implements TableOperations {
private final Configuration conf;
private final Path location;
- private TableMetadata currentMetadata = null;
- private Integer version = null;
- private boolean shouldRefresh = true;
private HadoopFileIO defaultFileIo = null;
+ private volatile TableMetadata currentMetadata = null;
+ private volatile Integer version = null;
+ private volatile boolean shouldRefresh = true;
+
protected HadoopTableOperations(Path location, Configuration conf) {
this.conf = conf;
this.location = location;
@@ -74,6 +76,18 @@ public class HadoopTableOperations implements TableOperations {
return currentMetadata;
}
+ private synchronized Pair<Integer, TableMetadata> versionAndMetadata() {
+ return Pair.of(version, currentMetadata);
+ }
+
+ private synchronized void updateVersionAndMetadata(int newVersion, String metadataFile) {
+ // update if the current version is out of date
+ if (version == null || version != newVersion) {
+ this.version = newVersion;
+ this.currentMetadata = checkUUID(currentMetadata, TableMetadataParser.read(io(), metadataFile));
+ }
+ }
+
@Override
public TableMetadata refresh() {
int ver = version != null ? version : readVersionHint();
@@ -93,12 +107,7 @@ public class HadoopTableOperations implements TableOperations {
nextMetadataFile = getMetadataFile(ver + 1);
}
- // only load if the current version is out of date
- if (version == null || version != ver) {
- this.version = ver;
-
- this.currentMetadata = checkUUID(currentMetadata, TableMetadataParser.read(io(), metadataFile.toString()));
- }
+ updateVersionAndMetadata(ver, metadataFile.toString());
this.shouldRefresh = false;
return currentMetadata;
@@ -108,8 +117,9 @@ public class HadoopTableOperations implements TableOperations {
}
@Override
- public synchronized void commit(TableMetadata base, TableMetadata metadata) {
- if (base != current()) {
+ public void commit(TableMetadata base, TableMetadata metadata) {
+ Pair<Integer, TableMetadata> current = versionAndMetadata();
+ if (base != current.second()) {
throw new CommitFailedException("Cannot commit changes based on stale table metadata");
}
@@ -131,7 +141,7 @@ public class HadoopTableOperations implements TableOperations {
Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + fileExtension);
TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));
- int nextVersion = (version != null ? version : 0) + 1;
+ int nextVersion = (current.first() != null ? current.first() : 0) + 1;
Path finalMetadataFile = metadataFilePath(nextVersion, codec);
FileSystem fs = getFileSystem(tempMetadataFile, conf);