You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/29 20:35:11 UTC
[iceberg] branch master updated: Core: Add REST catalog session timeout (#4894)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 58657c629 Core: Add REST catalog session timeout (#4894)
58657c629 is described below
commit 58657c629d4ed011fc6669766620964036d96c99
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Sun May 29 13:35:06 2022 -0700
Core: Add REST catalog session timeout (#4894)
---
.../java/org/apache/iceberg/CatalogProperties.java | 5 ++++
.../apache/iceberg/rest/RESTSessionCatalog.java | 35 ++++++++++++++++++----
.../apache/iceberg/rest/auth/OAuth2Properties.java | 4 +--
.../org/apache/iceberg/rest/auth/OAuth2Util.java | 7 ++++-
4 files changed, 42 insertions(+), 9 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java
index 447970cf6..d96784162 100644
--- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java
+++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java
@@ -83,4 +83,9 @@ public class CatalogProperties {
public static final String APP_ID = "app-id";
public static final String USER = "user";
+ public static final String AUTH_DEFAULT_REFRESH_ENABLED = "auth.default-refresh-enabled";
+ public static final boolean AUTH_DEFAULT_REFRESH_ENABLED_DEFAULT = false;
+
+ public static final String AUTH_SESSION_TIMEOUT_MS = "auth.session-timeout-ms";
+ public static final long AUTH_SESSION_TIMEOUT_MS_DEFAULT = TimeUnit.HOURS.toMillis(1);
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index cb8b5b0e3..2512594e6 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -21,9 +21,11 @@ package org.apache.iceberg.rest;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -85,8 +87,9 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
OAuth2Properties.SAML2_TOKEN_TYPE, OAuth2Properties.SAML1_TOKEN_TYPE);
private final Function<Map<String, String>, RESTClient> clientBuilder;
- private final Cache<String, AuthSession> sessions = Caffeine.newBuilder().build();
+ private Cache<String, AuthSession> sessions = null;
private AuthSession catalogAuth = null;
+ private boolean refreshAuthByDefault = false;
private RESTClient client = null;
private ResourcePaths paths = null;
private Object conf = null;
@@ -138,6 +141,9 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
this.catalogAuth = newSession(initToken, expiresInMs(mergedProps), catalogAuth);
}
+ this.sessions = newSessionCache(mergedProps);
+ this.refreshAuthByDefault = PropertyUtil.propertyAsBoolean(mergedProps,
+ CatalogProperties.AUTH_DEFAULT_REFRESH_ENABLED, CatalogProperties.AUTH_DEFAULT_REFRESH_ENABLED_DEFAULT);
this.client = clientBuilder.apply(mergedProps);
this.paths = ResourcePaths.forCatalogProperties(mergedProps);
@@ -578,9 +584,11 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
return parent;
}
- private AuthSession newSession(String token, long expirationMs, AuthSession parent) {
+ private AuthSession newSession(String token, Long expirationMs, AuthSession parent) {
AuthSession session = new AuthSession(parent.headers(), token, OAuth2Properties.ACCESS_TOKEN_TYPE);
- scheduleTokenRefresh(session, System.currentTimeMillis(), expirationMs, TimeUnit.MILLISECONDS);
+ if (expirationMs != null) {
+ scheduleTokenRefresh(session, System.currentTimeMillis(), expirationMs, TimeUnit.MILLISECONDS);
+ }
return session;
}
@@ -606,12 +614,27 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
return session;
}
- private long expiresInMs(Map<String, String> properties) {
- return PropertyUtil.propertyAsLong(
- properties, OAuth2Properties.EXCHANGE_TOKEN_MS, OAuth2Properties.EXCHANGE_TOKEN_MS_DEFAULT);
+ private Long expiresInMs(Map<String, String> properties) {
+ if (refreshAuthByDefault || properties.containsKey(OAuth2Properties.TOKEN_EXPIRES_IN_MS)) {
+ return PropertyUtil.propertyAsLong(
+ properties, OAuth2Properties.TOKEN_EXPIRES_IN_MS, OAuth2Properties.TOKEN_EXPIRES_IN_MS_DEFAULT);
+ } else {
+ return null;
+ }
}
private static Map<String, String> configHeaders(Map<String, String> properties) {
return RESTUtil.extractPrefixMap(properties, "header.");
}
+
+ private static Cache<String, AuthSession> newSessionCache(Map<String, String> properties) {
+ long expirationIntervalMs = PropertyUtil.propertyAsLong(properties,
+ CatalogProperties.AUTH_SESSION_TIMEOUT_MS,
+ CatalogProperties.AUTH_SESSION_TIMEOUT_MS_DEFAULT);
+
+ return Caffeine.newBuilder()
+ .expireAfterAccess(Duration.ofMillis(expirationIntervalMs))
+ .removalListener((RemovalListener<String, AuthSession>) (id, auth, cause) -> auth.stopRefreshing())
+ .build();
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java
index 184a80481..dd3a2e78c 100644
--- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java
+++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Properties.java
@@ -37,8 +37,8 @@ public class OAuth2Properties {
* Interval in milliseconds to wait before attempting to exchange the configured catalog Bearer token.
* By default, token exchange will be attempted after 1 hour.
*/
- public static final String EXCHANGE_TOKEN_MS = "exchange-token-in-ms";
- public static final long EXCHANGE_TOKEN_MS_DEFAULT = 3_600_000; // 1 hour
+ public static final String TOKEN_EXPIRES_IN_MS = "token-expires-in-ms";
+ public static final long TOKEN_EXPIRES_IN_MS_DEFAULT = 3_600_000; // 1 hour
/**
* Additional scope for OAuth2.
diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
index 48f4d7980..8c8a223ee 100644
--- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
+++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
@@ -280,6 +280,7 @@ public class OAuth2Util {
private Map<String, String> headers;
private String token;
private String tokenType;
+ private volatile boolean keepRefreshed = true;
public AuthSession(Map<String, String> baseHeaders, String token, String tokenType) {
this.headers = RESTUtil.merge(baseHeaders, authHeaders(token));
@@ -299,6 +300,10 @@ public class OAuth2Util {
return tokenType;
}
+ public void stopRefreshing() {
+ this.keepRefreshed = false;
+ }
+
/**
* Attempt to refresh the session token using the token exchange flow.
*
@@ -306,7 +311,7 @@ public class OAuth2Util {
* @return interval to wait before calling refresh again, or null if no refresh is needed
*/
public Pair<Integer, TimeUnit> refresh(RESTClient client) {
- if (token != null) {
+ if (token != null && keepRefreshed) {
AtomicReference<OAuthTokenResponse> ref = new AtomicReference<>(null);
boolean isSuccessful = Tasks.foreach(ref)
.suppressFailureWhenFinished()