You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/25 16:02:21 UTC
[iceberg] branch master updated: Core: Add session catalog implementation for REST (#4830)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c4c86113a Core: Add session catalog implementation for REST (#4830)
c4c86113a is described below
commit c4c86113adb459c56ea7893bb13b6388d467495c
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Wed May 25 09:02:13 2022 -0700
Core: Add session catalog implementation for REST (#4830)
---
baseline.gradle | 1 +
.../java/org/apache/iceberg/rest/RESTCatalog.java | 228 +++++++
.../apache/iceberg/rest/RESTCatalogProperties.java | 35 --
.../apache/iceberg/rest/RESTSessionCatalog.java | 230 +++++--
.../java/org/apache/iceberg/rest/RESTUtil.java | 23 +-
.../apache/iceberg/rest/auth/OAuth2Properties.java | 20 +
.../org/apache/iceberg/rest/auth/OAuth2Util.java | 138 ++++-
.../java/org/apache/iceberg/util/ThreadPools.java | 30 +-
.../apache/iceberg/rest/RESTCatalogAdapter.java | 7 +-
.../org/apache/iceberg/rest/TestRESTCatalog.java | 666 ++++++++++++++++++++-
10 files changed, 1260 insertions(+), 118 deletions(-)
diff --git a/baseline.gradle b/baseline.gradle
index 3e9ee9098..ec4208741 100644
--- a/baseline.gradle
+++ b/baseline.gradle
@@ -82,6 +82,7 @@ subprojects {
'-Xep:StrictUnusedVariable:OFF',
'-Xep:TypeParameterShadowing:OFF',
'-Xep:TypeParameterUnusedInFormals:OFF',
+ '-Xep:DangerousThreadPoolExecutorUsage:OFF',
)
}
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
new file mode 100644
index 000000000..63747a596
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.rest;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Configuration>, Closeable {
+ private final RESTSessionCatalog sessionCatalog;
+ private final Catalog delegate;
+ private final SupportsNamespaces nsDelegate;
+
+ public RESTCatalog() {
+ this(SessionCatalog.SessionContext.createEmpty(), new HTTPClientFactory());
+ }
+
+ public RESTCatalog(Function<Map<String, String>, RESTClient> clientBuilder) {
+ this(SessionCatalog.SessionContext.createEmpty(), clientBuilder);
+ }
+
+ public RESTCatalog(SessionCatalog.SessionContext context, Function<Map<String, String>, RESTClient> clientBuilder) {
+ this.sessionCatalog = new RESTSessionCatalog(clientBuilder);
+ this.delegate = sessionCatalog.asCatalog(context);
+ this.nsDelegate = (SupportsNamespaces) delegate;
+ }
+
+ @Override
+ public void initialize(String name, Map<String, String> props) {
+ Preconditions.checkArgument(props != null, "Invalid configuration: null");
+ sessionCatalog.initialize(name, props);
+ }
+
+ @Override
+ public String name() {
+ return sessionCatalog.name();
+ }
+
+ public Map<String, String> properties() {
+ return sessionCatalog.properties();
+ }
+
+ @Override
+ public List<TableIdentifier> listTables(Namespace ns) {
+ return delegate.listTables(ns);
+ }
+
+ @Override
+ public boolean tableExists(TableIdentifier ident) {
+ return delegate.tableExists(ident);
+ }
+
+ @Override
+ public Table loadTable(TableIdentifier ident) {
+ return delegate.loadTable(ident);
+ }
+
+ @Override
+ public void invalidateTable(TableIdentifier ident) {
+ delegate.invalidateTable(ident);
+ }
+
+ @Override
+ public TableBuilder buildTable(TableIdentifier ident, Schema schema) {
+ return delegate.buildTable(ident, schema);
+ }
+
+ @Override
+ public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec, String location,
+ Map<String, String> props) {
+ return delegate.createTable(ident, schema, spec, location, props);
+ }
+
+ @Override
+ public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec,
+ Map<String, String> props) {
+ return delegate.createTable(ident, schema, spec, props);
+ }
+
+ @Override
+ public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec) {
+ return delegate.createTable(ident, schema, spec);
+ }
+
+ @Override
+ public Table createTable(TableIdentifier identifier, Schema schema) {
+ return delegate.createTable(identifier, schema);
+ }
+
+ @Override
+ public Transaction newCreateTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec,
+ String location, Map<String, String> props) {
+ return delegate.newCreateTableTransaction(ident, schema, spec, location, props);
+ }
+
+ @Override
+ public Transaction newCreateTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec,
+ Map<String, String> props) {
+ return delegate.newCreateTableTransaction(ident, schema, spec, props);
+ }
+
+ @Override
+ public Transaction newCreateTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec) {
+ return delegate.newCreateTableTransaction(ident, schema, spec);
+ }
+
+ @Override
+ public Transaction newCreateTableTransaction(TableIdentifier identifier, Schema schema) {
+ return delegate.newCreateTableTransaction(identifier, schema);
+ }
+
+ @Override
+ public Transaction newReplaceTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec,
+ String location, Map<String, String> props, boolean orCreate) {
+ return delegate.newReplaceTableTransaction(ident, schema, spec, location, props, orCreate);
+ }
+
+ @Override
+ public Transaction newReplaceTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec,
+ Map<String, String> props, boolean orCreate) {
+ return delegate.newReplaceTableTransaction(ident, schema, spec, props, orCreate);
+ }
+
+ @Override
+ public Transaction newReplaceTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec,
+ boolean orCreate) {
+ return delegate.newReplaceTableTransaction(ident, schema, spec, orCreate);
+ }
+
+ @Override
+ public Transaction newReplaceTableTransaction(TableIdentifier ident, Schema schema, boolean orCreate) {
+ return delegate.newReplaceTableTransaction(ident, schema, orCreate);
+ }
+
+ @Override
+ public boolean dropTable(TableIdentifier ident) {
+ return delegate.dropTable(ident);
+ }
+
+ @Override
+ public boolean dropTable(TableIdentifier ident, boolean purge) {
+ return delegate.dropTable(ident, purge);
+ }
+
+ @Override
+ public void renameTable(TableIdentifier from, TableIdentifier to) {
+ delegate.renameTable(from, to);
+ }
+
+ @Override
+ public Table registerTable(TableIdentifier ident, String metadataFileLocation) {
+ return delegate.registerTable(ident, metadataFileLocation);
+ }
+
+ @Override
+ public void createNamespace(Namespace ns, Map<String, String> props) {
+ nsDelegate.createNamespace(ns, props);
+ }
+
+ @Override
+ public List<Namespace> listNamespaces(Namespace ns) throws NoSuchNamespaceException {
+ return nsDelegate.listNamespaces(ns);
+ }
+
+ @Override
+ public Map<String, String> loadNamespaceMetadata(Namespace ns) throws NoSuchNamespaceException {
+ return nsDelegate.loadNamespaceMetadata(ns);
+ }
+
+ @Override
+ public boolean dropNamespace(Namespace ns) throws NamespaceNotEmptyException {
+ return nsDelegate.dropNamespace(ns);
+ }
+
+ @Override
+ public boolean setProperties(Namespace ns, Map<String, String> props) throws NoSuchNamespaceException {
+ return nsDelegate.setProperties(ns, props);
+ }
+
+ @Override
+ public boolean removeProperties(Namespace ns, Set<String> props) throws NoSuchNamespaceException {
+ return nsDelegate.removeProperties(ns, props);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ sessionCatalog.setConf(conf);
+ }
+
+ @Override
+ public void close() throws IOException {
+ sessionCatalog.close();
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java
deleted file mode 100644
index 7aa291654..000000000
--- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.rest;
-
-/**
- * Catalog properties which are specific to the RESTCatalog, which can be used in conjunction with
- * {@link org.apache.iceberg.CatalogProperties}
- */
-public class RESTCatalogProperties {
-
- private RESTCatalogProperties() {
- }
-
- /**
- * A Bearer authorization token which will be used to authenticate requests with the server.
- */
- public static final String AUTH_TOKEN = "token";
-}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index f2b165fb5..cb8b5b0e3 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -19,11 +19,16 @@
package org.apache.iceberg.rest;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.Closeable;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
@@ -50,7 +55,9 @@ import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.auth.OAuth2Properties;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.auth.OAuth2Util.AuthSession;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
@@ -61,21 +68,33 @@ import org.apache.iceberg.rest.responses.GetNamespaceResponse;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RESTSessionCatalog extends BaseSessionCatalog implements Configurable<Configuration>, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(RESTSessionCatalog.class);
+ private static final long MAX_REFRESH_WINDOW_MILLIS = 60_000; // 1 minute
+ private static final long MIN_REFRESH_WAIT_MILLIS = 10;
+ private static final List<String> TOKEN_PREFERENCE_ORDER = ImmutableList.of(
+ OAuth2Properties.ID_TOKEN_TYPE, OAuth2Properties.ACCESS_TOKEN_TYPE, OAuth2Properties.JWT_TOKEN_TYPE,
+ OAuth2Properties.SAML2_TOKEN_TYPE, OAuth2Properties.SAML1_TOKEN_TYPE);
private final Function<Map<String, String>, RESTClient> clientBuilder;
+ private final Cache<String, AuthSession> sessions = Caffeine.newBuilder().build();
+ private AuthSession catalogAuth = null;
private RESTClient client = null;
- private Map<String, String> baseHeaders = ImmutableMap.of();
private ResourcePaths paths = null;
private Object conf = null;
private FileIO io = null;
+ // a lazy thread pool for token refresh
+ private volatile ScheduledExecutorService refreshExecutor = null;
+
public RESTSessionCatalog() {
this(new HTTPClientFactory());
}
@@ -87,18 +106,54 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
@Override
public void initialize(String name, Map<String, String> props) {
Preconditions.checkArgument(props != null, "Invalid configuration: null");
- ConfigResponse config = fetchConfig(props);
- Map<String, String> properties = config.merge(props);
- this.client = clientBuilder.apply(properties);
- this.baseHeaders = RESTUtil.extractPrefixMap(properties, "header.");
- this.paths = ResourcePaths.forCatalogProperties(properties);
- String ioImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
- this.io = CatalogUtil.loadFileIO(ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), properties, conf);
- super.initialize(name, properties);
+
+ long startTimeMillis = System.currentTimeMillis(); // keep track of the init start time for token refresh
+ String initToken = props.get(OAuth2Properties.TOKEN);
+
+ // fetch auth and config to complete initialization
+ ConfigResponse config;
+ OAuthTokenResponse authResponse;
+ try (RESTClient initClient = clientBuilder.apply(props)) {
+ Map<String, String> initHeaders = RESTUtil.merge(configHeaders(props), OAuth2Util.authHeaders(initToken));
+ String credential = props.get(OAuth2Properties.CREDENTIAL);
+ if (credential != null && !credential.isEmpty()) {
+ String scope = props.getOrDefault(OAuth2Properties.SCOPE, OAuth2Properties.CATALOG_SCOPE);
+ authResponse = OAuth2Util.fetchToken(initClient, initHeaders, credential, scope);
+ config = fetchConfig(initClient, RESTUtil.merge(initHeaders, OAuth2Util.authHeaders(authResponse.token())));
+ } else {
+ authResponse = null;
+ config = fetchConfig(initClient, initHeaders);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close HTTP client", e);
+ }
+
+ // build the final configuration and set up the catalog's auth
+ Map<String, String> mergedProps = config.merge(props);
+ Map<String, String> baseHeaders = configHeaders(mergedProps);
+ this.catalogAuth = new AuthSession(baseHeaders, null, null);
+ if (authResponse != null) {
+ this.catalogAuth = newSession(authResponse, startTimeMillis, catalogAuth);
+ } else if (initToken != null) {
+ this.catalogAuth = newSession(initToken, expiresInMs(mergedProps), catalogAuth);
+ }
+
+ this.client = clientBuilder.apply(mergedProps);
+ this.paths = ResourcePaths.forCatalogProperties(mergedProps);
+
+ String ioImpl = mergedProps.get(CatalogProperties.FILE_IO_IMPL);
+ this.io = CatalogUtil.loadFileIO(ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), mergedProps, conf);
+
+ super.initialize(name, mergedProps);
}
- protected Supplier<Map<String, String>> headers(SessionContext context) {
- return () -> baseHeaders;
+ private AuthSession session(SessionContext context) {
+ return sessions.get(context.sessionId(),
+ id -> newSession(context.credentials(), context.properties(), catalogAuth));
+ }
+
+ private Supplier<Map<String, String>> headers(SessionContext context) {
+ return session(context)::headers;
}
@Override
@@ -148,8 +203,9 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
public Table loadTable(SessionContext context, TableIdentifier identifier) {
LoadTableResponse response = loadInternal(context, identifier);
Pair<RESTClient, FileIO> clients = tableClients(response.config());
+ AuthSession session = tableSession(response.config(), session(context));
RESTTableOperations ops = new RESTTableOperations(
- clients.first(), paths.table(identifier), headers(context), clients.second(), response.tableMetadata());
+ clients.first(), paths.table(identifier), session::headers, clients.second(), response.tableMetadata());
return new BaseTable(ops, fullTableName(identifier));
}
@@ -222,11 +278,61 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
return !response.updated().isEmpty();
}
+ private ScheduledExecutorService tokenRefreshExecutor() {
+ if (refreshExecutor == null) {
+ synchronized (this) {
+ if (refreshExecutor == null) {
+ this.refreshExecutor = ThreadPools.newScheduledPool(name() + "-token-refresh", 1);
+ }
+ }
+ }
+
+ return refreshExecutor;
+ }
+
+ private void scheduleTokenRefresh(
+ AuthSession session, long startTimeMillis, long expiresIn, TimeUnit unit) {
+ // convert expiration interval to milliseconds
+ long expiresInMillis = unit.toMillis(expiresIn);
+ // how much ahead of time to start the request to allow it to complete
+ long refreshWindowMillis = Math.min(expiresInMillis / 10, MAX_REFRESH_WINDOW_MILLIS);
+ // how much time to wait before expiration
+ long waitIntervalMillis = expiresInMillis - refreshWindowMillis;
+ // how much time has already elapsed since the new token was issued
+ long elapsedMillis = System.currentTimeMillis() - startTimeMillis;
+ // how much time to actually wait
+ long timeToWait = Math.max(waitIntervalMillis - elapsedMillis, MIN_REFRESH_WAIT_MILLIS);
+
+ tokenRefreshExecutor().schedule(
+ () -> {
+ long refreshStartTime = System.currentTimeMillis();
+ Pair<Integer, TimeUnit> expiration = session.refresh(client);
+ if (expiration != null) {
+ scheduleTokenRefresh(session, refreshStartTime, expiration.first(), expiration.second());
+ }
+ },
+ timeToWait, TimeUnit.MILLISECONDS);
+ }
+
@Override
public void close() throws IOException {
if (client != null) {
client.close();
}
+
+ if (refreshExecutor != null) {
+ ScheduledExecutorService service = refreshExecutor;
+ this.refreshExecutor = null;
+
+ try {
+ if (service.awaitTermination(1, TimeUnit.MINUTES)) {
+ LOG.warn("Timed out waiting for refresh executor to terminate");
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for refresh executor to terminate", e);
+ Thread.currentThread().interrupt();
+ }
+ }
}
private class Builder implements Catalog.TableBuilder {
@@ -292,8 +398,9 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
ErrorHandlers.tableErrorHandler());
Pair<RESTClient, FileIO> clients = tableClients(response.config());
+ AuthSession session = tableSession(response.config(), session(context));
RESTTableOperations ops = new RESTTableOperations(
- clients.first(), paths.table(ident), headers(context), clients.second(), response.tableMetadata());
+ clients.first(), paths.table(ident), session::headers, clients.second(), response.tableMetadata());
return new BaseTable(ops, fullTableName(ident));
}
@@ -304,10 +411,11 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
String fullName = fullTableName(ident);
Pair<RESTClient, FileIO> clients = tableClients(response.config());
+ AuthSession session = tableSession(response.config(), session(context));
TableMetadata meta = response.tableMetadata();
RESTTableOperations ops = new RESTTableOperations(
- clients.first(), paths.table(ident), headers(context), clients.second(),
+ clients.first(), paths.table(ident), session::headers, clients.second(),
RESTTableOperations.UpdateType.CREATE, createChanges(meta), meta);
return Transactions.createTableTransaction(fullName, ops, meta);
@@ -319,6 +427,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
String fullName = fullTableName(ident);
Pair<RESTClient, FileIO> clients = tableClients(response.config());
+ AuthSession session = tableSession(response.config(), session(context));
TableMetadata base = response.tableMetadata();
Map<String, String> tableProperties = propertiesBuilder.build();
@@ -347,7 +456,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
}
RESTTableOperations ops = new RESTTableOperations(
- clients.first(), paths.table(ident), headers(context), clients.second(),
+ clients.first(), paths.table(ident), session::headers, clients.second(),
RESTTableOperations.UpdateType.REPLACE, changes.build(), base);
return Transactions.replaceTableTransaction(fullName, ops, replacement);
@@ -421,18 +530,12 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
return String.format("%s.%s", name(), ident);
}
- private Map<String, String> fullConf(Map<String, String> config) {
- Map<String, String> fullConf = Maps.newHashMap(properties());
- fullConf.putAll(config);
- return fullConf;
- }
-
private Pair<RESTClient, FileIO> tableClients(Map<String, String> config) {
if (config.isEmpty()) {
return Pair.of(client, io); // reuse client and io since config is the same
}
- Map<String, String> fullConf = fullConf(config);
+ Map<String, String> fullConf = RESTUtil.merge(properties(), config);
String ioImpl = fullConf.get(CatalogProperties.FILE_IO_IMPL);
FileIO tableIO = CatalogUtil.loadFileIO(
ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), fullConf, this.conf);
@@ -441,23 +544,74 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
return Pair.of(tableClient, tableIO);
}
- private ConfigResponse fetchConfig(Map<String, String> props) {
- // Create a client for one time use, as we will reconfigure the client using the merged server and application
- // defined configuration.
- RESTClient singleUseClient = clientBuilder.apply(props);
- Map<String, String> headers = RESTUtil.extractPrefixMap(props, "header.");
+ private AuthSession tableSession(Map<String, String> tableConf, AuthSession parent) {
+ return newSession(tableConf, tableConf, parent);
+ }
- try {
- ConfigResponse configResponse = singleUseClient
- .get(ResourcePaths.config(), ConfigResponse.class, headers, ErrorHandlers.defaultErrorHandler());
- configResponse.validate();
- return configResponse;
- } finally {
- try {
- singleUseClient.close();
- } catch (IOException e) {
- LOG.error("Failed to close HTTP client used for getting catalog configuration. Possible resource leak.", e);
+ private static ConfigResponse fetchConfig(RESTClient client, Map<String, String> headers) {
+ ConfigResponse configResponse = client
+ .get(ResourcePaths.config(), ConfigResponse.class, headers, ErrorHandlers.defaultErrorHandler());
+ configResponse.validate();
+ return configResponse;
+ }
+
+ private AuthSession newSession(Map<String, String> credentials, Map<String, String> properties, AuthSession parent) {
+ if (credentials != null) {
+ // use the bearer token without exchanging
+ if (credentials.containsKey(OAuth2Properties.TOKEN)) {
+ return newSession(credentials.get(OAuth2Properties.TOKEN), expiresInMs(properties), parent);
}
+
+ if (credentials.containsKey(OAuth2Properties.CREDENTIAL)) {
+ // fetch a token using the client credentials flow
+ return newSession(credentials.get(OAuth2Properties.CREDENTIAL), parent);
+ }
+
+ for (String tokenType : TOKEN_PREFERENCE_ORDER) {
+ if (credentials.containsKey(tokenType)) {
+ // exchange the token for an access token using the token exchange flow
+ return newSession(credentials.get(tokenType), tokenType, parent);
+ }
+ }
+ }
+
+ return parent;
+ }
+
+ private AuthSession newSession(String token, long expirationMs, AuthSession parent) {
+ AuthSession session = new AuthSession(parent.headers(), token, OAuth2Properties.ACCESS_TOKEN_TYPE);
+ scheduleTokenRefresh(session, System.currentTimeMillis(), expirationMs, TimeUnit.MILLISECONDS);
+ return session;
+ }
+
+ private AuthSession newSession(String token, String tokenType, AuthSession parent) {
+ long startTimeMillis = System.currentTimeMillis();
+ OAuthTokenResponse response = OAuth2Util.exchangeToken(
+ client, parent.headers(), token, tokenType, parent.token(), parent.tokenType(), OAuth2Properties.CATALOG_SCOPE);
+ return newSession(response, startTimeMillis, parent);
+ }
+
+ private AuthSession newSession(String credential, AuthSession parent) {
+ long startTimeMillis = System.currentTimeMillis();
+ OAuthTokenResponse response = OAuth2Util.fetchToken(
+ client, parent.headers(), credential, OAuth2Properties.CATALOG_SCOPE);
+ return newSession(response, startTimeMillis, parent);
+ }
+
+ private AuthSession newSession(OAuthTokenResponse response, long startTimeMillis, AuthSession parent) {
+ AuthSession session = new AuthSession(parent.headers(), response.token(), response.issuedTokenType());
+ if (response.expiresInSeconds() != null) {
+ scheduleTokenRefresh(session, startTimeMillis, response.expiresInSeconds(), TimeUnit.SECONDS);
}
+ return session;
+ }
+
+ private long expiresInMs(Map<String, String> properties) {
+ return PropertyUtil.propertyAsLong(
+ properties, OAuth2Properties.EXCHANGE_TOKEN_MS, OAuth2Properties.EXCHANGE_TOKEN_MS_DEFAULT);
+ }
+
+ private static Map<String, String> configHeaders(Map<String, String> properties) {
+ return RESTUtil.extractPrefixMap(properties, "header.");
}
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java b/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java
index 55e5c1acb..3a51ad16d 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java
@@ -33,7 +33,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-class RESTUtil {
+public class RESTUtil {
private static final Joiner NULL_JOINER = Joiner.on("%00");
private static final Splitter NULL_SPLITTER = Splitter.on("%00");
@@ -52,6 +52,27 @@ class RESTUtil {
return result;
}
+ /**
+ * Merge updates into a target string map.
+ *
+ * @param target a map to update
+ * @param updates a map of updates
+ * @return an immutable result map built from target and updates
+ */
+ public static Map<String, String> merge(Map<String, String> target, Map<String, String> updates) {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+
+ target.forEach((key, value) -> {
+ if (!updates.containsKey(key)) {
+ builder.put(key, value);
+ }
+ });
+
+ updates.forEach(builder::put);
+
+ return builder.build();
+ }
+
/**
* Takes in a map, and returns a copy filtered on the entries with keys beginning with the designated prefix.
* The keys are returned with the prefix removed.
diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java
index 4c78d80b0..184a80481 100644
--- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java
+++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java
@@ -33,8 +33,28 @@ public class OAuth2Properties {
*/
public static final String CREDENTIAL = "credential";
+ /**
+ * Interval in milliseconds to wait before attempting to exchange the configured catalog Bearer token.
+ * By default, token exchange will be attempted after 1 hour.
+ */
+ public static final String EXCHANGE_TOKEN_MS = "exchange-token-in-ms";
+ public static final long EXCHANGE_TOKEN_MS_DEFAULT = 3_600_000; // 1 hour
+
+ /**
+ * Additional scope for OAuth2.
+ */
+ public static final String SCOPE = "scope";
+
/**
* Scope for OAuth2 flows.
*/
public static final String CATALOG_SCOPE = "catalog";
+
+ // token type constants
+ public static final String ACCESS_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token";
+ public static final String REFRESH_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:refresh_token";
+ public static final String ID_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token";
+ public static final String SAML1_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:saml1";
+ public static final String SAML2_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:saml2";
+ public static final String JWT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt";
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
index 1ed7517fd..48f4d7980 100644
--- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
+++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
@@ -26,6 +26,8 @@ import java.io.StringWriter;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
@@ -33,16 +35,28 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.rest.ErrorHandlers;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.ResourcePaths;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.apache.iceberg.util.JsonUtil;
import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
public class OAuth2Util {
private OAuth2Util() {
}
+ private static final Logger LOG = LoggerFactory.getLogger(OAuth2Util.class);
+
// valid scope tokens are from ascii 0x21 to 0x7E, excluding 0x22 (") and 0x5C (\)
private static final Pattern VALID_SCOPE_TOKEN = Pattern.compile("^[!-~&&[^\"\\\\]]+$");
private static final Splitter SCOPE_DELIMITER = Splitter.on(" ");
@@ -68,12 +82,8 @@ public class OAuth2Util {
private static final String ACTOR_TOKEN = "actor_token";
private static final String ACTOR_TOKEN_TYPE = "actor_token_type";
private static final Set<String> VALID_TOKEN_TYPES = Sets.newHashSet(
- "urn:ietf:params:oauth:token-type:access_token",
- "urn:ietf:params:oauth:token-type:refresh_token",
- "urn:ietf:params:oauth:token-type:id_token",
- "urn:ietf:params:oauth:token-type:saml1",
- "urn:ietf:params:oauth:token-type:saml2",
- "urn:ietf:params:oauth:token-type:jwt");
+ OAuth2Properties.ACCESS_TOKEN_TYPE, OAuth2Properties.REFRESH_TOKEN_TYPE, OAuth2Properties.ID_TOKEN_TYPE,
+ OAuth2Properties.SAML1_TOKEN_TYPE, OAuth2Properties.SAML2_TOKEN_TYPE, OAuth2Properties.JWT_TOKEN_TYPE);
// response serialization
private static final String ACCESS_TOKEN = "access_token";
@@ -102,12 +112,51 @@ public class OAuth2Util {
return SCOPE_JOINER.join(scopes);
}
- public static Map<String, String> tokenExchangeRequest(String subjectToken, String subjectTokenType,
+ private static OAuthTokenResponse refreshToken(RESTClient client, Map<String, String> headers,
+ String subjectToken, String subjectTokenType, String scope) {
+ Map<String, String> request = tokenExchangeRequest(
+ subjectToken, subjectTokenType,
+ scope != null ? ImmutableList.of(scope) : ImmutableList.of());
+
+ OAuthTokenResponse response = client.postForm(
+ ResourcePaths.tokens(), request, OAuthTokenResponse.class, headers, ErrorHandlers.defaultErrorHandler());
+ response.validate();
+
+ return response;
+ }
+
+ public static OAuthTokenResponse exchangeToken(RESTClient client, Map<String, String> headers,
+ String subjectToken, String subjectTokenType,
+ String actorToken, String actorTokenType, String scope) {
+ Map<String, String> request = tokenExchangeRequest(
+ subjectToken, subjectTokenType, actorToken, actorTokenType,
+ scope != null ? ImmutableList.of(scope) : ImmutableList.of());
+
+ OAuthTokenResponse response = client.postForm(
+ ResourcePaths.tokens(), request, OAuthTokenResponse.class, headers, ErrorHandlers.defaultErrorHandler());
+ response.validate();
+
+ return response;
+ }
+
+ public static OAuthTokenResponse fetchToken(RESTClient client, Map<String, String> headers, String credential,
+ String scope) {
+ Map<String, String> request = clientCredentialsRequest(
+ credential, scope != null ? ImmutableList.of(scope) : ImmutableList.of());
+
+ OAuthTokenResponse response = client.postForm(
+ ResourcePaths.tokens(), request, OAuthTokenResponse.class, headers, ErrorHandlers.defaultErrorHandler());
+ response.validate();
+
+ return response;
+ }
+
+ private static Map<String, String> tokenExchangeRequest(String subjectToken, String subjectTokenType,
List<String> scopes) {
return tokenExchangeRequest(subjectToken, subjectTokenType, null, null, scopes);
}
- public static Map<String, String> tokenExchangeRequest(String subjectToken, String subjectTokenType,
+ private static Map<String, String> tokenExchangeRequest(String subjectToken, String subjectTokenType,
String actorToken, String actorTokenType,
List<String> scopes) {
Preconditions.checkArgument(VALID_TOKEN_TYPES.contains(subjectTokenType),
@@ -144,12 +193,12 @@ public class OAuth2Util {
}
}
- public static Map<String, String> clientCredentialsRequest(String credential, List<String> scopes) {
+ private static Map<String, String> clientCredentialsRequest(String credential, List<String> scopes) {
Pair<String, String> credentialPair = parseCredential(credential);
return clientCredentialsRequest(credentialPair.first(), credentialPair.second(), scopes);
}
- public static Map<String, String> clientCredentialsRequest(String clientId, String clientSecret,
+ private static Map<String, String> clientCredentialsRequest(String clientId, String clientSecret,
List<String> scopes) {
ImmutableMap.Builder<String, String> formData = ImmutableMap.builder();
formData.put(GRANT_TYPE, CLIENT_CREDENTIALS);
@@ -157,7 +206,7 @@ public class OAuth2Util {
formData.put(CLIENT_ID, clientId);
}
formData.put(CLIENT_SECRET, clientSecret);
- formData.put(SCOPE, toScope(Iterables.concat(scopes, ImmutableList.of(CATALOG))));
+ formData.put(SCOPE, toScope(scopes));
return formData.build();
}
@@ -223,4 +272,69 @@ public class OAuth2Util {
return builder.build();
}
+
+ /**
+ * Class to handle authorization headers and token refresh.
+ */
+ public static class AuthSession {
+ private Map<String, String> headers;
+ private String token;
+ private String tokenType;
+
+ public AuthSession(Map<String, String> baseHeaders, String token, String tokenType) {
+ this.headers = RESTUtil.merge(baseHeaders, authHeaders(token));
+ this.token = token;
+ this.tokenType = tokenType;
+ }
+
+ public Map<String, String> headers() {
+ return headers;
+ }
+
+ public String token() {
+ return token;
+ }
+
+ public String tokenType() {
+ return tokenType;
+ }
+
+ /**
+ * Attempt to refresh the session token using the token exchange flow.
+ *
+ * @param client a RESTClient
+ * @return interval to wait before calling refresh again, or null if no refresh is needed
+ */
+ public Pair<Integer, TimeUnit> refresh(RESTClient client) {
+ if (token != null) {
+ AtomicReference<OAuthTokenResponse> ref = new AtomicReference<>(null);
+ boolean isSuccessful = Tasks.foreach(ref)
+ .suppressFailureWhenFinished()
+ .retry(5)
+ .onFailure((task, err) -> LOG.warn("Failed to refresh token", err))
+ .exponentialBackoff(
+ COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
+ COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
+ COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
+ 2.0 /* exponential */)
+ .run(holder ->
+ holder.set(refreshToken(client, headers(), token, tokenType, OAuth2Properties.CATALOG_SCOPE)));
+
+ if (!isSuccessful) {
+ return null;
+ }
+
+ OAuthTokenResponse response = ref.get();
+ this.token = response.token();
+ this.tokenType = response.issuedTokenType();
+ this.headers = RESTUtil.merge(headers, authHeaders(token));
+
+ if (response.expiresInSeconds() != null) {
+ return Pair.of(response.expiresInSeconds(), TimeUnit.SECONDS);
+ }
+ }
+
+ return null;
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
index ebf234b98..ee970005b 100644
--- a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
+++ b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
@@ -21,6 +21,9 @@ package org.apache.iceberg.util;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.iceberg.SystemProperties;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
@@ -61,12 +64,20 @@ public class ThreadPools {
public static ExecutorService newWorkerPool(String namePrefix, int poolSize) {
return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) Executors.newFixedThreadPool(
- poolSize,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat(namePrefix + "-%d")
- .build()));
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(poolSize, newDaemonThreadFactory(namePrefix)));
+ }
+
+ /**
+ * Create a new {@link ScheduledExecutorService} with the given name and pool size.
+ * <p>
+ * Threads used by this service will be daemon threads.
+ *
+ * @param namePrefix a base name for threads in the executor service's thread pool
+ * @param poolSize max number of threads to use
+ * @return an executor service
+ */
+ public static ScheduledExecutorService newScheduledPool(String namePrefix, int poolSize) {
+ return new ScheduledThreadPoolExecutor(poolSize, newDaemonThreadFactory(namePrefix));
}
private static int getPoolSize(String systemProperty, int defaultSize) {
@@ -80,4 +91,11 @@ public class ThreadPools {
}
return defaultSize;
}
+
+ private static ThreadFactory newDaemonThreadFactory(String namePrefix) {
+ return new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat(namePrefix + "-%d")
+ .build();
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
index ce2de73cf..9b74fe4ea 100644
--- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
+++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
@@ -170,8 +170,13 @@ public class RESTCatalogAdapter implements RESTClient {
.build());
case "urn:ietf:params:oauth:grant-type:token-exchange":
+ String actor = request.get("actor_token");
+ String token = String.format(
+ "token-exchange-token:sub=%s%s",
+ request.get("subject_token"),
+ actor != null ? ",act=" + actor : "");
return castResponse(responseType, OAuthTokenResponse.builder()
- .withToken("token-exchange-token:sub=" + request.get("subject_token"))
+ .withToken(token)
.withIssuedTokenType("urn:ietf:params:oauth:token-type:access_token")
.withTokenType("Bearer")
.build());
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index 6b3feb2af..d10bead64 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -28,31 +28,41 @@ import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.catalog.SessionCatalog;
-import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.OAuthTokenResponse;
+import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
-public class TestRESTCatalog<T extends Catalog & SupportsNamespaces> extends CatalogTests<T> {
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+
+public class TestRESTCatalog extends CatalogTests<RESTCatalog> {
@TempDir
public Path temp;
- private RESTSessionCatalog sessionCatalog;
- private T restCatalog;
+ private RESTCatalog restCatalog;
private JdbcCatalog backendCatalog;
@BeforeEach
- @SuppressWarnings("unchecked")
public void createCatalog() {
File warehouse = temp.toFile();
Configuration conf = new Configuration();
@@ -66,29 +76,40 @@ public class TestRESTCatalog<T extends Catalog & SupportsNamespaces> extends Cat
JdbcCatalog.PROPERTY_PREFIX + "password", "password");
backendCatalog.initialize("backend", backendCatalogProperties);
- Map<String, String> testHeaders = ImmutableMap.of("header", "value");
+ Map<String, String> catalogHeaders = ImmutableMap.of(
+ "Authorization", "Bearer client-credentials-token:sub=catalog");
+ Map<String, String> contextHeaders = ImmutableMap.of(
+ "Authorization", "Bearer client-credentials-token:sub=user");
RESTCatalogAdapter adaptor = new RESTCatalogAdapter(backendCatalog) {
@Override
public <T extends RESTResponse> T execute(RESTCatalogAdapter.HTTPMethod method, String path, Object body,
Class<T> responseType, Map<String, String> headers,
Consumer<ErrorResponse> errorHandler) {
- Assertions.assertEquals(testHeaders, headers, "Should pass headers through");
+ // this doesn't use a Mockito spy because this is used for catalog tests, which have different method calls
+ if (!"v1/oauth/tokens".equals(path)) {
+ if ("v1/config".equals(path)) {
+ Assertions.assertEquals(catalogHeaders, headers, "Headers did not match for path: " + path);
+ } else {
+ Assertions.assertEquals(contextHeaders, headers, "Headers did not match for path: " + path);
+ }
+ }
return super.execute(method, path, body, responseType, headers, errorHandler);
}
};
- this.sessionCatalog = new RESTSessionCatalog((config) -> adaptor);
- sessionCatalog.setConf(conf);
- sessionCatalog.initialize("prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "header.header", "value"));
+ SessionCatalog.SessionContext context = new SessionCatalog.SessionContext(
+ UUID.randomUUID().toString(), "user", ImmutableMap.of("credential", "user:12345"), ImmutableMap.of());
- this.restCatalog = (T) sessionCatalog.asCatalog(SessionCatalog.SessionContext.createEmpty());
+ this.restCatalog = new RESTCatalog(context, (config) -> adaptor);
+ restCatalog.setConf(conf);
+ restCatalog.initialize("prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "credential", "catalog:12345"));
}
@AfterEach
public void closeCatalog() throws IOException {
if (restCatalog != null) {
- sessionCatalog.close();
+ restCatalog.close();
}
if (backendCatalog != null) {
@@ -97,7 +118,7 @@ public class TestRESTCatalog<T extends Catalog & SupportsNamespaces> extends Cat
}
@Override
- protected T catalog() {
+ protected RESTCatalog catalog() {
return restCatalog;
}
@@ -115,22 +136,47 @@ public class TestRESTCatalog<T extends Catalog & SupportsNamespaces> extends Cat
@Test
public void testConfigRoute() throws IOException {
- RESTCatalogAdapter adaptor = new RESTCatalogAdapter(backendCatalog) {
+ RESTClient testClient = new RESTClient() {
+ @Override
+ public void head(String path, Map<String, String> headers, Consumer<ErrorResponse> errorHandler) {
+ throw new UnsupportedOperationException("Should not be called for testConfigRoute");
+ }
+
@Override
+ public <T extends RESTResponse> T delete(String path, Class<T> responseType, Map<String, String> headers,
+ Consumer<ErrorResponse> errorHandler) {
+ throw new UnsupportedOperationException("Should not be called for testConfigRoute");
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
public <T extends RESTResponse> T get(String path, Class<T> responseType, Map<String, String> headers,
Consumer<ErrorResponse> errorHandler) {
- if (ResourcePaths.config().equals(path)) {
- return castResponse(responseType, ConfigResponse
- .builder()
- .withDefaults(ImmutableMap.of(CatalogProperties.CLIENT_POOL_SIZE, "1"))
- .withOverrides(ImmutableMap.of(CatalogProperties.CACHE_ENABLED, "false"))
- .build());
- }
- return super.get(path, responseType, headers, errorHandler);
+ return (T) ConfigResponse
+ .builder()
+ .withDefaults(ImmutableMap.of(CatalogProperties.CLIENT_POOL_SIZE, "1"))
+ .withOverrides(ImmutableMap.of(CatalogProperties.CACHE_ENABLED, "false"))
+ .build();
+ }
+
+ @Override
+ public <T extends RESTResponse> T post(String path, RESTRequest body, Class<T> responseType,
+ Map<String, String> headers, Consumer<ErrorResponse> errorHandler) {
+ throw new UnsupportedOperationException("Should not be called for testConfigRoute");
+ }
+
+ @Override
+ public <T extends RESTResponse> T postForm(String path, Map<String, String> formData, Class<T> responseType,
+ Map<String, String> headers, Consumer<ErrorResponse> errorHandler) {
+ throw new UnsupportedOperationException("Should not be called for testConfigRoute");
+ }
+
+ @Override
+ public void close() {
}
};
- RESTSessionCatalog restCat = new RESTSessionCatalog((config) -> adaptor);
+ RESTCatalog restCat = new RESTCatalog((config) -> testClient);
Map<String, String> initialConfig = ImmutableMap.of(
CatalogProperties.URI, "http://localhost:8080",
CatalogProperties.CACHE_ENABLED, "true");
@@ -148,7 +194,7 @@ public class TestRESTCatalog<T extends Catalog & SupportsNamespaces> extends Cat
@Test
public void testInitializeWithBadArguments() throws IOException {
- RESTSessionCatalog restCat = new RESTSessionCatalog();
+ RESTCatalog restCat = new RESTCatalog();
AssertHelpers.assertThrows("Configuration passed to initialize cannot be null",
IllegalArgumentException.class,
"Invalid configuration: null",
@@ -161,4 +207,574 @@ public class TestRESTCatalog<T extends Catalog & SupportsNamespaces> extends Cat
restCat.close();
}
+
+ @Test
+ public void testCatalogBasicBearerToken() {
+ Map<String, String> catalogHeaders = ImmutableMap.of(
+ "Authorization", "Bearer bearer-token");
+
+ RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+ RESTCatalog catalog = new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
+ catalog.initialize("prod", ImmutableMap.of(
+ CatalogProperties.URI, "ignored",
+ "token", "bearer-token"));
+
+ Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
+
+ // the bearer token should be used for all interactions
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET),
+ eq("v1/namespaces/ns/tables/table"),
+ any(),
+ eq(LoadTableResponse.class),
+ eq(catalogHeaders),
+ any());
+ }
+
+ @Test
+ public void testCatalogCredential() {
+ Map<String, String> emptyHeaders = ImmutableMap.of();
+ Map<String, String> catalogHeaders = ImmutableMap.of(
+ "Authorization", "Bearer client-credentials-token:sub=catalog");
+
+ RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+ RESTCatalog catalog = new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
+ catalog.initialize("prod", ImmutableMap.of(
+ CatalogProperties.URI, "ignored",
+ "credential", "catalog:secret"));
+
+ Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
+
+ // no token or credential for catalog token exchange
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(emptyHeaders), any());
+ // no token or credential for config
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+ // use the catalog token for all interactions
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET),
+ eq("v1/namespaces/ns/tables/table"),
+ any(),
+ eq(LoadTableResponse.class),
+ eq(catalogHeaders),
+ any());
+ }
+
+ @Test
+ public void testCatalogBearerTokenWithClientCredential() {
+ Map<String, String> contextHeaders = ImmutableMap.of(
+ "Authorization", "Bearer client-credentials-token:sub=user");
+ Map<String, String> catalogHeaders = ImmutableMap.of(
+ "Authorization", "Bearer bearer-token");
+
+ RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+ SessionCatalog.SessionContext context = new SessionCatalog.SessionContext(
+ UUID.randomUUID().toString(), "user", ImmutableMap.of("credential", "user:secret"), ImmutableMap.of());
+
+ RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
+ catalog.initialize("prod", ImmutableMap.of(
+ CatalogProperties.URI, "ignored",
+ "token", "bearer-token"));
+
+ Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
+
+ // use the bearer token for config
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+ // use the bearer token to fetch the context token
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(catalogHeaders), any());
+ // use the context token for table load
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET),
+ eq("v1/namespaces/ns/tables/table"),
+ any(),
+ eq(LoadTableResponse.class),
+ eq(contextHeaders),
+ any());
+ }
+
+ @Test
+ public void testCatalogCredentialWithClientCredential() {
+ Map<String, String> emptyHeaders = ImmutableMap.of();
+ Map<String, String> contextHeaders = ImmutableMap.of(
+ "Authorization", "Bearer client-credentials-token:sub=user");
+ Map<String, String> catalogHeaders = ImmutableMap.of(
+ "Authorization", "Bearer client-credentials-token:sub=catalog");
+
+ RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+ SessionCatalog.SessionContext context = new SessionCatalog.SessionContext(
+ UUID.randomUUID().toString(), "user", ImmutableMap.of("credential", "user:secret"), ImmutableMap.of());
+
+ RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
+ catalog.initialize("prod", ImmutableMap.of(
+ CatalogProperties.URI, "ignored",
+ "credential", "catalog:secret"));
+
+ Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
+
+ // call client credentials with no initial auth
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(emptyHeaders), any());
+ // use the client credential token for config
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+ // use the client credential to fetch the context token
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(catalogHeaders), any());
+ // use the context token for table load
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET),
+ eq("v1/namespaces/ns/tables/table"),
+ any(),
+ eq(LoadTableResponse.class),
+ eq(contextHeaders),
+ any());
+ }
+
+ @Test
+ public void testCatalogBearerTokenAndCredentialWithClientCredential() {
+ Map<String, String> contextHeaders = ImmutableMap.of(
+ "Authorization", "Bearer client-credentials-token:sub=user");
+ Map<String, String> initHeaders = ImmutableMap.of(
+ "Authorization", "Bearer bearer-token");
+ Map<String, String> catalogHeaders = ImmutableMap.of(
+ "Authorization", "Bearer client-credentials-token:sub=catalog");
+
+ RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+ SessionCatalog.SessionContext context = new SessionCatalog.SessionContext(
+ UUID.randomUUID().toString(), "user", ImmutableMap.of("credential", "user:secret"), ImmutableMap.of());
+
+ RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
+ catalog.initialize("prod", ImmutableMap.of(
+ CatalogProperties.URI, "ignored",
+ "credential", "catalog:secret",
+ "token", "bearer-token"));
+
+ Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
+
+ // use the bearer token for client credentials
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(initHeaders), any());
+ // use the client credential token for config
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+ // use the client credential to fetch the context token
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(catalogHeaders), any());
+ // use the context token for table load
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET),
+ eq("v1/namespaces/ns/tables/table"),
+ any(),
+ eq(LoadTableResponse.class),
+ eq(contextHeaders),
+ any());
+ }
+
+ @Test
+ public void testClientBearerToken() {
+ testClientAuth("bearer-token",
+ ImmutableMap.of(
+ "token", "client-bearer-token",
+ "credential", "user:secret",
+ "urn:ietf:params:oauth:token-type:id_token", "id-token",
+ "urn:ietf:params:oauth:token-type:access_token", "access-token",
+ "urn:ietf:params:oauth:token-type:jwt", "jwt-token",
+ "urn:ietf:params:oauth:token-type:saml2", "saml2-token",
+ "urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
+ ImmutableMap.of("Authorization", "Bearer client-bearer-token"));
+ }
+
+ @Test
+ public void testClientCredential() {
+ testClientAuth("bearer-token",
+ ImmutableMap.of(
+ "credential", "user:secret",
+ "urn:ietf:params:oauth:token-type:id_token", "id-token",
+ "urn:ietf:params:oauth:token-type:access_token", "access-token",
+ "urn:ietf:params:oauth:token-type:jwt", "jwt-token",
+ "urn:ietf:params:oauth:token-type:saml2", "saml2-token",
+ "urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
+ ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=user"));
+ }
+
+ @Test
+ public void testClientIDToken() {
+ testClientAuth("bearer-token",
+ ImmutableMap.of(
+ "urn:ietf:params:oauth:token-type:id_token", "id-token",
+ "urn:ietf:params:oauth:token-type:access_token", "access-token",
+ "urn:ietf:params:oauth:token-type:jwt", "jwt-token",
+ "urn:ietf:params:oauth:token-type:saml2", "saml2-token",
+ "urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
+ ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=id-token,act=bearer-token"));
+ }
+
+ @Test
+ public void testClientAccessToken() {
+ testClientAuth("bearer-token",
+ ImmutableMap.of(
+ "urn:ietf:params:oauth:token-type:access_token", "access-token",
+ "urn:ietf:params:oauth:token-type:jwt", "jwt-token",
+ "urn:ietf:params:oauth:token-type:saml2", "saml2-token",
+ "urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
+ ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=access-token,act=bearer-token"));
+ }
+
+ @Test
+ public void testClientJWTToken() {
+ testClientAuth("bearer-token",
+ ImmutableMap.of(
+ "urn:ietf:params:oauth:token-type:jwt", "jwt-token",
+ "urn:ietf:params:oauth:token-type:saml2", "saml2-token",
+ "urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
+ ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=jwt-token,act=bearer-token"));
+ }
+
+ @Test
+ public void testClientSAML2Token() {
+ testClientAuth("bearer-token",
+ ImmutableMap.of(
+ "urn:ietf:params:oauth:token-type:saml2", "saml2-token",
+ "urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
+ ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=saml2-token,act=bearer-token"));
+ }
+
+ @Test
+ public void testClientSAML1Token() {
+ testClientAuth("bearer-token",
+ ImmutableMap.of("urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
+ ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=saml1-token,act=bearer-token"));
+ }
+
+ private void testClientAuth(String catalogToken, Map<String, String> credentials,
+ Map<String, String> expectedHeaders) {
+ Map<String, String> catalogHeaders = ImmutableMap.of(
+ "Authorization", "Bearer " + catalogToken);
+
+ RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+ SessionCatalog.SessionContext context = new SessionCatalog.SessionContext(
+ UUID.randomUUID().toString(), "user", credentials, ImmutableMap.of());
+
+ RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
+ catalog.initialize("prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "token", catalogToken));
+
+ Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
+
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+
+ // token passes a static token. otherwise, validate a client credentials or token exchange request
+ if (!credentials.containsKey("token")) {
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(catalogHeaders), any());
+ }
+
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET),
+ eq("v1/namespaces/ns/tables/table"),
+ any(),
+ eq(LoadTableResponse.class),
+ eq(expectedHeaders),
+ any());
+ }
+
+ @Test
+ public void testTableBearerToken() {
+ testTableAuth("catalog",
+ ImmutableMap.of("urn:ietf:params:oauth:token-type:id_token", "id-token"),
+ ImmutableMap.of("token", "table-bearer-token"),
+ ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=id-token,act=catalog"),
+ ImmutableMap.of("Authorization", "Bearer table-bearer-token"));
+ }
+
+ @Test
+ public void testTableIDToken() {
+ testTableAuth("catalog",
+ ImmutableMap.of("urn:ietf:params:oauth:token-type:id_token", "id-token"),
+ ImmutableMap.of("urn:ietf:params:oauth:token-type:id_token", "table-id-token"),
+ ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=id-token,act=catalog"),
+ ImmutableMap.of("Authorization",
+ "Bearer token-exchange-token:sub=table-id-token,act=token-exchange-token:sub=id-token,act=catalog"));
+ }
+
+ @Test
+ public void testTableCredential() {
+ testTableAuth("catalog",
+ ImmutableMap.of("urn:ietf:params:oauth:token-type:id_token", "id-token"),
+ ImmutableMap.of("credential", "table-user:secret"),
+ ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=id-token,act=catalog"),
+ ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=table-user"));
+ }
+
+ public void testTableAuth(String catalogToken, Map<String, String> credentials, Map<String, String> tableConfig,
+ Map<String, String> expectedContextHeaders, Map<String, String> expectedTableHeaders) {
+ TableIdentifier ident = TableIdentifier.of("ns", "table");
+ Map<String, String> catalogHeaders = ImmutableMap.of(
+ "Authorization", "Bearer " + catalogToken);
+
+ RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+ // inject the expected table config
+ Answer<LoadTableResponse> addTableConfig = invocation -> {
+ LoadTableResponse loadTable = (LoadTableResponse) invocation.callRealMethod();
+ return LoadTableResponse.builder()
+ .withTableMetadata(loadTable.tableMetadata())
+ .addAllConfig(loadTable.config())
+ .addAllConfig(tableConfig)
+ .build();
+ };
+
+ Mockito.doAnswer(addTableConfig).when(adapter).execute(
+ eq(HTTPMethod.POST),
+ eq("v1/namespaces/ns/tables"),
+ any(),
+ eq(LoadTableResponse.class),
+ eq(expectedContextHeaders),
+ any());
+
+ Mockito.doAnswer(addTableConfig).when(adapter).execute(
+ eq(HTTPMethod.GET),
+ eq("v1/namespaces/ns/tables/table"),
+ any(),
+ eq(LoadTableResponse.class),
+ eq(expectedContextHeaders),
+ any());
+
+ SessionCatalog.SessionContext context = new SessionCatalog.SessionContext(
+ UUID.randomUUID().toString(), "user", credentials, ImmutableMap.of());
+
+ RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
+ catalog.initialize("prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "token", catalogToken));
+
+ Schema expectedSchema = new Schema(
+ required(1, "id", Types.IntegerType.get(), "unique ID"),
+ required(2, "data", Types.StringType.get())
+ );
+
+ Table table = catalog.createTable(ident, expectedSchema);
+ Assertions.assertEquals(expectedSchema.asStruct(), table.schema().asStruct(), "Schema should match");
+
+ Table loaded = catalog.loadTable(ident); // the first load will send the token
+ Assertions.assertEquals(expectedSchema.asStruct(), loaded.schema().asStruct(), "Schema should match");
+
+ loaded.refresh(); // refresh to force reload
+
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+ // session client credentials flow
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(catalogHeaders), any());
+
+ // create table request
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.POST),
+ eq("v1/namespaces/ns/tables"),
+ any(),
+ eq(LoadTableResponse.class),
+ eq(expectedContextHeaders),
+ any());
+
+ // if the table returned a bearer token, there will be no token request
+ if (!tableConfig.containsKey("token")) {
+ // client credentials or token exchange to get a table token
+ Mockito.verify(adapter, times(2)).execute(
+ eq(HTTPMethod.POST),
+ eq("v1/oauth/tokens"),
+ any(),
+ eq(OAuthTokenResponse.class),
+ eq(expectedContextHeaders),
+ any());
+ }
+
+ // automatic refresh when metadata is accessed after commit
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET),
+ eq("v1/namespaces/ns/tables/table"),
+ any(),
+ eq(LoadTableResponse.class),
+ eq(expectedTableHeaders),
+ any());
+
+ // load table from catalog
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET),
+ eq("v1/namespaces/ns/tables/table"),
+ any(),
+ eq(LoadTableResponse.class),
+ eq(expectedContextHeaders),
+ any());
+
+ // refresh loaded table
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET),
+ eq("v1/namespaces/ns/tables/table"),
+ any(),
+ eq(LoadTableResponse.class),
+ eq(expectedTableHeaders),
+ any());
+ }
+
+ @Test
+ public void testCatalogTokenRefresh() throws Exception {
+ Map<String, String> emptyHeaders = ImmutableMap.of();
+ Map<String, String> catalogHeaders = ImmutableMap.of(
+ "Authorization", "Bearer client-credentials-token:sub=catalog");
+
+ RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+ Answer<OAuthTokenResponse> addOneSecondExpiration = invocation -> {
+ OAuthTokenResponse response = (OAuthTokenResponse) invocation.callRealMethod();
+ return OAuthTokenResponse.builder()
+ .withToken(response.token())
+ .withTokenType(response.tokenType())
+ .withIssuedTokenType(response.issuedTokenType())
+ .addScopes(response.scopes())
+ .setExpirationInSeconds(1)
+ .build();
+ };
+
+ Mockito.doAnswer(addOneSecondExpiration).when(adapter).execute(
+ eq(HTTPMethod.POST),
+ eq("v1/oauth/tokens"),
+ any(),
+ eq(OAuthTokenResponse.class),
+ any(),
+ any());
+
+ Map<String, String> contextCredentials = ImmutableMap.of();
+ SessionCatalog.SessionContext context = new SessionCatalog.SessionContext(
+ UUID.randomUUID().toString(), "user", contextCredentials, ImmutableMap.of());
+
+ RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
+ catalog.initialize("prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "credential", "catalog:secret"));
+
+ Thread.sleep(3_000); // sleep until after 2 refresh calls
+
+ // call client credentials with no initial auth
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(emptyHeaders), any());
+
+ // use the client credential token for config
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+
+ // verify the first token exchange
+ Map<String, String> firstRefreshRequest = ImmutableMap.of(
+ "grant_type", "urn:ietf:params:oauth:grant-type:token-exchange",
+ "subject_token", "client-credentials-token:sub=catalog",
+ "subject_token_type", "urn:ietf:params:oauth:token-type:access_token",
+ "scope", "catalog"
+ );
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.POST),
+ eq("v1/oauth/tokens"),
+ Mockito.argThat(firstRefreshRequest::equals),
+ eq(OAuthTokenResponse.class),
+ eq(catalogHeaders),
+ any());
+
+ // verify that a second exchange occurs
+ Map<String, String> secondRefreshRequest = ImmutableMap.of(
+ "grant_type", "urn:ietf:params:oauth:grant-type:token-exchange",
+ "subject_token", "token-exchange-token:sub=client-credentials-token:sub=catalog",
+ "subject_token_type", "urn:ietf:params:oauth:token-type:access_token",
+ "scope", "catalog"
+ );
+ Map<String, String> secondRefreshHeaders = ImmutableMap.of(
+ "Authorization", "Bearer token-exchange-token:sub=client-credentials-token:sub=catalog"
+ );
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.POST),
+ eq("v1/oauth/tokens"),
+ Mockito.argThat(secondRefreshRequest::equals),
+ eq(OAuthTokenResponse.class),
+ eq(secondRefreshHeaders),
+ any());
+ }
+
+ @Test
+ public void testCatalogRefreshedTokenIsUsed() throws Exception {
+ Map<String, String> emptyHeaders = ImmutableMap.of();
+ Map<String, String> catalogHeaders = ImmutableMap.of(
+ "Authorization", "Bearer client-credentials-token:sub=catalog");
+
+ RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+ Answer<OAuthTokenResponse> addOneSecondExpiration = invocation -> {
+ OAuthTokenResponse response = (OAuthTokenResponse) invocation.callRealMethod();
+ return OAuthTokenResponse.builder()
+ .withToken(response.token())
+ .withTokenType(response.tokenType())
+ .withIssuedTokenType(response.issuedTokenType())
+ .addScopes(response.scopes())
+ .setExpirationInSeconds(1)
+ .build();
+ };
+
+ Mockito.doAnswer(addOneSecondExpiration).when(adapter).execute(
+ eq(HTTPMethod.POST),
+ eq("v1/oauth/tokens"),
+ any(),
+ eq(OAuthTokenResponse.class),
+ any(),
+ any());
+
+ Map<String, String> contextCredentials = ImmutableMap.of();
+ SessionCatalog.SessionContext context = new SessionCatalog.SessionContext(
+ UUID.randomUUID().toString(), "user", contextCredentials, ImmutableMap.of());
+
+ RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
+ catalog.initialize("prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "credential", "catalog:secret"));
+
+ Thread.sleep(1_100); // sleep until after 2 refresh calls
+
+ // use the exchanged catalog token
+ Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
+
+ // call client credentials with no initial auth
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(emptyHeaders), any());
+
+ // use the client credential token for config
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+
+ // verify the first token exchange
+ Map<String, String> firstRefreshRequest = ImmutableMap.of(
+ "grant_type", "urn:ietf:params:oauth:grant-type:token-exchange",
+ "subject_token", "client-credentials-token:sub=catalog",
+ "subject_token_type", "urn:ietf:params:oauth:token-type:access_token",
+ "scope", "catalog"
+ );
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.POST),
+ eq("v1/oauth/tokens"),
+ Mockito.argThat(firstRefreshRequest::equals),
+ eq(OAuthTokenResponse.class),
+ eq(catalogHeaders),
+ any());
+
+ // use the refreshed context token for table load
+ Map<String, String> refreshedCatalogHeader = ImmutableMap.of(
+ "Authorization", "Bearer token-exchange-token:sub=client-credentials-token:sub=catalog"
+ );
+ Mockito.verify(adapter).execute(
+ eq(HTTPMethod.GET),
+ eq("v1/namespaces/ns/tables/table"),
+ any(),
+ eq(LoadTableResponse.class),
+ eq(refreshedCatalogHeader),
+ any());
+ }
}