You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/02 20:03:54 UTC

[GitHub] merlimat closed pull request #1087: Add basic authentication plugin

merlimat closed pull request #1087: Add basic authentication plugin
URL: https://github.com/apache/incubator-pulsar/pull/1087
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 11797a49c..5c71efe2c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -622,6 +622,7 @@ flexible messaging model and an intuitive client API.</description>
             <exclude>logs/**</exclude>
             <exclude>**/*.versionsBackup</exclude>
             <exclude>**/circe/**</exclude>
+            <exclude>pulsar-broker/src/test/resources/authentication/basic/.htpasswd</exclude>
             <exclude>pulsar-client-cpp/lib/checksum/int_types.h</exclude>
             <exclude>pulsar-client-cpp/lib/checksum/gf2.hpp</exclude>
             <exclude>pulsar-client-cpp/lib/checksum/crc32c_sse42.cc</exclude>
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java
new file mode 100644
index 000000000..f2f2f6588
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.authentication;
+
+import org.apache.commons.codec.digest.Crypt;
+import org.apache.commons.codec.digest.Md5Crypt;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+
+import javax.naming.AuthenticationException;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+
+public class AuthenticationProviderBasic implements AuthenticationProvider {
+    private final static String HTTP_HEADER_NAME = "Authorization";
+    private final static String CONF_SYSTEM_PROPERTY_KEY = "pulsar.auth.basic.conf";
+    private Map<String, String> users;
+
+    @Override
+    public void close() throws IOException {
+        // noop
+    }
+
+    @Override
+    public void initialize(ServiceConfiguration config) throws IOException {
+        File confFile = new File(System.getProperty(CONF_SYSTEM_PROPERTY_KEY));
+        if (!confFile.exists()) {
+            throw new IOException("The password auth conf file does not exist");
+        } else if (!confFile.isFile()) {
+            throw new IOException("The path is not a file");
+        }
+        BufferedReader reader = new BufferedReader(new FileReader(confFile));
+        users = new HashMap<>();
+        for (String line : reader.lines().toArray(s -> new String[s])) {
+            List<String> splitLine = Arrays.asList(line.split(":"));
+            if (splitLine.size() != 2) {
+                throw new IOException("The format of the password auth conf file is invalid");
+            }
+            users.put(splitLine.get(0), splitLine.get(1));
+        }
+        reader.close();
+    }
+
+    @Override
+    public String getAuthMethodName() {
+        return "basic";
+    }
+
+    @Override
+    public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
+        AuthParams authParams = new AuthParams(authData);
+        String userId = authParams.getUserId();
+        String password = authParams.getPassword();
+        String msg = "Unknown user or invalid password";
+
+        if (users.get(userId) == null) {
+            throw new AuthenticationException(msg);
+        }
+
+        String encryptedPassword = users.get(userId);
+
+        // For md5 algorithm
+        if ((users.get(userId).startsWith("$apr1"))) {
+            List<String> splitEncryptedPassword = Arrays.asList(encryptedPassword.split("\\$"));
+            if (splitEncryptedPassword.size() != 4 || !encryptedPassword
+                    .equals(Md5Crypt.apr1Crypt(password.getBytes(), splitEncryptedPassword.get(2)))) {
+                throw new AuthenticationException(msg);
+            }
+        // For crypt algorithm
+        } else if (!encryptedPassword.equals(Crypt.crypt(password.getBytes(), encryptedPassword.substring(0, 2)))) {
+            throw new AuthenticationException(msg);
+        }
+
+        return userId;
+    }
+
+    private class AuthParams {
+        private String userId;
+        private String password;
+
+        public AuthParams(AuthenticationDataSource authData) throws AuthenticationException {
+            String authParams;
+            if (authData.hasDataFromCommand()) {
+                authParams = authData.getCommandData();
+            } else if (authData.hasDataFromHttp()) {
+                String rawAuthToken = authData.getHttpHeader(HTTP_HEADER_NAME);
+                // parsing and validation
+                if (StringUtils.isBlank(rawAuthToken) || !rawAuthToken.toUpperCase().startsWith("BASIC ")) {
+                    throw new AuthenticationException("Authentication token has to be started with \"Basic \"");
+                }
+                String[] splitRawAuthToken = rawAuthToken.split(" ");
+                if (splitRawAuthToken.length != 2) {
+                    throw new AuthenticationException("Base64 encoded token is not found");
+                }
+
+                try {
+                    authParams = new String(Base64.getDecoder().decode(splitRawAuthToken[1]));
+                } catch (Exception e) {
+                    throw new AuthenticationException("Base64 decoding is failure: " + e.getMessage());
+                }
+            } else {
+                throw new AuthenticationException("Authentication data source does not have data");
+            }
+
+            String[] parsedAuthParams = authParams.split(":");
+            if (parsedAuthParams.length != 2) {
+                throw new AuthenticationException("Base64 decoded params are invalid");
+            }
+
+            userId = parsedAuthParams[0];
+            password = parsedAuthParams[1];
+        }
+
+        public String getUserId() {
+            return userId;
+        }
+
+        public String getPassword() {
+            return password;
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 83b15f5a6..729294033 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -18,32 +18,17 @@
  */
 package org.apache.pulsar.client.api;
 
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-
-import java.net.URI;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
-import javax.ws.rs.InternalServerErrorException;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
-import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
-import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -52,8 +37,13 @@
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import javax.ws.rs.InternalServerErrorException;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
 
 public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(AuthenticatedProducerConsumerTest.class);
@@ -64,6 +54,8 @@
     private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
     private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
 
+    private final String BASIC_CONF_FILE_PATH = "./src/test/resources/authentication/basic/.htpasswd";
+
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
@@ -83,6 +75,7 @@ protected void setup() throws Exception {
         Set<String> superUserRoles = new HashSet<>();
         superUserRoles.add("localhost");
         superUserRoles.add("superUser");
+        superUserRoles.add("superUser2");
         conf.setSuperUserRoles(superUserRoles);
 
         conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
@@ -91,6 +84,8 @@ protected void setup() throws Exception {
 
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
+        providers.add(AuthenticationProviderBasic.class.getName());
+        System.setProperty("pulsar.auth.basic.conf", BASIC_CONF_FILE_PATH);
         conf.setAuthenticationProviders(providers);
 
         conf.setClusterName("use");
@@ -107,7 +102,13 @@ protected final void internalSetup(Authentication auth) throws Exception {
         clientConf.setUseTls(true);
 
         admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
-        String lookupUrl = new URI("pulsar+ssl://localhost:" + BROKER_PORT_TLS).toString();
+        String lookupUrl;
+        // For http basic authentication test
+        if (methodName.equals("testBasicCryptSyncProducerAndConsumer")) {
+            lookupUrl = new URI("https://localhost:" + BROKER_WEBSERVICE_PORT_TLS).toString();
+        } else {
+            lookupUrl = new URI("pulsar+ssl://localhost:" + BROKER_PORT_TLS).toString();
+        }
         pulsarClient = PulsarClient.create(lookupUrl, clientConf);
     }
 
@@ -167,8 +168,6 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep
         authTls.configure(authParams);
         internalSetup(authTls);
 
-        admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
-                "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
                 new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
         admin.namespaces().createNamespace("my-property/use/my-ns");
@@ -178,6 +177,39 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "batch")
+    public void testBasicCryptSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        AuthenticationBasic authPassword = new AuthenticationBasic();
+        authPassword.configure("{\"userId\":\"superUser\",\"password\":\"supepass\"}");
+        internalSetup(authPassword);
+
+        admin.properties()
+                .createProperty("my-property", new PropertyAdmin(Lists.newArrayList(), Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+
+        testSyncProducerAndConsumer(batchMessageDelayMs);
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test(dataProvider = "batch")
+    public void testBasicArp1SyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        AuthenticationBasic authPassword = new AuthenticationBasic();
+        authPassword.configure("{\"userId\":\"superUser2\",\"password\":\"superpassword\"}");
+        internalSetup(authPassword);
+
+        admin.properties()
+                .createProperty("my-property", new PropertyAdmin(Lists.newArrayList(), Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+
+        testSyncProducerAndConsumer(batchMessageDelayMs);
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+
     @Test(dataProvider = "batch")
     public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception {
         log.info("-- Starting {} test --", methodName);
@@ -223,7 +255,7 @@ public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws
 
     /**
      * Verifies: on 500 server error, broker invalidates session and client receives 500 correctly.
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -254,7 +286,7 @@ public void testAuthenticationFilterNegative() throws Exception {
     /**
      * verifies that topicLookup/PartitionMetadataLookup gives InternalServerError(500) instead 401(auth_failed) on
      * unknown-exception failure
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -292,4 +324,4 @@ public void testInternalServerExceptionOnLookup() throws Exception {
 
     }
 
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/test/resources/authentication/basic/.htpasswd b/pulsar-broker/src/test/resources/authentication/basic/.htpasswd
new file mode 100644
index 000000000..b1a099a5f
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/basic/.htpasswd
@@ -0,0 +1,2 @@
+superUser:mQQQIsyvvKRtU
+superUser2:$apr1$foobarmq$kuSZlLgOITksCkRgl57ie/
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 1f0f4417c..3c26a9e87 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -69,6 +69,12 @@
       <groupId>org.bouncycastle</groupId>
       <artifactId>bcpkix-jdk15on</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java
new file mode 100644
index 000000000..37454d2f0
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.auth;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class AuthenticationBasic implements Authentication, EncodedAuthenticationParameterSupport {
+    private String userId;
+    private String password;
+
+    @Override
+    public void close() throws IOException {
+        // noop
+    }
+
+    @Override
+    public String getAuthMethodName() {
+        return "basic";
+    }
+
+    @Override
+    public AuthenticationDataProvider getAuthData() throws PulsarClientException {
+        try {
+            return new AuthenticationDataBasic(userId, password);
+        } catch (Exception e) {
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public void configure(Map<String, String> authParams) {
+        configure(new Gson().toJson(authParams));
+    }
+
+    @Override
+    public void configure(String encodedAuthParamString) {
+        JsonObject params = new Gson().fromJson(encodedAuthParamString, JsonObject.class);
+        userId = params.get("userId").getAsString();
+        password = params.get("password").getAsString();
+    }
+
+    @Override
+    public void start() throws PulsarClientException {
+        // noop
+    }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataBasic.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataBasic.java
new file mode 100644
index 000000000..39b7d5f39
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataBasic.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.auth;
+
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import java.util.Base64;
+
+public class AuthenticationDataBasic implements AuthenticationDataProvider {
+    private final static String HTTP_HEADER_NAME = "Authorization";
+    private String httpAuthToken;
+    private String commandAuthToken;
+
+    public AuthenticationDataBasic(String userId, String password) {
+        httpAuthToken = "Basic " + Base64.getEncoder().encodeToString((userId + ":" + password).getBytes());
+        commandAuthToken = userId+":"+password;
+    }
+
+    @Override
+    public boolean hasDataForHttp() {
+        return true;
+    }
+
+    @Override
+    public Set<Map.Entry<String, String>> getHttpHeaders() {
+        Map<String, String> headers = new HashMap<>();
+        headers.put(HTTP_HEADER_NAME, httpAuthToken);
+        return headers.entrySet();
+    }
+
+    @Override
+    public boolean hasDataFromCommand() {
+        return true;
+    }
+
+    @Override
+    public String getCommandData() {
+        return commandAuthToken;
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services