You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/15 07:38:58 UTC
[pulsar] branch master updated: Add helper method to for IO
connectors to use function secrets (#4222)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e1728d0 Add helper method to for IO connectors to use function secrets (#4222)
e1728d0 is described below
commit e1728d0c8787293ffac398de54165838d54741b6
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed May 15 00:38:52 2019 -0700
Add helper method to for IO connectors to use function secrets (#4222)
* pulsar-io connectors use secrets
* remove unnecessary files
* fix pom
* add license headers
---
pulsar-io/common/pom.xml | 49 ++++++++
.../org/apache/pulsar/io/common/IOConfigUtils.java | 60 +++++++++
.../apache/pulsar/io/common/IOConfigUtilsTest.java | 139 +++++++++++++++++++++
pulsar-io/pom.xml | 1 +
pulsar-io/twitter/pom.xml | 5 +
.../apache/pulsar/io/twitter/TwitterFireHose.java | 3 +-
6 files changed, 256 insertions(+), 1 deletion(-)
diff --git a/pulsar-io/common/pom.xml b/pulsar-io/common/pom.xml
new file mode 100644
index 0000000..1d463b3
--- /dev/null
+++ b/pulsar-io/common/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io</artifactId>
+ <version>2.4.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-io-common</artifactId>
+ <name>Pulsar IO :: IO Common</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
new file mode 100644
index 0000000..f89e0e5
--- /dev/null
+++ b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.common;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class IOConfigUtils {
+ public static <T> T loadWithSecrets(Map<String, Object> map, Class<T> clazz, SourceContext sourceContext) {
+ Map<String, Object> configs = new HashMap<>(map);
+
+ for (Field field : clazz.getDeclaredFields()) {
+ field.setAccessible(true);
+ for (Annotation annotation : field.getAnnotations()) {
+ if (annotation.annotationType().equals(FieldDoc.class)) {
+
+ if (((FieldDoc) annotation).sensitive()) {
+ String secret = null;
+ try {
+ secret = sourceContext.getSecret(field.getName());
+ } catch (Exception e) {
+ log.warn("Failed to read secret {}", field.getName(), e);
+ break;
+ }
+
+ if (secret != null) {
+ configs.put(field.getName(), secret);
+ }
+ }
+ }
+
+ }
+ }
+ return ObjectMapperFactory.getThreadLocal().convertValue(configs, clazz);
+ }
+}
diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
new file mode 100644
index 0000000..7584d8b
--- /dev/null
+++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.common;
+
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+import org.slf4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class IOConfigUtilsTest {
+
+ @Data
+ static class TestConfig {
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ sensitive = true,
+ help = "password"
+ )
+ private String password;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ sensitive = false,
+ help = ""
+ )
+ private String notSensitive;
+
+ /**
+ * Non-string secrets are not supported at this moment
+ */
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ sensitive = true,
+ help = ""
+ )
+ private long sensitiveLong;
+ }
+
+ static class TestSourceContext implements SourceContext {
+
+ static Map<String, String> secretsMap = new HashMap<>();
+ static {
+ secretsMap.put("password", "my-password");
+ }
+
+ @Override
+ public int getInstanceId() {
+ return 0;
+ }
+
+ @Override
+ public int getNumInstances() {
+ return 0;
+ }
+
+ @Override
+ public void recordMetric(String metricName, double value) {
+
+ }
+
+ @Override
+ public String getOutputTopic() {
+ return null;
+ }
+
+ @Override
+ public String getTenant() {
+ return null;
+ }
+
+ @Override
+ public String getNamespace() {
+ return null;
+ }
+
+ @Override
+ public String getSourceName() {
+ return null;
+ }
+
+ @Override
+ public Logger getLogger() {
+ return null;
+ }
+
+ @Override
+ public String getSecret(String secretName) {
+ return secretsMap.get(secretName);
+ }
+ }
+
+ @Test
+ public void loadWithSecrets() {
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("notSensitive", "foo");
+ TestConfig testConfig = IOConfigUtils.loadWithSecrets(configMap, TestConfig.class, new TestSourceContext());
+
+ Assert.assertEquals(testConfig.notSensitive, "foo");
+ Assert.assertEquals(testConfig.password, "my-password");
+
+ configMap = new HashMap<>();
+ configMap.put("notSensitive", "foo");
+ configMap.put("password", "another-password");
+ configMap.put("sensitiveLong", 5L);
+
+ testConfig = IOConfigUtils.loadWithSecrets(configMap, TestConfig.class, new TestSourceContext());
+
+ Assert.assertEquals(testConfig.notSensitive, "foo");
+ Assert.assertEquals(testConfig.password, "my-password");
+ Assert.assertEquals(testConfig.sensitiveLong, 5L);
+ }
+}
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 69be1289..8e10df8 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -33,6 +33,7 @@
<modules>
<module>core</module>
+ <module>common</module>
<module>docs</module>
<module>twitter</module>
<module>cassandra</module>
diff --git a/pulsar-io/twitter/pom.xml b/pulsar-io/twitter/pom.xml
index af7c842..c4f3047 100644
--- a/pulsar-io/twitter/pom.xml
+++ b/pulsar-io/twitter/pom.xml
@@ -65,6 +65,11 @@
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index 1219e0b..70fa7ea 100644
--- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -39,6 +39,7 @@ import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
@@ -71,7 +72,7 @@ public class TwitterFireHose extends PushSource<TweetData> {
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws IOException {
- TwitterFireHoseConfig hoseConfig = TwitterFireHoseConfig.load(config);
+ TwitterFireHoseConfig hoseConfig = IOConfigUtils.loadWithSecrets(config, TwitterFireHoseConfig.class, sourceContext);
hoseConfig.validate();
waitObject = new Object();
startThread(hoseConfig);