You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/01/16 02:38:21 UTC

[GitHub] hrsakai closed pull request #820: Added setting for anonymous user role

hrsakai closed pull request #820: Added setting for anonymous user role
URL: https://github.com/apache/incubator-pulsar/pull/820
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/broker.conf b/conf/broker.conf
index af05b567c..5b8e90380 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -210,6 +210,9 @@ brokerClientAuthenticationParameters=
 # Supported Athenz provider domain names(comma separated) for authentication
 athenzDomainNames=
 
+# When this parameter is not empty, unauthenticated users perform as anonymousUserRole
+anonymousUserRole=
+
 ### --- BookKeeper Client --- ###
 
 # Authentication plugin to use when connecting to bookies
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 797b07db4..47a3b60e4 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -176,6 +176,9 @@ brokerClientAuthenticationParameters=
 # Supported Athenz provider domain names(comma separated) for authentication
 athenzDomainNames=
 
+# When this parameter is not empty, unauthenticated users perform as anonymousUserRole
+anonymousUserRole=
+
 ### --- BookKeeper Client --- ###
 
 # Authentication plugin to use when connecting to bookies
diff --git a/conf/websocket.conf b/conf/websocket.conf
index de043d5c7..cf5135d9e 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -72,6 +72,9 @@ superUserRoles=
 brokerClientAuthenticationPlugin=
 brokerClientAuthenticationParameters=
 
+# When this parameter is not empty, unauthenticated users perform as anonymousUserRole
+anonymousUserRole=
+
 ### --- TLS --- ###
 
 # Enable TLS
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a638fd4fb..2bf177396 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -201,6 +201,9 @@
     private String brokerClientAuthenticationPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationDisabled";
     private String brokerClientAuthenticationParameters = "";
 
+    // When this parameter is not empty, unauthenticated users perform as anonymousUserRole
+    private String anonymousUserRole = null;
+
     /**** --- BookKeeper Client --- ****/
     // Authentication plugin to use when connecting to bookies
     private String bookkeeperClientAuthenticationPlugin;
@@ -796,6 +799,14 @@ public void setBrokerClientAuthenticationParameters(String brokerClientAuthentic
         this.brokerClientAuthenticationParameters = brokerClientAuthenticationParameters;
     }
 
+    public String getAnonymousUserRole() {
+        return anonymousUserRole;
+    }
+
+    public void setAnonymousUserRole(String anonymousUserRole) {
+        this.anonymousUserRole = anonymousUserRole;
+    }
+
     public String getBookkeeperClientAuthenticationPlugin() {
         return bookkeeperClientAuthenticationPlugin;
     }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
index d94e97c59..930f3d2e5 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
@@ -25,6 +25,7 @@
 import javax.naming.AuthenticationException;
 import javax.servlet.http.HttpServletRequest;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.slf4j.Logger;
@@ -38,10 +39,12 @@
  */
 public class AuthenticationService implements Closeable {
     private static final Logger LOG = LoggerFactory.getLogger(AuthenticationService.class);
+    private final String anonymousUserRole;
 
     private final Map<String, AuthenticationProvider> providers = Maps.newHashMap();
 
     public AuthenticationService(ServiceConfiguration conf) throws PulsarServerException {
+        anonymousUserRole = conf.getAnonymousUserRole();
         if (conf.isAuthenticationEnabled()) {
             try {
                 AuthenticationProvider provider;
@@ -71,6 +74,9 @@ public String authenticate(AuthenticationDataSource authData, String authMethodN
         if (provider != null) {
             return provider.authenticate(authData);
         } else {
+            if (StringUtils.isNotBlank(anonymousUserRole)) {
+                return anonymousUserRole;
+            }
             throw new AuthenticationException("Unsupported authentication mode: " + authMethodName);
         }
     }
@@ -88,6 +94,9 @@ public String authenticateHttpRequest(HttpServletRequest request) throws Authent
 
         // No authentication provided
         if (!providers.isEmpty()) {
+            if (StringUtils.isNotBlank(anonymousUserRole)) {
+                return anonymousUserRole;
+            }
             // If at least a provider was configured, then the authentication needs to be provider
             throw new AuthenticationException("Authentication required");
         } else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 3049f94dd..d4b3ba5a8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -22,10 +22,7 @@
 import static org.mockito.Mockito.spy;
 
 import java.net.URI;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.InternalServerErrorException;
@@ -42,6 +39,7 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.mockito.Mockito;
@@ -68,6 +66,10 @@
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
+        if (methodName.equals("testAnonymousSyncProducerAndConsumer")) {
+            conf.setAnonymousUserRole("anonymousUser");
+        }
+
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
 
@@ -119,26 +121,10 @@ protected void cleanup() throws Exception {
         return new Object[][] { { 0 }, { 1000 } };
     }
 
-    @Test(dataProvider = "batch")
-    public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
-        log.info("-- Starting {} test --", methodName);
-
-        Map<String, String> authParams = new HashMap<>();
-        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
-        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
-        Authentication authTls = new AuthenticationTls();
-        authTls.configure(authParams);
-        internalSetup(authTls);
-
-        admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
-                "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
-        admin.properties().createProperty("my-property",
-                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
-        admin.namespaces().createNamespace("my-property/use/my-ns");
-
+    public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
         ConsumerConfiguration conf = new ConsumerConfiguration();
         conf.setSubscriptionType(SubscriptionType.Exclusive);
-        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
+        Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic", "my-subscriber-name",
                 conf);
 
         ProducerConfiguration producerConf = new ProducerConfiguration();
@@ -149,7 +135,7 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep
             producerConf.setBatchingMaxMessages(5);
         }
 
-        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
+        Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic", producerConf);
         for (int i = 0; i < 10; i++) {
             String message = "my-message-" + i;
             producer.send(message.getBytes());
@@ -167,6 +153,70 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep
         // Acknowledge the consumption of all messages at once
         consumer.acknowledgeCumulative(msg);
         consumer.close();
+    }
+
+    @Test(dataProvider = "batch")
+    public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+        internalSetup(authTls);
+
+        admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+                "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+
+        testSyncProducerAndConsumer(batchMessageDelayMs);
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test(dataProvider = "batch")
+    public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+        internalSetup(authTls);
+
+        admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+                "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
+        admin.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("anonymousUser"), Sets.newHashSet("use")));
+
+        // make a PulsarAdmin instance as "anonymousUser" for http request
+        admin.close();
+        ClientConfiguration clientConf = new ClientConfiguration();
+        clientConf.setOperationTimeout(1, TimeUnit.SECONDS);
+        admin = spy(new PulsarAdmin(brokerUrl, clientConf));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+        admin.persistentTopics().grantPermission("persistent://my-property/use/my-ns/my-topic", "anonymousUser", EnumSet
+                .allOf(AuthAction.class));
+
+        // setup the client
+        pulsarClient.close();
+        pulsarClient = PulsarClient.create("pulsar://localhost:" + BROKER_PORT, clientConf);
+
+        // unauthorized topic test
+        Exception pulsarClientException = null;
+        try {
+            pulsarClient.subscribe("persistent://my-property/use/my-ns/other-topic", "my-subscriber-name");
+        } catch (Exception e) {
+            pulsarClientException = e;
+        }
+        Assert.assertTrue(pulsarClientException instanceof PulsarClientException);
+
+        testSyncProducerAndConsumer(batchMessageDelayMs);
+
         log.info("-- Exiting {} test --", methodName);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/MockUnauthenticationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/MockUnauthenticationProvider.java
new file mode 100644
index 000000000..6be13832a
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/MockUnauthenticationProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+
+import javax.naming.AuthenticationException;
+
+public class MockUnauthenticationProvider extends MockAuthenticationProvider {
+
+    @Override
+    public String getAuthMethodName() {
+        // method name
+        return "mockunauth";
+    }
+
+    @Override
+    public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+        throw new AuthenticationException();
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
index b264a44fd..56d48e51c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
@@ -22,6 +22,7 @@
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
+import java.lang.reflect.Method;
 import java.net.URI;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -56,10 +57,12 @@
 import com.google.common.collect.Sets;
 
 public class ProxyAuthenticationTest extends ProducerConsumerBase {
-    
+
     private int port;
     private ProxyServer proxyServer;
     private WebSocketService service;
+    private WebSocketClient consumeClient;
+    private WebSocketClient produceClient;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -71,10 +74,21 @@ public void setup() throws Exception {
         config.setWebServicePort(port);
         config.setClusterName("use");
         config.setAuthenticationEnabled(true);
-        config.setAuthenticationProviders(
-                Sets.newHashSet("org.apache.pulsar.websocket.proxy.MockAuthenticationProvider"));
         config.setGlobalZookeeperServers("dummy-zk-servers");
         config.setSuperUserRoles(Sets.newHashSet("pulsar.super_user"));
+
+        // If this is not set, 500 error occurs.
+        config.setGlobalZookeeperServers("dummy");
+
+        if (methodName.equals("authenticatedSocketTest") || methodName.equals("statsTest")) {
+            config.setAuthenticationProviders(Sets.newHashSet("org.apache.pulsar.websocket.proxy.MockAuthenticationProvider"));
+        } else {
+            config.setAuthenticationProviders(Sets.newHashSet("org.apache.pulsar.websocket.proxy.MockUnauthenticationProvider"));
+        }
+        if (methodName.equals("anonymousSocketTest")) {
+            config.setAnonymousUserRole("anonymousUser");
+        }
+
         service = spy(new WebSocketService(config));
         doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
         proxyServer = new ProxyServer(config);
@@ -84,6 +98,22 @@ public void setup() throws Exception {
 
     @AfterMethod
     protected void cleanup() throws Exception {
+        ExecutorService executor = newFixedThreadPool(1);
+        try {
+            executor.submit(() -> {
+                try {
+                    consumeClient.stop();
+                    produceClient.stop();
+                    log.info("proxy clients are stopped successfully");
+                } catch (Exception e) {
+                    log.error(e.getMessage());
+                }
+            }).get(2, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            log.error("failed to close clients ", e);
+        }
+        executor.shutdownNow();
+
         super.internalCleanup();
         service.close();
         proxyServer.stop();
@@ -91,7 +121,6 @@ protected void cleanup() throws Exception {
 
     }
 
-    @Test(timeOut=10000)
     public void socketTest() throws Exception {
         final String topic = "my-property/use/my-ns/my-topic1";
         final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/" + topic + "/my-sub";
@@ -99,46 +128,47 @@ public void socketTest() throws Exception {
         URI consumeUri = URI.create(consumerUri);
         URI produceUri = URI.create(producerUri);
 
-        WebSocketClient consumeClient = new WebSocketClient();
+        consumeClient = new WebSocketClient();
         SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
-        WebSocketClient produceClient = new WebSocketClient();
+        produceClient = new WebSocketClient();
         SimpleProducerSocket produceSocket = new SimpleProducerSocket();
 
-        try {
-            consumeClient.start();
-            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
-            Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
-            log.info("Connecting to : {}", consumeUri);
+        consumeClient.start();
+        ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+        Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
+        log.info("Connecting to : {}", consumeUri);
+
+        ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+        produceClient.start();
+        Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
+        Assert.assertTrue(consumerFuture.get().isOpen());
+        Assert.assertTrue(producerFuture.get().isOpen());
+
+        consumeSocket.awaitClose(1, TimeUnit.SECONDS);
+        produceSocket.awaitClose(1, TimeUnit.SECONDS);
+        Assert.assertTrue(produceSocket.getBuffer().size() > 0);
+        Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
+    }
 
-            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
-            produceClient.start();
-            Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
-            // let it connect
-            Thread.sleep(1000);
-            Assert.assertTrue(consumerFuture.get().isOpen());
-            Assert.assertTrue(producerFuture.get().isOpen());
+    @Test(timeOut=10000)
+    public void authenticatedSocketTest() throws Exception {
+        socketTest();
+    }
 
-            consumeSocket.awaitClose(1, TimeUnit.SECONDS);
-            produceSocket.awaitClose(1, TimeUnit.SECONDS);
-            Assert.assertTrue(produceSocket.getBuffer().size() > 0);
-            Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
-        } finally {
-            ExecutorService executor = newFixedThreadPool(1);
-            try {
-                executor.submit(() -> {
-                    try {
-                        consumeClient.stop();
-                        produceClient.stop();
-                        log.info("proxy clients are stopped successfully");
-                    } catch (Exception e) {
-                        log.error(e.getMessage());
-                    }
-                }).get(2, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                log.error("failed to close clients ", e);
-            }
-            executor.shutdownNow();
+    @Test(timeOut=10000)
+    public void anonymousSocketTest() throws Exception {
+        socketTest();
+    }
+
+    @Test(timeOut=10000)
+    public void unauthenticatedSocketTest() throws Exception{
+        Exception exception = null;
+        try {
+            socketTest();
+        } catch (Exception e) {
+            exception = e;
         }
+        Assert.assertTrue(exception instanceof java.util.concurrent.ExecutionException);
     }
 
     @Test(timeOut=10000)
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index 9b18ddbe0..6f9536c99 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -81,6 +81,9 @@
     // Number of connections per Broker in Pulsar Client used in WebSocket proxy
     private int connectionsPerBroker = Runtime.getRuntime().availableProcessors();
 
+    // When this parameter is not empty, unauthenticated users perform as anonymousUserRole
+    private String anonymousUserRole = null;
+
     /***** --- TLS --- ****/
     // Enable TLS
     private boolean tlsEnabled = false;
@@ -239,6 +242,14 @@ public void setBrokerClientAuthenticationParameters(String brokerClientAuthentic
 
     public void setConnectionsPerBroker(int connectionsPerBroker) { this.connectionsPerBroker = connectionsPerBroker; }
 
+    public String getAnonymousUserRole() {
+        return anonymousUserRole;
+    }
+
+    public void setAnonymousUserRole(String anonymousUserRole) {
+        this.anonymousUserRole = anonymousUserRole;
+    }
+
     public boolean isTlsEnabled() {
         return tlsEnabled;
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services