You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2018/02/11 00:54:16 UTC

[incubator-pulsar] branch master updated: Support custom URL scheme handlers (#1212)

This is an automated email from the ASF dual-hosted git repository.

maskit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ccbfbf8  Support custom URL scheme handlers (#1212)
ccbfbf8 is described below

commit ccbfbf881a8dd3b17ec3b091d4c30eb46389a6c5
Author: Masakazu Kitajo <ma...@apache.org>
AuthorDate: Sun Feb 11 09:54:14 2018 +0900

    Support custom URL scheme handlers (#1212)
---
 .../client/impl/auth/AuthenticationAthenz.java     |  47 ++++----
 .../client/api/url/DataURLStreamHandler.java       | 121 +++++++++++++++++++++
 .../api/url/PulsarURLStreamHandlerFactory.java     |  51 +++++++++
 .../java/org/apache/pulsar/client/api/url/URL.java |  54 +++++++++
 4 files changed, 246 insertions(+), 27 deletions(-)

diff --git a/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java b/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
index 277f4a1..4aa7c3e 100644
--- a/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
+++ b/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
@@ -21,13 +21,12 @@ package org.apache.pulsar.client.impl.auth;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
-import java.io.File;
 import java.io.IOException;
-import java.net.URI;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.net.URISyntaxException;
+import java.net.URLConnection;
 import java.security.PrivateKey;
-import java.util.Base64;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -37,11 +36,13 @@ import org.apache.pulsar.client.api.AuthenticationUtil;
 import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.GettingAuthenticationDataException;
+import org.apache.pulsar.client.api.url.URL;
 
-import com.google.common.base.Splitter;
+import com.google.common.io.CharStreams;
 import com.yahoo.athenz.auth.ServiceIdentityProvider;
 import com.yahoo.athenz.auth.impl.SimpleServiceIdentityProvider;
 import com.yahoo.athenz.auth.util.Crypto;
+import com.yahoo.athenz.auth.util.CryptoException;
 import com.yahoo.athenz.zts.RoleToken;
 import com.yahoo.athenz.zts.ZTSClient;
 
@@ -50,7 +51,6 @@ public class AuthenticationAthenz implements Authentication, EncodedAuthenticati
     private static final long serialVersionUID = 1L;
 
     private static final String APPLICATION_X_PEM_FILE = "application/x-pem-file";
-    private static final String APPLICATION_X_PEM_FILE_BASE64 = "application/x-pem-file;base64";
 
     private transient ZTSClient ztsClient = null;
     private String ztsUrl;
@@ -80,7 +80,8 @@ public class AuthenticationAthenz implements Authentication, EncodedAuthenticati
         }
         try {
             // the following would set up the API call that requests tokens from the server
-            // that can only be used if they are 10 minutes from expiration and last twenty four hours
+            // that can only be used if they are 10 minutes from expiration and last twenty
+            // four hours
             RoleToken token = getZtsClient().getRoleToken(providerDomain, null, minValidity, maxValidity, false);
             roleToken = token.getToken();
             cachedRoleTokenTimestamp = System.nanoTime();
@@ -94,7 +95,8 @@ public class AuthenticationAthenz implements Authentication, EncodedAuthenticati
         if (roleToken == null) {
             return false;
         }
-        // Ensure we refresh the Athenz role token every hour to avoid using an expired role token
+        // Ensure we refresh the Athenz role token every hour to avoid using an expired
+        // role token
         return (System.nanoTime() - cachedRoleTokenTimestamp) < TimeUnit.HOURS.toNanos(cacheDurationInHour);
     }
 
@@ -168,27 +170,18 @@ public class AuthenticationAthenz implements Authentication, EncodedAuthenticati
     private PrivateKey loadPrivateKey(String privateKeyURL) {
         PrivateKey privateKey = null;
         try {
-            URI uri = new URI(privateKeyURL);
-            if (isBlank(uri.getScheme())) {
-                // We treated as file path
-                privateKey = Crypto.loadPrivateKey(new File(privateKeyURL));
-            } else if (uri.getScheme().equals("file")) {
-                privateKey = Crypto.loadPrivateKey(new File(uri.getPath()));
-            } else if (uri.getScheme().equals("data")) {
-                List<String> dataParts = Splitter.on(",").splitToList(uri.getSchemeSpecificPart());
-                // Support Urlencode but not decode here because already decoded by URI class.
-                if (dataParts.get(0).equals(APPLICATION_X_PEM_FILE)) {
-                    privateKey = Crypto.loadPrivateKey(dataParts.get(1));
-                // Support base64
-                } else if (dataParts.get(0).equals(APPLICATION_X_PEM_FILE_BASE64)) {
-                    privateKey = Crypto.loadPrivateKey(new String(Base64.getDecoder().decode(dataParts.get(1))));
-                } else {
-                    throw new IllegalArgumentException(
-                            "Unsupported media type or encoding format: " + dataParts.get(0));
-                }
+            URLConnection urlConnection = new URL(privateKeyURL).openConnection();
+            String protocol = urlConnection.getURL().getProtocol();
+            if ("data".equals(protocol) && !APPLICATION_X_PEM_FILE.equals(urlConnection.getContentType())) {
+                throw new IllegalArgumentException(
+                        "Unsupported media type or encoding format: " + urlConnection.getContentType());
             }
+            String keyData = CharStreams.toString(new InputStreamReader((InputStream) urlConnection.getContent()));
+            privateKey = Crypto.loadPrivateKey(keyData);
         } catch (URISyntaxException e) {
-            throw new IllegalArgumentException("Invalid privateKey format");
+            throw new IllegalArgumentException("Invalid privateKey format", e);
+        } catch (CryptoException | InstantiationException | IllegalAccessException | IOException e) {
+            privateKey = null;
         }
         return privateKey;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
new file mode 100644
index 0000000..f4147cd
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.url;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLStreamHandler;
+import java.util.Base64;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class DataURLStreamHandler extends URLStreamHandler {
+
+    class DataURLConnection extends URLConnection {
+        private boolean parsed = false;
+        private String contentType;
+        private String data;
+        private URI uri;
+
+        protected DataURLConnection(URL url) {
+            super(url);
+            try {
+                this.uri = this.url.toURI();
+            } catch (URISyntaxException e) {
+                this.uri = null;
+            }
+        }
+
+        @Override
+        public void connect() throws IOException {
+            if (this.parsed) {
+                return;
+            }
+
+            if (this.uri == null) {
+                throw new IOException();
+            }
+            Pattern pattern = Pattern.compile(
+                    "(?<mimeType>.+?)(;(?<charset>charset=.+?))?(;(?<base64>base64?))?,(?<data>.+)", Pattern.DOTALL);
+            Matcher matcher = pattern.matcher(this.uri.getSchemeSpecificPart());
+            if (matcher.matches()) {
+                this.contentType = matcher.group("mimeType");
+                String charset = matcher.group("charset");
+                if (charset == null) {
+                    charset = "US-ASCII";
+                }
+                if (matcher.group("base64") == null) {
+                    // Support Urlencode but not decode here because already decoded by URI class.
+                    this.data = new String(matcher.group("data").getBytes(), charset);
+                } else {
+                    this.data = new String(Base64.getDecoder().decode(matcher.group("data")), charset);
+                }
+            } else {
+                throw new MalformedURLException();
+            }
+            parsed = true;
+        }
+
+        @Override
+        public long getContentLengthLong() {
+            long length;
+            try {
+                this.connect();
+                length = this.data.length();
+            } catch (IOException e) {
+                length = -1;
+            }
+            return length;
+        }
+
+        @Override
+        public String getContentType() {
+            String contentType;
+            try {
+                this.connect();
+                contentType = this.contentType;
+            } catch (IOException e) {
+                contentType = null;
+            }
+            return contentType;
+        }
+
+        @Override
+        public String getContentEncoding() {
+            return "identity";
+        }
+
+        public InputStream getInputStream() throws IOException {
+            this.connect();
+            return new ByteArrayInputStream(this.data.getBytes());
+        }
+    }
+
+    @Override
+    protected URLConnection openConnection(URL u) throws IOException {
+        return new DataURLConnection(u);
+    }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
new file mode 100644
index 0000000..af7b668
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.url;
+
+import java.net.URLStreamHandler;
+import java.net.URLStreamHandlerFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PulsarURLStreamHandlerFactory implements URLStreamHandlerFactory {
+    static Map<String, Class<? extends URLStreamHandler>> handlers;
+    static {
+        handlers = new HashMap<>();
+        handlers.put("data", DataURLStreamHandler.class);
+    }
+
+    @Override
+    public URLStreamHandler createURLStreamHandler(String protocol) {
+        URLStreamHandler urlStreamHandler;
+        try {
+            Class<? extends URLStreamHandler> handler = handlers.get(protocol);
+            if (handler != null) {
+                urlStreamHandler = handler.newInstance();
+            } else {
+                urlStreamHandler = null;
+            }
+        } catch (InstantiationException e) {
+            urlStreamHandler = null;
+        } catch (IllegalAccessException e) {
+            urlStreamHandler = null;
+        }
+        return urlStreamHandler;
+    }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/URL.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/URL.java
new file mode 100644
index 0000000..4d8c367
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/URL.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.url;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLConnection;
+import java.net.URLStreamHandlerFactory;
+
+public class URL {
+    private static URLStreamHandlerFactory urlStreamHandlerFactory = new PulsarURLStreamHandlerFactory();
+    private java.net.URL url;
+
+    public URL(String spec)
+            throws MalformedURLException, URISyntaxException, InstantiationException, IllegalAccessException {
+        String scheme = new URI(spec).getScheme();
+        if (scheme == null) {
+            this.url = new java.net.URL(null, "file:" + spec);
+        } else {
+            this.url = new java.net.URL(null, spec, urlStreamHandlerFactory.createURLStreamHandler(scheme));
+        }
+    }
+
+    public URLConnection openConnection() throws IOException {
+        return this.url.openConnection();
+    }
+
+    public Object getContent() throws IOException {
+        return this.url.getContent();
+    }
+
+    public Object getContent(Class[] classes) throws IOException {
+        return this.url.getContent(classes);
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
maskit@apache.org.