You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/03/06 00:43:43 UTC
[incubator-iceberg] branch master updated: Add snapshot summary and
operation. (#74)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new e8c5e18 Add snapshot summary and operation. (#74)
e8c5e18 is described below
commit e8c5e1812f44fecfa1f8706f942a240a1b42abe5
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Tue Mar 5 16:43:39 2019 -0800
Add snapshot summary and operation. (#74)
---
.../java/com/netflix/iceberg/DataOperations.java | 60 +++++++++++++
.../main/java/com/netflix/iceberg/Snapshot.java | 16 ++++
.../java/com/netflix/iceberg/BaseSnapshot.java | 25 +++++-
.../main/java/com/netflix/iceberg/FastAppend.java | 13 +++
.../main/java/com/netflix/iceberg/MergeAppend.java | 5 ++
.../com/netflix/iceberg/MergingSnapshotUpdate.java | 34 ++++++--
.../java/com/netflix/iceberg/OverwriteData.java | 5 ++
.../java/com/netflix/iceberg/ReplaceFiles.java | 5 ++
.../iceberg/ReplacePartitionsOperation.java | 5 ++
.../java/com/netflix/iceberg/SnapshotParser.java | 47 ++++++++++-
.../java/com/netflix/iceberg/SnapshotSummary.java | 98 ++++++++++++++++++++++
.../java/com/netflix/iceberg/SnapshotUpdate.java | 89 +++++++++++++++++++-
.../java/com/netflix/iceberg/StreamingDelete.java | 5 ++
.../java/com/netflix/iceberg/TestSnapshotJson.java | 40 ++++++++-
.../com/netflix/iceberg/TestTableMetadataJson.java | 13 +--
.../com/netflix/iceberg/avro/AvroTestHelpers.java | 8 --
16 files changed, 441 insertions(+), 27 deletions(-)
diff --git a/api/src/main/java/com/netflix/iceberg/DataOperations.java b/api/src/main/java/com/netflix/iceberg/DataOperations.java
new file mode 100644
index 0000000..c273e66
--- /dev/null
+++ b/api/src/main/java/com/netflix/iceberg/DataOperations.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+/**
+ * Data operations that produce snapshots.
+ * <p>
+ * A snapshot can return the operation that created the snapshot to help other components ignore
+ * snapshots that are not needed for some tasks. For example, snapshot expiration does not need to
+ * clean up deleted files for appends, which have no deleted files.
+ */
+public class DataOperations {
+ private DataOperations() {
+ }
+
+ /**
+ * New data is appended to the table and no data is removed or deleted.
+ * <p>
+ * This operation is implemented by {@link AppendFiles}.
+ */
+ public static final String APPEND = "append";
+
+ /**
+ * Files are removed and replaced, without changing the data in the table.
+ * <p>
+ * This operation is implemented by {@link RewriteFiles}.
+ */
+ public static final String REPLACE = "replace";
+
+ /**
+ * New data is added to overwrite existing data.
+ * <p>
+ * This operation is implemented by {@link OverwriteFiles} and {@link ReplacePartitions}.
+ */
+ public static final String OVERWRITE = "overwrite";
+
+ /**
+ * Data is deleted from the table and no data is added.
+ * <p>
+ * This operation is implemented by {@link DeleteFiles}.
+ */
+ public static final String DELETE = "delete";
+}
diff --git a/api/src/main/java/com/netflix/iceberg/Snapshot.java b/api/src/main/java/com/netflix/iceberg/Snapshot.java
index 89542dc..c2071d6 100644
--- a/api/src/main/java/com/netflix/iceberg/Snapshot.java
+++ b/api/src/main/java/com/netflix/iceberg/Snapshot.java
@@ -20,6 +20,7 @@
package com.netflix.iceberg;
import java.util.List;
+import java.util.Map;
/**
* A snapshot of the data in a table at a point in time.
@@ -63,6 +64,21 @@ public interface Snapshot {
List<ManifestFile> manifests();
/**
+ * Return the name of the {@link DataOperations data operation} that produced this snapshot.
+ *
+ * @return the operation that produced this snapshot, or null if the operation is unknown
+ * @see DataOperations
+ */
+ String operation();
+
+ /**
+ * Return a string map of summary data for the operation that produced this snapshot.
+ *
+ * @return a string map of summary data.
+ */
+ Map<String, String> summary();
+
+ /**
* Return all files added to the table in this snapshot.
* <p>
* The files returned include the following columns: file_path, file_format, partition,
diff --git a/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java b/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
index 554f24f..8289ea2 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
@@ -29,6 +29,7 @@ import com.netflix.iceberg.io.InputFile;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
class BaseSnapshot implements Snapshot {
private final TableOperations ops;
@@ -36,6 +37,8 @@ class BaseSnapshot implements Snapshot {
private final Long parentId;
private final long timestampMillis;
private final InputFile manifestList;
+ private final String operation;
+ private final Map<String, String> summary;
// lazily initialized
private List<ManifestFile> manifests = null;
@@ -48,7 +51,7 @@ class BaseSnapshot implements Snapshot {
BaseSnapshot(TableOperations ops,
long snapshotId,
String... manifestFiles) {
- this(ops, snapshotId, null, System.currentTimeMillis(),
+ this(ops, snapshotId, null, System.currentTimeMillis(), null, null,
Lists.transform(Arrays.asList(manifestFiles),
path -> new GenericManifestFile(ops.io().newInputFile(path), 0)));
}
@@ -57,11 +60,15 @@ class BaseSnapshot implements Snapshot {
long snapshotId,
Long parentId,
long timestampMillis,
+ String operation,
+ Map<String, String> summary,
InputFile manifestList) {
this.ops = ops;
this.snapshotId = snapshotId;
this.parentId = parentId;
this.timestampMillis = timestampMillis;
+ this.operation = operation;
+ this.summary = summary;
this.manifestList = manifestList;
}
@@ -69,8 +76,10 @@ class BaseSnapshot implements Snapshot {
long snapshotId,
Long parentId,
long timestampMillis,
+ String operation,
+ Map<String, String> summary,
List<ManifestFile> manifests) {
- this(ops, snapshotId, parentId, timestampMillis, (InputFile) null);
+ this(ops, snapshotId, parentId, timestampMillis, operation, summary, (InputFile) null);
this.manifests = manifests;
}
@@ -90,6 +99,16 @@ class BaseSnapshot implements Snapshot {
}
@Override
+ public String operation() {
+ return operation;
+ }
+
+ @Override
+ public Map<String, String> summary() {
+ return summary;
+ }
+
+ @Override
public List<ManifestFile> manifests() {
if (manifests == null) {
// if manifests isn't set, then the snapshotFile is set and should be read to get the list
@@ -164,6 +183,8 @@ class BaseSnapshot implements Snapshot {
return Objects.toStringHelper(this)
.add("id", snapshotId)
.add("timestamp_ms", timestampMillis)
+ .add("operation", operation)
+ .add("summary", summary)
.add("manifests", manifests())
.toString();
}
diff --git a/core/src/main/java/com/netflix/iceberg/FastAppend.java b/core/src/main/java/com/netflix/iceberg/FastAppend.java
index 278f059..63bfcfc 100644
--- a/core/src/main/java/com/netflix/iceberg/FastAppend.java
+++ b/core/src/main/java/com/netflix/iceberg/FastAppend.java
@@ -25,6 +25,7 @@ import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.io.OutputFile;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.Set;
/**
@@ -34,6 +35,7 @@ import java.util.Set;
*/
class FastAppend extends SnapshotUpdate implements AppendFiles {
private final PartitionSpec spec;
+ private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final List<DataFile> newFiles = Lists.newArrayList();
private ManifestFile newManifest = null;
private boolean hasNewFiles = false;
@@ -44,9 +46,20 @@ class FastAppend extends SnapshotUpdate implements AppendFiles {
}
@Override
+ protected String operation() {
+ return DataOperations.APPEND;
+ }
+
+ @Override
+ protected Map<String, String> summary() {
+ return summaryBuilder.build();
+ }
+
+ @Override
public FastAppend appendFile(DataFile file) {
this.hasNewFiles = true;
newFiles.add(file);
+ summaryBuilder.addedFile(spec, file);
return this;
}
diff --git a/core/src/main/java/com/netflix/iceberg/MergeAppend.java b/core/src/main/java/com/netflix/iceberg/MergeAppend.java
index 6f67c56..8268afa 100644
--- a/core/src/main/java/com/netflix/iceberg/MergeAppend.java
+++ b/core/src/main/java/com/netflix/iceberg/MergeAppend.java
@@ -32,6 +32,11 @@ class MergeAppend extends MergingSnapshotUpdate implements AppendFiles {
}
@Override
+ protected String operation() {
+ return DataOperations.APPEND;
+ }
+
+ @Override
public MergeAppend appendFile(DataFile file) {
add(file);
return this;
diff --git a/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java b/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
index 156f9ea..eb37466 100644
--- a/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
+++ b/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
@@ -21,6 +21,7 @@ package com.netflix.iceberg;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -78,6 +79,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
private final PartitionSpec spec;
private final long manifestTargetSizeBytes;
private final int minManifestsCountToMerge;
+ private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
// update data
private final AtomicInteger manifestCount = new AtomicInteger(0);
@@ -99,7 +101,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
// tracking where files were deleted to validate retries quickly
- private final Map<ManifestFile, Set<CharSequenceWrapper>> filteredManifestToDeletedFiles =
+ private final Map<ManifestFile, Iterable<DataFile>> filteredManifestToDeletedFiles =
Maps.newConcurrentMap();
private boolean filterUpdated = false; // used to clear caches of filtered and merged manifests
@@ -124,7 +126,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
}
protected List<DataFile> addedFiles() {
- return newFiles;
+ return ImmutableList.copyOf(newFiles);
}
protected void failAnyDelete() {
@@ -172,7 +174,14 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
}
@Override
+ protected Map<String, String> summary() {
+ return summaryBuilder.build();
+ }
+
+ @Override
public List<ManifestFile> apply(TableMetadata base) {
+ summaryBuilder.clear();
+
if (filterUpdated) {
cleanUncommittedFilters(SnapshotUpdate.EMPTY_SET);
this.filterUpdated = false;
@@ -188,6 +197,11 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
// add the current spec as the first group. files are added to the beginning.
try {
if (newFiles.size() > 0) {
+ // add all of the new files to the summary builder
+ for (DataFile file : newFiles) {
+ summaryBuilder.addedFile(spec, file);
+ }
+
ManifestFile newManifest = newFilesAsManifest();
List<ManifestFile> manifestGroup = Lists.newArrayList();
manifestGroup.add(newManifest);
@@ -212,9 +226,13 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
}, IOException.class);
for (ManifestFile manifest : filtered) {
- Set<CharSequenceWrapper> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
+ PartitionSpec spec = ops.current().spec(manifest.partitionSpecId());
+ Iterable<DataFile> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
- deletedFiles.addAll(manifestDeletes);
+ for (DataFile file : manifestDeletes) {
+ summaryBuilder.deletedFile(spec, file);
+ deletedFiles.add(CharSequenceWrapper.wrap(file.path()));
+ }
}
List<ManifestFile> group = groups.get(manifest.partitionSpecId());
@@ -362,6 +380,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
// when this point is reached, there is at least one file that will be deleted in the
// manifest. produce a copy of the manifest with all deleted files removed.
+ List<DataFile> deletedFiles = Lists.newArrayList();
Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
OutputFile filteredCopy = manifestPath(manifestCount.getAndIncrement());
ManifestWriter writer = new ManifestWriter(reader.spec(), filteredCopy, snapshotId());
@@ -383,6 +402,11 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
if (deletedPaths.contains(wrapper)) {
LOG.warn("Deleting a duplicate path from manifest {}: {}",
manifest.path(), wrapper.get());
+ summaryBuilder.incrementDuplicateDeletes();
+ } else {
+ // only add the file to deletes if it is a new delete
+ // this keeps the snapshot summary accurate for non-duplicate data
+ deletedFiles.add(entry.file().copy());
}
deletedPaths.add(wrapper);
@@ -400,7 +424,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
// update caches
filteredManifests.put(manifest, filtered);
- filteredManifestToDeletedFiles.put(filtered, deletedPaths);
+ filteredManifestToDeletedFiles.put(filtered, deletedFiles);
return filtered;
}
diff --git a/core/src/main/java/com/netflix/iceberg/OverwriteData.java b/core/src/main/java/com/netflix/iceberg/OverwriteData.java
index 3ebb725..f9a6abe 100644
--- a/core/src/main/java/com/netflix/iceberg/OverwriteData.java
+++ b/core/src/main/java/com/netflix/iceberg/OverwriteData.java
@@ -34,6 +34,11 @@ public class OverwriteData extends MergingSnapshotUpdate implements OverwriteFil
}
@Override
+ protected String operation() {
+ return DataOperations.OVERWRITE;
+ }
+
+ @Override
public OverwriteFiles overwriteByRowFilter(Expression expr) {
deleteByRowFilter(expr);
return this;
diff --git a/core/src/main/java/com/netflix/iceberg/ReplaceFiles.java b/core/src/main/java/com/netflix/iceberg/ReplaceFiles.java
index 73b47c7..71ac066 100644
--- a/core/src/main/java/com/netflix/iceberg/ReplaceFiles.java
+++ b/core/src/main/java/com/netflix/iceberg/ReplaceFiles.java
@@ -31,6 +31,11 @@ class ReplaceFiles extends MergingSnapshotUpdate implements RewriteFiles {
}
@Override
+ protected String operation() {
+ return DataOperations.REPLACE;
+ }
+
+ @Override
public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> filesToAdd) {
Preconditions.checkArgument(filesToDelete != null && !filesToDelete.isEmpty(),
"Files to delete cannot be null or empty");
diff --git a/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java b/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
index 4fb6aa8..360487e 100644
--- a/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
+++ b/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
@@ -29,6 +29,11 @@ public class ReplacePartitionsOperation extends MergingSnapshotUpdate implements
}
@Override
+ protected String operation() {
+ return DataOperations.OVERWRITE;
+ }
+
+ @Override
public ReplacePartitions addFile(DataFile file) {
dropPartition(file.partition());
add(file);
diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotParser.java b/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
index d73da8a..647f7f0 100644
--- a/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
+++ b/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
@@ -22,18 +22,23 @@ package com.netflix.iceberg;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.util.JsonUtil;
import java.io.IOException;
import java.io.StringWriter;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
public class SnapshotParser {
private static final String SNAPSHOT_ID = "snapshot-id";
private static final String PARENT_SNAPSHOT_ID = "parent-snapshot-id";
private static final String TIMESTAMP_MS = "timestamp-ms";
+ private static final String SUMMARY = "summary";
+ private static final String OPERATION = "operation";
private static final String MANIFESTS = "manifests";
private static final String MANIFEST_LIST = "manifest-list";
@@ -46,6 +51,22 @@ public class SnapshotParser {
}
generator.writeNumberField(TIMESTAMP_MS, snapshot.timestampMillis());
+ // if there is an operation, write the summary map
+ if (snapshot.operation() != null) {
+ generator.writeObjectFieldStart(SUMMARY);
+ generator.writeStringField(OPERATION, snapshot.operation());
+ if (snapshot.summary() != null) {
+ for (Map.Entry<String, String> entry : snapshot.summary().entrySet()) {
+ // only write operation once
+ if (OPERATION.equals(entry.getKey())) {
+ continue;
+ }
+ generator.writeStringField(entry.getKey(), entry.getValue());
+ }
+ }
+ generator.writeEndObject();
+ }
+
String manifestList = snapshot.manifestListLocation();
if (manifestList != null) {
// write just the location. manifests should not be embedded in JSON along with a list
@@ -86,17 +107,39 @@ public class SnapshotParser {
}
long timestamp = JsonUtil.getLong(TIMESTAMP_MS, node);
+ Map<String, String> summary = null;
+ String operation = null;
+ if (node.has(SUMMARY)) {
+ JsonNode sNode = node.get(SUMMARY);
+ Preconditions.checkArgument(sNode != null && !sNode.isNull() && sNode.isObject(),
+ "Cannot parse summary from non-object value: %s", sNode);
+
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ Iterator<String> fields = sNode.fieldNames();
+ while (fields.hasNext()) {
+ String field = fields.next();
+ if (field.equals(OPERATION)) {
+ operation = JsonUtil.getString(OPERATION, sNode);
+ } else {
+ builder.put(field, JsonUtil.getString(field, sNode));
+ }
+ }
+ summary = builder.build();
+ }
+
if (node.has(MANIFEST_LIST)) {
// the manifest list is stored in a manifest list file
String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
- return new BaseSnapshot(ops, versionId, parentId, timestamp, ops.io().newInputFile(manifestList));
+ return new BaseSnapshot(
+ ops, versionId, parentId, timestamp, operation, summary,
+ ops.io().newInputFile(manifestList));
} else {
// fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
// loaded lazily, if it is needed
List<ManifestFile> manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node),
location -> new GenericManifestFile(ops.io().newInputFile(location), 0));
- return new BaseSnapshot(ops, versionId, parentId, timestamp, manifests);
+ return new BaseSnapshot(ops, versionId, parentId, timestamp, operation, summary, manifests);
}
}
diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotSummary.java b/core/src/main/java/com/netflix/iceberg/SnapshotSummary.java
new file mode 100644
index 0000000..6dd223f
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/SnapshotSummary.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+
+public class SnapshotSummary {
+ public static final String ADDED_FILES_PROP = "added-data-files";
+ public static final String DELETED_FILES_PROP = "deleted-data-files";
+ public static final String TOTAL_FILES_PROP = "total-data-files";
+ public static final String ADDED_RECORDS_PROP = "added-records";
+ public static final String DELETED_RECORDS_PROP = "deleted-records";
+ public static final String TOTAL_RECORDS_PROP = "total-records";
+ public static final String DELETED_DUPLICATE_FILES = "deleted-duplicate-files";
+ public static final String CHANGED_PARTITION_COUNT_PROP = "changed-partition-count";
+
+ private SnapshotSummary() {
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ // commit summary tracking
+ private Set<String> changedPartitions = Sets.newHashSet();
+ private long addedFiles = 0L;
+ private long deletedFiles = 0L;
+ private long deletedDuplicateFiles = 0L;
+ private long addedRecords = 0L;
+ private long deletedRecords = 0L;
+
+ public void clear() {
+ changedPartitions.clear();
+ this.addedFiles = 0L;
+ this.deletedFiles = 0L;
+ this.deletedDuplicateFiles = 0L;
+ this.addedRecords = 0L;
+ this.deletedRecords = 0L;
+ }
+
+ public void incrementDuplicateDeletes() {
+ this.deletedDuplicateFiles += 1;
+ }
+
+ public void deletedFile(PartitionSpec spec, DataFile file) {
+ changedPartitions.add(spec.partitionToPath(file.partition()));
+ this.deletedFiles += 1;
+ this.deletedRecords += file.recordCount();
+ }
+
+ public void addedFile(PartitionSpec spec, DataFile file) {
+ changedPartitions.add(spec.partitionToPath(file.partition()));
+ this.addedFiles += 1;
+ this.addedRecords += file.recordCount();
+ }
+
+ public Map<String, String> build() {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+
+ setIf(addedFiles > 0, builder, ADDED_FILES_PROP, addedFiles);
+ setIf(deletedFiles > 0, builder, DELETED_FILES_PROP, deletedFiles);
+ setIf(deletedDuplicateFiles > 0, builder, DELETED_DUPLICATE_FILES, deletedDuplicateFiles);
+ setIf(addedRecords > 0, builder, ADDED_RECORDS_PROP, addedRecords);
+ setIf(deletedRecords > 0, builder, DELETED_RECORDS_PROP, deletedRecords);
+ setIf(true, builder, CHANGED_PARTITION_COUNT_PROP, changedPartitions.size());
+
+ return builder.build();
+ }
+
+ private static void setIf(boolean expression, ImmutableMap.Builder<String, String> builder,
+ String property, Object value) {
+ if (expression) {
+ builder.put(property, String.valueOf(value));
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java b/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
index 0cc98f9..39e7a58 100644
--- a/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
+++ b/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
@@ -22,6 +22,7 @@ package com.netflix.iceberg;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.netflix.iceberg.exceptions.CommitFailedException;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@@ -102,6 +104,61 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
*/
protected abstract void cleanUncommitted(Set<ManifestFile> committed);
+ /**
+ * A string that describes the action that produced the new snapshot.
+ *
+ * @return a string operation
+ */
+ protected abstract String operation();
+
+ /**
+ * A string map with a summary of the changes in this snapshot update.
+ *
+ * @return a string map that summarizes the update
+ */
+ protected Map<String, String> summary() {
+ return ImmutableMap.of();
+ }
+
+ /**
+ * Returns the snapshot summary from the implementation and updates totals.
+ */
+ private Map<String, String> summary(TableMetadata base) {
+ Map<String, String> summary = summary();
+ if (summary == null) {
+ return ImmutableMap.of();
+ }
+
+ Map<String, String> previousSummary;
+ if (base.currentSnapshot() != null) {
+ if (base.currentSnapshot().summary() != null) {
+ previousSummary = base.currentSnapshot().summary();
+ } else {
+ // previous snapshot had no summary, use an empty summary
+ previousSummary = ImmutableMap.of();
+ }
+ } else {
+ // if there was no previous snapshot, default the summary to start totals at 0
+ previousSummary = ImmutableMap.of(
+ SnapshotSummary.TOTAL_RECORDS_PROP, "0",
+ SnapshotSummary.TOTAL_FILES_PROP, "0");
+ }
+
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+
+ // copy all summary properties from the implementation
+ builder.putAll(summary);
+
+ updateTotal(
+ builder, previousSummary, SnapshotSummary.TOTAL_RECORDS_PROP,
+ summary, SnapshotSummary.ADDED_RECORDS_PROP, SnapshotSummary.DELETED_RECORDS_PROP);
+ updateTotal(
+ builder, previousSummary, SnapshotSummary.TOTAL_FILES_PROP,
+ summary, SnapshotSummary.ADDED_FILES_PROP, SnapshotSummary.DELETED_FILES_PROP);
+
+ return builder.build();
+ }
+
@Override
public Snapshot apply() {
this.base = ops.refresh();
@@ -134,12 +191,13 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
}
return new BaseSnapshot(ops,
- snapshotId(), parentSnapshotId, System.currentTimeMillis(),
+ snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
ops.io().newInputFile(manifestList.location()));
} else {
return new BaseSnapshot(ops,
- snapshotId(), parentSnapshotId, System.currentTimeMillis(), manifests);
+ snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
+ manifests);
}
}
@@ -268,4 +326,31 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
throw new RuntimeIOException(e, "Failed to read manifest: %s", manifest.path());
}
}
+
+ private static void updateTotal(ImmutableMap.Builder<String, String> summaryBuilder,
+ Map<String, String> previousSummary, String totalProperty,
+ Map<String, String> currentSummary,
+ String addedProperty, String deletedProperty) {
+ String totalStr = previousSummary.get(totalProperty);
+ if (totalStr != null) {
+ try {
+ long newTotal = Long.parseLong(totalStr);
+
+ String addedStr = currentSummary.get(addedProperty);
+ if (addedStr != null) {
+ newTotal += Long.parseLong(addedStr);
+ }
+
+ String deletedStr = currentSummary.get(deletedProperty);
+ if (deletedStr != null) {
+ newTotal -= Long.parseLong(deletedStr);
+ }
+
+ summaryBuilder.put(totalProperty, String.valueOf(newTotal));
+
+ } catch (NumberFormatException e) {
+ // ignore and do not add total
+ }
+ }
+ }
}
diff --git a/core/src/main/java/com/netflix/iceberg/StreamingDelete.java b/core/src/main/java/com/netflix/iceberg/StreamingDelete.java
index 4b3756a..8e3fab9 100644
--- a/core/src/main/java/com/netflix/iceberg/StreamingDelete.java
+++ b/core/src/main/java/com/netflix/iceberg/StreamingDelete.java
@@ -33,6 +33,11 @@ class StreamingDelete extends MergingSnapshotUpdate implements DeleteFiles {
}
@Override
+ protected String operation() {
+ return DataOperations.DELETE;
+ }
+
+ @Override
public StreamingDelete deleteFile(CharSequence path) {
delete(path);
return this;
diff --git a/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java b/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java
index dbcc811..d60247b 100644
--- a/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java
+++ b/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java
@@ -20,6 +20,7 @@
package com.netflix.iceberg;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -47,6 +48,39 @@ public class TestSnapshotJson {
expected.snapshotId(), snapshot.snapshotId());
Assert.assertEquals("Files should match",
expected.manifests(), snapshot.manifests());
+ Assert.assertNull("Operation should be null", snapshot.operation());
+ Assert.assertNull("Summary should be null", snapshot.summary());
+ }
+
+ @Test
+ public void testJsonConversionWithOperation() {
+ long parentId = 1;
+ long id = 2;
+ List<ManifestFile> manifests = ImmutableList.of(
+ new GenericManifestFile(localInput("file:/tmp/manifest1.avro"), 0),
+ new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0));
+
+ Snapshot expected = new BaseSnapshot(ops, id, parentId, System.currentTimeMillis(),
+ DataOperations.REPLACE, ImmutableMap.of("files-added", "4", "files-deleted", "100"),
+ manifests);
+
+ String json = SnapshotParser.toJson(expected);
+ Snapshot snapshot = SnapshotParser.fromJson(ops, json);
+
+ Assert.assertEquals("Snapshot ID should match",
+ expected.snapshotId(), snapshot.snapshotId());
+ Assert.assertEquals("Timestamp should match",
+ expected.timestampMillis(), snapshot.timestampMillis());
+ Assert.assertEquals("Parent ID should match",
+ expected.parentId(), snapshot.parentId());
+ Assert.assertEquals("Manifest list should match",
+ expected.manifestListLocation(), snapshot.manifestListLocation());
+ Assert.assertEquals("Files should match",
+ expected.manifests(), snapshot.manifests());
+ Assert.assertEquals("Operation should match",
+ expected.operation(), snapshot.operation());
+ Assert.assertEquals("Summary should match",
+ expected.summary(), snapshot.summary());
}
@Test
@@ -67,9 +101,9 @@ public class TestSnapshotJson {
}
Snapshot expected = new BaseSnapshot(
- ops, id, parentId, System.currentTimeMillis(), localInput(manifestList));
+ ops, id, parentId, System.currentTimeMillis(), null, null, localInput(manifestList));
Snapshot inMemory = new BaseSnapshot(
- ops, id, parentId, expected.timestampMillis(), manifests);
+ ops, id, parentId, expected.timestampMillis(), null, null, manifests);
Assert.assertEquals("Files should match in memory list",
inMemory.manifests(), expected.manifests());
@@ -87,5 +121,7 @@ public class TestSnapshotJson {
expected.manifestListLocation(), snapshot.manifestListLocation());
Assert.assertEquals("Files should match",
expected.manifests(), snapshot.manifests());
+ Assert.assertNull("Operation should be null", snapshot.operation());
+ Assert.assertNull("Summary should be null", snapshot.summary());
}
}
diff --git a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
index e21b4b6..1b54390 100644
--- a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
+++ b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
@@ -68,11 +68,11 @@ public class TestTableMetadataJson {
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
- null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of(
+ ops, previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
- null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of(
+ ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
List<SnapshotLogEntry> snapshotLog = ImmutableList.<SnapshotLogEntry>builder()
@@ -130,11 +130,11 @@ public class TestTableMetadataJson {
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
- ops, previousSnapshotId, null, previousSnapshotId, ImmutableList.of(
+ ops, previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
- ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of(
+ ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
List<SnapshotLogEntry> reversedSnapshotLog = Lists.newArrayList();
@@ -175,13 +175,14 @@ public class TestTableMetadataJson {
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
- ops, previousSnapshotId, null, previousSnapshotId, ImmutableList.of(
+ ops, previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
- ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of(
+ ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
+
TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location",
System.currentTimeMillis(), 3, schema, 6, ImmutableList.of(spec),
ImmutableMap.of("property", "value"), currentSnapshotId,
diff --git a/core/src/test/java/com/netflix/iceberg/avro/AvroTestHelpers.java b/core/src/test/java/com/netflix/iceberg/avro/AvroTestHelpers.java
index e988d51..c12912f 100644
--- a/core/src/test/java/com/netflix/iceberg/avro/AvroTestHelpers.java
+++ b/core/src/test/java/com/netflix/iceberg/avro/AvroTestHelpers.java
@@ -21,19 +21,11 @@ package com.netflix.iceberg.avro;
import com.netflix.iceberg.types.Type;
import com.netflix.iceberg.types.Types;
-import com.netflix.iceberg.util.CharSequenceWrapper;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.junit.Assert;
-
-import java.time.LocalDate;
-import java.time.temporal.ChronoUnit;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;