You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "nastra (via GitHub)" <gi...@apache.org> on 2023/02/07 16:46:02 UTC

[GitHub] [iceberg] nastra commented on a diff in pull request #6706: Refactor table metadata snapshot management

nastra commented on code in PR #6706:
URL: https://github.com/apache/iceberg/pull/6706#discussion_r1098892936


##########
core/src/main/java/org/apache/iceberg/SnapshotOperations.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.SerializableSupplier;
+
+/**
+ * SnapshotOperations abstracts access to snapshots for table metadata. This allows a subset of
+ * snapshots/refs to be loaded initially for operations that do not require all to be present (e.g.
+ * table scans of the current snapshot).
+ *
+ * <p>In the event that snapshots/refs need to be accessed, the provided supplier will be invoked,
+ * which must provide all addressable snapshots for the table.
+ */
+class SnapshotOperations implements Serializable {
+  private List<Snapshot> snapshots;
+  private final SerializableSupplier<List<Snapshot>> snapshotsSupplier;
+  private Map<Long, Snapshot> snapshotsById;
+  private Map<String, SnapshotRef> refs;
+  private final SerializableSupplier<Map<String, SnapshotRef>> refsSupplier;
+  private boolean loaded;
+
+  private SnapshotOperations(
+      List<Snapshot> snapshots,
+      SerializableSupplier<List<Snapshot>> snapshotsSupplier,
+      Map<String, SnapshotRef> refs,
+      SerializableSupplier<Map<String, SnapshotRef>> refsSupplier) {
+    this.snapshots = ImmutableList.copyOf(snapshots);
+    this.snapshotsSupplier = snapshotsSupplier;
+    this.refs = SerializableMap.copyOf(refs);
+    this.refsSupplier = refsSupplier;
+
+    this.snapshotsById = indexSnapshotsById(snapshots);
+  }
+
+  List<Snapshot> snapshots() {
+    ensureLoaded();
+
+    return snapshots;
+  }
+
+  Snapshot snapshot(long id) {
+    if (!snapshotsById.containsKey(id)) {
+      ensureLoaded();
+    }
+
+    return snapshotsById.get(id);
+  }
+
+  boolean contains(long id) {
+    if (!snapshotsById.containsKey(id)) {
+      ensureLoaded();
+    }
+
+    return snapshotsById.containsKey(id);
+  }
+
+  Map<String, SnapshotRef> refs() {
+    return refs;
+  }
+
+  SnapshotRef ref(String name) {
+    return refs.get(name);
+  }
+
+  private void ensureLoaded() {

Review Comment:
   do we care about parallel access here? just wondering whether this should be `synchronized` here



##########
core/src/main/java/org/apache/iceberg/SnapshotOperations.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.SerializableSupplier;
+
+/**
+ * SnapshotOperations abstracts access to snapshots for table metadata. This allows a subset of
+ * snapshots/refs to be loaded initially for operations that do not require all to be present (e.g.
+ * table scans of the current snapshot).
+ *
+ * <p>In the event that snapshots/refs need to be accessed, the provided supplier will be invoked,
+ * which must provide all addressable snapshots for the table.
+ */
+class SnapshotOperations implements Serializable {
+  private List<Snapshot> snapshots;
+  private final SerializableSupplier<List<Snapshot>> snapshotsSupplier;
+  private Map<Long, Snapshot> snapshotsById;
+  private Map<String, SnapshotRef> refs;
+  private final SerializableSupplier<Map<String, SnapshotRef>> refsSupplier;
+  private boolean loaded;
+
+  private SnapshotOperations(
+      List<Snapshot> snapshots,
+      SerializableSupplier<List<Snapshot>> snapshotsSupplier,
+      Map<String, SnapshotRef> refs,
+      SerializableSupplier<Map<String, SnapshotRef>> refsSupplier) {
+    this.snapshots = ImmutableList.copyOf(snapshots);
+    this.snapshotsSupplier = snapshotsSupplier;
+    this.refs = SerializableMap.copyOf(refs);
+    this.refsSupplier = refsSupplier;
+
+    this.snapshotsById = indexSnapshotsById(snapshots);
+  }
+
+  List<Snapshot> snapshots() {
+    ensureLoaded();
+
+    return snapshots;
+  }
+
+  Snapshot snapshot(long id) {
+    if (!snapshotsById.containsKey(id)) {
+      ensureLoaded();
+    }
+
+    return snapshotsById.get(id);
+  }
+
+  boolean contains(long id) {
+    if (!snapshotsById.containsKey(id)) {
+      ensureLoaded();
+    }
+
+    return snapshotsById.containsKey(id);
+  }
+
+  Map<String, SnapshotRef> refs() {
+    return refs;
+  }
+
+  SnapshotRef ref(String name) {
+    return refs.get(name);
+  }
+
+  private void ensureLoaded() {
+    if (!loaded && snapshotsSupplier != null) {
+      this.snapshots = ImmutableList.copyOf(snapshotsSupplier.get());
+      this.snapshotsById = indexSnapshotsById(snapshots);
+    }
+
+    if (!loaded && refsSupplier != null) {
+      this.refs = SerializableMap.copyOf(refsSupplier.get());
+    }
+
+    loaded = true;
+  }
+
+  void validate(long currentSnapshotId, long lastSequenceNumber) {
+    validateSnapshots(lastSequenceNumber);
+    validateRefs(currentSnapshotId);
+  }
+
+  private void validateSnapshots(long lastSequenceNumber) {
+    for (Snapshot snap : snapshots) {
+      ValidationException.check(
+          snap.sequenceNumber() <= lastSequenceNumber,
+          "Invalid snapshot with sequence number %s greater than last sequence number %s",
+          snap.sequenceNumber(),
+          lastSequenceNumber);
+    }
+  }
+
+  private void validateRefs(long currentSnapshotId) {
+    for (SnapshotRef ref : refs.values()) {
+      Preconditions.checkArgument(
+          snapshotsById.containsKey(ref.snapshotId()),
+          "Snapshot for reference %s does not exist in the existing snapshots list",
+          ref);
+    }
+
+    SnapshotRef main = refs.get(SnapshotRef.MAIN_BRANCH);
+    if (currentSnapshotId != -1) {
+      Preconditions.checkArgument(
+          main == null || currentSnapshotId == main.snapshotId(),
+          "Current snapshot ID does not match main branch (%s != %s)",
+          currentSnapshotId,
+          main != null ? main.snapshotId() : null);
+    } else {
+      Preconditions.checkArgument(
+          main == null, "Current snapshot is not set, but main branch exists: %s", main);
+    }
+  }
+
+  private static Map<Long, Snapshot> indexSnapshotsById(List<Snapshot> snapshots) {
+    return SerializableMap.copyOf(
+        snapshots.stream().collect(Collectors.toMap(Snapshot::snapshotId, Function.identity())));
+  }
+
+  public static Builder buildFrom(SnapshotOperations base) {
+    return new Builder(base);
+  }
+
+  public static Builder buildFromEmpty() {
+    return new Builder();
+  }
+
+  public static SnapshotOperations empty() {
+    return SnapshotOperations.buildFromEmpty().build();
+  }
+
+  static class Builder {
+    private List<Snapshot> snapshots;
+    private SerializableSupplier<List<Snapshot>> snapshotsSupplier;
+    private final Map<Long, Snapshot> snapshotsById;
+    private Map<String, SnapshotRef> refs;
+    private SerializableSupplier<Map<String, SnapshotRef>> refsSupplier;
+
+    Builder() {
+      this.snapshots = Lists.newArrayList();
+      this.refs = Maps.newHashMap();
+
+      this.snapshotsById = Maps.newHashMap();
+    }
+
+    Builder(SnapshotOperations base) {
+      this.snapshots = Lists.newArrayList(base.snapshots);
+      this.snapshotsSupplier = base.snapshotsSupplier;
+      this.refs = Maps.newHashMap(base.refs);
+      this.refsSupplier = base.refsSupplier;
+
+      this.snapshotsById = indexSnapshotsById(snapshots);

Review Comment:
   wouldn't `this.snapshotsById = base.snapshotsById` give the same result here?



##########
core/src/main/java/org/apache/iceberg/SnapshotOperations.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.SerializableSupplier;
+
+/**
+ * SnapshotOperations abstracts access to snapshots for table metadata. This allows a subset of
+ * snapshots/refs to be loaded initially for operations that do not require all to be present (e.g.
+ * table scans of the current snapshot).
+ *
+ * <p>In the event that snapshots/refs need to be accessed, the provided supplier will be invoked,
+ * which must provide all addressable snapshots for the table.
+ */
+class SnapshotOperations implements Serializable {
+  private List<Snapshot> snapshots;
+  private final SerializableSupplier<List<Snapshot>> snapshotsSupplier;
+  private Map<Long, Snapshot> snapshotsById;
+  private Map<String, SnapshotRef> refs;
+  private final SerializableSupplier<Map<String, SnapshotRef>> refsSupplier;
+  private boolean loaded;
+
+  private SnapshotOperations(
+      List<Snapshot> snapshots,
+      SerializableSupplier<List<Snapshot>> snapshotsSupplier,
+      Map<String, SnapshotRef> refs,
+      SerializableSupplier<Map<String, SnapshotRef>> refsSupplier) {
+    this.snapshots = ImmutableList.copyOf(snapshots);
+    this.snapshotsSupplier = snapshotsSupplier;
+    this.refs = SerializableMap.copyOf(refs);
+    this.refsSupplier = refsSupplier;
+
+    this.snapshotsById = indexSnapshotsById(snapshots);
+  }
+
+  List<Snapshot> snapshots() {
+    ensureLoaded();
+
+    return snapshots;
+  }
+
+  Snapshot snapshot(long id) {
+    if (!snapshotsById.containsKey(id)) {
+      ensureLoaded();
+    }
+
+    return snapshotsById.get(id);
+  }
+
+  boolean contains(long id) {
+    if (!snapshotsById.containsKey(id)) {
+      ensureLoaded();
+    }
+
+    return snapshotsById.containsKey(id);
+  }
+
+  Map<String, SnapshotRef> refs() {
+    return refs;
+  }
+
+  SnapshotRef ref(String name) {
+    return refs.get(name);
+  }
+
+  private void ensureLoaded() {
+    if (!loaded && snapshotsSupplier != null) {
+      this.snapshots = ImmutableList.copyOf(snapshotsSupplier.get());
+      this.snapshotsById = indexSnapshotsById(snapshots);
+    }
+
+    if (!loaded && refsSupplier != null) {
+      this.refs = SerializableMap.copyOf(refsSupplier.get());
+    }
+
+    loaded = true;
+  }
+
+  void validate(long currentSnapshotId, long lastSequenceNumber) {
+    validateSnapshots(lastSequenceNumber);
+    validateRefs(currentSnapshotId);
+  }
+
+  private void validateSnapshots(long lastSequenceNumber) {

Review Comment:
   I think it would be good to add some validation tests as part of `TestSnapshotOperations` as I can't find anything in the codebase that specifically tests for those exception messages around snapshots/refs



##########
core/src/test/java/org/apache/iceberg/TestSnapshotOperations.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+@RunWith(Parameterized.class)
+public class TestSnapshotOperations extends TableTestBase {
+  @Parameterized.Parameters(name = "formatVersion = {0}")
+  public static Object[] parameters() {
+    return new Object[] {1, 2};
+  }
+
+  public TestSnapshotOperations(int formatVersion) {
+    super(formatVersion);
+  }
+
+  private Snapshot currentSnapshot;
+  private List<Snapshot> allSnapshots;
+  private SerializableSupplier<List<Snapshot>> snapshotSupplier;
+
+  @Before
+  public void before() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    table.newFastAppend().appendFile(FILE_B).commit();
+
+    this.currentSnapshot = table.currentSnapshot();
+    this.allSnapshots = Lists.newArrayList(table.snapshots());
+
+    // Anonymous class is required for proper mocking as opposed to lambda
+    this.snapshotSupplier =
+        new SerializableSupplier<List<Snapshot>>() {
+          @Override
+          public List<Snapshot> get() {
+            return allSnapshots;
+          }
+        };
+  }
+
+  @Test
+  public void testSnapshotsLoadBehavior() {
+    SerializableSupplier<List<Snapshot>> snapshotsSupplierMock = Mockito.spy(snapshotSupplier);
+
+    List<Snapshot> initialSnapshots =
+        currentSnapshot != null
+            ? Collections.singletonList(currentSnapshot)
+            : Collections.emptyList();
+
+    SnapshotOperations snapshotOperations =
+        SnapshotOperations.buildFromEmpty()
+            .snapshots(initialSnapshots)
+            .snapshotsSupplier(snapshotsSupplierMock)
+            .build();
+
+    snapshotOperations.snapshots();
+    snapshotOperations.snapshots();
+    snapshotOperations.snapshots();

Review Comment:
   I think it would make sense to also add calls to `snapshotOperations.contains(-1L);` and `snapshotOperations.snapshot(-1L);` just to make sure those calls don't trigger another load



##########
core/src/test/java/org/apache/iceberg/TestSnapshotOperations.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+@RunWith(Parameterized.class)
+public class TestSnapshotOperations extends TableTestBase {
+  @Parameterized.Parameters(name = "formatVersion = {0}")
+  public static Object[] parameters() {
+    return new Object[] {1, 2};
+  }
+
+  public TestSnapshotOperations(int formatVersion) {
+    super(formatVersion);
+  }
+
+  private Snapshot currentSnapshot;
+  private List<Snapshot> allSnapshots;
+  private SerializableSupplier<List<Snapshot>> snapshotSupplier;
+
+  @Before
+  public void before() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    table.newFastAppend().appendFile(FILE_B).commit();
+
+    this.currentSnapshot = table.currentSnapshot();
+    this.allSnapshots = Lists.newArrayList(table.snapshots());
+
+    // Anonymous class is required for proper mocking as opposed to lambda
+    this.snapshotSupplier =
+        new SerializableSupplier<List<Snapshot>>() {
+          @Override
+          public List<Snapshot> get() {
+            return allSnapshots;
+          }
+        };
+  }
+
+  @Test
+  public void testSnapshotsLoadBehavior() {
+    SerializableSupplier<List<Snapshot>> snapshotsSupplierMock = Mockito.spy(snapshotSupplier);
+
+    List<Snapshot> initialSnapshots =
+        currentSnapshot != null
+            ? Collections.singletonList(currentSnapshot)
+            : Collections.emptyList();
+
+    SnapshotOperations snapshotOperations =
+        SnapshotOperations.buildFromEmpty()
+            .snapshots(initialSnapshots)
+            .snapshotsSupplier(snapshotsSupplierMock)
+            .build();
+
+    snapshotOperations.snapshots();
+    snapshotOperations.snapshots();
+    snapshotOperations.snapshots();
+
+    // Verify that the snapshot supplier only gets called once even with repeated calls to load
+    // snapshots
+    verify(snapshotsSupplierMock, times(1)).get();
+  }
+
+  @Test
+  public void testUnloadedSnapshotReference() {
+    SerializableSupplier<List<Snapshot>> snapshotsSupplierMock = Mockito.spy(snapshotSupplier);
+
+    List<Snapshot> initialSnapshots =
+        currentSnapshot != null
+            ? Collections.singletonList(currentSnapshot)
+            : Collections.emptyList();
+
+    SnapshotOperations snapshotOperations =
+        SnapshotOperations.buildFromEmpty()
+            .snapshots(initialSnapshots)
+            .snapshotsSupplier(snapshotsSupplierMock)
+            .build();
+
+    // Ensure all snapshots are reachable
+    allSnapshots.forEach(
+        snapshot ->
+            assertThat(snapshotOperations.snapshot(snapshot.snapshotId())).isEqualTo(snapshot));
+
+    // Verify that loading was called while checking snapshots
+    verify(snapshotsSupplierMock, times(1)).get();
+  }
+
+  @Test
+  public void testCurrentSnapshot() {
+    SerializableSupplier<List<Snapshot>> snapshotsSupplierMock = Mockito.spy(snapshotSupplier);
+
+    SnapshotOperations snapshotOperations =
+        SnapshotOperations.buildFromEmpty()
+            .add(currentSnapshot)
+            .snapshotsSupplier(snapshotsSupplierMock)
+            .addRef(
+                SnapshotRef.MAIN_BRANCH,
+                SnapshotRef.branchBuilder(currentSnapshot.snapshotId()).build())
+            .build();
+
+    TableMetadata testMetadata =
+        TableMetadata.buildFrom(table.ops().current())
+            .setSnapshotOperations(snapshotOperations)
+            .build();
+
+    Table t1 = new BaseTable(new MetadataTableOperations(table.io(), testMetadata), "temp");
+
+    // Loading the current snapshot should not invoke the supplier
+    t1.currentSnapshot();
+
+    // Performing a table scan should not invoke supplier
+    t1.newScan().planFiles().forEach(t -> {});
+
+    // Performing a table scan on main should not invoke supplier
+    t1.newScan().useRef(SnapshotRef.MAIN_BRANCH).planFiles().forEach(t -> {});
+
+    verify(snapshotsSupplierMock, times(0)).get();
+  }
+
+  @Test
+  public void testSnapshotOperationsSerialization() throws IOException, ClassNotFoundException {
+    final List<Snapshot> allSnapshots = Lists.newArrayList(table.snapshots());
+
+    SnapshotOperations snapshotOperations =
+        SnapshotOperations.buildFromEmpty()
+            .add(currentSnapshot)
+            .snapshotsSupplier(() -> allSnapshots)
+            .build();
+
+    SnapshotOperations result = TestHelpers.roundTripSerialize(snapshotOperations);
+    SnapshotOperations kryoResult = TestHelpers.KryoHelpers.roundTripSerialize(snapshotOperations);
+
+    assertThat(table.snapshots()).containsExactlyElementsOf(result.snapshots());
+    assertThat(table.snapshots()).containsExactlyElementsOf(kryoResult.snapshots());
+  }
+
+  private static class MetadataTableOperations implements TableOperations {
+    private FileIO io;
+    private TableMetadata currentMetadata;
+
+    public MetadataTableOperations(FileIO io, TableMetadata currentMetadata) {
+      this.io = io;
+      this.currentMetadata = currentMetadata;
+    }
+
+    @Override
+    public TableMetadata current() {
+      return currentMetadata;
+    }
+
+    @Override
+    public TableMetadata refresh() {
+      throw new UnsupportedOperationException("refresh not support for test ops implementation.");

Review Comment:
   nit (here and all the places further below): not support -> not supported
    



##########
core/src/main/java/org/apache/iceberg/SnapshotOperations.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.SerializableSupplier;
+
+/**
+ * SnapshotOperations abstracts access to snapshots for table metadata. This allows a subset of
+ * snapshots/refs to be loaded initially for operations that do not require all to be present (e.g.
+ * table scans of the current snapshot).
+ *
+ * <p>In the event that snapshots/refs need to be accessed, the provided supplier will be invoked,
+ * which must provide all addressable snapshots for the table.
+ */
+class SnapshotOperations implements Serializable {
+  private List<Snapshot> snapshots;
+  private final SerializableSupplier<List<Snapshot>> snapshotsSupplier;
+  private Map<Long, Snapshot> snapshotsById;
+  private Map<String, SnapshotRef> refs;
+  private final SerializableSupplier<Map<String, SnapshotRef>> refsSupplier;
+  private boolean loaded;
+
+  private SnapshotOperations(
+      List<Snapshot> snapshots,
+      SerializableSupplier<List<Snapshot>> snapshotsSupplier,
+      Map<String, SnapshotRef> refs,
+      SerializableSupplier<Map<String, SnapshotRef>> refsSupplier) {
+    this.snapshots = ImmutableList.copyOf(snapshots);
+    this.snapshotsSupplier = snapshotsSupplier;
+    this.refs = SerializableMap.copyOf(refs);
+    this.refsSupplier = refsSupplier;
+
+    this.snapshotsById = indexSnapshotsById(snapshots);
+  }
+
+  List<Snapshot> snapshots() {
+    ensureLoaded();
+
+    return snapshots;
+  }
+
+  Snapshot snapshot(long id) {
+    if (!snapshotsById.containsKey(id)) {
+      ensureLoaded();
+    }
+
+    return snapshotsById.get(id);
+  }
+
+  boolean contains(long id) {
+    if (!snapshotsById.containsKey(id)) {
+      ensureLoaded();
+    }
+
+    return snapshotsById.containsKey(id);
+  }
+
+  Map<String, SnapshotRef> refs() {
+    return refs;
+  }
+
+  SnapshotRef ref(String name) {
+    return refs.get(name);
+  }
+
+  private void ensureLoaded() {
+    if (!loaded && snapshotsSupplier != null) {
+      this.snapshots = ImmutableList.copyOf(snapshotsSupplier.get());
+      this.snapshotsById = indexSnapshotsById(snapshots);
+    }
+
+    if (!loaded && refsSupplier != null) {
+      this.refs = SerializableMap.copyOf(refsSupplier.get());
+    }
+
+    loaded = true;
+  }
+
+  void validate(long currentSnapshotId, long lastSequenceNumber) {
+    validateSnapshots(lastSequenceNumber);
+    validateRefs(currentSnapshotId);
+  }
+
+  private void validateSnapshots(long lastSequenceNumber) {
+    for (Snapshot snap : snapshots) {
+      ValidationException.check(
+          snap.sequenceNumber() <= lastSequenceNumber,
+          "Invalid snapshot with sequence number %s greater than last sequence number %s",
+          snap.sequenceNumber(),
+          lastSequenceNumber);
+    }
+  }
+
+  private void validateRefs(long currentSnapshotId) {
+    for (SnapshotRef ref : refs.values()) {
+      Preconditions.checkArgument(
+          snapshotsById.containsKey(ref.snapshotId()),
+          "Snapshot for reference %s does not exist in the existing snapshots list",
+          ref);
+    }
+
+    SnapshotRef main = refs.get(SnapshotRef.MAIN_BRANCH);
+    if (currentSnapshotId != -1) {
+      Preconditions.checkArgument(
+          main == null || currentSnapshotId == main.snapshotId(),
+          "Current snapshot ID does not match main branch (%s != %s)",
+          currentSnapshotId,
+          main != null ? main.snapshotId() : null);
+    } else {
+      Preconditions.checkArgument(
+          main == null, "Current snapshot is not set, but main branch exists: %s", main);
+    }
+  }
+
+  private static Map<Long, Snapshot> indexSnapshotsById(List<Snapshot> snapshots) {
+    return SerializableMap.copyOf(
+        snapshots.stream().collect(Collectors.toMap(Snapshot::snapshotId, Function.identity())));
+  }
+
+  public static Builder buildFrom(SnapshotOperations base) {
+    return new Builder(base);
+  }
+
+  public static Builder buildFromEmpty() {
+    return new Builder();
+  }
+
+  public static SnapshotOperations empty() {
+    return SnapshotOperations.buildFromEmpty().build();
+  }
+
+  static class Builder {
+    private List<Snapshot> snapshots;
+    private SerializableSupplier<List<Snapshot>> snapshotsSupplier;
+    private final Map<Long, Snapshot> snapshotsById;
+    private Map<String, SnapshotRef> refs;
+    private SerializableSupplier<Map<String, SnapshotRef>> refsSupplier;
+
+    Builder() {
+      this.snapshots = Lists.newArrayList();
+      this.refs = Maps.newHashMap();
+
+      this.snapshotsById = Maps.newHashMap();
+    }
+
+    Builder(SnapshotOperations base) {
+      this.snapshots = Lists.newArrayList(base.snapshots);
+      this.snapshotsSupplier = base.snapshotsSupplier;
+      this.refs = Maps.newHashMap(base.refs);
+      this.refsSupplier = base.refsSupplier;
+
+      this.snapshotsById = indexSnapshotsById(snapshots);
+    }
+
+    Builder snapshots(List<Snapshot> snapshots1) {
+      this.snapshots = snapshots1;
+      return this;
+    }
+
+    Builder snapshotsSupplier(SerializableSupplier<List<Snapshot>> snapshotsSupplier1) {
+      this.snapshotsSupplier = snapshotsSupplier1;
+      return this;
+    }
+
+    Snapshot snapshot(Long id) {
+      return snapshotsById.get(id);
+    }
+
+    Builder add(Snapshot snapshot) {
+      ValidationException.check(
+          !snapshotsById.containsKey(snapshot.snapshotId()),

Review Comment:
   given that there's some substantial logic and validation happening in the builder I think it would also be good to add a few tests for this



##########
core/src/main/java/org/apache/iceberg/SnapshotOperations.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.SerializableSupplier;
+
+/**
+ * SnapshotOperations abstracts access to snapshots for table metadata. This allows a subset of
+ * snapshots/refs to be loaded initially for operations that do not require all to be present (e.g.
+ * table scans of the current snapshot).
+ *
+ * <p>In the event that snapshots/refs need to be accessed, the provided supplier will be invoked,
+ * which must provide all addressable snapshots for the table.
+ */
+class SnapshotOperations implements Serializable {
+  private List<Snapshot> snapshots;
+  private final SerializableSupplier<List<Snapshot>> snapshotsSupplier;
+  private Map<Long, Snapshot> snapshotsById;
+  private Map<String, SnapshotRef> refs;
+  private final SerializableSupplier<Map<String, SnapshotRef>> refsSupplier;
+  private boolean loaded;
+
+  private SnapshotOperations(
+      List<Snapshot> snapshots,
+      SerializableSupplier<List<Snapshot>> snapshotsSupplier,
+      Map<String, SnapshotRef> refs,
+      SerializableSupplier<Map<String, SnapshotRef>> refsSupplier) {
+    this.snapshots = ImmutableList.copyOf(snapshots);
+    this.snapshotsSupplier = snapshotsSupplier;
+    this.refs = SerializableMap.copyOf(refs);
+    this.refsSupplier = refsSupplier;
+
+    this.snapshotsById = indexSnapshotsById(snapshots);
+  }
+
+  List<Snapshot> snapshots() {
+    ensureLoaded();
+
+    return snapshots;
+  }
+
+  Snapshot snapshot(long id) {
+    if (!snapshotsById.containsKey(id)) {
+      ensureLoaded();
+    }
+
+    return snapshotsById.get(id);
+  }
+
+  boolean contains(long id) {
+    if (!snapshotsById.containsKey(id)) {
+      ensureLoaded();
+    }
+
+    return snapshotsById.containsKey(id);
+  }
+
+  Map<String, SnapshotRef> refs() {
+    return refs;
+  }
+
+  SnapshotRef ref(String name) {
+    return refs.get(name);
+  }
+
+  private void ensureLoaded() {
+    if (!loaded && snapshotsSupplier != null) {
+      this.snapshots = ImmutableList.copyOf(snapshotsSupplier.get());
+      this.snapshotsById = indexSnapshotsById(snapshots);
+    }
+
+    if (!loaded && refsSupplier != null) {
+      this.refs = SerializableMap.copyOf(refsSupplier.get());
+    }
+
+    loaded = true;
+  }
+
+  void validate(long currentSnapshotId, long lastSequenceNumber) {
+    validateSnapshots(lastSequenceNumber);
+    validateRefs(currentSnapshotId);
+  }
+
+  private void validateSnapshots(long lastSequenceNumber) {
+    for (Snapshot snap : snapshots) {
+      ValidationException.check(
+          snap.sequenceNumber() <= lastSequenceNumber,
+          "Invalid snapshot with sequence number %s greater than last sequence number %s",
+          snap.sequenceNumber(),
+          lastSequenceNumber);
+    }
+  }
+
+  private void validateRefs(long currentSnapshotId) {
+    for (SnapshotRef ref : refs.values()) {
+      Preconditions.checkArgument(
+          snapshotsById.containsKey(ref.snapshotId()),
+          "Snapshot for reference %s does not exist in the existing snapshots list",
+          ref);
+    }
+
+    SnapshotRef main = refs.get(SnapshotRef.MAIN_BRANCH);
+    if (currentSnapshotId != -1) {
+      Preconditions.checkArgument(
+          main == null || currentSnapshotId == main.snapshotId(),
+          "Current snapshot ID does not match main branch (%s != %s)",
+          currentSnapshotId,
+          main != null ? main.snapshotId() : null);
+    } else {
+      Preconditions.checkArgument(
+          main == null, "Current snapshot is not set, but main branch exists: %s", main);
+    }
+  }
+
+  private static Map<Long, Snapshot> indexSnapshotsById(List<Snapshot> snapshots) {
+    return SerializableMap.copyOf(
+        snapshots.stream().collect(Collectors.toMap(Snapshot::snapshotId, Function.identity())));
+  }
+
+  public static Builder buildFrom(SnapshotOperations base) {
+    return new Builder(base);
+  }
+
+  public static Builder buildFromEmpty() {

Review Comment:
   +1 on naming it `builder()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org