You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/29 20:35:11 UTC

[iceberg] branch master updated: Core: Add REST catalog session timeout (#4894)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 58657c629 Core: Add REST catalog session timeout (#4894)
58657c629 is described below

commit 58657c629d4ed011fc6669766620964036d96c99
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Sun May 29 13:35:06 2022 -0700

    Core: Add REST catalog session timeout (#4894)
---
 .../java/org/apache/iceberg/CatalogProperties.java |  5 ++++
 .../apache/iceberg/rest/RESTSessionCatalog.java    | 35 ++++++++++++++++++----
 .../apache/iceberg/rest/auth/OAuth2Properties.java |  4 +--
 .../org/apache/iceberg/rest/auth/OAuth2Util.java   |  7 ++++-
 4 files changed, 42 insertions(+), 9 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java
index 447970cf6..d96784162 100644
--- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java
+++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java
@@ -83,4 +83,9 @@ public class CatalogProperties {
   public static final String APP_ID = "app-id";
   public static final String USER = "user";
 
+  public static final String AUTH_DEFAULT_REFRESH_ENABLED = "auth.default-refresh-enabled";
+  public static final boolean AUTH_DEFAULT_REFRESH_ENABLED_DEFAULT = false;
+
+  public static final String AUTH_SESSION_TIMEOUT_MS = "auth.session-timeout-ms";
+  public static final long AUTH_SESSION_TIMEOUT_MS_DEFAULT = TimeUnit.HOURS.toMillis(1);
 }
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index cb8b5b0e3..2512594e6 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -21,9 +21,11 @@ package org.apache.iceberg.rest;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -85,8 +87,9 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
       OAuth2Properties.SAML2_TOKEN_TYPE, OAuth2Properties.SAML1_TOKEN_TYPE);
 
   private final Function<Map<String, String>, RESTClient> clientBuilder;
-  private final Cache<String, AuthSession> sessions = Caffeine.newBuilder().build();
+  private Cache<String, AuthSession> sessions = null;
   private AuthSession catalogAuth = null;
+  private boolean refreshAuthByDefault = false;
   private RESTClient client = null;
   private ResourcePaths paths = null;
   private Object conf = null;
@@ -138,6 +141,9 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
       this.catalogAuth = newSession(initToken, expiresInMs(mergedProps), catalogAuth);
     }
 
+    this.sessions = newSessionCache(mergedProps);
+    this.refreshAuthByDefault = PropertyUtil.propertyAsBoolean(mergedProps,
+        CatalogProperties.AUTH_DEFAULT_REFRESH_ENABLED, CatalogProperties.AUTH_DEFAULT_REFRESH_ENABLED_DEFAULT);
     this.client = clientBuilder.apply(mergedProps);
     this.paths = ResourcePaths.forCatalogProperties(mergedProps);
 
@@ -578,9 +584,11 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
     return parent;
   }
 
-  private AuthSession newSession(String token, long expirationMs, AuthSession parent) {
+  private AuthSession newSession(String token, Long expirationMs, AuthSession parent) {
     AuthSession session = new AuthSession(parent.headers(), token, OAuth2Properties.ACCESS_TOKEN_TYPE);
-    scheduleTokenRefresh(session, System.currentTimeMillis(), expirationMs, TimeUnit.MILLISECONDS);
+    if (expirationMs != null) {
+      scheduleTokenRefresh(session, System.currentTimeMillis(), expirationMs, TimeUnit.MILLISECONDS);
+    }
     return session;
   }
 
@@ -606,12 +614,27 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
     return session;
   }
 
-  private long expiresInMs(Map<String, String> properties) {
-    return PropertyUtil.propertyAsLong(
-        properties, OAuth2Properties.EXCHANGE_TOKEN_MS, OAuth2Properties.EXCHANGE_TOKEN_MS_DEFAULT);
+  private Long expiresInMs(Map<String, String> properties) {
+    if (refreshAuthByDefault || properties.containsKey(OAuth2Properties.TOKEN_EXPIRES_IN_MS)) {
+      return PropertyUtil.propertyAsLong(
+          properties, OAuth2Properties.TOKEN_EXPIRES_IN_MS, OAuth2Properties.TOKEN_EXPIRES_IN_MS_DEFAULT);
+    } else {
+      return null;
+    }
   }
 
   private static Map<String, String> configHeaders(Map<String, String> properties) {
     return RESTUtil.extractPrefixMap(properties, "header.");
   }
+
+  private static Cache<String, AuthSession> newSessionCache(Map<String, String> properties) {
+    long expirationIntervalMs = PropertyUtil.propertyAsLong(properties,
+        CatalogProperties.AUTH_SESSION_TIMEOUT_MS,
+        CatalogProperties.AUTH_SESSION_TIMEOUT_MS_DEFAULT);
+
+    return Caffeine.newBuilder()
+        .expireAfterAccess(Duration.ofMillis(expirationIntervalMs))
+        .removalListener((RemovalListener<String, AuthSession>) (id, auth, cause) -> auth.stopRefreshing())
+        .build();
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java
index 184a80481..dd3a2e78c 100644
--- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java
+++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java
@@ -37,8 +37,8 @@ public class OAuth2Properties {
    * Interval in milliseconds to wait before attempting to exchange the configured catalog Bearer token.
    * By default, token exchange will be attempted after 1 hour.
    */
-  public static final String EXCHANGE_TOKEN_MS = "exchange-token-in-ms";
-  public static final long EXCHANGE_TOKEN_MS_DEFAULT = 3_600_000; // 1 hour
+  public static final String TOKEN_EXPIRES_IN_MS = "token-expires-in-ms";
+  public static final long TOKEN_EXPIRES_IN_MS_DEFAULT = 3_600_000; // 1 hour
 
   /**
    * Additional scope for OAuth2.
diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
index 48f4d7980..8c8a223ee 100644
--- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
+++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
@@ -280,6 +280,7 @@ public class OAuth2Util {
     private Map<String, String> headers;
     private String token;
     private String tokenType;
+    private volatile boolean keepRefreshed = true;
 
     public AuthSession(Map<String, String> baseHeaders, String token, String tokenType) {
       this.headers = RESTUtil.merge(baseHeaders, authHeaders(token));
@@ -299,6 +300,10 @@ public class OAuth2Util {
       return tokenType;
     }
 
+    public void stopRefreshing() {
+      this.keepRefreshed = false;
+    }
+
     /**
      * Attempt to refresh the session token using the token exchange flow.
      *
@@ -306,7 +311,7 @@ public class OAuth2Util {
      * @return interval to wait before calling refresh again, or null if no refresh is needed
      */
     public Pair<Integer, TimeUnit> refresh(RESTClient client) {
-      if (token != null) {
+      if (token != null && keepRefreshed) {
         AtomicReference<OAuthTokenResponse> ref = new AtomicReference<>(null);
         boolean isSuccessful = Tasks.foreach(ref)
             .suppressFailureWhenFinished()