You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/26 11:27:08 UTC
[pulsar] 02/02: Add refresh authentication command in broker (#9064)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 78e7b2069f25d3da96e8204e0dac27d44448602f
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Jan 26 18:11:04 2021 +0800
Add refresh authentication command in broker (#9064)
* Add refresh authentication command in the broker
---
**Motivation**
Some authentication provider is using cached authentication data. Until
redoing the 'getAuthData' then it will provide the new auth data. So
I add a refresh command and do the refresh authentication data provider
when received the command.
**Verify this change**
Existent tests can pass.
(cherry picked from commit 373948ee1df6294ef43dafe457a86e53cd86ab32)
---
.../authentication/SaslAuthenticateTest.java | 2 +-
.../broker/authentication/AuthenticationState.java | 4 +-
.../AuthenticationProviderTokenTest.java | 2 +-
.../client/api/MutualAuthenticationTest.java | 2 +-
...kenOauth2AuthenticatedProducerConsumerTest.java | 76 ++++++++++++++++++++++
.../org/apache/pulsar/common/api/AuthData.java | 5 +-
.../client/impl/auth/AuthenticationSasl.java | 2 +-
.../impl/auth/SaslAuthenticationDataProvider.java | 2 +-
.../org/apache/pulsar/client/impl/ClientCnx.java | 14 +++-
.../pulsar/proxy/server/DirectProxyHandler.java | 13 +++-
.../apache/pulsar/proxy/server/ProxyClientCnx.java | 2 +-
.../token/PulsarTokenAuthenticationBaseSuite.java | 11 +++-
12 files changed, 121 insertions(+), 14 deletions(-)
diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index e94d2ae..bb20077 100644
--- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -260,7 +260,7 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
// auth between server and client.
// first time auth
- AuthData initData1 = dataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+ AuthData initData1 = dataProvider.authenticate(AuthData.INIT_AUTH_DATA);
AuthData serverData1 = authState.authenticate(initData1);
boolean complete = authState.isComplete();
assertFalse(complete);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
index ac881ac..f51e818 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
@@ -68,7 +68,7 @@ public interface AuthenticationState {
/**
* If the authentication state supports refreshing and the credentials are expired,
- * the auth provider will call this method ot initiate the refresh process.
+ * the auth provider will call this method to initiate the refresh process.
* <p>
* The auth state here will return the broker side data that will be used to send
* a challenge to the client.
@@ -77,6 +77,6 @@ public interface AuthenticationState {
* @throws AuthenticationException
*/
default AuthData refreshAuthentication() throws AuthenticationException {
- return null;
+ return AuthData.REFRESH_AUTH_DATA;
}
}
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
index 60fa5f6..c2610aa 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
@@ -634,7 +634,7 @@ public class AuthenticationProviderTokenTest {
assertTrue(authState.isComplete());
AuthData brokerData = authState.refreshAuthentication();
- assertNull(brokerData);
+ assertEquals(brokerData, AuthData.REFRESH_AUTH_DATA);
}
// tests for Token Audience
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
index ef06323..458c4ae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -70,7 +70,7 @@ public class MutualAuthenticationTest extends ProducerConsumerBase {
String dataString = new String(data.getBytes(), UTF_8);
AuthData toSend;
- if (Arrays.equals(dataString.getBytes(), AuthData.INIT_AUTH_DATA)) {
+ if (Arrays.equals(dataString.getBytes(), AuthData.INIT_AUTH_DATA_BYTES)) {
toSend = AuthData.of(clientAuthStrings[0].getBytes(UTF_8));
} else if (Arrays.equals(dataString.getBytes(), serverAuthStrings[0].getBytes(UTF_8))) {
toSend = AuthData.of(clientAuthStrings[1].getBytes(UTF_8));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index cfb80dc..b4e7bb0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -25,15 +25,19 @@ import java.net.URI;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
@@ -62,6 +66,7 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
protected void setup() throws Exception {
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
+ conf.setAuthenticationRefreshCheckSeconds(5);
Set<String> superUserRoles = new HashSet<>();
superUserRoles.add(ADMIN_ROLE);
@@ -157,4 +162,75 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
log.info("-- Exiting {} test --", methodName);
}
+ @Test
+ public void testOAuth2TokenRefreshedWithoutReconnect() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+ clientSetup();
+
+ // test rest by admin
+ admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+ admin.tenants().createTenant("my-property",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic")
+ .subscriptionName("my-subscriber-name").subscribe();
+
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic");
+ Producer<byte[]> producer = producerBuilder.create();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ Message<byte[]> msg = null;
+ Set<String> messageSet = Sets.newHashSet();
+ for (int i = 0; i < 10; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+
+ // get the first connection stats
+ ProducerImpl producerImpl = (ProducerImpl) producer;
+ String accessTokenOld = producerImpl.getClientCnx().getAuthenticationDataProvider().getCommandData();
+ long lastDisconnectTime = producer.getLastDisconnectedTimestamp();
+
+ // the token expire duration is 10 seconds, so we need to wait for the authenticationData refreshed
+ Awaitility.await()
+ .atLeast(10, TimeUnit.SECONDS)
+ .atMost(20, TimeUnit.SECONDS)
+ .with()
+ .pollInterval(Duration.ofSeconds(1))
+ .untilAsserted(() -> {
+ String accessTokenNew = producerImpl.getClientCnx().getAuthenticationDataProvider().getCommandData();
+ Assert.assertNotEquals(accessTokenOld, accessTokenNew);
+ });
+
+ // get the lastDisconnectTime, it should be same with the before, because the connection shouldn't disconnect
+ long lastDisconnectTimeAfterTokenExpired = producer.getLastDisconnectedTimestamp();
+ Assert.assertEquals(lastDisconnectTime, lastDisconnectTimeAfterTokenExpired);
+
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ msg = null;
+ messageSet = Sets.newHashSet();
+ for (int i = 0; i < 10; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+ }
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java
index f12c436..188cb27 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java
@@ -28,7 +28,10 @@ import lombok.Data;
@Data(staticConstructor = "of")
public final class AuthData {
// CHECKSTYLE.OFF: StaticVariableName
- public static byte[] INIT_AUTH_DATA = "PulsarAuthInit".getBytes(UTF_8);
+ public static byte[] INIT_AUTH_DATA_BYTES = "PulsarAuthInit".getBytes(UTF_8);
+ public static byte[] REFRESH_AUTH_DATA_BYTES = "PulsarAuthRefresh".getBytes(UTF_8);
+ public static AuthData INIT_AUTH_DATA = AuthData.of(INIT_AUTH_DATA_BYTES);
+ public static AuthData REFRESH_AUTH_DATA = AuthData.of(REFRESH_AUTH_DATA_BYTES);
// CHECKSTYLE.ON: StaticVariableName
private final byte[] bytes;
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
index ee850e1..54d1802 100644
--- a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
@@ -261,7 +261,7 @@ public class AuthenticationSasl implements Authentication, EncodedAuthentication
}
// first time init
headers.put(SASL_HEADER_STATE, SASL_STATE_CLIENT_INIT);
- AuthData initData = authData.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+ AuthData initData = authData.authenticate(AuthData.INIT_AUTH_DATA);
headers.put(SASL_AUTH_TOKEN,
Base64.getEncoder().encodeToString(initData.getBytes()));
} else {
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
index 261e06c..daae735 100644
--- a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
@@ -51,7 +51,7 @@ public class SaslAuthenticationDataProvider implements AuthenticationDataProvide
@Override
public AuthData authenticate(AuthData commandData) throws AuthenticationException {
// init
- if (Arrays.equals(commandData.getBytes(), AuthData.INIT_AUTH_DATA)) {
+ if (Arrays.equals(commandData.getBytes(), AuthData.INIT_AUTH_DATA_BYTES)) {
if (pulsarSaslClient.hasInitialResponse()) {
return pulsarSaslClient.evaluateChallenge(AuthData.of(new byte[0]));
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index bbcba68..c551588 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -37,6 +37,7 @@ import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
@@ -138,6 +139,7 @@ public class ClientCnx extends PulsarHandler {
private ScheduledFuture<?> timeoutTask;
// Added for mutual authentication.
+ @Getter
protected AuthenticationDataProvider authenticationDataProvider;
private TransactionBufferHandler transactionBufferHandler;
@@ -226,7 +228,7 @@ public class ClientCnx extends PulsarHandler {
// each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data,
// and return authData to server.
authenticationDataProvider = authentication.getAuthData(remoteHostName);
- AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+ AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
PulsarVersion.getVersion(), proxyToTargetBrokerAddress, null, null, null);
}
@@ -318,6 +320,16 @@ public class ClientCnx extends PulsarHandler {
checkArgument(authChallenge.hasChallenge());
checkArgument(authChallenge.getChallenge().hasAuthData());
+ if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData().toByteArray())) {
+ try {
+ authenticationDataProvider = authentication.getAuthData(remoteHostName);
+ } catch (PulsarClientException e) {
+ log.error("{} Error when refreshing authentication data provider: {}", ctx.channel(), e);
+ connectionFuture.completeExceptionally(e);
+ return;
+ }
+ }
+
// mutual authn. If auth not complete, continue auth; if auth complete, complete connectionFuture.
try {
AuthData authData = authenticationDataProvider
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 44d7ad5..e7233a6 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -46,6 +46,7 @@ import io.netty.util.concurrent.FutureListener;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
@@ -57,6 +58,7 @@ import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.tls.TlsHostnameVerifier;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.AuthData;
@@ -222,7 +224,7 @@ public class DirectProxyHandler {
this.ctx = ctx;
// Send the Connect command to broker
authenticationDataProvider = authentication.getAuthData(remoteHostName);
- AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+ AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
ByteBuf command = null;
command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy",
null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod);
@@ -262,6 +264,15 @@ public class DirectProxyHandler {
checkArgument(authChallenge.hasChallenge());
checkArgument(authChallenge.getChallenge().hasAuthData() && authChallenge.getChallenge().hasAuthData());
+ if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData())) {
+ try {
+ authenticationDataProvider = authentication.getAuthData(remoteHostName);
+ } catch (PulsarClientException e) {
+ log.error("{} Error when refreshing authentication data provider: {}", ctx.channel(), e);
+ return;
+ }
+ }
+
// mutual authn. If auth not complete, continue auth; if auth complete, complete connectionFuture.
try {
AuthData authData = authenticationDataProvider
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 0878370..665b9f8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -54,7 +54,7 @@ public class ProxyClientCnx extends ClientCnx {
}
authenticationDataProvider = authentication.getAuthData(remoteHostName);
- AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+ AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
clientAuthMethod);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
index 7a923ee..eb96555 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
@@ -35,6 +35,8 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -288,6 +290,7 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
admin.namespaces().grantPermissionOnNamespace(namespace, REGULAR_USER_ROLE, EnumSet.allOf(AuthAction.class));
String initialToken = this.createClientTokenWithExpiry(5, TimeUnit.SECONDS);
+ String refreshedToken = this.createClientTokenWithExpiry(30, TimeUnit.SECONDS);
@Cleanup
PulsarClient client = PulsarClient.builder()
@@ -295,7 +298,7 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
.authentication(AuthenticationFactory.token(() -> {
if (shouldRefreshToken) {
try {
- return createClientTokenWithExpiry(5, TimeUnit.SECONDS);
+ return refreshedToken;
} catch (Exception e) {
return null;
}
@@ -308,17 +311,19 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topic)
- .sendTimeout(1, TimeUnit.SECONDS)
+ .sendTimeout(3, TimeUnit.SECONDS)
.create();
-
// Initially the token is valid and producer will be able to publish
producer.send("hello-1");
+ long lastDisconnectedTimestamp = producer.getLastDisconnectedTimestamp();
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
if (shouldRefreshToken) {
// The token will have been refreshed, so the app won't see any error
producer.send("hello-2");
+ long timestamp = producer.getLastDisconnectedTimestamp();
+ assertEquals(timestamp, lastDisconnectedTimestamp);
} else {
// The token has expired, so this next message will be rejected
try {