You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/04/09 19:14:41 UTC
[incubator-iceberg] branch master updated: Prepare metadata writers
for format v2 (#903)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c094da5 Prepare metadata writers for format v2 (#903)
c094da5 is described below
commit c094da57db072187b2d930ae6c5590218012ec3f
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Apr 9 12:14:34 2020 -0700
Prepare metadata writers for format v2 (#903)
* Track format version in TableMetadata.
* Pass formatVersion when creating a manifest writer.
* Pass formatVersion when creating a manifest list writer.
* Add v2 change section to spec.
---
.../org/apache/iceberg/BaseRewriteManifests.java | 12 +-
.../main/java/org/apache/iceberg/FastAppend.java | 8 +-
.../org/apache/iceberg/ManifestListWriter.java | 18 +-
.../java/org/apache/iceberg/ManifestWriter.java | 22 ++-
.../apache/iceberg/MergingSnapshotProducer.java | 15 +-
.../java/org/apache/iceberg/SnapshotProducer.java | 13 +-
.../java/org/apache/iceberg/TableMetadata.java | 68 ++++---
.../org/apache/iceberg/TableMetadataParser.java | 12 +-
.../org/apache/iceberg/TestFormatVersions.java | 65 ++++++
.../java/org/apache/iceberg/TestSnapshotJson.java | 3 +-
.../java/org/apache/iceberg/TestTableMetadata.java | 220 +++++++++++++--------
.../test/java/org/apache/iceberg/TestTables.java | 4 +-
site/docs/spec.md | 14 ++
.../apache/iceberg/spark/source/TestTables.java | 3 +-
14 files changed, 319 insertions(+), 158 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index 5a9469c..0eee04b 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -33,7 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -74,7 +73,6 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
private final Set<ManifestFile> rewrittenManifests = Sets.newConcurrentHashSet();
private final Map<Object, WriterWrapper> writers = Maps.newConcurrentMap();
- private final AtomicInteger manifestSuffix = new AtomicInteger(0);
private final AtomicLong entryCount = new AtomicLong(0);
private Function<DataFile, Object> clusterByFunc;
@@ -157,7 +155,7 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
private ManifestFile copyManifest(ManifestFile manifest) {
try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), specsById)) {
- OutputFile newFile = manifestPath(manifestSuffix.getAndIncrement());
+ OutputFile newFile = newManifestOutput();
return ManifestWriter.copyManifest(reader, newFile, snapshotId(), summaryBuilder, ALLOWED_ENTRY_STATUSES);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
@@ -336,18 +334,14 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
synchronized void addEntry(ManifestEntry entry) {
if (writer == null) {
- writer = newWriter();
+ writer = newManifestWriter(spec);
} else if (writer.length() >= getManifestTargetSizeBytes()) {
close();
- writer = newWriter();
+ writer = newManifestWriter(spec);
}
writer.existing(entry);
}
- private ManifestWriter newWriter() {
- return new ManifestWriter(spec, manifestPath(manifestSuffix.getAndIncrement()), snapshotId());
- }
-
synchronized void close() {
if (writer != null) {
try {
diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java
index 246e10f..3338831 100644
--- a/core/src/main/java/org/apache/iceberg/FastAppend.java
+++ b/core/src/main/java/org/apache/iceberg/FastAppend.java
@@ -26,7 +26,6 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.OutputFile;
@@ -48,7 +47,6 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private ManifestFile newManifest = null;
- private final AtomicInteger manifestCount = new AtomicInteger(0);
private boolean hasNewFiles = false;
FastAppend(TableOperations ops) {
@@ -110,7 +108,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private ManifestFile copyManifest(ManifestFile manifest) {
try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) {
- OutputFile newManifestPath = manifestPath(manifestCount.getAndIncrement());
+ OutputFile newManifestPath = newManifestOutput();
return ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), summaryBuilder);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
@@ -165,9 +163,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
}
if (newManifest == null && newFiles.size() > 0) {
- OutputFile out = manifestPath(manifestCount.getAndIncrement());
-
- ManifestWriter writer = new ManifestWriter(spec, out, snapshotId());
+ ManifestWriter writer = newManifestWriter(spec);
try {
writer.addAll(newFiles);
} finally {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
index 0271695..e81d496 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
@@ -28,10 +28,18 @@ import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
-class ManifestListWriter implements FileAppender<ManifestFile> {
+abstract class ManifestListWriter implements FileAppender<ManifestFile> {
+ static ManifestListWriter write(int formatVersion, OutputFile manifestListFile,
+ long snapshotId, Long parentSnapshotId) {
+ if (formatVersion == 1) {
+ return new V1Writer(manifestListFile, snapshotId, parentSnapshotId);
+ }
+ throw new UnsupportedOperationException("Cannot write manifest list for table version: " + formatVersion);
+ }
+
private final FileAppender<ManifestFile> writer;
- ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
+ private ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
this.writer = newAppender(snapshotFile, ImmutableMap.of(
"snapshot-id", String.valueOf(snapshotId),
"parent-snapshot-id", String.valueOf(parentSnapshotId)));
@@ -80,4 +88,10 @@ class ManifestListWriter implements FileAppender<ManifestFile> {
throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: " + file);
}
}
+
+ static class V1Writer extends ManifestListWriter {
+ private V1Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
+ super(snapshotFile, snapshotId, parentSnapshotId);
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index b7ed80d..d1a9fa1 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
/**
* Writer for manifest files.
*/
-public class ManifestWriter implements FileAppender<DataFile> {
+public abstract class ManifestWriter implements FileAppender<DataFile> {
private static final Logger LOG = LoggerFactory.getLogger(ManifestWriter.class);
static ManifestFile copyAppendManifest(ManifestReader reader, OutputFile outputFile, long snapshotId,
@@ -44,7 +44,7 @@ public class ManifestWriter implements FileAppender<DataFile> {
static ManifestFile copyManifest(ManifestReader reader, OutputFile outputFile, long snapshotId,
SnapshotSummary.Builder summaryBuilder,
Set<ManifestEntry.Status> allowedEntryStatuses) {
- ManifestWriter writer = new ManifestWriter(reader.spec(), outputFile, snapshotId);
+ ManifestWriter writer = new V1Writer(reader.spec(), outputFile, snapshotId);
boolean threw = true;
try {
for (ManifestEntry entry : reader.entries()) {
@@ -93,7 +93,15 @@ public class ManifestWriter implements FileAppender<DataFile> {
* @return a manifest writer
*/
public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) {
- return new ManifestWriter(spec, outputFile, null);
+ // always use a v1 writer for appended manifests because sequence number must be inherited
+ return write(1, spec, outputFile, null);
+ }
+
+ static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
+ if (formatVersion == 1) {
+ return new V1Writer(spec, outputFile, snapshotId);
+ }
+ throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion);
}
private final OutputFile file;
@@ -111,7 +119,7 @@ public class ManifestWriter implements FileAppender<DataFile> {
private int deletedFiles = 0;
private long deletedRows = 0L;
- ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) {
+ private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) {
this.file = file;
this.specId = spec.specId();
this.writer = newAppender(FileFormat.AVRO, spec, file);
@@ -229,4 +237,10 @@ public class ManifestWriter implements FileAppender<DataFile> {
throw new RuntimeIOException(e, "Failed to create manifest writer for path: " + file);
}
}
+
+ private static class V1Writer extends ManifestWriter {
+ V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
+ super(spec, file, snapshotId);
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index e08ae49..4537c23 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -33,7 +33,6 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
@@ -87,7 +86,6 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private final boolean snapshotIdInheritanceEnabled;
// update data
- private final AtomicInteger manifestCount = new AtomicInteger(0);
private final List<DataFile> newFiles = Lists.newArrayList();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
@@ -230,7 +228,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private ManifestFile copyManifest(ManifestFile manifest) {
try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) {
- OutputFile newManifestPath = manifestPath(manifestCount.getAndIncrement());
+ OutputFile newManifestPath = newManifestOutput();
return ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), appendedManifestsSummary);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
@@ -542,8 +540,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
// manifest. produce a copy of the manifest with all deleted files removed.
List<DataFile> deletedFiles = Lists.newArrayList();
Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
- OutputFile filteredCopy = manifestPath(manifestCount.getAndIncrement());
- ManifestWriter writer = new ManifestWriter(reader.spec(), filteredCopy, snapshotId());
+ ManifestWriter writer = newManifestWriter(reader.spec());
try {
reader.entries().forEach(entry -> {
DataFile file = entry.file();
@@ -655,9 +652,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
return mergeManifests.get(bin);
}
- OutputFile out = manifestPath(manifestCount.getAndIncrement());
-
- ManifestWriter writer = new ManifestWriter(ops.current().spec(specId), out, snapshotId());
+ ManifestWriter writer = newManifestWriter(ops.current().spec());
try {
for (ManifestFile manifest : bin) {
try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) {
@@ -697,9 +692,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
}
if (cachedNewManifest == null) {
- OutputFile out = manifestPath(manifestCount.getAndIncrement());
-
- ManifestWriter writer = new ManifestWriter(spec, out, snapshotId());
+ ManifestWriter writer = newManifestWriter(spec);
try {
writer.addAll(newFiles);
} finally {
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 4cf4600..0d3239f 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -75,6 +75,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
private final TableOperations ops;
private final String commitUUID = UUID.randomUUID().toString();
+ private final AtomicInteger manifestCount = new AtomicInteger(0);
private final AtomicInteger attempt = new AtomicInteger(0);
private final List<String> manifestLists = Lists.newArrayList();
private volatile Long snapshotId = null;
@@ -148,8 +149,8 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
if (base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
OutputFile manifestList = manifestListPath();
- try (ManifestListWriter writer = new ManifestListWriter(
- manifestList, snapshotId(), parentSnapshotId)) {
+ try (ManifestListWriter writer = ManifestListWriter.write(
+ ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId)) {
// keep track of the manifest lists created
manifestLists.add(manifestList.location());
@@ -310,9 +311,13 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
String.format("snap-%d-%d-%s", snapshotId(), attempt.incrementAndGet(), commitUUID))));
}
- protected OutputFile manifestPath(int manifestNumber) {
+ protected OutputFile newManifestOutput() {
return ops.io().newOutputFile(
- ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestNumber)));
+ ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement())));
+ }
+
+ protected ManifestWriter newManifestWriter(PartitionSpec spec) {
+ return ManifestWriter.write(ops.current().formatVersion(), spec, newManifestOutput(), snapshotId());
}
protected long snapshotId() {
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index e44e002..4d12cf5 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -43,17 +43,12 @@ import org.apache.iceberg.util.PropertyUtil;
* Metadata for a table.
*/
public class TableMetadata {
- static final int TABLE_FORMAT_VERSION = 1;
+ static final int DEFAULT_TABLE_FORMAT_VERSION = 1;
+ static final int SUPPORTED_TABLE_FORMAT_VERSION = 2;
static final int INITIAL_SPEC_ID = 0;
public static TableMetadata newTableMetadata(Schema schema,
PartitionSpec spec,
- String location) {
- return newTableMetadata(schema, spec, location, ImmutableMap.of());
- }
-
- public static TableMetadata newTableMetadata(Schema schema,
- PartitionSpec spec,
String location,
Map<String, String> properties) {
// reassign all column ids to ensure consistency
@@ -73,7 +68,7 @@ public class TableMetadata {
}
PartitionSpec freshSpec = specBuilder.build();
- return new TableMetadata(null, UUID.randomUUID().toString(), location,
+ return new TableMetadata(null, DEFAULT_TABLE_FORMAT_VERSION, UUID.randomUUID().toString(), location,
System.currentTimeMillis(),
lastColumnId.get(), freshSchema, INITIAL_SPEC_ID, ImmutableList.of(freshSpec),
ImmutableMap.copyOf(properties), -1, ImmutableList.of(),
@@ -170,6 +165,7 @@ public class TableMetadata {
private final InputFile file;
// stored metadata
+ private final int formatVersion;
private final String uuid;
private final String location;
private final long lastUpdatedMillis;
@@ -186,6 +182,7 @@ public class TableMetadata {
private final List<MetadataLogEntry> previousFiles;
TableMetadata(InputFile file,
+ int formatVersion,
String uuid,
String location,
long lastUpdatedMillis,
@@ -198,6 +195,13 @@ public class TableMetadata {
List<Snapshot> snapshots,
List<HistoryEntry> snapshotLog,
List<MetadataLogEntry> previousFiles) {
+ Preconditions.checkArgument(formatVersion <= SUPPORTED_TABLE_FORMAT_VERSION,
+ "Unsupported format version: v%s", formatVersion);
+ if (formatVersion > 1) {
+ Preconditions.checkArgument(uuid != null, "UUID is required in format v%s", formatVersion);
+ }
+
+ this.formatVersion = formatVersion;
this.file = file;
this.uuid = uuid;
this.location = location;
@@ -240,6 +244,10 @@ public class TableMetadata {
"Invalid table metadata: Cannot find current version");
}
+ public int formatVersion() {
+ return formatVersion;
+ }
+
public InputFile file() {
return file;
}
@@ -328,24 +336,18 @@ public class TableMetadata {
if (uuid != null) {
return this;
} else {
- return new TableMetadata(null, UUID.randomUUID().toString(), location,
+ return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location,
lastUpdatedMillis, lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
}
- public TableMetadata updateTableLocation(String newLocation) {
- return new TableMetadata(null, uuid, newLocation,
- System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
- currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
- }
-
public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) {
PartitionSpec.checkCompatibility(spec(), newSchema);
// rebuild all of the partition specs for the new current schema
List<PartitionSpec> updatedSpecs = Lists.transform(specs,
spec -> updateSpecSchema(newSchema, spec));
- return new TableMetadata(null, uuid, location,
+ return new TableMetadata(null, formatVersion, uuid, location,
System.currentTimeMillis(), newLastColumnId, newSchema, defaultSpecId, updatedSpecs, properties,
currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
@@ -374,7 +376,7 @@ public class TableMetadata {
builder.add(freshSpec(newDefaultSpecId, schema, newPartitionSpec));
}
- return new TableMetadata(null, uuid, location,
+ return new TableMetadata(null, formatVersion, uuid, location,
System.currentTimeMillis(), lastColumnId, schema, newDefaultSpecId,
builder.build(), properties,
currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
@@ -385,7 +387,7 @@ public class TableMetadata {
.addAll(snapshots)
.add(snapshot)
.build();
- return new TableMetadata(null, uuid, location,
+ return new TableMetadata(null, formatVersion, uuid, location,
snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, newSnapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
@@ -404,7 +406,7 @@ public class TableMetadata {
.addAll(snapshotLog)
.add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
.build();
- return new TableMetadata(null, uuid, location,
+ return new TableMetadata(null, formatVersion, uuid, location,
snapshot.timestampMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
snapshot.snapshotId(), newSnapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
@@ -435,7 +437,7 @@ public class TableMetadata {
}
}
- return new TableMetadata(null, uuid, location,
+ return new TableMetadata(null, formatVersion, uuid, location,
System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, filtered, ImmutableList.copyOf(newSnapshotLog),
addPreviousFile(file, lastUpdatedMillis));
@@ -456,14 +458,14 @@ public class TableMetadata {
.add(new SnapshotLogEntry(nowMillis, snapshot.snapshotId()))
.build();
- return new TableMetadata(null, uuid, location,
+ return new TableMetadata(null, formatVersion, uuid, location,
nowMillis, lastColumnId, schema, defaultSpecId, specs, properties,
snapshot.snapshotId(), snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
public TableMetadata replaceProperties(Map<String, String> newProperties) {
ValidationException.check(newProperties != null, "Cannot set properties to null");
- return new TableMetadata(null, uuid, location,
+ return new TableMetadata(null, formatVersion, uuid, location,
System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, newProperties,
currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis, newProperties));
}
@@ -480,7 +482,7 @@ public class TableMetadata {
ValidationException.check(currentSnapshotId < 0 || // not set
Iterables.getLast(newSnapshotLog).snapshotId() == currentSnapshotId,
"Cannot set invalid snapshot log: latest entry is not the current snapshot");
- return new TableMetadata(null, uuid, location,
+ return new TableMetadata(null, formatVersion, uuid, location,
System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
@@ -519,14 +521,30 @@ public class TableMetadata {
newProperties.putAll(this.properties);
newProperties.putAll(updatedProperties);
- return new TableMetadata(null, uuid, location,
+ return new TableMetadata(null, formatVersion, uuid, location,
System.currentTimeMillis(), nextLastColumnId.get(), freshSchema,
specId, builder.build(), ImmutableMap.copyOf(newProperties),
-1, snapshots, ImmutableList.of(), addPreviousFile(file, lastUpdatedMillis, newProperties));
}
public TableMetadata updateLocation(String newLocation) {
- return new TableMetadata(null, uuid, newLocation,
+ return new TableMetadata(null, formatVersion, uuid, newLocation,
+ System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+ currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
+ }
+
+ public TableMetadata upgradeToFormatVersion(int newFormatVersion) {
+ Preconditions.checkArgument(newFormatVersion <= SUPPORTED_TABLE_FORMAT_VERSION,
+ "Cannot upgrade table to unsupported format version: v%s (supported: v%s)",
+ newFormatVersion, SUPPORTED_TABLE_FORMAT_VERSION);
+ Preconditions.checkArgument(newFormatVersion >= formatVersion,
+ "Cannot downgrade v%s table to v%s", formatVersion, newFormatVersion);
+
+ if (newFormatVersion == formatVersion) {
+ return this;
+ }
+
+ return new TableMetadata(null, newFormatVersion, uuid, location,
System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
currentSnapshotId, snapshots, snapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index fd68338..6fca73b 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -151,7 +151,7 @@ public class TableMetadataParser {
private static void toJson(TableMetadata metadata, JsonGenerator generator) throws IOException {
generator.writeStartObject();
- generator.writeNumberField(FORMAT_VERSION, TableMetadata.TABLE_FORMAT_VERSION);
+ generator.writeNumberField(FORMAT_VERSION, metadata.formatVersion());
generator.writeStringField(TABLE_UUID, metadata.uuid());
generator.writeStringField(LOCATION, metadata.location());
generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis());
@@ -161,8 +161,10 @@ public class TableMetadataParser {
SchemaParser.toJson(metadata.schema(), generator);
// for older readers, continue writing the default spec as "partition-spec"
- generator.writeFieldName(PARTITION_SPEC);
- PartitionSpecParser.toJsonFields(metadata.spec(), generator);
+ if (metadata.formatVersion() == 1) {
+ generator.writeFieldName(PARTITION_SPEC);
+ PartitionSpecParser.toJsonFields(metadata.spec(), generator);
+ }
// write the default spec ID and spec list
generator.writeNumberField(DEFAULT_SPEC_ID, metadata.defaultSpecId());
@@ -226,7 +228,7 @@ public class TableMetadataParser {
"Cannot parse metadata from a non-object: %s", node);
int formatVersion = JsonUtil.getInt(FORMAT_VERSION, node);
- Preconditions.checkArgument(formatVersion == TableMetadata.TABLE_FORMAT_VERSION,
+ Preconditions.checkArgument(formatVersion <= TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION,
"Cannot read unsupported version %s", formatVersion);
String uuid = JsonUtil.getStringOrNull(TABLE_UUID, node);
@@ -295,7 +297,7 @@ public class TableMetadataParser {
}
}
- return new TableMetadata(file, uuid, location,
+ return new TableMetadata(file, formatVersion, uuid, location,
lastUpdatedMillis, lastAssignedColumnId, schema, defaultSpecId, specs, properties,
currentVersionId, snapshots, ImmutableList.copyOf(entries.iterator()),
ImmutableList.copyOf(metadataEntries.iterator()));
diff --git a/core/src/test/java/org/apache/iceberg/TestFormatVersions.java b/core/src/test/java/org/apache/iceberg/TestFormatVersions.java
new file mode 100644
index 0000000..f58d70b
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestFormatVersions.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFormatVersions extends TableTestBase {
+ @Test
+ public void testDefaultFormatVersion() {
+ Assert.assertEquals("Should default to v1", 1, table.ops().current().formatVersion());
+ }
+
+ @Test
+ public void testFormatVersionUpgrade() {
+ TableOperations ops = table.ops();
+ TableMetadata base = ops.current();
+ ops.commit(base, base.upgradeToFormatVersion(2));
+
+ Assert.assertEquals("Should report v2", 2, ops.current().formatVersion());
+ }
+
+ @Test
+ public void testFormatVersionDowngrade() {
+ TableOperations ops = table.ops();
+ TableMetadata base = ops.current();
+ ops.commit(base, base.upgradeToFormatVersion(2));
+
+ Assert.assertEquals("Should report v2", 2, ops.current().formatVersion());
+
+ AssertHelpers.assertThrows("Should reject a version downgrade",
+ IllegalArgumentException.class, "Cannot downgrade",
+ () -> ops.current().upgradeToFormatVersion(1));
+
+ Assert.assertEquals("Should report v2", 2, ops.current().formatVersion());
+ }
+
+ @Test
+ public void testFormatVersionUpgradeNotSupported() {
+ TableOperations ops = table.ops();
+ TableMetadata base = ops.current();
+ AssertHelpers.assertThrows("Should reject an unsupported version upgrade",
+ IllegalArgumentException.class, "Cannot upgrade table to unsupported format version",
+ () -> ops.commit(base, base.upgradeToFormatVersion(TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION + 1)));
+
+ Assert.assertEquals("Should report v1", 1, ops.current().formatVersion());
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
index 16ad4ee..5e55d66 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
@@ -95,8 +95,7 @@ public class TestSnapshotJson {
Assert.assertTrue(manifestList.delete());
manifestList.deleteOnExit();
- try (ManifestListWriter writer = new ManifestListWriter(
- Files.localOutput(manifestList), id, parentId)) {
+ try (ManifestListWriter writer = ManifestListWriter.write(1, Files.localOutput(manifestList), id, parentId)) {
writer.addAll(manifests);
}
diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
index c2603f2..a77530b 100644
--- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
@@ -55,8 +55,21 @@ import static org.apache.iceberg.TableMetadataParser.PARTITION_SPEC;
import static org.apache.iceberg.TableMetadataParser.PROPERTIES;
import static org.apache.iceberg.TableMetadataParser.SCHEMA;
import static org.apache.iceberg.TableMetadataParser.SNAPSHOTS;
+import static org.apache.iceberg.TableMetadataParser.TABLE_UUID;
public class TestTableMetadata {
+ private static final String TEST_LOCATION = "s3://bucket/test/location";
+
+ private static final Schema TEST_SCHEMA = new Schema(
+ Types.NestedField.required(1, "x", Types.LongType.get()),
+ Types.NestedField.required(2, "y", Types.LongType.get(), "comment"),
+ Types.NestedField.required(3, "z", Types.LongType.get())
+ );
+
+ private static final int LAST_ASSIGNED_COLUMN_ID = 3;
+
+ private static final PartitionSpec SPEC_5 = PartitionSpec.builderFor(TEST_SCHEMA).withSpecId(5).build();
+
@Rule
public TemporaryFolder temp = new TemporaryFolder();
@@ -64,30 +77,22 @@ public class TestTableMetadata {
@Test
public void testJsonConversion() throws Exception {
- Schema schema = new Schema(
- Types.NestedField.required(1, "x", Types.LongType.get()),
- Types.NestedField.required(2, "y", Types.LongType.get(), "comment"),
- Types.NestedField.required(3, "z", Types.LongType.get())
- );
-
- PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
-
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
- new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
+ new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
- new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
+ new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
List<HistoryEntry> snapshotLog = ImmutableList.<HistoryEntry>builder()
.add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshot.snapshotId()))
.add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshot.snapshotId()))
.build();
- TableMetadata expected = new TableMetadata(null, UUID.randomUUID().toString(), "s3://bucket/test/location",
- System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
+ TableMetadata expected = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION,
+ System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog, ImmutableList.of());
@@ -95,6 +100,8 @@ public class TestTableMetadata {
TableMetadata metadata = TableMetadataParser.fromJson(ops.io(), null,
JsonUtil.mapper().readValue(asJson, JsonNode.class));
+ Assert.assertEquals("Format version should match",
+ expected.formatVersion(), metadata.formatVersion());
Assert.assertEquals("Table UUID should match",
expected.uuid(), metadata.uuid());
Assert.assertEquals("Table location should match",
@@ -128,27 +135,19 @@ public class TestTableMetadata {
@Test
public void testFromJsonSortsSnapshotLog() throws Exception {
- Schema schema = new Schema(
- Types.NestedField.required(1, "x", Types.LongType.get()),
- Types.NestedField.required(2, "y", Types.LongType.get()),
- Types.NestedField.required(3, "z", Types.LongType.get())
- );
-
- PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
-
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
- new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
+ new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
- new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
+ new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
- TableMetadata expected = new TableMetadata(null, UUID.randomUUID().toString(), "s3://bucket/test/location",
- System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
+ TableMetadata expected = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION,
+ System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog, ImmutableList.of());
@@ -173,13 +172,7 @@ public class TestTableMetadata {
@Test
public void testBackwardCompat() throws Exception {
- Schema schema = new Schema(
- Types.NestedField.required(1, "x", Types.LongType.get()),
- Types.NestedField.required(2, "y", Types.LongType.get()),
- Types.NestedField.required(3, "z", Types.LongType.get())
- );
-
- PartitionSpec spec = PartitionSpec.builderFor(schema).identity("x").withSpecId(6).build();
+ PartitionSpec spec = PartitionSpec.builderFor(TEST_SCHEMA).identity("x").withSpecId(6).build();
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
@@ -190,8 +183,8 @@ public class TestTableMetadata {
ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
- TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
- System.currentTimeMillis(), 3, schema, 6, ImmutableList.of(spec),
+ TableMetadata expected = new TableMetadata(null, 1, null, TEST_LOCATION,
+ System.currentTimeMillis(), 3, TEST_SCHEMA, 6, ImmutableList.of(spec),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of(), ImmutableList.of());
@@ -199,6 +192,8 @@ public class TestTableMetadata {
TableMetadata metadata = TableMetadataParser
.fromJson(ops.io(), null, JsonUtil.mapper().readValue(asJson, JsonNode.class));
+ Assert.assertEquals("Format version should match",
+ expected.formatVersion(), metadata.formatVersion());
Assert.assertNull("Table UUID should not be assigned", metadata.uuid());
Assert.assertEquals("Table location should match",
expected.location(), metadata.location());
@@ -242,7 +237,7 @@ public class TestTableMetadata {
generator.writeStartObject(); // start table metadata object
- generator.writeNumberField(FORMAT_VERSION, TableMetadata.TABLE_FORMAT_VERSION);
+ generator.writeNumberField(FORMAT_VERSION, 1);
generator.writeStringField(LOCATION, metadata.location());
generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis());
generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId());
@@ -281,22 +276,14 @@ public class TestTableMetadata {
@Test
public void testJsonWithPreviousMetadataLog() throws Exception {
- Schema schema = new Schema(
- Types.NestedField.required(1, "x", Types.LongType.get()),
- Types.NestedField.required(2, "y", Types.LongType.get()),
- Types.NestedField.required(3, "z", Types.LongType.get())
- );
-
- PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
-
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
- new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
+ new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
- new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
+ new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
long currentTimestamp = System.currentTimeMillis();
@@ -304,8 +291,8 @@ public class TestTableMetadata {
previousMetadataLog.add(new MetadataLogEntry(currentTimestamp,
"/tmp/000001-" + UUID.randomUUID().toString() + ".metadata.json"));
- TableMetadata base = new TableMetadata(null, UUID.randomUUID().toString(), "s3://bucket/test/location",
- System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
+ TableMetadata base = new TableMetadata(null, 1, UUID.randomUUID().toString(), TEST_LOCATION,
+ System.currentTimeMillis(), 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
ImmutableList.copyOf(previousMetadataLog));
@@ -318,23 +305,15 @@ public class TestTableMetadata {
}
@Test
- public void testAddPreviousMetadataRemoveNone() throws Exception {
- Schema schema = new Schema(
- Types.NestedField.required(1, "x", Types.LongType.get()),
- Types.NestedField.required(2, "y", Types.LongType.get()),
- Types.NestedField.required(3, "z", Types.LongType.get())
- );
-
- PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
-
+ public void testAddPreviousMetadataRemoveNone() {
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
- new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
+ new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
- new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
+ new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
long currentTimestamp = System.currentTimeMillis();
@@ -347,8 +326,8 @@ public class TestTableMetadata {
MetadataLogEntry latestPreviousMetadata = new MetadataLogEntry(currentTimestamp - 80,
"/tmp/000003-" + UUID.randomUUID().toString() + ".metadata.json");
- TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), UUID.randomUUID().toString(),
- "s3://bucket/test/location", currentTimestamp - 80, 3, schema, 5, ImmutableList.of(spec),
+ TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(),
+ TEST_LOCATION, currentTimestamp - 80, 3, TEST_SCHEMA, 5, ImmutableList.of(SPEC_5),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
ImmutableList.copyOf(previousMetadataLog));
@@ -365,23 +344,15 @@ public class TestTableMetadata {
}
@Test
- public void testAddPreviousMetadataRemoveOne() throws Exception {
- Schema schema = new Schema(
- Types.NestedField.required(1, "x", Types.LongType.get()),
- Types.NestedField.required(2, "y", Types.LongType.get()),
- Types.NestedField.required(3, "z", Types.LongType.get())
- );
-
- PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
-
+ public void testAddPreviousMetadataRemoveOne() {
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
- new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
+ new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
- new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
+ new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
long currentTimestamp = System.currentTimeMillis();
@@ -400,9 +371,9 @@ public class TestTableMetadata {
MetadataLogEntry latestPreviousMetadata = new MetadataLogEntry(currentTimestamp - 50,
"/tmp/000006-" + UUID.randomUUID().toString() + ".metadata.json");
- TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), UUID.randomUUID().toString(),
- "s3://bucket/test/location", currentTimestamp - 50, 3, schema, 5,
- ImmutableList.of(spec), ImmutableMap.of("property", "value"), currentSnapshotId,
+ TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(),
+ TEST_LOCATION, currentTimestamp - 50, 3, TEST_SCHEMA, 5,
+ ImmutableList.of(SPEC_5), ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
ImmutableList.copyOf(previousMetadataLog));
@@ -423,23 +394,15 @@ public class TestTableMetadata {
}
@Test
- public void testAddPreviousMetadataRemoveMultiple() throws Exception {
- Schema schema = new Schema(
- Types.NestedField.required(1, "x", Types.LongType.get()),
- Types.NestedField.required(2, "y", Types.LongType.get()),
- Types.NestedField.required(3, "z", Types.LongType.get())
- );
-
- PartitionSpec spec = PartitionSpec.builderFor(schema).withSpecId(5).build();
-
+ public void testAddPreviousMetadataRemoveMultiple() {
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
- new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
+ new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId())));
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
- new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
+ new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId())));
List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
long currentTimestamp = System.currentTimeMillis();
@@ -458,9 +421,9 @@ public class TestTableMetadata {
MetadataLogEntry latestPreviousMetadata = new MetadataLogEntry(currentTimestamp - 50,
"/tmp/000006-" + UUID.randomUUID().toString() + ".metadata.json");
- TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), UUID.randomUUID().toString(),
- "s3://bucket/test/location", currentTimestamp - 50, 3, schema, 2,
- ImmutableList.of(spec), ImmutableMap.of("property", "value"), currentSnapshotId,
+ TableMetadata base = new TableMetadata(localInput(latestPreviousMetadata.file()), 1, UUID.randomUUID().toString(),
+ TEST_LOCATION, currentTimestamp - 50, 3, TEST_SCHEMA, 2,
+ ImmutableList.of(SPEC_5), ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog,
ImmutableList.copyOf(previousMetadataLog));
@@ -480,4 +443,87 @@ public class TestTableMetadata {
ImmutableList.copyOf(removedPreviousMetadata));
}
+ @Test
+ public void testV2UUIDValidation() {
+ AssertHelpers.assertThrows("Should reject v2 metadata without a UUID",
+ IllegalArgumentException.class, "UUID is required in format v2",
+ () -> new TableMetadata(null, 2, null, TEST_LOCATION, System.currentTimeMillis(), LAST_ASSIGNED_COLUMN_ID,
+ TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5), ImmutableMap.of(), -1L, ImmutableList.of(),
+ ImmutableList.of(), ImmutableList.of())
+ );
+ }
+
+ @Test
+ public void testVersionValidation() {
+ int unsupportedVersion = TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION + 1;
+ AssertHelpers.assertThrows("Should reject unsupported metadata",
+ IllegalArgumentException.class, "Unsupported format version: v" + unsupportedVersion,
+ () -> new TableMetadata(null, unsupportedVersion, null, TEST_LOCATION, System.currentTimeMillis(),
+ LAST_ASSIGNED_COLUMN_ID, TEST_SCHEMA, SPEC_5.specId(), ImmutableList.of(SPEC_5), ImmutableMap.of(), -1L,
+ ImmutableList.of(), ImmutableList.of(), ImmutableList.of())
+ );
+ }
+
+ @Test
+ public void testParserVersionValidation() throws Exception {
+ String supportedVersion = toJsonWithVersion(
+ TableMetadata.newTableMetadata(TEST_SCHEMA, SPEC_5, TEST_LOCATION, ImmutableMap.of()),
+ TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION);
+ TableMetadata parsed = TableMetadataParser.fromJson(
+ ops.io(), null, JsonUtil.mapper().readValue(supportedVersion, JsonNode.class));
+ Assert.assertNotNull("Should successfully read supported metadata version", parsed);
+
+ String unsupportedVersion = toJsonWithVersion(
+ TableMetadata.newTableMetadata(TEST_SCHEMA, SPEC_5, TEST_LOCATION, ImmutableMap.of()),
+ TableMetadata.SUPPORTED_TABLE_FORMAT_VERSION + 1);
+ AssertHelpers.assertThrows("Should not read unsupported metadata",
+ IllegalArgumentException.class, "Cannot read unsupported version",
+ () -> TableMetadataParser.fromJson(
+ ops.io(), null, JsonUtil.mapper().readValue(unsupportedVersion, JsonNode.class)));
+ }
+
+ public static String toJsonWithVersion(TableMetadata metadata, int version) {
+ StringWriter writer = new StringWriter();
+ try {
+ JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+
+ generator.writeStartObject(); // start table metadata object
+
+ generator.writeNumberField(FORMAT_VERSION, version);
+ generator.writeStringField(TABLE_UUID, metadata.uuid());
+ generator.writeStringField(LOCATION, metadata.location());
+ generator.writeNumberField(LAST_UPDATED_MILLIS, metadata.lastUpdatedMillis());
+ generator.writeNumberField(LAST_COLUMN_ID, metadata.lastColumnId());
+
+ generator.writeFieldName(SCHEMA);
+ SchemaParser.toJson(metadata.schema(), generator);
+
+ // mimic an old writer by writing only partition-spec and not the default ID or spec list
+ generator.writeFieldName(PARTITION_SPEC);
+ PartitionSpecParser.toJsonFields(metadata.spec(), generator);
+
+ generator.writeObjectFieldStart(PROPERTIES);
+ for (Map.Entry<String, String> keyValue : metadata.properties().entrySet()) {
+ generator.writeStringField(keyValue.getKey(), keyValue.getValue());
+ }
+ generator.writeEndObject();
+
+ generator.writeNumberField(CURRENT_SNAPSHOT_ID,
+ metadata.currentSnapshot() != null ? metadata.currentSnapshot().snapshotId() : -1);
+
+ generator.writeArrayFieldStart(SNAPSHOTS);
+ for (Snapshot snapshot : metadata.snapshots()) {
+ SnapshotParser.toJson(snapshot, generator);
+ }
+ generator.writeEndArray();
+ // skip the snapshot log
+
+ generator.writeEndObject(); // end table metadata object
+
+ generator.flush();
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to write json for: %s", metadata);
+ }
+ return writer.toString();
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java
index 20d2f3e..41974e3 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -43,7 +43,7 @@ public class TestTables {
if (ops.current() != null) {
throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp);
}
- ops.commit(null, TableMetadata.newTableMetadata(schema, spec, temp.toString()));
+ ops.commit(null, TableMetadata.newTableMetadata(schema, spec, temp.toString(), ImmutableMap.of()));
return new TestTable(ops, name);
}
@@ -53,7 +53,7 @@ public class TestTables {
throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp);
}
- TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, temp.toString());
+ TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, temp.toString(), ImmutableMap.of());
return Transactions.createTableTransaction(ops, metadata);
}
diff --git a/site/docs/spec.md b/site/docs/spec.md
index 921c100..332e247 100644
--- a/site/docs/spec.md
+++ b/site/docs/spec.md
@@ -677,3 +677,17 @@ This serialization scheme is for storing single values as individual binary valu
| **`map`** | Not supported |
+## Format version changes
+
+### Version 2
+
+Writing metadata:
+* Table metadata field `sequence-number` is required.
+* Table metadata field `table-uuid` is required.
+* Table metadata field `partition-specs` is required.
+* Table metadata field `default-spec-id` is required.
+* Table metadata field `partition-spec` is no longer required and may be omitted.
+* Snapshot field `manifest-list` is required.
+* Snapshot field `manifests` is not allowed.
+
+Note that these requirements apply when writing data to a v2 table. Tables that are upgraded from v1 may contain metadata that does not follow these requirements. Implementations should remain backward-compatible with v1 metadata requirements.
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java
index 879aa16..b9d6c9c 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.spark.source;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.File;
import java.util.Map;
@@ -49,7 +50,7 @@ class TestTables {
if (ops.current() != null) {
throw new AlreadyExistsException("Table %s already exists at location: %s", name, temp);
}
- ops.commit(null, TableMetadata.newTableMetadata(schema, spec, temp.toString()));
+ ops.commit(null, TableMetadata.newTableMetadata(schema, spec, temp.toString(), ImmutableMap.of()));
return new TestTable(ops, name);
}