You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/03/06 00:43:43 UTC

[incubator-iceberg] branch master updated: Add snapshot summary and operation. (#74)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new e8c5e18  Add snapshot summary and operation. (#74)
e8c5e18 is described below

commit e8c5e1812f44fecfa1f8706f942a240a1b42abe5
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Tue Mar 5 16:43:39 2019 -0800

    Add snapshot summary and operation. (#74)
---
 .../java/com/netflix/iceberg/DataOperations.java   | 60 +++++++++++++
 .../main/java/com/netflix/iceberg/Snapshot.java    | 16 ++++
 .../java/com/netflix/iceberg/BaseSnapshot.java     | 25 +++++-
 .../main/java/com/netflix/iceberg/FastAppend.java  | 13 +++
 .../main/java/com/netflix/iceberg/MergeAppend.java |  5 ++
 .../com/netflix/iceberg/MergingSnapshotUpdate.java | 34 ++++++--
 .../java/com/netflix/iceberg/OverwriteData.java    |  5 ++
 .../java/com/netflix/iceberg/ReplaceFiles.java     |  5 ++
 .../iceberg/ReplacePartitionsOperation.java        |  5 ++
 .../java/com/netflix/iceberg/SnapshotParser.java   | 47 ++++++++++-
 .../java/com/netflix/iceberg/SnapshotSummary.java  | 98 ++++++++++++++++++++++
 .../java/com/netflix/iceberg/SnapshotUpdate.java   | 89 +++++++++++++++++++-
 .../java/com/netflix/iceberg/StreamingDelete.java  |  5 ++
 .../java/com/netflix/iceberg/TestSnapshotJson.java | 40 ++++++++-
 .../com/netflix/iceberg/TestTableMetadataJson.java | 13 +--
 .../com/netflix/iceberg/avro/AvroTestHelpers.java  |  8 --
 16 files changed, 441 insertions(+), 27 deletions(-)

diff --git a/api/src/main/java/com/netflix/iceberg/DataOperations.java b/api/src/main/java/com/netflix/iceberg/DataOperations.java
new file mode 100644
index 0000000..c273e66
--- /dev/null
+++ b/api/src/main/java/com/netflix/iceberg/DataOperations.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+/**
+ * Data operations that produce snapshots.
+ * <p>
+ * A snapshot can return the operation that created the snapshot to help other components ignore
+ * snapshots that are not needed for some tasks. For example, snapshot expiration does not need to
+ * clean up deleted files for appends, which have no deleted files.
+ */
+public class DataOperations {
+  private DataOperations() {
+  }
+
+  /**
+   * New data is appended to the table and no data is removed or deleted.
+   * <p>
+   * This operation is implemented by {@link AppendFiles}.
+   */
+  public static final String APPEND = "append";
+
+  /**
+   * Files are removed and replaced, without changing the data in the table.
+   * <p>
+   * This operation is implemented by {@link RewriteFiles}.
+   */
+  public static final String REPLACE = "replace";
+
+  /**
+   * New data is added to overwrite existing data.
+   * <p>
+   * This operation is implemented by {@link OverwriteFiles} and {@link ReplacePartitions}.
+   */
+  public static final String OVERWRITE = "overwrite";
+
+  /**
+   * Data is deleted from the table and no data is added.
+   * <p>
+   * This operation is implemented by {@link DeleteFiles}.
+   */
+  public static final String DELETE = "delete";
+}
diff --git a/api/src/main/java/com/netflix/iceberg/Snapshot.java b/api/src/main/java/com/netflix/iceberg/Snapshot.java
index 89542dc..c2071d6 100644
--- a/api/src/main/java/com/netflix/iceberg/Snapshot.java
+++ b/api/src/main/java/com/netflix/iceberg/Snapshot.java
@@ -20,6 +20,7 @@
 package com.netflix.iceberg;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * A snapshot of the data in a table at a point in time.
@@ -63,6 +64,21 @@ public interface Snapshot {
   List<ManifestFile> manifests();
 
   /**
+   * Return the name of the {@link DataOperations data operation} that produced this snapshot.
+   *
+   * @return the operation that produced this snapshot, or null if the operation is unknown
+   * @see DataOperations
+   */
+  String operation();
+
+  /**
+   * Return a string map of summary data for the operation that produced this snapshot.
+   *
+   * @return a string map of summary data.
+   */
+  Map<String, String> summary();
+
+  /**
    * Return all files added to the table in this snapshot.
    * <p>
    * The files returned include the following columns: file_path, file_format, partition,
diff --git a/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java b/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
index 554f24f..8289ea2 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
@@ -29,6 +29,7 @@ import com.netflix.iceberg.io.InputFile;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 class BaseSnapshot implements Snapshot {
   private final TableOperations ops;
@@ -36,6 +37,8 @@ class BaseSnapshot implements Snapshot {
   private final Long parentId;
   private final long timestampMillis;
   private final InputFile manifestList;
+  private final String operation;
+  private final Map<String, String> summary;
 
   // lazily initialized
   private List<ManifestFile> manifests = null;
@@ -48,7 +51,7 @@ class BaseSnapshot implements Snapshot {
   BaseSnapshot(TableOperations ops,
                long snapshotId,
                String... manifestFiles) {
-    this(ops, snapshotId, null, System.currentTimeMillis(),
+    this(ops, snapshotId, null, System.currentTimeMillis(), null, null,
         Lists.transform(Arrays.asList(manifestFiles),
             path -> new GenericManifestFile(ops.io().newInputFile(path), 0)));
   }
@@ -57,11 +60,15 @@ class BaseSnapshot implements Snapshot {
                long snapshotId,
                Long parentId,
                long timestampMillis,
+               String operation,
+               Map<String, String> summary,
                InputFile manifestList) {
     this.ops = ops;
     this.snapshotId = snapshotId;
     this.parentId = parentId;
     this.timestampMillis = timestampMillis;
+    this.operation = operation;
+    this.summary = summary;
     this.manifestList = manifestList;
   }
 
@@ -69,8 +76,10 @@ class BaseSnapshot implements Snapshot {
                long snapshotId,
                Long parentId,
                long timestampMillis,
+               String operation,
+               Map<String, String> summary,
                List<ManifestFile> manifests) {
-    this(ops, snapshotId, parentId, timestampMillis, (InputFile) null);
+    this(ops, snapshotId, parentId, timestampMillis, operation, summary, (InputFile) null);
     this.manifests = manifests;
   }
 
@@ -90,6 +99,16 @@ class BaseSnapshot implements Snapshot {
   }
 
   @Override
+  public String operation() {
+    return operation;
+  }
+
+  @Override
+  public Map<String, String> summary() {
+    return summary;
+  }
+
+  @Override
   public List<ManifestFile> manifests() {
     if (manifests == null) {
       // if manifests isn't set, then the snapshotFile is set and should be read to get the list
@@ -164,6 +183,8 @@ class BaseSnapshot implements Snapshot {
     return Objects.toStringHelper(this)
         .add("id", snapshotId)
         .add("timestamp_ms", timestampMillis)
+        .add("operation", operation)
+        .add("summary", summary)
         .add("manifests", manifests())
         .toString();
   }
diff --git a/core/src/main/java/com/netflix/iceberg/FastAppend.java b/core/src/main/java/com/netflix/iceberg/FastAppend.java
index 278f059..63bfcfc 100644
--- a/core/src/main/java/com/netflix/iceberg/FastAppend.java
+++ b/core/src/main/java/com/netflix/iceberg/FastAppend.java
@@ -25,6 +25,7 @@ import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.io.OutputFile;
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -34,6 +35,7 @@ import java.util.Set;
  */
 class FastAppend extends SnapshotUpdate implements AppendFiles {
   private final PartitionSpec spec;
+  private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
   private final List<DataFile> newFiles = Lists.newArrayList();
   private ManifestFile newManifest = null;
   private boolean hasNewFiles = false;
@@ -44,9 +46,20 @@ class FastAppend extends SnapshotUpdate implements AppendFiles {
   }
 
   @Override
+  protected String operation() {
+    return DataOperations.APPEND;
+  }
+
+  @Override
+  protected Map<String, String> summary() {
+    return summaryBuilder.build();
+  }
+
+  @Override
   public FastAppend appendFile(DataFile file) {
     this.hasNewFiles = true;
     newFiles.add(file);
+    summaryBuilder.addedFile(spec, file);
     return this;
   }
 
diff --git a/core/src/main/java/com/netflix/iceberg/MergeAppend.java b/core/src/main/java/com/netflix/iceberg/MergeAppend.java
index 6f67c56..8268afa 100644
--- a/core/src/main/java/com/netflix/iceberg/MergeAppend.java
+++ b/core/src/main/java/com/netflix/iceberg/MergeAppend.java
@@ -32,6 +32,11 @@ class MergeAppend extends MergingSnapshotUpdate implements AppendFiles {
   }
 
   @Override
+  protected String operation() {
+    return DataOperations.APPEND;
+  }
+
+  @Override
   public MergeAppend appendFile(DataFile file) {
     add(file);
     return this;
diff --git a/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java b/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
index 156f9ea..eb37466 100644
--- a/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
+++ b/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
@@ -21,6 +21,7 @@ package com.netflix.iceberg;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -78,6 +79,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
   private final PartitionSpec spec;
   private final long manifestTargetSizeBytes;
   private final int minManifestsCountToMerge;
+  private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
 
   // update data
   private final AtomicInteger manifestCount = new AtomicInteger(0);
@@ -99,7 +101,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
   private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
 
   // tracking where files were deleted to validate retries quickly
-  private final Map<ManifestFile, Set<CharSequenceWrapper>> filteredManifestToDeletedFiles =
+  private final Map<ManifestFile, Iterable<DataFile>> filteredManifestToDeletedFiles =
       Maps.newConcurrentMap();
 
   private boolean filterUpdated = false; // used to clear caches of filtered and merged manifests
@@ -124,7 +126,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
   }
 
   protected List<DataFile> addedFiles() {
-    return newFiles;
+    return ImmutableList.copyOf(newFiles);
   }
 
   protected void failAnyDelete() {
@@ -172,7 +174,14 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
   }
 
   @Override
+  protected Map<String, String> summary() {
+    return summaryBuilder.build();
+  }
+
+  @Override
   public List<ManifestFile> apply(TableMetadata base) {
+    summaryBuilder.clear();
+
     if (filterUpdated) {
       cleanUncommittedFilters(SnapshotUpdate.EMPTY_SET);
       this.filterUpdated = false;
@@ -188,6 +197,11 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
     // add the current spec as the first group. files are added to the beginning.
     try {
       if (newFiles.size() > 0) {
+        // add all of the new files to the summary builder
+        for (DataFile file : newFiles) {
+          summaryBuilder.addedFile(spec, file);
+        }
+
         ManifestFile newManifest = newFilesAsManifest();
         List<ManifestFile> manifestGroup = Lists.newArrayList();
         manifestGroup.add(newManifest);
@@ -212,9 +226,13 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
             }, IOException.class);
 
         for (ManifestFile manifest : filtered) {
-          Set<CharSequenceWrapper> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
+          PartitionSpec spec = ops.current().spec(manifest.partitionSpecId());
+          Iterable<DataFile> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
           if (manifestDeletes != null) {
-            deletedFiles.addAll(manifestDeletes);
+            for (DataFile file : manifestDeletes) {
+              summaryBuilder.deletedFile(spec, file);
+              deletedFiles.add(CharSequenceWrapper.wrap(file.path()));
+            }
           }
 
           List<ManifestFile> group = groups.get(manifest.partitionSpecId());
@@ -362,6 +380,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
 
       // when this point is reached, there is at least one file that will be deleted in the
       // manifest. produce a copy of the manifest with all deleted files removed.
+      List<DataFile> deletedFiles = Lists.newArrayList();
       Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
       OutputFile filteredCopy = manifestPath(manifestCount.getAndIncrement());
       ManifestWriter writer = new ManifestWriter(reader.spec(), filteredCopy, snapshotId());
@@ -383,6 +402,11 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
               if (deletedPaths.contains(wrapper)) {
                 LOG.warn("Deleting a duplicate path from manifest {}: {}",
                     manifest.path(), wrapper.get());
+                summaryBuilder.incrementDuplicateDeletes();
+              } else {
+                // only add the file to deletes if it is a new delete
+                // this keeps the snapshot summary accurate for non-duplicate data
+                deletedFiles.add(entry.file().copy());
               }
               deletedPaths.add(wrapper);
 
@@ -400,7 +424,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
 
       // update caches
       filteredManifests.put(manifest, filtered);
-      filteredManifestToDeletedFiles.put(filtered, deletedPaths);
+      filteredManifestToDeletedFiles.put(filtered, deletedFiles);
 
       return filtered;
     }
diff --git a/core/src/main/java/com/netflix/iceberg/OverwriteData.java b/core/src/main/java/com/netflix/iceberg/OverwriteData.java
index 3ebb725..f9a6abe 100644
--- a/core/src/main/java/com/netflix/iceberg/OverwriteData.java
+++ b/core/src/main/java/com/netflix/iceberg/OverwriteData.java
@@ -34,6 +34,11 @@ public class OverwriteData extends MergingSnapshotUpdate implements OverwriteFil
   }
 
   @Override
+  protected String operation() {
+    return DataOperations.OVERWRITE;
+  }
+
+  @Override
   public OverwriteFiles overwriteByRowFilter(Expression expr) {
     deleteByRowFilter(expr);
     return this;
diff --git a/core/src/main/java/com/netflix/iceberg/ReplaceFiles.java b/core/src/main/java/com/netflix/iceberg/ReplaceFiles.java
index 73b47c7..71ac066 100644
--- a/core/src/main/java/com/netflix/iceberg/ReplaceFiles.java
+++ b/core/src/main/java/com/netflix/iceberg/ReplaceFiles.java
@@ -31,6 +31,11 @@ class ReplaceFiles extends MergingSnapshotUpdate implements RewriteFiles {
   }
 
   @Override
+  protected String operation() {
+    return DataOperations.REPLACE;
+  }
+
+  @Override
   public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> filesToAdd) {
     Preconditions.checkArgument(filesToDelete != null && !filesToDelete.isEmpty(),
         "Files to delete cannot be null or empty");
diff --git a/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java b/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
index 4fb6aa8..360487e 100644
--- a/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
+++ b/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
@@ -29,6 +29,11 @@ public class ReplacePartitionsOperation extends MergingSnapshotUpdate implements
   }
 
   @Override
+  protected String operation() {
+    return DataOperations.OVERWRITE;
+  }
+
+  @Override
   public ReplacePartitions addFile(DataFile file) {
     dropPartition(file.partition());
     add(file);
diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotParser.java b/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
index d73da8a..647f7f0 100644
--- a/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
+++ b/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
@@ -22,18 +22,23 @@ package com.netflix.iceberg;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.util.JsonUtil;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 public class SnapshotParser {
 
   private static final String SNAPSHOT_ID = "snapshot-id";
   private static final String PARENT_SNAPSHOT_ID = "parent-snapshot-id";
   private static final String TIMESTAMP_MS = "timestamp-ms";
+  private static final String SUMMARY = "summary";
+  private static final String OPERATION = "operation";
   private static final String MANIFESTS = "manifests";
   private static final String MANIFEST_LIST = "manifest-list";
 
@@ -46,6 +51,22 @@ public class SnapshotParser {
     }
     generator.writeNumberField(TIMESTAMP_MS, snapshot.timestampMillis());
 
+    // if there is an operation, write the summary map
+    if (snapshot.operation() != null) {
+      generator.writeObjectFieldStart(SUMMARY);
+      generator.writeStringField(OPERATION, snapshot.operation());
+      if (snapshot.summary() != null) {
+        for (Map.Entry<String, String> entry : snapshot.summary().entrySet()) {
+          // only write operation once
+          if (OPERATION.equals(entry.getKey())) {
+            continue;
+          }
+          generator.writeStringField(entry.getKey(), entry.getValue());
+        }
+      }
+      generator.writeEndObject();
+    }
+
     String manifestList = snapshot.manifestListLocation();
     if (manifestList != null) {
       // write just the location. manifests should not be embedded in JSON along with a list
@@ -86,17 +107,39 @@ public class SnapshotParser {
     }
     long timestamp = JsonUtil.getLong(TIMESTAMP_MS, node);
 
+    Map<String, String> summary = null;
+    String operation = null;
+    if (node.has(SUMMARY)) {
+      JsonNode sNode = node.get(SUMMARY);
+      Preconditions.checkArgument(sNode != null && !sNode.isNull() && sNode.isObject(),
+          "Cannot parse summary from non-object value: %s", sNode);
+
+      ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+      Iterator<String> fields = sNode.fieldNames();
+      while (fields.hasNext()) {
+        String field = fields.next();
+        if (field.equals(OPERATION)) {
+          operation = JsonUtil.getString(OPERATION, sNode);
+        } else {
+          builder.put(field, JsonUtil.getString(field, sNode));
+        }
+      }
+      summary = builder.build();
+    }
+
     if (node.has(MANIFEST_LIST)) {
       // the manifest list is stored in a manifest list file
       String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
-      return new BaseSnapshot(ops, versionId, parentId, timestamp, ops.io().newInputFile(manifestList));
+      return new BaseSnapshot(
+          ops, versionId, parentId, timestamp, operation, summary,
+          ops.io().newInputFile(manifestList));
 
     } else {
       // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
       // loaded lazily, if it is needed
       List<ManifestFile> manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node),
           location -> new GenericManifestFile(ops.io().newInputFile(location), 0));
-      return new BaseSnapshot(ops, versionId, parentId, timestamp, manifests);
+      return new BaseSnapshot(ops, versionId, parentId, timestamp, operation, summary, manifests);
     }
   }
 
diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotSummary.java b/core/src/main/java/com/netflix/iceberg/SnapshotSummary.java
new file mode 100644
index 0000000..6dd223f
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/SnapshotSummary.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+
+public class SnapshotSummary {
+  public static final String ADDED_FILES_PROP = "added-data-files";
+  public static final String DELETED_FILES_PROP = "deleted-data-files";
+  public static final String TOTAL_FILES_PROP = "total-data-files";
+  public static final String ADDED_RECORDS_PROP = "added-records";
+  public static final String DELETED_RECORDS_PROP = "deleted-records";
+  public static final String TOTAL_RECORDS_PROP = "total-records";
+  public static final String DELETED_DUPLICATE_FILES = "deleted-duplicate-files";
+  public static final String CHANGED_PARTITION_COUNT_PROP = "changed-partition-count";
+
+  private SnapshotSummary() {
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    // commit summary tracking
+    private Set<String> changedPartitions = Sets.newHashSet();
+    private long addedFiles = 0L;
+    private long deletedFiles = 0L;
+    private long deletedDuplicateFiles = 0L;
+    private long addedRecords = 0L;
+    private long deletedRecords = 0L;
+
+    public void clear() {
+      changedPartitions.clear();
+      this.addedFiles = 0L;
+      this.deletedFiles = 0L;
+      this.deletedDuplicateFiles = 0L;
+      this.addedRecords = 0L;
+      this.deletedRecords = 0L;
+    }
+
+    public void incrementDuplicateDeletes() {
+      this.deletedDuplicateFiles += 1;
+    }
+
+    public void deletedFile(PartitionSpec spec, DataFile file) {
+      changedPartitions.add(spec.partitionToPath(file.partition()));
+      this.deletedFiles += 1;
+      this.deletedRecords += file.recordCount();
+    }
+
+    public void addedFile(PartitionSpec spec, DataFile file) {
+      changedPartitions.add(spec.partitionToPath(file.partition()));
+      this.addedFiles += 1;
+      this.addedRecords += file.recordCount();
+    }
+
+    public Map<String, String> build() {
+      ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+
+      setIf(addedFiles > 0, builder, ADDED_FILES_PROP, addedFiles);
+      setIf(deletedFiles > 0, builder, DELETED_FILES_PROP, deletedFiles);
+      setIf(deletedDuplicateFiles > 0, builder, DELETED_DUPLICATE_FILES, deletedDuplicateFiles);
+      setIf(addedRecords > 0, builder, ADDED_RECORDS_PROP, addedRecords);
+      setIf(deletedRecords > 0, builder, DELETED_RECORDS_PROP, deletedRecords);
+      setIf(true, builder, CHANGED_PARTITION_COUNT_PROP, changedPartitions.size());
+
+      return builder.build();
+    }
+
+    private static void setIf(boolean expression, ImmutableMap.Builder<String, String> builder,
+                              String property, Object value) {
+      if (expression) {
+        builder.put(property, String.valueOf(value));
+      }
+    }
+  }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java b/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
index 0cc98f9..39e7a58 100644
--- a/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
+++ b/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
@@ -22,6 +22,7 @@ package com.netflix.iceberg;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.netflix.iceberg.exceptions.CommitFailedException;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -102,6 +104,61 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
    */
   protected abstract void cleanUncommitted(Set<ManifestFile> committed);
 
+  /**
+   * A string that describes the action that produced the new snapshot.
+   *
+   * @return a string operation
+   */
+  protected abstract String operation();
+
+  /**
+   * A string map with a summary of the changes in this snapshot update.
+   *
+   * @return a string map that summarizes the update
+   */
+  protected Map<String, String> summary() {
+    return ImmutableMap.of();
+  }
+
+  /**
+   * Returns the snapshot summary from the implementation and updates totals.
+   */
+  private Map<String, String> summary(TableMetadata base) {
+    Map<String, String> summary = summary();
+    if (summary == null) {
+      return ImmutableMap.of();
+    }
+
+    Map<String, String> previousSummary;
+    if (base.currentSnapshot() != null) {
+      if (base.currentSnapshot().summary() != null) {
+        previousSummary = base.currentSnapshot().summary();
+      } else {
+        // previous snapshot had no summary, use an empty summary
+        previousSummary = ImmutableMap.of();
+      }
+    } else {
+      // if there was no previous snapshot, default the summary to start totals at 0
+      previousSummary = ImmutableMap.of(
+          SnapshotSummary.TOTAL_RECORDS_PROP, "0",
+          SnapshotSummary.TOTAL_FILES_PROP, "0");
+    }
+
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+
+    // copy all summary properties from the implementation
+    builder.putAll(summary);
+
+    updateTotal(
+        builder, previousSummary, SnapshotSummary.TOTAL_RECORDS_PROP,
+        summary, SnapshotSummary.ADDED_RECORDS_PROP, SnapshotSummary.DELETED_RECORDS_PROP);
+    updateTotal(
+        builder, previousSummary, SnapshotSummary.TOTAL_FILES_PROP,
+        summary, SnapshotSummary.ADDED_FILES_PROP, SnapshotSummary.DELETED_FILES_PROP);
+
+    return builder.build();
+  }
+
   @Override
   public Snapshot apply() {
     this.base = ops.refresh();
@@ -134,12 +191,13 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
       }
 
       return new BaseSnapshot(ops,
-          snapshotId(), parentSnapshotId, System.currentTimeMillis(),
+          snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
           ops.io().newInputFile(manifestList.location()));
 
     } else {
       return new BaseSnapshot(ops,
-          snapshotId(), parentSnapshotId, System.currentTimeMillis(), manifests);
+          snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
+          manifests);
     }
   }
 
@@ -268,4 +326,31 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
       throw new RuntimeIOException(e, "Failed to read manifest: %s", manifest.path());
     }
   }
+
+  private static void updateTotal(ImmutableMap.Builder<String, String> summaryBuilder,
+                                  Map<String, String> previousSummary, String totalProperty,
+                                  Map<String, String> currentSummary,
+                                  String addedProperty, String deletedProperty) {
+    String totalStr = previousSummary.get(totalProperty);
+    if (totalStr != null) {
+      try {
+        long newTotal = Long.parseLong(totalStr);
+
+        String addedStr = currentSummary.get(addedProperty);
+        if (addedStr != null) {
+          newTotal += Long.parseLong(addedStr);
+        }
+
+        String deletedStr = currentSummary.get(deletedProperty);
+        if (deletedStr != null) {
+          newTotal -= Long.parseLong(deletedStr);
+        }
+
+        summaryBuilder.put(totalProperty, String.valueOf(newTotal));
+
+      } catch (NumberFormatException e) {
+        // ignore and do not add total
+      }
+    }
+  }
 }
diff --git a/core/src/main/java/com/netflix/iceberg/StreamingDelete.java b/core/src/main/java/com/netflix/iceberg/StreamingDelete.java
index 4b3756a..8e3fab9 100644
--- a/core/src/main/java/com/netflix/iceberg/StreamingDelete.java
+++ b/core/src/main/java/com/netflix/iceberg/StreamingDelete.java
@@ -33,6 +33,11 @@ class StreamingDelete extends MergingSnapshotUpdate implements DeleteFiles {
   }
 
   @Override
+  protected String operation() {
+    return DataOperations.DELETE;
+  }
+
+  @Override
   public StreamingDelete deleteFile(CharSequence path) {
     delete(path);
     return this;
diff --git a/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java b/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java
index dbcc811..d60247b 100644
--- a/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java
+++ b/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java
@@ -20,6 +20,7 @@
 package com.netflix.iceberg;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -47,6 +48,39 @@ public class TestSnapshotJson {
         expected.snapshotId(), snapshot.snapshotId());
     Assert.assertEquals("Files should match",
         expected.manifests(), snapshot.manifests());
+    Assert.assertNull("Operation should be null", snapshot.operation());
+    Assert.assertNull("Summary should be null", snapshot.summary());
+  }
+
+  @Test
+  public void testJsonConversionWithOperation() {
+    long parentId = 1;
+    long id = 2;
+    List<ManifestFile> manifests = ImmutableList.of(
+        new GenericManifestFile(localInput("file:/tmp/manifest1.avro"), 0),
+        new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0));
+
+    Snapshot expected = new BaseSnapshot(ops, id, parentId, System.currentTimeMillis(),
+        DataOperations.REPLACE, ImmutableMap.of("files-added", "4", "files-deleted", "100"),
+        manifests);
+
+    String json = SnapshotParser.toJson(expected);
+    Snapshot snapshot = SnapshotParser.fromJson(ops, json);
+
+    Assert.assertEquals("Snapshot ID should match",
+        expected.snapshotId(), snapshot.snapshotId());
+    Assert.assertEquals("Timestamp should match",
+        expected.timestampMillis(), snapshot.timestampMillis());
+    Assert.assertEquals("Parent ID should match",
+        expected.parentId(), snapshot.parentId());
+    Assert.assertEquals("Manifest list should match",
+        expected.manifestListLocation(), snapshot.manifestListLocation());
+    Assert.assertEquals("Files should match",
+        expected.manifests(), snapshot.manifests());
+    Assert.assertEquals("Operation should match",
+        expected.operation(), snapshot.operation());
+    Assert.assertEquals("Summary should match",
+        expected.summary(), snapshot.summary());
   }
 
   @Test
@@ -67,9 +101,9 @@ public class TestSnapshotJson {
     }
 
     Snapshot expected = new BaseSnapshot(
-        ops, id, parentId, System.currentTimeMillis(), localInput(manifestList));
+        ops, id, parentId, System.currentTimeMillis(), null, null, localInput(manifestList));
     Snapshot inMemory = new BaseSnapshot(
-        ops, id, parentId, expected.timestampMillis(), manifests);
+        ops, id, parentId, expected.timestampMillis(), null, null, manifests);
 
     Assert.assertEquals("Files should match in memory list",
         inMemory.manifests(), expected.manifests());
@@ -87,5 +121,7 @@ public class TestSnapshotJson {
         expected.manifestListLocation(), snapshot.manifestListLocation());
     Assert.assertEquals("Files should match",
         expected.manifests(), snapshot.manifests());
+    Assert.assertNull("Operation should be null", snapshot.operation());
+    Assert.assertNull("Summary should be null", snapshot.summary());
   }
 }
diff --git a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
index e21b4b6..1b54390 100644
--- a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
+++ b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
@@ -68,11 +68,11 @@ public class TestTableMetadataJson {
 
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
-        null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of(
+        ops, previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
-        null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of(
+        ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
 
     List<SnapshotLogEntry> snapshotLog = ImmutableList.<SnapshotLogEntry>builder()
@@ -130,11 +130,11 @@ public class TestTableMetadataJson {
 
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
-        ops, previousSnapshotId, null, previousSnapshotId, ImmutableList.of(
+        ops, previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
-        ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of(
+        ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
 
     List<SnapshotLogEntry> reversedSnapshotLog = Lists.newArrayList();
@@ -175,13 +175,14 @@ public class TestTableMetadataJson {
 
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
-        ops, previousSnapshotId, null, previousSnapshotId, ImmutableList.of(
+        ops, previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
-        ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of(
+        ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
 
+
     TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location",
         System.currentTimeMillis(), 3, schema, 6, ImmutableList.of(spec),
         ImmutableMap.of("property", "value"), currentSnapshotId,
diff --git a/core/src/test/java/com/netflix/iceberg/avro/AvroTestHelpers.java b/core/src/test/java/com/netflix/iceberg/avro/AvroTestHelpers.java
index e988d51..c12912f 100644
--- a/core/src/test/java/com/netflix/iceberg/avro/AvroTestHelpers.java
+++ b/core/src/test/java/com/netflix/iceberg/avro/AvroTestHelpers.java
@@ -21,19 +21,11 @@ package com.netflix.iceberg.avro;
 
 import com.netflix.iceberg.types.Type;
 import com.netflix.iceberg.types.Types;
-import com.netflix.iceberg.util.CharSequenceWrapper;
 import org.apache.avro.JsonProperties;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.junit.Assert;
-
-import java.time.LocalDate;
-import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;