You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2023/02/20 17:25:59 UTC

[iceberg] branch master updated: Core: Add REST support for lazy snapshot loading (#6850)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 237e991466 Core: Add REST support for lazy snapshot loading (#6850)
237e991466 is described below

commit 237e99146656f4a76e5b31d4daa18d6c35ddabf3
Author: Daniel Weeks <dw...@apache.org>
AuthorDate: Mon Feb 20 09:25:52 2023 -0800

    Core: Add REST support for lazy snapshot loading (#6850)
---
 .../apache/iceberg/rest/RESTSessionCatalog.java    |  47 ++++++++-
 .../org/apache/iceberg/rest/TestRESTCatalog.java   | 114 +++++++++++++++++++++
 open-api/rest-catalog-open-api.yaml                |  13 +++
 3 files changed, 169 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index cbe733e745..e214759802 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.time.Duration;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Future;
@@ -88,6 +89,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
     implements Configurable<Configuration>, Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(RESTSessionCatalog.class);
   private static final String REST_METRICS_REPORTING_ENABLED = "rest-metrics-reporting-enabled";
+  private static final String REST_SNAPSHOT_LOADING_MODE = "snapshot-loading-mode";
   private static final List<String> TOKEN_PREFERENCE_ORDER =
       ImmutableList.of(
           OAuth2Properties.ID_TOKEN_TYPE,
@@ -102,6 +104,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
   private boolean refreshAuthByDefault = false;
   private RESTClient client = null;
   private ResourcePaths paths = null;
+  private SnapshotMode snapshotMode = null;
   private Object conf = null;
   private FileIO io = null;
   private MetricsReporter reporter = null;
@@ -110,6 +113,15 @@ public class RESTSessionCatalog extends BaseSessionCatalog
   // a lazy thread pool for token refresh
   private volatile ScheduledExecutorService refreshExecutor = null;
 
+  enum SnapshotMode {
+    ALL,
+    REFS;
+
+    Map<String, String> params() {
+      return ImmutableMap.of("snapshots", this.name().toLowerCase(Locale.US));
+    }
+  }
+
   public RESTSessionCatalog() {
     this(config -> HTTPClient.builder().uri(config.get(CatalogProperties.URI)).build());
   }
@@ -179,6 +191,13 @@ public class RESTSessionCatalog extends BaseSessionCatalog
     this.io =
         CatalogUtil.loadFileIO(
             ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), mergedProps, conf);
+
+    this.snapshotMode =
+        SnapshotMode.valueOf(
+            PropertyUtil.propertyAsString(
+                    mergedProps, REST_SNAPSHOT_LOADING_MODE, SnapshotMode.ALL.name())
+                .toUpperCase(Locale.US));
+
     String metricsReporterImpl = mergedProps.get(CatalogProperties.METRICS_REPORTER_IMPL);
     this.reporter =
         null != metricsReporterImpl
@@ -263,9 +282,11 @@ public class RESTSessionCatalog extends BaseSessionCatalog
     client.post(paths.rename(), request, null, headers(context), ErrorHandlers.tableErrorHandler());
   }
 
-  private LoadTableResponse loadInternal(SessionContext context, TableIdentifier identifier) {
+  private LoadTableResponse loadInternal(
+      SessionContext context, TableIdentifier identifier, SnapshotMode mode) {
     return client.get(
         paths.table(identifier),
+        mode.params(),
         LoadTableResponse.class,
         headers(context),
         ErrorHandlers.tableErrorHandler());
@@ -279,7 +300,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
     LoadTableResponse response;
     TableIdentifier loadedIdent;
     try {
-      response = loadInternal(context, identifier);
+      response = loadInternal(context, identifier, snapshotMode);
       loadedIdent = identifier;
       metadataType = null;
 
@@ -289,7 +310,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
         // attempt to load a metadata table using the identifier's namespace as the base table
         TableIdentifier baseIdent = TableIdentifier.of(identifier.namespace().levels());
         try {
-          response = loadInternal(context, baseIdent);
+          response = loadInternal(context, baseIdent, snapshotMode);
           loadedIdent = baseIdent;
         } catch (NoSuchTableException ignored) {
           // the base table does not exist
@@ -302,13 +323,29 @@ public class RESTSessionCatalog extends BaseSessionCatalog
     }
 
     AuthSession session = tableSession(response.config(), session(context));
+    TableMetadata tableMetadata;
+
+    if (snapshotMode == SnapshotMode.REFS) {
+      tableMetadata =
+          TableMetadata.buildFrom(response.tableMetadata())
+              .setSnapshotsSupplier(
+                  () ->
+                      loadInternal(context, identifier, SnapshotMode.ALL)
+                          .tableMetadata()
+                          .snapshots())
+              .discardChanges()
+              .build();
+    } else {
+      tableMetadata = response.tableMetadata();
+    }
+
     RESTTableOperations ops =
         new RESTTableOperations(
             client,
             paths.table(loadedIdent),
             session::headers,
             tableFileIO(response.config()),
-            response.tableMetadata());
+            tableMetadata);
 
     TableIdentifier tableIdentifier = loadedIdent;
     BaseTable table =
@@ -588,7 +625,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
 
     @Override
     public Transaction replaceTransaction() {
-      LoadTableResponse response = loadInternal(context, ident);
+      LoadTableResponse response = loadInternal(context, ident, snapshotMode);
       String fullName = fullTableName(ident);
 
       AuthSession session = tableSession(response.config(), session(context));
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index 0578bff297..0360055a54 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -30,9 +31,11 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.CatalogProperties;
@@ -40,7 +43,9 @@ import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.catalog.CatalogTests;
 import org.apache.iceberg.catalog.SessionCatalog;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -49,7 +54,9 @@ import org.apache.iceberg.jdbc.JdbcCatalog;
 import org.apache.iceberg.metrics.MetricsReport;
 import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod;
+import org.apache.iceberg.rest.RESTSessionCatalog.SnapshotMode;
 import org.apache.iceberg.rest.auth.AuthSessionUtil;
 import org.apache.iceberg.rest.auth.OAuth2Properties;
 import org.apache.iceberg.rest.auth.OAuth2Util;
@@ -737,6 +744,113 @@ public class TestRESTCatalog extends CatalogTests<RESTCatalog> {
         ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=table-user"));
   }
 
+  @Test
+  public void testSnapshotParams() {
+    assertThat(SnapshotMode.ALL.params()).isEqualTo(ImmutableMap.of("snapshots", "all"));
+
+    assertThat(SnapshotMode.REFS.params()).isEqualTo(ImmutableMap.of("snapshots", "refs"));
+  }
+
+  @Test
+  public void testTableSnapshotLoading() {
+    RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+    RESTCatalog catalog =
+        new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
+    catalog.initialize(
+        "test",
+        ImmutableMap.of(
+            CatalogProperties.URI,
+            "ignored",
+            CatalogProperties.FILE_IO_IMPL,
+            "org.apache.iceberg.io.InMemoryFileIO",
+            // default loading to refs only
+            "snapshot-loading-mode",
+            "refs"));
+
+    // Create a table with multiple snapshots
+    Table table = catalog.createTable(TABLE, SCHEMA);
+    table
+        .newFastAppend()
+        .appendFile(
+            DataFiles.builder(PartitionSpec.unpartitioned())
+                .withPath("/path/to/data-a.parquet")
+                .withFileSizeInBytes(10)
+                .withRecordCount(2)
+                .build())
+        .commit();
+
+    table
+        .newFastAppend()
+        .appendFile(
+            DataFiles.builder(PartitionSpec.unpartitioned())
+                .withPath("/path/to/data-b.parquet")
+                .withFileSizeInBytes(10)
+                .withRecordCount(2)
+                .build())
+        .commit();
+
+    ResourcePaths paths = ResourcePaths.forCatalogProperties(Maps.newHashMap());
+
+    // Respond with only referenced snapshots
+    Answer<?> refsAnswer =
+        invocation -> {
+          LoadTableResponse originalResponse = (LoadTableResponse) invocation.callRealMethod();
+          TableMetadata fullTableMetadata = originalResponse.tableMetadata();
+
+          Set<Long> referencedSnapshotIds =
+              fullTableMetadata.refs().values().stream()
+                  .map(SnapshotRef::snapshotId)
+                  .collect(Collectors.toSet());
+
+          TableMetadata refsMetadata =
+              fullTableMetadata.removeSnapshotsIf(
+                  s -> !referencedSnapshotIds.contains(s.snapshotId()));
+
+          return LoadTableResponse.builder()
+              .withTableMetadata(refsMetadata)
+              .addAllConfig(originalResponse.config())
+              .build();
+        };
+
+    Mockito.doAnswer(refsAnswer)
+        .when(adapter)
+        .execute(
+            eq(HTTPMethod.GET),
+            eq(paths.table(TABLE)),
+            eq(ImmutableMap.of("snapshots", "refs")),
+            any(),
+            eq(LoadTableResponse.class),
+            any(),
+            any());
+
+    Table refsTables = catalog.loadTable(TABLE);
+
+    assertThat(refsTables.currentSnapshot()).isEqualTo(table.currentSnapshot());
+    // verify that the table was loaded with the refs argument
+    verify(adapter, times(1))
+        .execute(
+            eq(HTTPMethod.GET),
+            eq(paths.table(TABLE)),
+            eq(ImmutableMap.of("snapshots", "refs")),
+            any(),
+            eq(LoadTableResponse.class),
+            any(),
+            any());
+
+    // verify that all snapshots are loaded when referenced
+    assertThat(refsTables.snapshots()).containsExactlyInAnyOrderElementsOf(table.snapshots());
+    verify(adapter, times(1))
+        .execute(
+            eq(HTTPMethod.GET),
+            eq(paths.table(TABLE)),
+            eq(ImmutableMap.of("snapshots", "all")),
+            any(),
+            eq(LoadTableResponse.class),
+            any(),
+            any());
+  }
+
   public void testTableAuth(
       String catalogToken,
       Map<String, String> credentials,
diff --git a/open-api/rest-catalog-open-api.yaml b/open-api/rest-catalog-open-api.yaml
index d3f3a44890..663e2fbe78 100644
--- a/open-api/rest-catalog-open-api.yaml
+++ b/open-api/rest-catalog-open-api.yaml
@@ -523,6 +523,19 @@ paths:
         table. The configuration key "token" is used to pass an access token to be used as a bearer token
         for table requests. Otherwise, a token may be passed using a RFC 8693 token type as a configuration
         key. For example, "urn:ietf:params:oauth:token-type:jwt=<JWT-token>".
+      parameters:
+        - in: query
+          name: snapshots
+          description:
+            The snapshots to return in the body of the metadata. Setting the value to `all` would
+            return the full set of snapshots currently valid for the table. Setting the value to
+            `refs` would load all snapshots referenced by branches or tags.
+          
+            Default if no param is provided is `all`.
+          required: false
+          schema:
+            type: string
+            enum: [ all, refs ]
       responses:
         200:
           $ref: '#/components/responses/LoadTableResponse'