You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2023/02/20 17:25:59 UTC
[iceberg] branch master updated: Core: Add REST support for lazy snapshot loading (#6850)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 237e991466 Core: Add REST support for lazy snapshot loading (#6850)
237e991466 is described below
commit 237e99146656f4a76e5b31d4daa18d6c35ddabf3
Author: Daniel Weeks <dw...@apache.org>
AuthorDate: Mon Feb 20 09:25:52 2023 -0800
Core: Add REST support for lazy snapshot loading (#6850)
---
.../apache/iceberg/rest/RESTSessionCatalog.java | 47 ++++++++-
.../org/apache/iceberg/rest/TestRESTCatalog.java | 114 +++++++++++++++++++++
open-api/rest-catalog-open-api.yaml | 13 +++
3 files changed, 169 insertions(+), 5 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index cbe733e745..e214759802 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
@@ -88,6 +89,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
implements Configurable<Configuration>, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(RESTSessionCatalog.class);
private static final String REST_METRICS_REPORTING_ENABLED = "rest-metrics-reporting-enabled";
+ private static final String REST_SNAPSHOT_LOADING_MODE = "snapshot-loading-mode";
private static final List<String> TOKEN_PREFERENCE_ORDER =
ImmutableList.of(
OAuth2Properties.ID_TOKEN_TYPE,
@@ -102,6 +104,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
private boolean refreshAuthByDefault = false;
private RESTClient client = null;
private ResourcePaths paths = null;
+ private SnapshotMode snapshotMode = null;
private Object conf = null;
private FileIO io = null;
private MetricsReporter reporter = null;
@@ -110,6 +113,15 @@ public class RESTSessionCatalog extends BaseSessionCatalog
// a lazy thread pool for token refresh
private volatile ScheduledExecutorService refreshExecutor = null;
+ enum SnapshotMode {
+ ALL,
+ REFS;
+
+ Map<String, String> params() {
+ return ImmutableMap.of("snapshots", this.name().toLowerCase(Locale.US));
+ }
+ }
+
public RESTSessionCatalog() {
this(config -> HTTPClient.builder().uri(config.get(CatalogProperties.URI)).build());
}
@@ -179,6 +191,13 @@ public class RESTSessionCatalog extends BaseSessionCatalog
this.io =
CatalogUtil.loadFileIO(
ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), mergedProps, conf);
+
+ this.snapshotMode =
+ SnapshotMode.valueOf(
+ PropertyUtil.propertyAsString(
+ mergedProps, REST_SNAPSHOT_LOADING_MODE, SnapshotMode.ALL.name())
+ .toUpperCase(Locale.US));
+
String metricsReporterImpl = mergedProps.get(CatalogProperties.METRICS_REPORTER_IMPL);
this.reporter =
null != metricsReporterImpl
@@ -263,9 +282,11 @@ public class RESTSessionCatalog extends BaseSessionCatalog
client.post(paths.rename(), request, null, headers(context), ErrorHandlers.tableErrorHandler());
}
- private LoadTableResponse loadInternal(SessionContext context, TableIdentifier identifier) {
+ private LoadTableResponse loadInternal(
+ SessionContext context, TableIdentifier identifier, SnapshotMode mode) {
return client.get(
paths.table(identifier),
+ mode.params(),
LoadTableResponse.class,
headers(context),
ErrorHandlers.tableErrorHandler());
@@ -279,7 +300,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
LoadTableResponse response;
TableIdentifier loadedIdent;
try {
- response = loadInternal(context, identifier);
+ response = loadInternal(context, identifier, snapshotMode);
loadedIdent = identifier;
metadataType = null;
@@ -289,7 +310,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
// attempt to load a metadata table using the identifier's namespace as the base table
TableIdentifier baseIdent = TableIdentifier.of(identifier.namespace().levels());
try {
- response = loadInternal(context, baseIdent);
+ response = loadInternal(context, baseIdent, snapshotMode);
loadedIdent = baseIdent;
} catch (NoSuchTableException ignored) {
// the base table does not exist
@@ -302,13 +323,29 @@ public class RESTSessionCatalog extends BaseSessionCatalog
}
AuthSession session = tableSession(response.config(), session(context));
+ TableMetadata tableMetadata;
+
+ if (snapshotMode == SnapshotMode.REFS) {
+ tableMetadata =
+ TableMetadata.buildFrom(response.tableMetadata())
+ .setSnapshotsSupplier(
+ () ->
+ loadInternal(context, identifier, SnapshotMode.ALL)
+ .tableMetadata()
+ .snapshots())
+ .discardChanges()
+ .build();
+ } else {
+ tableMetadata = response.tableMetadata();
+ }
+
RESTTableOperations ops =
new RESTTableOperations(
client,
paths.table(loadedIdent),
session::headers,
tableFileIO(response.config()),
- response.tableMetadata());
+ tableMetadata);
TableIdentifier tableIdentifier = loadedIdent;
BaseTable table =
@@ -588,7 +625,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
@Override
public Transaction replaceTransaction() {
- LoadTableResponse response = loadInternal(context, ident);
+ LoadTableResponse response = loadInternal(context, ident, snapshotMode);
String fullName = fullTableName(ident);
AuthSession session = tableSession(response.config(), session(context));
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index 0578bff297..0360055a54 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -30,9 +31,11 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
@@ -40,7 +43,9 @@ import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -49,7 +54,9 @@ import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod;
+import org.apache.iceberg.rest.RESTSessionCatalog.SnapshotMode;
import org.apache.iceberg.rest.auth.AuthSessionUtil;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.rest.auth.OAuth2Util;
@@ -737,6 +744,113 @@ public class TestRESTCatalog extends CatalogTests<RESTCatalog> {
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=table-user"));
}
+ @Test
+ public void testSnapshotParams() {
+ assertThat(SnapshotMode.ALL.params()).isEqualTo(ImmutableMap.of("snapshots", "all"));
+
+ assertThat(SnapshotMode.REFS.params()).isEqualTo(ImmutableMap.of("snapshots", "refs"));
+ }
+
+ @Test
+ public void testTableSnapshotLoading() {
+ RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+ RESTCatalog catalog =
+ new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
+ catalog.initialize(
+ "test",
+ ImmutableMap.of(
+ CatalogProperties.URI,
+ "ignored",
+ CatalogProperties.FILE_IO_IMPL,
+ "org.apache.iceberg.io.InMemoryFileIO",
+ // default loading to refs only
+ "snapshot-loading-mode",
+ "refs"));
+
+ // Create a table with multiple snapshots
+ Table table = catalog.createTable(TABLE, SCHEMA);
+ table
+ .newFastAppend()
+ .appendFile(
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(2)
+ .build())
+ .commit();
+
+ table
+ .newFastAppend()
+ .appendFile(
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-b.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(2)
+ .build())
+ .commit();
+
+ ResourcePaths paths = ResourcePaths.forCatalogProperties(Maps.newHashMap());
+
+ // Respond with only referenced snapshots
+ Answer<?> refsAnswer =
+ invocation -> {
+ LoadTableResponse originalResponse = (LoadTableResponse) invocation.callRealMethod();
+ TableMetadata fullTableMetadata = originalResponse.tableMetadata();
+
+ Set<Long> referencedSnapshotIds =
+ fullTableMetadata.refs().values().stream()
+ .map(SnapshotRef::snapshotId)
+ .collect(Collectors.toSet());
+
+ TableMetadata refsMetadata =
+ fullTableMetadata.removeSnapshotsIf(
+ s -> !referencedSnapshotIds.contains(s.snapshotId()));
+
+ return LoadTableResponse.builder()
+ .withTableMetadata(refsMetadata)
+ .addAllConfig(originalResponse.config())
+ .build();
+ };
+
+ Mockito.doAnswer(refsAnswer)
+ .when(adapter)
+ .execute(
+ eq(HTTPMethod.GET),
+ eq(paths.table(TABLE)),
+ eq(ImmutableMap.of("snapshots", "refs")),
+ any(),
+ eq(LoadTableResponse.class),
+ any(),
+ any());
+
+ Table refsTables = catalog.loadTable(TABLE);
+
+ assertThat(refsTables.currentSnapshot()).isEqualTo(table.currentSnapshot());
+ // verify that the table was loaded with the refs argument
+ verify(adapter, times(1))
+ .execute(
+ eq(HTTPMethod.GET),
+ eq(paths.table(TABLE)),
+ eq(ImmutableMap.of("snapshots", "refs")),
+ any(),
+ eq(LoadTableResponse.class),
+ any(),
+ any());
+
+ // verify that all snapshots are loaded when referenced
+ assertThat(refsTables.snapshots()).containsExactlyInAnyOrderElementsOf(table.snapshots());
+ verify(adapter, times(1))
+ .execute(
+ eq(HTTPMethod.GET),
+ eq(paths.table(TABLE)),
+ eq(ImmutableMap.of("snapshots", "all")),
+ any(),
+ eq(LoadTableResponse.class),
+ any(),
+ any());
+ }
+
public void testTableAuth(
String catalogToken,
Map<String, String> credentials,
diff --git a/open-api/rest-catalog-open-api.yaml b/open-api/rest-catalog-open-api.yaml
index d3f3a44890..663e2fbe78 100644
--- a/open-api/rest-catalog-open-api.yaml
+++ b/open-api/rest-catalog-open-api.yaml
@@ -523,6 +523,19 @@ paths:
table. The configuration key "token" is used to pass an access token to be used as a bearer token
for table requests. Otherwise, a token may be passed using a RFC 8693 token type as a configuration
key. For example, "urn:ietf:params:oauth:token-type:jwt=<JWT-token>".
+ parameters:
+ - in: query
+ name: snapshots
+ description:
+ The snapshots to return in the body of the metadata. Setting the value to `all` would
+ return the full set of snapshots currently valid for the table. Setting the value to
+ `refs` would load all snapshots referenced by branches or tags.
+
+ Default if no param is provided is `all`.
+ required: false
+ schema:
+ type: string
+ enum: [ all, refs ]
responses:
200:
$ref: '#/components/responses/LoadTableResponse'