You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2018/02/11 00:54:16 UTC
[incubator-pulsar] branch master updated: Support custom URL scheme
handlers (#1212)
This is an automated email from the ASF dual-hosted git repository.
maskit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ccbfbf8 Support custom URL scheme handlers (#1212)
ccbfbf8 is described below
commit ccbfbf881a8dd3b17ec3b091d4c30eb46389a6c5
Author: Masakazu Kitajo <ma...@apache.org>
AuthorDate: Sun Feb 11 09:54:14 2018 +0900
Support custom URL scheme handlers (#1212)
---
.../client/impl/auth/AuthenticationAthenz.java | 47 ++++----
.../client/api/url/DataURLStreamHandler.java | 121 +++++++++++++++++++++
.../api/url/PulsarURLStreamHandlerFactory.java | 51 +++++++++
.../java/org/apache/pulsar/client/api/url/URL.java | 54 +++++++++
4 files changed, 246 insertions(+), 27 deletions(-)
diff --git a/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java b/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
index 277f4a1..4aa7c3e 100644
--- a/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
+++ b/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
@@ -21,13 +21,12 @@ package org.apache.pulsar.client.impl.auth;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import java.io.File;
import java.io.IOException;
-import java.net.URI;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.net.URISyntaxException;
+import java.net.URLConnection;
import java.security.PrivateKey;
-import java.util.Base64;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -37,11 +36,13 @@ import org.apache.pulsar.client.api.AuthenticationUtil;
import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.GettingAuthenticationDataException;
+import org.apache.pulsar.client.api.url.URL;
-import com.google.common.base.Splitter;
+import com.google.common.io.CharStreams;
import com.yahoo.athenz.auth.ServiceIdentityProvider;
import com.yahoo.athenz.auth.impl.SimpleServiceIdentityProvider;
import com.yahoo.athenz.auth.util.Crypto;
+import com.yahoo.athenz.auth.util.CryptoException;
import com.yahoo.athenz.zts.RoleToken;
import com.yahoo.athenz.zts.ZTSClient;
@@ -50,7 +51,6 @@ public class AuthenticationAthenz implements Authentication, EncodedAuthenticati
private static final long serialVersionUID = 1L;
private static final String APPLICATION_X_PEM_FILE = "application/x-pem-file";
- private static final String APPLICATION_X_PEM_FILE_BASE64 = "application/x-pem-file;base64";
private transient ZTSClient ztsClient = null;
private String ztsUrl;
@@ -80,7 +80,8 @@ public class AuthenticationAthenz implements Authentication, EncodedAuthenticati
}
try {
// the following would set up the API call that requests tokens from the server
- // that can only be used if they are 10 minutes from expiration and last twenty four hours
+ // that can only be used if they are 10 minutes from expiration and last twenty
+ // four hours
RoleToken token = getZtsClient().getRoleToken(providerDomain, null, minValidity, maxValidity, false);
roleToken = token.getToken();
cachedRoleTokenTimestamp = System.nanoTime();
@@ -94,7 +95,8 @@ public class AuthenticationAthenz implements Authentication, EncodedAuthenticati
if (roleToken == null) {
return false;
}
- // Ensure we refresh the Athenz role token every hour to avoid using an expired role token
+ // Ensure we refresh the Athenz role token every hour to avoid using an expired
+ // role token
return (System.nanoTime() - cachedRoleTokenTimestamp) < TimeUnit.HOURS.toNanos(cacheDurationInHour);
}
@@ -168,27 +170,18 @@ public class AuthenticationAthenz implements Authentication, EncodedAuthenticati
private PrivateKey loadPrivateKey(String privateKeyURL) {
PrivateKey privateKey = null;
try {
- URI uri = new URI(privateKeyURL);
- if (isBlank(uri.getScheme())) {
- // We treated as file path
- privateKey = Crypto.loadPrivateKey(new File(privateKeyURL));
- } else if (uri.getScheme().equals("file")) {
- privateKey = Crypto.loadPrivateKey(new File(uri.getPath()));
- } else if (uri.getScheme().equals("data")) {
- List<String> dataParts = Splitter.on(",").splitToList(uri.getSchemeSpecificPart());
- // Support Urlencode but not decode here because already decoded by URI class.
- if (dataParts.get(0).equals(APPLICATION_X_PEM_FILE)) {
- privateKey = Crypto.loadPrivateKey(dataParts.get(1));
- // Support base64
- } else if (dataParts.get(0).equals(APPLICATION_X_PEM_FILE_BASE64)) {
- privateKey = Crypto.loadPrivateKey(new String(Base64.getDecoder().decode(dataParts.get(1))));
- } else {
- throw new IllegalArgumentException(
- "Unsupported media type or encoding format: " + dataParts.get(0));
- }
+ URLConnection urlConnection = new URL(privateKeyURL).openConnection();
+ String protocol = urlConnection.getURL().getProtocol();
+ if ("data".equals(protocol) && !APPLICATION_X_PEM_FILE.equals(urlConnection.getContentType())) {
+ throw new IllegalArgumentException(
+ "Unsupported media type or encoding format: " + urlConnection.getContentType());
}
+ String keyData = CharStreams.toString(new InputStreamReader((InputStream) urlConnection.getContent()));
+ privateKey = Crypto.loadPrivateKey(keyData);
} catch (URISyntaxException e) {
- throw new IllegalArgumentException("Invalid privateKey format");
+ throw new IllegalArgumentException("Invalid privateKey format", e);
+ } catch (CryptoException | InstantiationException | IllegalAccessException | IOException e) {
+ privateKey = null;
}
return privateKey;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
new file mode 100644
index 0000000..f4147cd
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.url;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLStreamHandler;
+import java.util.Base64;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class DataURLStreamHandler extends URLStreamHandler {
+
+ class DataURLConnection extends URLConnection {
+ private boolean parsed = false;
+ private String contentType;
+ private String data;
+ private URI uri;
+
+ protected DataURLConnection(URL url) {
+ super(url);
+ try {
+ this.uri = this.url.toURI();
+ } catch (URISyntaxException e) {
+ this.uri = null;
+ }
+ }
+
+ @Override
+ public void connect() throws IOException {
+ if (this.parsed) {
+ return;
+ }
+
+ if (this.uri == null) {
+ throw new IOException();
+ }
+ Pattern pattern = Pattern.compile(
+ "(?<mimeType>.+?)(;(?<charset>charset=.+?))?(;(?<base64>base64?))?,(?<data>.+)", Pattern.DOTALL);
+ Matcher matcher = pattern.matcher(this.uri.getSchemeSpecificPart());
+ if (matcher.matches()) {
+ this.contentType = matcher.group("mimeType");
+ String charset = matcher.group("charset");
+ if (charset == null) {
+ charset = "US-ASCII";
+ }
+ if (matcher.group("base64") == null) {
+ // Support Urlencode but not decode here because already decoded by URI class.
+ this.data = new String(matcher.group("data").getBytes(), charset);
+ } else {
+ this.data = new String(Base64.getDecoder().decode(matcher.group("data")), charset);
+ }
+ } else {
+ throw new MalformedURLException();
+ }
+ parsed = true;
+ }
+
+ @Override
+ public long getContentLengthLong() {
+ long length;
+ try {
+ this.connect();
+ length = this.data.length();
+ } catch (IOException e) {
+ length = -1;
+ }
+ return length;
+ }
+
+ @Override
+ public String getContentType() {
+ String contentType;
+ try {
+ this.connect();
+ contentType = this.contentType;
+ } catch (IOException e) {
+ contentType = null;
+ }
+ return contentType;
+ }
+
+ @Override
+ public String getContentEncoding() {
+ return "identity";
+ }
+
+ public InputStream getInputStream() throws IOException {
+ this.connect();
+ return new ByteArrayInputStream(this.data.getBytes());
+ }
+ }
+
+ @Override
+ protected URLConnection openConnection(URL u) throws IOException {
+ return new DataURLConnection(u);
+ }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
new file mode 100644
index 0000000..af7b668
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.url;
+
+import java.net.URLStreamHandler;
+import java.net.URLStreamHandlerFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PulsarURLStreamHandlerFactory implements URLStreamHandlerFactory {
+ static Map<String, Class<? extends URLStreamHandler>> handlers;
+ static {
+ handlers = new HashMap<>();
+ handlers.put("data", DataURLStreamHandler.class);
+ }
+
+ @Override
+ public URLStreamHandler createURLStreamHandler(String protocol) {
+ URLStreamHandler urlStreamHandler;
+ try {
+ Class<? extends URLStreamHandler> handler = handlers.get(protocol);
+ if (handler != null) {
+ urlStreamHandler = handler.newInstance();
+ } else {
+ urlStreamHandler = null;
+ }
+ } catch (InstantiationException e) {
+ urlStreamHandler = null;
+ } catch (IllegalAccessException e) {
+ urlStreamHandler = null;
+ }
+ return urlStreamHandler;
+ }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/URL.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/URL.java
new file mode 100644
index 0000000..4d8c367
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/URL.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.url;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLConnection;
+import java.net.URLStreamHandlerFactory;
+
+public class URL {
+ private static URLStreamHandlerFactory urlStreamHandlerFactory = new PulsarURLStreamHandlerFactory();
+ private java.net.URL url;
+
+ public URL(String spec)
+ throws MalformedURLException, URISyntaxException, InstantiationException, IllegalAccessException {
+ String scheme = new URI(spec).getScheme();
+ if (scheme == null) {
+ this.url = new java.net.URL(null, "file:" + spec);
+ } else {
+ this.url = new java.net.URL(null, spec, urlStreamHandlerFactory.createURLStreamHandler(scheme));
+ }
+ }
+
+ public URLConnection openConnection() throws IOException {
+ return this.url.openConnection();
+ }
+
+ public Object getContent() throws IOException {
+ return this.url.getContent();
+ }
+
+ public Object getContent(Class[] classes) throws IOException {
+ return this.url.getContent(classes);
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
maskit@apache.org.