You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/20 17:53:49 UTC

[GitHub] merlimat closed pull request #1256: Fix bug that consumer which specify incorrect subscription hangs up w?

merlimat closed pull request #1256: Fix bug that consumer which specify incorrect subscription hangs up w?
URL: https://github.com/apache/incubator-pulsar/pull/1256
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index cb473c875..b951e3901 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -669,6 +669,11 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                         ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
                     }
                     return null;
+                }).exceptionally(e -> {
+                    String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole);
+                    log.warn(msg);
+                    ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage()));
+                    return null;
                 });
             } else {
                 final String msg = "Proxy Client is not authorized to subscribe";
@@ -855,6 +860,11 @@ protected void handleProducer(final CommandProducer cmdProducer) {
                         ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
                     }
                     return null;
+                }).exceptionally(e -> {
+                    String msg = String.format("[%s] %s with role %s", remoteAddress, e.getMessage(), authRole);
+                    log.warn(msg);
+                    ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, e.getMessage()));
+                    return null;
                 });
             } else {
                 final String msg = "Proxy Client is not authorized to Produce";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 4f4fcd6a1..465cc6d52 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
@@ -29,6 +30,7 @@
 
 import javax.naming.AuthenticationException;
 
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -147,6 +149,46 @@ public void testProducerAndConsumerAuthorization() throws Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testSubscriptionPrefixAuthorization() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        conf.setAuthorizationProvider(TestAuthorizationProviderWithSubscriptionPrefix.class.getName());
+        setup();
+
+        ClientConfiguration adminConf = new ClientConfiguration();
+        Authentication adminAuthentication = new ClientAuthentication("superUser");
+        adminConf.setAuthentication(adminAuthentication);
+        admin = spy(new PulsarAdmin(brokerUrl, adminConf));
+
+        String lookupUrl;
+        lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
+
+        ClientConfiguration clientConfValid = new ClientConfiguration();
+        Authentication authentication = new ClientAuthentication(clientRole);
+        clientConfValid.setAuthentication(authentication);
+
+        pulsarClient = PulsarClient.create(lookupUrl, clientConfValid);
+
+        admin.properties().createProperty("prop-prefix",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("prop-prefix/use/ns");
+
+        // (1) Valid subscription name will be approved by authorization service
+        Consumer consumer = pulsarClient.subscribe("persistent://prop-prefix/use/ns/t1", clientRole + "-sub1");
+        consumer.close();
+
+        // (2) InValid subscription name will be rejected by authorization service
+        try {
+            consumer = pulsarClient.subscribe("persistent://prop-prefix/use/ns/t1", "sub1");
+            Assert.fail("should have failed with authorization error");
+        } catch (PulsarClientException.AuthorizationException pa) {
+            // Ok
+        }
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
     @Test
     public void testGrantPermission() throws Exception {
         log.info("-- Starting {} test --", methodName);
@@ -337,6 +379,24 @@ public void initialize(ServiceConfiguration conf, ConfigurationCacheService conf
         }
     }
 
+    public static class TestAuthorizationProviderWithSubscriptionPrefix extends TestAuthorizationProvider {
+
+        @Override
+        public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
+                AuthenticationDataSource authenticationData, String subscription) {
+            CompletableFuture<Boolean> future = new CompletableFuture<>();
+            if (isNotBlank(subscription)) {
+                if (!subscription.startsWith(role)) {
+                    future.completeExceptionally(new PulsarServerException(
+                            "The subscription name needs to be prefixed by the authentication role"));
+                }
+            }
+            future.complete(clientRole.equals(role));
+            return future;
+        }
+
+    }
+
     public static class TestAuthorizationProviderWithGrantPermission extends TestAuthorizationProvider {
 
         private Set<String> grantRoles = Sets.newHashSet();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services