You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/25 16:02:21 UTC

[iceberg] branch master updated: Core: Add session catalog implementation for REST (#4830)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c4c86113a Core: Add session catalog implementation for REST (#4830)
c4c86113a is described below

commit c4c86113adb459c56ea7893bb13b6388d467495c
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Wed May 25 09:02:13 2022 -0700

    Core: Add session catalog implementation for REST (#4830)
---
 baseline.gradle                                    |   1 +
 .../java/org/apache/iceberg/rest/RESTCatalog.java  | 228 +++++++
 .../apache/iceberg/rest/RESTCatalogProperties.java |  35 --
 .../apache/iceberg/rest/RESTSessionCatalog.java    | 230 +++++--
 .../java/org/apache/iceberg/rest/RESTUtil.java     |  23 +-
 .../apache/iceberg/rest/auth/OAuth2Properties.java |  20 +
 .../org/apache/iceberg/rest/auth/OAuth2Util.java   | 138 ++++-
 .../java/org/apache/iceberg/util/ThreadPools.java  |  30 +-
 .../apache/iceberg/rest/RESTCatalogAdapter.java    |   7 +-
 .../org/apache/iceberg/rest/TestRESTCatalog.java   | 666 ++++++++++++++++++++-
 10 files changed, 1260 insertions(+), 118 deletions(-)

diff --git a/baseline.gradle b/baseline.gradle
index 3e9ee9098..ec4208741 100644
--- a/baseline.gradle
+++ b/baseline.gradle
@@ -82,6 +82,7 @@ subprojects {
           '-Xep:StrictUnusedVariable:OFF',
           '-Xep:TypeParameterShadowing:OFF',
           '-Xep:TypeParameterUnusedInFormals:OFF',
+          '-Xep:DangerousThreadPoolExecutorUsage:OFF',
       )
     }
   }
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
new file mode 100644
index 000000000..63747a596
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.rest;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Configuration>, Closeable {
+  private final RESTSessionCatalog sessionCatalog;
+  private final Catalog delegate;
+  private final SupportsNamespaces nsDelegate;
+
+  public RESTCatalog() {
+    this(SessionCatalog.SessionContext.createEmpty(), new HTTPClientFactory());
+  }
+
+  public RESTCatalog(Function<Map<String, String>, RESTClient> clientBuilder) {
+    this(SessionCatalog.SessionContext.createEmpty(), clientBuilder);
+  }
+
+  public RESTCatalog(SessionCatalog.SessionContext context, Function<Map<String, String>, RESTClient> clientBuilder) {
+    this.sessionCatalog = new RESTSessionCatalog(clientBuilder);
+    this.delegate = sessionCatalog.asCatalog(context);
+    this.nsDelegate = (SupportsNamespaces) delegate;
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> props) {
+    Preconditions.checkArgument(props != null, "Invalid configuration: null");
+    sessionCatalog.initialize(name, props);
+  }
+
+  @Override
+  public String name() {
+    return sessionCatalog.name();
+  }
+
+  public Map<String, String> properties() {
+    return sessionCatalog.properties();
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace ns) {
+    return delegate.listTables(ns);
+  }
+
+  @Override
+  public boolean tableExists(TableIdentifier ident) {
+    return delegate.tableExists(ident);
+  }
+
+  @Override
+  public Table loadTable(TableIdentifier ident) {
+    return delegate.loadTable(ident);
+  }
+
+  @Override
+  public void invalidateTable(TableIdentifier ident) {
+    delegate.invalidateTable(ident);
+  }
+
+  @Override
+  public TableBuilder buildTable(TableIdentifier ident, Schema schema) {
+    return delegate.buildTable(ident, schema);
+  }
+
+  @Override
+  public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec, String location,
+                           Map<String, String> props) {
+    return delegate.createTable(ident, schema, spec, location, props);
+  }
+
+  @Override
+  public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec,
+                           Map<String, String> props) {
+    return delegate.createTable(ident, schema, spec, props);
+  }
+
+  @Override
+  public Table createTable(TableIdentifier ident, Schema schema, PartitionSpec spec) {
+    return delegate.createTable(ident, schema, spec);
+  }
+
+  @Override
+  public Table createTable(TableIdentifier identifier, Schema schema) {
+    return delegate.createTable(identifier, schema);
+  }
+
+  @Override
+  public Transaction newCreateTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec,
+                                               String location, Map<String, String> props) {
+    return delegate.newCreateTableTransaction(ident, schema, spec, location, props);
+  }
+
+  @Override
+  public Transaction newCreateTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec,
+                                               Map<String, String> props) {
+    return delegate.newCreateTableTransaction(ident, schema, spec, props);
+  }
+
+  @Override
+  public Transaction newCreateTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec) {
+    return delegate.newCreateTableTransaction(ident, schema, spec);
+  }
+
+  @Override
+  public Transaction newCreateTableTransaction(TableIdentifier identifier, Schema schema) {
+    return delegate.newCreateTableTransaction(identifier, schema);
+  }
+
+  @Override
+  public Transaction newReplaceTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec,
+                                                String location, Map<String, String> props, boolean orCreate) {
+    return delegate.newReplaceTableTransaction(ident, schema, spec, location, props, orCreate);
+  }
+
+  @Override
+  public Transaction newReplaceTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec,
+                                                Map<String, String> props, boolean orCreate) {
+    return delegate.newReplaceTableTransaction(ident, schema, spec, props, orCreate);
+  }
+
+  @Override
+  public Transaction newReplaceTableTransaction(TableIdentifier ident, Schema schema, PartitionSpec spec,
+                                                boolean orCreate) {
+    return delegate.newReplaceTableTransaction(ident, schema, spec, orCreate);
+  }
+
+  @Override
+  public Transaction newReplaceTableTransaction(TableIdentifier ident, Schema schema, boolean orCreate) {
+    return delegate.newReplaceTableTransaction(ident, schema, orCreate);
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier ident) {
+    return delegate.dropTable(ident);
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier ident, boolean purge) {
+    return delegate.dropTable(ident, purge);
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    delegate.renameTable(from, to);
+  }
+
+  @Override
+  public Table registerTable(TableIdentifier ident, String metadataFileLocation) {
+    return delegate.registerTable(ident, metadataFileLocation);
+  }
+
+  @Override
+  public void createNamespace(Namespace ns, Map<String, String> props) {
+    nsDelegate.createNamespace(ns, props);
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace ns) throws NoSuchNamespaceException {
+    return nsDelegate.listNamespaces(ns);
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace ns) throws NoSuchNamespaceException {
+    return nsDelegate.loadNamespaceMetadata(ns);
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace ns) throws NamespaceNotEmptyException {
+    return nsDelegate.dropNamespace(ns);
+  }
+
+  @Override
+  public boolean setProperties(Namespace ns, Map<String, String> props) throws NoSuchNamespaceException {
+    return nsDelegate.setProperties(ns, props);
+  }
+
+  @Override
+  public boolean removeProperties(Namespace ns, Set<String> props) throws NoSuchNamespaceException {
+    return nsDelegate.removeProperties(ns, props);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    sessionCatalog.setConf(conf);
+  }
+
+  @Override
+  public void close() throws IOException {
+    sessionCatalog.close();
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java
deleted file mode 100644
index 7aa291654..000000000
--- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.rest;
-
-/**
- * Catalog properties which are specific to the RESTCatalog, which can be used in conjunction with
- * {@link org.apache.iceberg.CatalogProperties}
- */
-public class RESTCatalogProperties {
-
-  private RESTCatalogProperties() {
-  }
-
-  /**
-   * A Bearer authorization token which will be used to authenticate requests with the server.
-   */
-  public static final String AUTH_TOKEN = "token";
-}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index f2b165fb5..cb8b5b0e3 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -19,11 +19,16 @@
 
 package org.apache.iceberg.rest;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.hadoop.conf.Configuration;
@@ -50,7 +55,9 @@ import org.apache.iceberg.io.ResolvingFileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.rest.auth.OAuth2Properties;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.auth.OAuth2Util.AuthSession;
 import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
 import org.apache.iceberg.rest.requests.RenameTableRequest;
@@ -61,21 +68,33 @@ import org.apache.iceberg.rest.responses.GetNamespaceResponse;
 import org.apache.iceberg.rest.responses.ListNamespacesResponse;
 import org.apache.iceberg.rest.responses.ListTablesResponse;
 import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.OAuthTokenResponse;
 import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
 import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RESTSessionCatalog extends BaseSessionCatalog implements Configurable<Configuration>, Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(RESTSessionCatalog.class);
+  private static final long MAX_REFRESH_WINDOW_MILLIS = 60_000; // 1 minute
+  private static final long MIN_REFRESH_WAIT_MILLIS = 10;
+  private static final List<String> TOKEN_PREFERENCE_ORDER = ImmutableList.of(
+      OAuth2Properties.ID_TOKEN_TYPE, OAuth2Properties.ACCESS_TOKEN_TYPE, OAuth2Properties.JWT_TOKEN_TYPE,
+      OAuth2Properties.SAML2_TOKEN_TYPE, OAuth2Properties.SAML1_TOKEN_TYPE);
 
   private final Function<Map<String, String>, RESTClient> clientBuilder;
+  private final Cache<String, AuthSession> sessions = Caffeine.newBuilder().build();
+  private AuthSession catalogAuth = null;
   private RESTClient client = null;
-  private Map<String, String> baseHeaders = ImmutableMap.of();
   private ResourcePaths paths = null;
   private Object conf = null;
   private FileIO io = null;
 
+  // a lazy thread pool for token refresh
+  private volatile ScheduledExecutorService refreshExecutor = null;
+
   public RESTSessionCatalog() {
     this(new HTTPClientFactory());
   }
@@ -87,18 +106,54 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
   @Override
   public void initialize(String name, Map<String, String> props) {
     Preconditions.checkArgument(props != null, "Invalid configuration: null");
-    ConfigResponse config = fetchConfig(props);
-    Map<String, String> properties = config.merge(props);
-    this.client = clientBuilder.apply(properties);
-    this.baseHeaders = RESTUtil.extractPrefixMap(properties, "header.");
-    this.paths = ResourcePaths.forCatalogProperties(properties);
-    String ioImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
-    this.io = CatalogUtil.loadFileIO(ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), properties, conf);
-    super.initialize(name, properties);
+
+    long startTimeMillis = System.currentTimeMillis(); // keep track of the init start time for token refresh
+    String initToken = props.get(OAuth2Properties.TOKEN);
+
+    // fetch auth and config to complete initialization
+    ConfigResponse config;
+    OAuthTokenResponse authResponse;
+    try (RESTClient initClient = clientBuilder.apply(props)) {
+      Map<String, String> initHeaders = RESTUtil.merge(configHeaders(props), OAuth2Util.authHeaders(initToken));
+      String credential = props.get(OAuth2Properties.CREDENTIAL);
+      if (credential != null && !credential.isEmpty()) {
+        String scope = props.getOrDefault(OAuth2Properties.SCOPE, OAuth2Properties.CATALOG_SCOPE);
+        authResponse = OAuth2Util.fetchToken(initClient, initHeaders, credential, scope);
+        config = fetchConfig(initClient, RESTUtil.merge(initHeaders, OAuth2Util.authHeaders(authResponse.token())));
+      } else {
+        authResponse = null;
+        config = fetchConfig(initClient, initHeaders);
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to close HTTP client", e);
+    }
+
+    // build the final configuration and set up the catalog's auth
+    Map<String, String> mergedProps = config.merge(props);
+    Map<String, String> baseHeaders = configHeaders(mergedProps);
+    this.catalogAuth = new AuthSession(baseHeaders, null, null);
+    if (authResponse != null) {
+      this.catalogAuth = newSession(authResponse, startTimeMillis, catalogAuth);
+    } else if (initToken != null) {
+      this.catalogAuth = newSession(initToken, expiresInMs(mergedProps), catalogAuth);
+    }
+
+    this.client = clientBuilder.apply(mergedProps);
+    this.paths = ResourcePaths.forCatalogProperties(mergedProps);
+
+    String ioImpl = mergedProps.get(CatalogProperties.FILE_IO_IMPL);
+    this.io = CatalogUtil.loadFileIO(ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), mergedProps, conf);
+
+    super.initialize(name, mergedProps);
   }
 
-  protected Supplier<Map<String, String>> headers(SessionContext context) {
-    return () -> baseHeaders;
+  private AuthSession session(SessionContext context) {
+    return sessions.get(context.sessionId(),
+        id -> newSession(context.credentials(), context.properties(), catalogAuth));
+  }
+
+  private Supplier<Map<String, String>> headers(SessionContext context) {
+    return session(context)::headers;
   }
 
   @Override
@@ -148,8 +203,9 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
   public Table loadTable(SessionContext context, TableIdentifier identifier) {
     LoadTableResponse response = loadInternal(context, identifier);
     Pair<RESTClient, FileIO> clients = tableClients(response.config());
+    AuthSession session = tableSession(response.config(), session(context));
     RESTTableOperations ops = new RESTTableOperations(
-        clients.first(), paths.table(identifier), headers(context), clients.second(), response.tableMetadata());
+        clients.first(), paths.table(identifier), session::headers, clients.second(), response.tableMetadata());
     return new BaseTable(ops, fullTableName(identifier));
   }
 
@@ -222,11 +278,61 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
     return !response.updated().isEmpty();
   }
 
+  private ScheduledExecutorService tokenRefreshExecutor() {
+    if (refreshExecutor == null) {
+      synchronized (this) {
+        if (refreshExecutor == null) {
+          this.refreshExecutor = ThreadPools.newScheduledPool(name() + "-token-refresh", 1);
+        }
+      }
+    }
+
+    return refreshExecutor;
+  }
+
+  private void scheduleTokenRefresh(
+      AuthSession session, long startTimeMillis, long expiresIn, TimeUnit unit) {
+    // convert expiration interval to milliseconds
+    long expiresInMillis = unit.toMillis(expiresIn);
+    // how much ahead of time to start the request to allow it to complete
+    long refreshWindowMillis = Math.min(expiresInMillis / 10, MAX_REFRESH_WINDOW_MILLIS);
+    // how much time to wait before expiration
+    long waitIntervalMillis = expiresInMillis - refreshWindowMillis;
+    // how much time has already elapsed since the new token was issued
+    long elapsedMillis = System.currentTimeMillis() - startTimeMillis;
+    // how much time to actually wait
+    long timeToWait = Math.max(waitIntervalMillis - elapsedMillis, MIN_REFRESH_WAIT_MILLIS);
+
+    tokenRefreshExecutor().schedule(
+        () -> {
+          long refreshStartTime = System.currentTimeMillis();
+          Pair<Integer, TimeUnit> expiration = session.refresh(client);
+          if (expiration != null) {
+            scheduleTokenRefresh(session, refreshStartTime, expiration.first(), expiration.second());
+          }
+        },
+        timeToWait, TimeUnit.MILLISECONDS);
+  }
+
   @Override
   public void close() throws IOException {
     if (client != null) {
       client.close();
     }
+
+    if (refreshExecutor != null) {
+      ScheduledExecutorService service = refreshExecutor;
+      this.refreshExecutor = null;
+
+      try {
+        if (service.awaitTermination(1, TimeUnit.MINUTES)) {
+          LOG.warn("Timed out waiting for refresh executor to terminate");
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting for refresh executor to terminate", e);
+        Thread.currentThread().interrupt();
+      }
+    }
   }
 
   private class Builder implements Catalog.TableBuilder {
@@ -292,8 +398,9 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
           ErrorHandlers.tableErrorHandler());
 
       Pair<RESTClient, FileIO> clients = tableClients(response.config());
+      AuthSession session = tableSession(response.config(), session(context));
       RESTTableOperations ops = new RESTTableOperations(
-          clients.first(), paths.table(ident), headers(context), clients.second(), response.tableMetadata());
+          clients.first(), paths.table(ident), session::headers, clients.second(), response.tableMetadata());
 
       return new BaseTable(ops, fullTableName(ident));
     }
@@ -304,10 +411,11 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
       String fullName = fullTableName(ident);
 
       Pair<RESTClient, FileIO> clients = tableClients(response.config());
+      AuthSession session = tableSession(response.config(), session(context));
       TableMetadata meta = response.tableMetadata();
 
       RESTTableOperations ops = new RESTTableOperations(
-          clients.first(), paths.table(ident), headers(context), clients.second(),
+          clients.first(), paths.table(ident), session::headers, clients.second(),
           RESTTableOperations.UpdateType.CREATE, createChanges(meta), meta);
 
       return Transactions.createTableTransaction(fullName, ops, meta);
@@ -319,6 +427,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
       String fullName = fullTableName(ident);
 
       Pair<RESTClient, FileIO> clients = tableClients(response.config());
+      AuthSession session = tableSession(response.config(), session(context));
       TableMetadata base = response.tableMetadata();
 
       Map<String, String> tableProperties = propertiesBuilder.build();
@@ -347,7 +456,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
       }
 
       RESTTableOperations ops = new RESTTableOperations(
-          clients.first(), paths.table(ident), headers(context), clients.second(),
+          clients.first(), paths.table(ident), session::headers, clients.second(),
           RESTTableOperations.UpdateType.REPLACE, changes.build(), base);
 
       return Transactions.replaceTableTransaction(fullName, ops, replacement);
@@ -421,18 +530,12 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
     return String.format("%s.%s", name(), ident);
   }
 
-  private Map<String, String> fullConf(Map<String, String> config) {
-    Map<String, String> fullConf = Maps.newHashMap(properties());
-    fullConf.putAll(config);
-    return fullConf;
-  }
-
   private Pair<RESTClient, FileIO> tableClients(Map<String, String> config) {
     if (config.isEmpty()) {
       return Pair.of(client, io); // reuse client and io since config is the same
     }
 
-    Map<String, String> fullConf = fullConf(config);
+    Map<String, String> fullConf = RESTUtil.merge(properties(), config);
     String ioImpl = fullConf.get(CatalogProperties.FILE_IO_IMPL);
     FileIO tableIO = CatalogUtil.loadFileIO(
         ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), fullConf, this.conf);
@@ -441,23 +544,74 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
     return Pair.of(tableClient, tableIO);
   }
 
-  private ConfigResponse fetchConfig(Map<String, String> props) {
-    // Create a client for one time use, as we will reconfigure the client using the merged server and application
-    // defined configuration.
-    RESTClient singleUseClient = clientBuilder.apply(props);
-    Map<String, String> headers = RESTUtil.extractPrefixMap(props, "header.");
+  private AuthSession tableSession(Map<String, String> tableConf, AuthSession parent) {
+    return newSession(tableConf, tableConf, parent);
+  }
 
-    try {
-      ConfigResponse configResponse = singleUseClient
-          .get(ResourcePaths.config(), ConfigResponse.class, headers, ErrorHandlers.defaultErrorHandler());
-      configResponse.validate();
-      return configResponse;
-    } finally {
-      try {
-        singleUseClient.close();
-      } catch (IOException e) {
-        LOG.error("Failed to close HTTP client used for getting catalog configuration. Possible resource leak.", e);
+  private static ConfigResponse fetchConfig(RESTClient client, Map<String, String> headers) {
+    ConfigResponse configResponse = client
+        .get(ResourcePaths.config(), ConfigResponse.class, headers, ErrorHandlers.defaultErrorHandler());
+    configResponse.validate();
+    return configResponse;
+  }
+
+  private AuthSession newSession(Map<String, String> credentials, Map<String, String> properties, AuthSession parent) {
+    if (credentials != null) {
+      // use the bearer token without exchanging
+      if (credentials.containsKey(OAuth2Properties.TOKEN)) {
+        return newSession(credentials.get(OAuth2Properties.TOKEN), expiresInMs(properties), parent);
       }
+
+      if (credentials.containsKey(OAuth2Properties.CREDENTIAL)) {
+        // fetch a token using the client credentials flow
+        return newSession(credentials.get(OAuth2Properties.CREDENTIAL), parent);
+      }
+
+      for (String tokenType : TOKEN_PREFERENCE_ORDER) {
+        if (credentials.containsKey(tokenType)) {
+          // exchange the token for an access token using the token exchange flow
+          return newSession(credentials.get(tokenType), tokenType, parent);
+        }
+      }
+    }
+
+    return parent;
+  }
+
+  private AuthSession newSession(String token, long expirationMs, AuthSession parent) {
+    AuthSession session = new AuthSession(parent.headers(), token, OAuth2Properties.ACCESS_TOKEN_TYPE);
+    scheduleTokenRefresh(session, System.currentTimeMillis(), expirationMs, TimeUnit.MILLISECONDS);
+    return session;
+  }
+
+  private AuthSession newSession(String token, String tokenType, AuthSession parent) {
+    long startTimeMillis = System.currentTimeMillis();
+    OAuthTokenResponse response = OAuth2Util.exchangeToken(
+        client, parent.headers(), token, tokenType, parent.token(), parent.tokenType(), OAuth2Properties.CATALOG_SCOPE);
+    return newSession(response, startTimeMillis, parent);
+  }
+
+  private AuthSession newSession(String credential, AuthSession parent) {
+    long startTimeMillis = System.currentTimeMillis();
+    OAuthTokenResponse response = OAuth2Util.fetchToken(
+        client, parent.headers(), credential, OAuth2Properties.CATALOG_SCOPE);
+    return newSession(response, startTimeMillis, parent);
+  }
+
+  private AuthSession newSession(OAuthTokenResponse response, long startTimeMillis, AuthSession parent) {
+    AuthSession session = new AuthSession(parent.headers(), response.token(), response.issuedTokenType());
+    if (response.expiresInSeconds() != null) {
+      scheduleTokenRefresh(session, startTimeMillis, response.expiresInSeconds(), TimeUnit.SECONDS);
     }
+    return session;
+  }
+
+  private long expiresInMs(Map<String, String> properties) {
+    return PropertyUtil.propertyAsLong(
+        properties, OAuth2Properties.EXCHANGE_TOKEN_MS, OAuth2Properties.EXCHANGE_TOKEN_MS_DEFAULT);
+  }
+
+  private static Map<String, String> configHeaders(Map<String, String> properties) {
+    return RESTUtil.extractPrefixMap(properties, "header.");
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java b/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java
index 55e5c1acb..3a51ad16d 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java
@@ -33,7 +33,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 
-class RESTUtil {
+public class RESTUtil {
   private static final Joiner NULL_JOINER = Joiner.on("%00");
   private static final Splitter NULL_SPLITTER = Splitter.on("%00");
 
@@ -52,6 +52,27 @@ class RESTUtil {
     return result;
   }
 
+  /**
+   * Merge updates into a target string map.
+   *
+   * @param target a map to update
+   * @param updates a map of updates
+   * @return an immutable result map built from target and updates
+   */
+  public static Map<String, String> merge(Map<String, String> target, Map<String, String> updates) {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+
+    target.forEach((key, value) -> {
+      if (!updates.containsKey(key)) {
+        builder.put(key, value);
+      }
+    });
+
+    updates.forEach(builder::put);
+
+    return builder.build();
+  }
+
   /**
    * Takes in a map, and returns a copy filtered on the entries with keys beginning with the designated prefix.
    * The keys are returned with the prefix removed.
diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java
index 4c78d80b0..184a80481 100644
--- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java
+++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java
@@ -33,8 +33,28 @@ public class OAuth2Properties {
    */
   public static final String CREDENTIAL = "credential";
 
+  /**
+   * Interval in milliseconds to wait before attempting to exchange the configured catalog Bearer token.
+   * By default, token exchange will be attempted after 1 hour.
+   */
+  public static final String EXCHANGE_TOKEN_MS = "exchange-token-in-ms";
+  public static final long EXCHANGE_TOKEN_MS_DEFAULT = 3_600_000; // 1 hour
+
+  /**
+   * Additional scope for OAuth2.
+   */
+  public static final String SCOPE = "scope";
+
   /**
    * Scope for OAuth2 flows.
    */
   public static final String CATALOG_SCOPE = "catalog";
+
+  // token type constants
+  public static final String ACCESS_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token";
+  public static final String REFRESH_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:refresh_token";
+  public static final String ID_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token";
+  public static final String SAML1_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:saml1";
+  public static final String SAML2_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:saml2";
+  public static final String JWT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt";
 }
diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
index 1ed7517fd..48f4d7980 100644
--- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
+++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
@@ -26,6 +26,8 @@ import java.io.StringWriter;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
@@ -33,16 +35,28 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Splitter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.rest.ErrorHandlers;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.ResourcePaths;
 import org.apache.iceberg.rest.responses.OAuthTokenResponse;
 import org.apache.iceberg.util.JsonUtil;
 import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
 
 public class OAuth2Util {
   private OAuth2Util() {
   }
 
+  private static final Logger LOG = LoggerFactory.getLogger(OAuth2Util.class);
+
   // valid scope tokens are from ascii 0x21 to 0x7E, excluding 0x22 (") and 0x5C (\)
   private static final Pattern VALID_SCOPE_TOKEN = Pattern.compile("^[!-~&&[^\"\\\\]]+$");
   private static final Splitter SCOPE_DELIMITER = Splitter.on(" ");
@@ -68,12 +82,8 @@ public class OAuth2Util {
   private static final String ACTOR_TOKEN = "actor_token";
   private static final String ACTOR_TOKEN_TYPE = "actor_token_type";
   private static final Set<String> VALID_TOKEN_TYPES = Sets.newHashSet(
-      "urn:ietf:params:oauth:token-type:access_token",
-      "urn:ietf:params:oauth:token-type:refresh_token",
-      "urn:ietf:params:oauth:token-type:id_token",
-      "urn:ietf:params:oauth:token-type:saml1",
-      "urn:ietf:params:oauth:token-type:saml2",
-      "urn:ietf:params:oauth:token-type:jwt");
+      OAuth2Properties.ACCESS_TOKEN_TYPE, OAuth2Properties.REFRESH_TOKEN_TYPE, OAuth2Properties.ID_TOKEN_TYPE,
+      OAuth2Properties.SAML1_TOKEN_TYPE, OAuth2Properties.SAML2_TOKEN_TYPE, OAuth2Properties.JWT_TOKEN_TYPE);
 
   // response serialization
   private static final String ACCESS_TOKEN = "access_token";
@@ -102,12 +112,51 @@ public class OAuth2Util {
     return SCOPE_JOINER.join(scopes);
   }
 
-  public static Map<String, String> tokenExchangeRequest(String subjectToken, String subjectTokenType,
+  private static OAuthTokenResponse refreshToken(RESTClient client, Map<String, String> headers,
+                                                 String subjectToken, String subjectTokenType, String scope) {
+    Map<String, String> request = tokenExchangeRequest(
+        subjectToken, subjectTokenType,
+        scope != null ? ImmutableList.of(scope) : ImmutableList.of());
+
+    OAuthTokenResponse response = client.postForm(
+        ResourcePaths.tokens(), request, OAuthTokenResponse.class, headers, ErrorHandlers.defaultErrorHandler());
+    response.validate();
+
+    return response;
+  }
+
+  public static OAuthTokenResponse exchangeToken(RESTClient client, Map<String, String> headers,
+                                                 String subjectToken, String subjectTokenType,
+                                                 String actorToken, String actorTokenType, String scope) {
+    Map<String, String> request = tokenExchangeRequest(
+        subjectToken, subjectTokenType, actorToken, actorTokenType,
+        scope != null ? ImmutableList.of(scope) : ImmutableList.of());
+
+    OAuthTokenResponse response = client.postForm(
+        ResourcePaths.tokens(), request, OAuthTokenResponse.class, headers, ErrorHandlers.defaultErrorHandler());
+    response.validate();
+
+    return response;
+  }
+
+  public static OAuthTokenResponse fetchToken(RESTClient client, Map<String, String> headers, String credential,
+                                              String scope) {
+    Map<String, String> request = clientCredentialsRequest(
+        credential, scope != null ? ImmutableList.of(scope) : ImmutableList.of());
+
+    OAuthTokenResponse response = client.postForm(
+        ResourcePaths.tokens(), request, OAuthTokenResponse.class, headers, ErrorHandlers.defaultErrorHandler());
+    response.validate();
+
+    return response;
+  }
+
+  private static Map<String, String> tokenExchangeRequest(String subjectToken, String subjectTokenType,
                                                          List<String> scopes) {
     return tokenExchangeRequest(subjectToken, subjectTokenType, null, null, scopes);
   }
 
-  public static Map<String, String> tokenExchangeRequest(String subjectToken, String subjectTokenType,
+  private static Map<String, String> tokenExchangeRequest(String subjectToken, String subjectTokenType,
                                                          String actorToken, String actorTokenType,
                                                          List<String> scopes) {
     Preconditions.checkArgument(VALID_TOKEN_TYPES.contains(subjectTokenType),
@@ -144,12 +193,12 @@ public class OAuth2Util {
     }
   }
 
-  public static Map<String, String> clientCredentialsRequest(String credential, List<String> scopes) {
+  private static Map<String, String> clientCredentialsRequest(String credential, List<String> scopes) {
     Pair<String, String> credentialPair = parseCredential(credential);
     return clientCredentialsRequest(credentialPair.first(), credentialPair.second(), scopes);
   }
 
-  public static Map<String, String> clientCredentialsRequest(String clientId, String clientSecret,
+  private static Map<String, String> clientCredentialsRequest(String clientId, String clientSecret,
                                                              List<String> scopes) {
     ImmutableMap.Builder<String, String> formData = ImmutableMap.builder();
     formData.put(GRANT_TYPE, CLIENT_CREDENTIALS);
@@ -157,7 +206,7 @@ public class OAuth2Util {
       formData.put(CLIENT_ID, clientId);
     }
     formData.put(CLIENT_SECRET, clientSecret);
-    formData.put(SCOPE, toScope(Iterables.concat(scopes, ImmutableList.of(CATALOG))));
+    formData.put(SCOPE, toScope(scopes));
 
     return formData.build();
   }
@@ -223,4 +272,69 @@ public class OAuth2Util {
 
     return builder.build();
   }
+
+  /**
+   * Class to handle authorization headers and token refresh.
+   */
+  public static class AuthSession {
+    private Map<String, String> headers;
+    private String token;
+    private String tokenType;
+
+    public AuthSession(Map<String, String> baseHeaders, String token, String tokenType) {
+      this.headers = RESTUtil.merge(baseHeaders, authHeaders(token));
+      this.token = token;
+      this.tokenType = tokenType;
+    }
+
+    public Map<String, String> headers() {
+      return headers;
+    }
+
+    public String token() {
+      return token;
+    }
+
+    public String tokenType() {
+      return tokenType;
+    }
+
+    /**
+     * Attempt to refresh the session token using the token exchange flow.
+     *
+     * @param client a RESTClient
+     * @return interval to wait before calling refresh again, or null if no refresh is needed
+     */
+    public Pair<Integer, TimeUnit> refresh(RESTClient client) {
+      if (token != null) {
+        AtomicReference<OAuthTokenResponse> ref = new AtomicReference<>(null);
+        boolean isSuccessful = Tasks.foreach(ref)
+            .suppressFailureWhenFinished()
+            .retry(5)
+            .onFailure((task, err) -> LOG.warn("Failed to refresh token", err))
+            .exponentialBackoff(
+                COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
+                COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
+                COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
+                2.0 /* exponential */)
+            .run(holder ->
+                holder.set(refreshToken(client, headers(), token, tokenType, OAuth2Properties.CATALOG_SCOPE)));
+
+        if (!isSuccessful) {
+          return null;
+        }
+
+        OAuthTokenResponse response = ref.get();
+        this.token = response.token();
+        this.tokenType = response.issuedTokenType();
+        this.headers = RESTUtil.merge(headers, authHeaders(token));
+
+        if (response.expiresInSeconds() != null) {
+          return Pair.of(response.expiresInSeconds(), TimeUnit.SECONDS);
+        }
+      }
+
+      return null;
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
index ebf234b98..ee970005b 100644
--- a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
+++ b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
@@ -21,6 +21,9 @@ package org.apache.iceberg.util;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.iceberg.SystemProperties;
 import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
@@ -61,12 +64,20 @@ public class ThreadPools {
 
   public static ExecutorService newWorkerPool(String namePrefix, int poolSize) {
     return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) Executors.newFixedThreadPool(
-            poolSize,
-            new ThreadFactoryBuilder()
-                .setDaemon(true)
-                .setNameFormat(namePrefix + "-%d")
-                .build()));
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(poolSize, newDaemonThreadFactory(namePrefix)));
+  }
+
+  /**
+   * Create a new {@link ScheduledExecutorService} with the given name and pool size.
+   * <p>
+   * Threads used by this service will be daemon threads.
+   *
+   * @param namePrefix a base name for threads in the executor service's thread pool
+   * @param poolSize max number of threads to use
+   * @return an executor service
+   */
+  public static ScheduledExecutorService newScheduledPool(String namePrefix, int poolSize) {
+    return new ScheduledThreadPoolExecutor(poolSize, newDaemonThreadFactory(namePrefix));
   }
 
   private static int getPoolSize(String systemProperty, int defaultSize) {
@@ -80,4 +91,11 @@ public class ThreadPools {
     }
     return defaultSize;
   }
+
+  private static ThreadFactory newDaemonThreadFactory(String namePrefix) {
+    return new ThreadFactoryBuilder()
+        .setDaemon(true)
+        .setNameFormat(namePrefix + "-%d")
+        .build();
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
index ce2de73cf..9b74fe4ea 100644
--- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
+++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
@@ -170,8 +170,13 @@ public class RESTCatalogAdapter implements RESTClient {
                 .build());
 
           case "urn:ietf:params:oauth:grant-type:token-exchange":
+            String actor = request.get("actor_token");
+            String token = String.format(
+                "token-exchange-token:sub=%s%s",
+                request.get("subject_token"),
+                actor != null ? ",act=" + actor : "");
             return castResponse(responseType, OAuthTokenResponse.builder()
-                .withToken("token-exchange-token:sub=" + request.get("subject_token"))
+                .withToken(token)
                 .withIssuedTokenType("urn:ietf:params:oauth:token-type:access_token")
                 .withTokenType("Bearer")
                 .build());
diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
index 6b3feb2af..d10bead64 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
@@ -28,31 +28,41 @@ import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.CatalogTests;
 import org.apache.iceberg.catalog.SessionCatalog;
-import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.jdbc.JdbcCatalog;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod;
 import org.apache.iceberg.rest.responses.ConfigResponse;
 import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.OAuthTokenResponse;
+import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
 
-public class TestRESTCatalog<T extends Catalog & SupportsNamespaces> extends CatalogTests<T> {
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+
+public class TestRESTCatalog extends CatalogTests<RESTCatalog> {
   @TempDir
   public Path temp;
 
-  private RESTSessionCatalog sessionCatalog;
-  private T restCatalog;
+  private RESTCatalog restCatalog;
   private JdbcCatalog backendCatalog;
 
   @BeforeEach
-  @SuppressWarnings("unchecked")
   public void createCatalog() {
     File warehouse = temp.toFile();
     Configuration conf = new Configuration();
@@ -66,29 +76,40 @@ public class TestRESTCatalog<T extends Catalog & SupportsNamespaces> extends Cat
         JdbcCatalog.PROPERTY_PREFIX + "password", "password");
     backendCatalog.initialize("backend", backendCatalogProperties);
 
-    Map<String, String> testHeaders = ImmutableMap.of("header", "value");
+    Map<String, String> catalogHeaders = ImmutableMap.of(
+        "Authorization", "Bearer client-credentials-token:sub=catalog");
+    Map<String, String> contextHeaders = ImmutableMap.of(
+        "Authorization", "Bearer client-credentials-token:sub=user");
 
     RESTCatalogAdapter adaptor = new RESTCatalogAdapter(backendCatalog) {
       @Override
       public <T extends RESTResponse> T execute(RESTCatalogAdapter.HTTPMethod method, String path, Object body,
                                                 Class<T> responseType, Map<String, String> headers,
                                                 Consumer<ErrorResponse> errorHandler) {
-        Assertions.assertEquals(testHeaders, headers, "Should pass headers through");
+        // this doesn't use a Mockito spy because this is used for catalog tests, which have different method calls
+        if (!"v1/oauth/tokens".equals(path)) {
+          if ("v1/config".equals(path)) {
+            Assertions.assertEquals(catalogHeaders, headers, "Headers did not match for path: " + path);
+          } else {
+            Assertions.assertEquals(contextHeaders, headers, "Headers did not match for path: " + path);
+          }
+        }
         return super.execute(method, path, body, responseType, headers, errorHandler);
       }
     };
 
-    this.sessionCatalog = new RESTSessionCatalog((config) -> adaptor);
-    sessionCatalog.setConf(conf);
-    sessionCatalog.initialize("prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "header.header", "value"));
+    SessionCatalog.SessionContext context = new SessionCatalog.SessionContext(
+        UUID.randomUUID().toString(), "user", ImmutableMap.of("credential", "user:12345"), ImmutableMap.of());
 
-    this.restCatalog = (T) sessionCatalog.asCatalog(SessionCatalog.SessionContext.createEmpty());
+    this.restCatalog = new RESTCatalog(context, (config) -> adaptor);
+    restCatalog.setConf(conf);
+    restCatalog.initialize("prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "credential", "catalog:12345"));
   }
 
   @AfterEach
   public void closeCatalog() throws IOException {
     if (restCatalog != null) {
-      sessionCatalog.close();
+      restCatalog.close();
     }
 
     if (backendCatalog != null) {
@@ -97,7 +118,7 @@ public class TestRESTCatalog<T extends Catalog & SupportsNamespaces> extends Cat
   }
 
   @Override
-  protected T catalog() {
+  protected RESTCatalog catalog() {
     return restCatalog;
   }
 
@@ -115,22 +136,47 @@ public class TestRESTCatalog<T extends Catalog & SupportsNamespaces> extends Cat
 
   @Test
   public void testConfigRoute() throws IOException {
-    RESTCatalogAdapter adaptor = new RESTCatalogAdapter(backendCatalog) {
+    RESTClient testClient = new RESTClient() {
+      @Override
+      public void head(String path, Map<String, String> headers, Consumer<ErrorResponse> errorHandler) {
+        throw new UnsupportedOperationException("Should not be called for testConfigRoute");
+      }
+
       @Override
+      public <T extends RESTResponse> T delete(String path, Class<T> responseType, Map<String, String> headers,
+                                               Consumer<ErrorResponse> errorHandler) {
+        throw new UnsupportedOperationException("Should not be called for testConfigRoute");
+      }
+
+      @Override
+      @SuppressWarnings("unchecked")
       public <T extends RESTResponse> T get(String path, Class<T> responseType, Map<String, String> headers,
                                             Consumer<ErrorResponse> errorHandler) {
-        if (ResourcePaths.config().equals(path)) {
-          return castResponse(responseType, ConfigResponse
-              .builder()
-              .withDefaults(ImmutableMap.of(CatalogProperties.CLIENT_POOL_SIZE, "1"))
-              .withOverrides(ImmutableMap.of(CatalogProperties.CACHE_ENABLED, "false"))
-              .build());
-        }
-        return super.get(path, responseType, headers, errorHandler);
+        return (T) ConfigResponse
+            .builder()
+            .withDefaults(ImmutableMap.of(CatalogProperties.CLIENT_POOL_SIZE, "1"))
+            .withOverrides(ImmutableMap.of(CatalogProperties.CACHE_ENABLED, "false"))
+            .build();
+      }
+
+      @Override
+      public <T extends RESTResponse> T post(String path, RESTRequest body, Class<T> responseType,
+                                             Map<String, String> headers, Consumer<ErrorResponse> errorHandler) {
+        throw new UnsupportedOperationException("Should not be called for testConfigRoute");
+      }
+
+      @Override
+      public <T extends RESTResponse> T postForm(String path, Map<String, String> formData, Class<T> responseType,
+                                                 Map<String, String> headers, Consumer<ErrorResponse> errorHandler) {
+        throw new UnsupportedOperationException("Should not be called for testConfigRoute");
+      }
+
+      @Override
+      public void close() {
       }
     };
 
-    RESTSessionCatalog restCat = new RESTSessionCatalog((config) -> adaptor);
+    RESTCatalog restCat = new RESTCatalog((config) -> testClient);
     Map<String, String> initialConfig = ImmutableMap.of(
         CatalogProperties.URI, "http://localhost:8080",
         CatalogProperties.CACHE_ENABLED, "true");
@@ -148,7 +194,7 @@ public class TestRESTCatalog<T extends Catalog & SupportsNamespaces> extends Cat
 
   @Test
   public void testInitializeWithBadArguments() throws IOException {
-    RESTSessionCatalog restCat = new RESTSessionCatalog();
+    RESTCatalog restCat = new RESTCatalog();
     AssertHelpers.assertThrows("Configuration passed to initialize cannot be null",
         IllegalArgumentException.class,
         "Invalid configuration: null",
@@ -161,4 +207,574 @@ public class TestRESTCatalog<T extends Catalog & SupportsNamespaces> extends Cat
 
     restCat.close();
   }
+
+  @Test
+  public void testCatalogBasicBearerToken() {
+    Map<String, String> catalogHeaders = ImmutableMap.of(
+        "Authorization", "Bearer bearer-token");
+
+    RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+    RESTCatalog catalog = new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
+    catalog.initialize("prod", ImmutableMap.of(
+        CatalogProperties.URI, "ignored",
+        "token", "bearer-token"));
+
+    Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
+
+    // the bearer token should be used for all interactions
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET),
+        eq("v1/namespaces/ns/tables/table"),
+        any(),
+        eq(LoadTableResponse.class),
+        eq(catalogHeaders),
+        any());
+  }
+
+  @Test
+  public void testCatalogCredential() {
+    Map<String, String> emptyHeaders = ImmutableMap.of();
+    Map<String, String> catalogHeaders = ImmutableMap.of(
+        "Authorization", "Bearer client-credentials-token:sub=catalog");
+
+    RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+    RESTCatalog catalog = new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
+    catalog.initialize("prod", ImmutableMap.of(
+        CatalogProperties.URI, "ignored",
+        "credential", "catalog:secret"));
+
+    Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
+
+    // no token or credential for catalog token exchange
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(emptyHeaders), any());
+    // no token or credential for config
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+    // use the catalog token for all interactions
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET),
+        eq("v1/namespaces/ns/tables/table"),
+        any(),
+        eq(LoadTableResponse.class),
+        eq(catalogHeaders),
+        any());
+  }
+
+  @Test
+  public void testCatalogBearerTokenWithClientCredential() {
+    Map<String, String> contextHeaders = ImmutableMap.of(
+        "Authorization", "Bearer client-credentials-token:sub=user");
+    Map<String, String> catalogHeaders = ImmutableMap.of(
+        "Authorization", "Bearer bearer-token");
+
+    RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+    SessionCatalog.SessionContext context = new SessionCatalog.SessionContext(
+        UUID.randomUUID().toString(), "user", ImmutableMap.of("credential", "user:secret"), ImmutableMap.of());
+
+    RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
+    catalog.initialize("prod", ImmutableMap.of(
+        CatalogProperties.URI, "ignored",
+        "token", "bearer-token"));
+
+    Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
+
+    // use the bearer token for config
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+    // use the bearer token to fetch the context token
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(catalogHeaders), any());
+    // use the context token for table load
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET),
+        eq("v1/namespaces/ns/tables/table"),
+        any(),
+        eq(LoadTableResponse.class),
+        eq(contextHeaders),
+        any());
+  }
+
+  @Test
+  public void testCatalogCredentialWithClientCredential() {
+    Map<String, String> emptyHeaders = ImmutableMap.of();
+    Map<String, String> contextHeaders = ImmutableMap.of(
+        "Authorization", "Bearer client-credentials-token:sub=user");
+    Map<String, String> catalogHeaders = ImmutableMap.of(
+        "Authorization", "Bearer client-credentials-token:sub=catalog");
+
+    RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+    SessionCatalog.SessionContext context = new SessionCatalog.SessionContext(
+        UUID.randomUUID().toString(), "user", ImmutableMap.of("credential", "user:secret"), ImmutableMap.of());
+
+    RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
+    catalog.initialize("prod", ImmutableMap.of(
+        CatalogProperties.URI, "ignored",
+        "credential", "catalog:secret"));
+
+    Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
+
+    // call client credentials with no initial auth
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(emptyHeaders), any());
+    // use the client credential token for config
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+    // use the client credential to fetch the context token
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(catalogHeaders), any());
+    // use the context token for table load
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET),
+        eq("v1/namespaces/ns/tables/table"),
+        any(),
+        eq(LoadTableResponse.class),
+        eq(contextHeaders),
+        any());
+  }
+
+  @Test
+  public void testCatalogBearerTokenAndCredentialWithClientCredential() {
+    Map<String, String> contextHeaders = ImmutableMap.of(
+        "Authorization", "Bearer client-credentials-token:sub=user");
+    Map<String, String> initHeaders = ImmutableMap.of(
+        "Authorization", "Bearer bearer-token");
+    Map<String, String> catalogHeaders = ImmutableMap.of(
+        "Authorization", "Bearer client-credentials-token:sub=catalog");
+
+    RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+    SessionCatalog.SessionContext context = new SessionCatalog.SessionContext(
+        UUID.randomUUID().toString(), "user", ImmutableMap.of("credential", "user:secret"), ImmutableMap.of());
+
+    RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
+    catalog.initialize("prod", ImmutableMap.of(
+        CatalogProperties.URI, "ignored",
+        "credential", "catalog:secret",
+        "token", "bearer-token"));
+
+    Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
+
+    // use the bearer token for client credentials
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(initHeaders), any());
+    // use the client credential token for config
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+    // use the client credential to fetch the context token
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(catalogHeaders), any());
+    // use the context token for table load
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET),
+        eq("v1/namespaces/ns/tables/table"),
+        any(),
+        eq(LoadTableResponse.class),
+        eq(contextHeaders),
+        any());
+  }
+
+  @Test
+  public void testClientBearerToken() {
+    testClientAuth("bearer-token",
+        ImmutableMap.of(
+            "token", "client-bearer-token",
+            "credential", "user:secret",
+            "urn:ietf:params:oauth:token-type:id_token", "id-token",
+            "urn:ietf:params:oauth:token-type:access_token", "access-token",
+            "urn:ietf:params:oauth:token-type:jwt", "jwt-token",
+            "urn:ietf:params:oauth:token-type:saml2", "saml2-token",
+            "urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
+        ImmutableMap.of("Authorization", "Bearer client-bearer-token"));
+  }
+
+  @Test
+  public void testClientCredential() {
+    testClientAuth("bearer-token",
+        ImmutableMap.of(
+            "credential", "user:secret",
+            "urn:ietf:params:oauth:token-type:id_token", "id-token",
+            "urn:ietf:params:oauth:token-type:access_token", "access-token",
+            "urn:ietf:params:oauth:token-type:jwt", "jwt-token",
+            "urn:ietf:params:oauth:token-type:saml2", "saml2-token",
+            "urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
+        ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=user"));
+  }
+
+  @Test
+  public void testClientIDToken() {
+    testClientAuth("bearer-token",
+        ImmutableMap.of(
+            "urn:ietf:params:oauth:token-type:id_token", "id-token",
+            "urn:ietf:params:oauth:token-type:access_token", "access-token",
+            "urn:ietf:params:oauth:token-type:jwt", "jwt-token",
+            "urn:ietf:params:oauth:token-type:saml2", "saml2-token",
+            "urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
+        ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=id-token,act=bearer-token"));
+  }
+
+  @Test
+  public void testClientAccessToken() {
+    testClientAuth("bearer-token",
+        ImmutableMap.of(
+            "urn:ietf:params:oauth:token-type:access_token", "access-token",
+            "urn:ietf:params:oauth:token-type:jwt", "jwt-token",
+            "urn:ietf:params:oauth:token-type:saml2", "saml2-token",
+            "urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
+        ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=access-token,act=bearer-token"));
+  }
+
+  @Test
+  public void testClientJWTToken() {
+    testClientAuth("bearer-token",
+        ImmutableMap.of(
+            "urn:ietf:params:oauth:token-type:jwt", "jwt-token",
+            "urn:ietf:params:oauth:token-type:saml2", "saml2-token",
+            "urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
+        ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=jwt-token,act=bearer-token"));
+  }
+
+  @Test
+  public void testClientSAML2Token() {
+    testClientAuth("bearer-token",
+        ImmutableMap.of(
+            "urn:ietf:params:oauth:token-type:saml2", "saml2-token",
+            "urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
+        ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=saml2-token,act=bearer-token"));
+  }
+
+  @Test
+  public void testClientSAML1Token() {
+    testClientAuth("bearer-token",
+        ImmutableMap.of("urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
+        ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=saml1-token,act=bearer-token"));
+  }
+
+  private void testClientAuth(String catalogToken, Map<String, String> credentials,
+                              Map<String, String> expectedHeaders) {
+    Map<String, String> catalogHeaders = ImmutableMap.of(
+        "Authorization", "Bearer " + catalogToken);
+
+    RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+    SessionCatalog.SessionContext context = new SessionCatalog.SessionContext(
+        UUID.randomUUID().toString(), "user", credentials, ImmutableMap.of());
+
+    RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
+    catalog.initialize("prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "token", catalogToken));
+
+    Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
+
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+
+    // token passes a static token. otherwise, validate a client credentials or token exchange request
+    if (!credentials.containsKey("token")) {
+      Mockito.verify(adapter).execute(
+          eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(catalogHeaders), any());
+    }
+
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET),
+        eq("v1/namespaces/ns/tables/table"),
+        any(),
+        eq(LoadTableResponse.class),
+        eq(expectedHeaders),
+        any());
+  }
+
+  @Test
+  public void testTableBearerToken() {
+    testTableAuth("catalog",
+        ImmutableMap.of("urn:ietf:params:oauth:token-type:id_token", "id-token"),
+        ImmutableMap.of("token", "table-bearer-token"),
+        ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=id-token,act=catalog"),
+        ImmutableMap.of("Authorization", "Bearer table-bearer-token"));
+  }
+
+  @Test
+  public void testTableIDToken() {
+    testTableAuth("catalog",
+        ImmutableMap.of("urn:ietf:params:oauth:token-type:id_token", "id-token"),
+        ImmutableMap.of("urn:ietf:params:oauth:token-type:id_token", "table-id-token"),
+        ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=id-token,act=catalog"),
+        ImmutableMap.of("Authorization",
+            "Bearer token-exchange-token:sub=table-id-token,act=token-exchange-token:sub=id-token,act=catalog"));
+  }
+
+  @Test
+  public void testTableCredential() {
+    testTableAuth("catalog",
+        ImmutableMap.of("urn:ietf:params:oauth:token-type:id_token", "id-token"),
+        ImmutableMap.of("credential", "table-user:secret"),
+        ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=id-token,act=catalog"),
+        ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=table-user"));
+  }
+
+  public void testTableAuth(String catalogToken, Map<String, String> credentials, Map<String, String> tableConfig,
+                            Map<String, String> expectedContextHeaders, Map<String, String> expectedTableHeaders) {
+    TableIdentifier ident = TableIdentifier.of("ns", "table");
+    Map<String, String> catalogHeaders = ImmutableMap.of(
+        "Authorization", "Bearer " + catalogToken);
+
+    RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+    // inject the expected table config
+    Answer<LoadTableResponse> addTableConfig = invocation -> {
+      LoadTableResponse loadTable = (LoadTableResponse) invocation.callRealMethod();
+      return LoadTableResponse.builder()
+          .withTableMetadata(loadTable.tableMetadata())
+          .addAllConfig(loadTable.config())
+          .addAllConfig(tableConfig)
+          .build();
+    };
+
+    Mockito.doAnswer(addTableConfig).when(adapter).execute(
+        eq(HTTPMethod.POST),
+        eq("v1/namespaces/ns/tables"),
+        any(),
+        eq(LoadTableResponse.class),
+        eq(expectedContextHeaders),
+        any());
+
+    Mockito.doAnswer(addTableConfig).when(adapter).execute(
+        eq(HTTPMethod.GET),
+        eq("v1/namespaces/ns/tables/table"),
+        any(),
+        eq(LoadTableResponse.class),
+        eq(expectedContextHeaders),
+        any());
+
+    SessionCatalog.SessionContext context = new SessionCatalog.SessionContext(
+        UUID.randomUUID().toString(), "user", credentials, ImmutableMap.of());
+
+    RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
+    catalog.initialize("prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "token", catalogToken));
+
+    Schema expectedSchema = new Schema(
+        required(1, "id", Types.IntegerType.get(), "unique ID"),
+        required(2, "data", Types.StringType.get())
+    );
+
+    Table table = catalog.createTable(ident, expectedSchema);
+    Assertions.assertEquals(expectedSchema.asStruct(), table.schema().asStruct(), "Schema should match");
+
+    Table loaded = catalog.loadTable(ident); // the first load will send the token
+    Assertions.assertEquals(expectedSchema.asStruct(), loaded.schema().asStruct(), "Schema should match");
+
+    loaded.refresh(); // refresh to force reload
+
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+    // session client credentials flow
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(catalogHeaders), any());
+
+    // create table request
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.POST),
+        eq("v1/namespaces/ns/tables"),
+        any(),
+        eq(LoadTableResponse.class),
+        eq(expectedContextHeaders),
+        any());
+
+    // if the table returned a bearer token, there will be no token request
+    if (!tableConfig.containsKey("token")) {
+      // client credentials or token exchange to get a table token
+      Mockito.verify(adapter, times(2)).execute(
+          eq(HTTPMethod.POST),
+          eq("v1/oauth/tokens"),
+          any(),
+          eq(OAuthTokenResponse.class),
+          eq(expectedContextHeaders),
+          any());
+    }
+
+    // automatic refresh when metadata is accessed after commit
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET),
+        eq("v1/namespaces/ns/tables/table"),
+        any(),
+        eq(LoadTableResponse.class),
+        eq(expectedTableHeaders),
+        any());
+
+    // load table from catalog
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET),
+        eq("v1/namespaces/ns/tables/table"),
+        any(),
+        eq(LoadTableResponse.class),
+        eq(expectedContextHeaders),
+        any());
+
+    // refresh loaded table
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET),
+        eq("v1/namespaces/ns/tables/table"),
+        any(),
+        eq(LoadTableResponse.class),
+        eq(expectedTableHeaders),
+        any());
+  }
+
+  @Test
+  public void testCatalogTokenRefresh() throws Exception {
+    Map<String, String> emptyHeaders = ImmutableMap.of();
+    Map<String, String> catalogHeaders = ImmutableMap.of(
+        "Authorization", "Bearer client-credentials-token:sub=catalog");
+
+    RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+    Answer<OAuthTokenResponse> addOneSecondExpiration = invocation -> {
+      OAuthTokenResponse response = (OAuthTokenResponse) invocation.callRealMethod();
+      return OAuthTokenResponse.builder()
+          .withToken(response.token())
+          .withTokenType(response.tokenType())
+          .withIssuedTokenType(response.issuedTokenType())
+          .addScopes(response.scopes())
+          .setExpirationInSeconds(1)
+          .build();
+    };
+
+    Mockito.doAnswer(addOneSecondExpiration).when(adapter).execute(
+        eq(HTTPMethod.POST),
+        eq("v1/oauth/tokens"),
+        any(),
+        eq(OAuthTokenResponse.class),
+        any(),
+        any());
+
+    Map<String, String> contextCredentials = ImmutableMap.of();
+    SessionCatalog.SessionContext context = new SessionCatalog.SessionContext(
+        UUID.randomUUID().toString(), "user", contextCredentials, ImmutableMap.of());
+
+    RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
+    catalog.initialize("prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "credential", "catalog:secret"));
+
+    Thread.sleep(3_000); // sleep until after 2 refresh calls
+
+    // call client credentials with no initial auth
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(emptyHeaders), any());
+
+    // use the client credential token for config
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+
+    // verify the first token exchange
+    Map<String, String> firstRefreshRequest = ImmutableMap.of(
+        "grant_type", "urn:ietf:params:oauth:grant-type:token-exchange",
+        "subject_token", "client-credentials-token:sub=catalog",
+        "subject_token_type", "urn:ietf:params:oauth:token-type:access_token",
+        "scope", "catalog"
+    );
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.POST),
+        eq("v1/oauth/tokens"),
+        Mockito.argThat(firstRefreshRequest::equals),
+        eq(OAuthTokenResponse.class),
+        eq(catalogHeaders),
+        any());
+
+    // verify that a second exchange occurs
+    Map<String, String> secondRefreshRequest = ImmutableMap.of(
+        "grant_type", "urn:ietf:params:oauth:grant-type:token-exchange",
+        "subject_token", "token-exchange-token:sub=client-credentials-token:sub=catalog",
+        "subject_token_type", "urn:ietf:params:oauth:token-type:access_token",
+        "scope", "catalog"
+    );
+    Map<String, String> secondRefreshHeaders = ImmutableMap.of(
+        "Authorization", "Bearer token-exchange-token:sub=client-credentials-token:sub=catalog"
+    );
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.POST),
+        eq("v1/oauth/tokens"),
+        Mockito.argThat(secondRefreshRequest::equals),
+        eq(OAuthTokenResponse.class),
+        eq(secondRefreshHeaders),
+        any());
+  }
+
+  @Test
+  public void testCatalogRefreshedTokenIsUsed() throws Exception {
+    Map<String, String> emptyHeaders = ImmutableMap.of();
+    Map<String, String> catalogHeaders = ImmutableMap.of(
+        "Authorization", "Bearer client-credentials-token:sub=catalog");
+
+    RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
+
+    Answer<OAuthTokenResponse> addOneSecondExpiration = invocation -> {
+      OAuthTokenResponse response = (OAuthTokenResponse) invocation.callRealMethod();
+      return OAuthTokenResponse.builder()
+          .withToken(response.token())
+          .withTokenType(response.tokenType())
+          .withIssuedTokenType(response.issuedTokenType())
+          .addScopes(response.scopes())
+          .setExpirationInSeconds(1)
+          .build();
+    };
+
+    Mockito.doAnswer(addOneSecondExpiration).when(adapter).execute(
+        eq(HTTPMethod.POST),
+        eq("v1/oauth/tokens"),
+        any(),
+        eq(OAuthTokenResponse.class),
+        any(),
+        any());
+
+    Map<String, String> contextCredentials = ImmutableMap.of();
+    SessionCatalog.SessionContext context = new SessionCatalog.SessionContext(
+        UUID.randomUUID().toString(), "user", contextCredentials, ImmutableMap.of());
+
+    RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
+    catalog.initialize("prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "credential", "catalog:secret"));
+
+    Thread.sleep(1_100); // sleep until after 2 refresh calls
+
+    // use the exchanged catalog token
+    Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
+
+    // call client credentials with no initial auth
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.POST), eq("v1/oauth/tokens"), any(), eq(OAuthTokenResponse.class), eq(emptyHeaders), any());
+
+    // use the client credential token for config
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET), eq("v1/config"), any(), eq(ConfigResponse.class), eq(catalogHeaders), any());
+
+    // verify the first token exchange
+    Map<String, String> firstRefreshRequest = ImmutableMap.of(
+        "grant_type", "urn:ietf:params:oauth:grant-type:token-exchange",
+        "subject_token", "client-credentials-token:sub=catalog",
+        "subject_token_type", "urn:ietf:params:oauth:token-type:access_token",
+        "scope", "catalog"
+    );
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.POST),
+        eq("v1/oauth/tokens"),
+        Mockito.argThat(firstRefreshRequest::equals),
+        eq(OAuthTokenResponse.class),
+        eq(catalogHeaders),
+        any());
+
+    // use the refreshed context token for table load
+    Map<String, String> refreshedCatalogHeader = ImmutableMap.of(
+        "Authorization", "Bearer token-exchange-token:sub=client-credentials-token:sub=catalog"
+    );
+    Mockito.verify(adapter).execute(
+        eq(HTTPMethod.GET),
+        eq("v1/namespaces/ns/tables/table"),
+        any(),
+        eq(LoadTableResponse.class),
+        eq(refreshedCatalogHeader),
+        any());
+  }
 }