You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/05/17 03:39:26 UTC

[pulsar] branch master updated: add getting secrets utils method for sinks (#4291)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 39004c8  add getting secrets utils method for sinks (#4291)
39004c8 is described below

commit 39004c83d5583237f34f75cd078c557421f0560a
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu May 16 20:39:22 2019 -0700

    add getting secrets utils method for sinks (#4291)
---
 .../org/apache/pulsar/io/common/IOConfigUtils.java | 12 +++-
 .../apache/pulsar/io/common/IOConfigUtilsTest.java | 84 ++++++++++++++++++++--
 2 files changed, 91 insertions(+), 5 deletions(-)

diff --git a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
index f89e0e5..f5b8870 100644
--- a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
+++ b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.io.common;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
@@ -27,10 +28,19 @@ import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Function;
 
 @Slf4j
 public class IOConfigUtils {
     public static <T> T loadWithSecrets(Map<String, Object> map, Class<T> clazz, SourceContext sourceContext) {
+        return loadWithSecrets(map, clazz, secretName -> sourceContext.getSecret(secretName));
+    }
+
+    public static <T> T loadWithSecrets(Map<String, Object> map, Class<T> clazz, SinkContext sinkContext) {
+        return loadWithSecrets(map, clazz, secretName -> sinkContext.getSecret(secretName));
+    }
+
+    private static <T> T loadWithSecrets(Map<String, Object> map, Class<T> clazz, Function<String, String> secretsGetter) {
         Map<String, Object> configs = new HashMap<>(map);
 
         for (Field field : clazz.getDeclaredFields()) {
@@ -41,7 +51,7 @@ public class IOConfigUtils {
                     if (((FieldDoc) annotation).sensitive()) {
                         String secret = null;
                         try {
-                            secret = sourceContext.getSecret(field.getName());
+                            secret = secretsGetter.apply(field.getName());
                         } catch (Exception e) {
                             log.warn("Failed to read secret {}", field.getName(), e);
                             break;
diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
index 7584d8b..af62c7f 100644
--- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
+++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -20,12 +20,14 @@ package org.apache.pulsar.io.common;
 
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 import org.slf4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -66,7 +68,7 @@ public class IOConfigUtilsTest {
 
         static Map<String, String> secretsMap = new HashMap<>();
         static {
-            secretsMap.put("password", "my-password");
+            secretsMap.put("password", "my-source-password");
         }
 
         @Override
@@ -116,14 +118,14 @@ public class IOConfigUtilsTest {
     }
 
     @Test
-    public void loadWithSecrets() {
+    public void testSourceLoadWithSecrets() {
 
         Map<String, Object> configMap = new HashMap<>();
         configMap.put("notSensitive", "foo");
         TestConfig testConfig = IOConfigUtils.loadWithSecrets(configMap, TestConfig.class, new TestSourceContext());
 
         Assert.assertEquals(testConfig.notSensitive, "foo");
-        Assert.assertEquals(testConfig.password, "my-password");
+        Assert.assertEquals(testConfig.password, "my-source-password");
 
         configMap = new HashMap<>();
         configMap.put("notSensitive", "foo");
@@ -133,7 +135,81 @@ public class IOConfigUtilsTest {
         testConfig = IOConfigUtils.loadWithSecrets(configMap, TestConfig.class, new TestSourceContext());
 
         Assert.assertEquals(testConfig.notSensitive, "foo");
-        Assert.assertEquals(testConfig.password, "my-password");
+        Assert.assertEquals(testConfig.password, "my-source-password");
+        Assert.assertEquals(testConfig.sensitiveLong, 5L);
+    }
+
+    static class TestSinkContext implements SinkContext {
+        static Map<String, String> secretsMap = new HashMap<>();
+        static {
+            secretsMap.put("password", "my-sink-password");
+        }
+
+        @Override
+        public int getInstanceId() {
+            return 0;
+        }
+
+        @Override
+        public int getNumInstances() {
+            return 0;
+        }
+
+        @Override
+        public void recordMetric(String metricName, double value) {
+
+        }
+
+        @Override
+        public Collection<String> getInputTopics() {
+            return null;
+        }
+
+        @Override
+        public String getTenant() {
+            return null;
+        }
+
+        @Override
+        public String getNamespace() {
+            return null;
+        }
+
+        @Override
+        public String getSinkName() {
+            return null;
+        }
+
+        @Override
+        public Logger getLogger() {
+            return null;
+        }
+
+        @Override
+        public String getSecret(String secretName) {
+            return secretsMap.get(secretName);
+        }
+    }
+
+    @Test
+    public void testSinkLoadWithSecrets() {
+
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("notSensitive", "foo");
+        TestConfig testConfig = IOConfigUtils.loadWithSecrets(configMap, TestConfig.class, new TestSinkContext());
+
+        Assert.assertEquals(testConfig.notSensitive, "foo");
+        Assert.assertEquals(testConfig.password, "my-sink-password");
+
+        configMap = new HashMap<>();
+        configMap.put("notSensitive", "foo");
+        configMap.put("password", "another-password");
+        configMap.put("sensitiveLong", 5L);
+
+        testConfig = IOConfigUtils.loadWithSecrets(configMap, TestConfig.class, new TestSinkContext());
+
+        Assert.assertEquals(testConfig.notSensitive, "foo");
+        Assert.assertEquals(testConfig.password, "my-sink-password");
         Assert.assertEquals(testConfig.sensitiveLong, 5L);
     }
 }