You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/13 13:20:07 UTC

[pulsar] branch master updated: [improve][connector] Add getSinkConfig method to SinkContext (#15482)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 61b9c728a19 [improve][connector] Add getSinkConfig method to SinkContext (#15482)
61b9c728a19 is described below

commit 61b9c728a198029ec52ac292ebf7ae725bc84fbe
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Fri May 13 21:19:57 2022 +0800

    [improve][connector] Add getSinkConfig method to SinkContext (#15482)
---
 .../java/org/apache/pulsar/functions/instance/ContextImpl.java    | 7 +++++++
 .../org/apache/pulsar/functions/instance/ContextImplTest.java     | 8 ++++++++
 .../test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java  | 6 ++++++
 .../core/src/main/java/org/apache/pulsar/io/core/SinkContext.java | 8 ++++++++
 .../org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java    | 6 ++++++
 5 files changed, 35 insertions(+)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 0828a211fc0..79ff3531de1 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -56,6 +56,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Context;
@@ -73,6 +74,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
@@ -251,6 +253,11 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
         return config.getFunctionDetails().getSource().getInputSpecsMap().keySet();
     }
 
+    @Override
+    public SinkConfig getSinkConfig() {
+        return SinkConfigUtils.convertFromDetails(config.getFunctionDetails());
+    }
+
     @Override
     public String getOutputTopic() {
         return config.getFunctionDetails().getSink().getTopic();
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 4e0a380244c..41cb550250a 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.state.BKStateStoreImpl;
@@ -138,6 +139,13 @@ public class ContextImplTest {
         context.getState("test-key");
     }
 
+    @Test
+    public void testGetSinkConfig() {
+        SinkContext sinkContext = context;
+        SinkConfig sinkConfig = sinkContext.getSinkConfig();
+        Assert.assertNotNull(sinkConfig);
+    }
+
     @Test
     public void testIncrCounterStateEnabled() throws Exception {
         context.defaultStateStore = mock(BKStateStoreImpl.class);
diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
index e2ec4918bb7..94d5f8472d6 100644
--- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
+++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
@@ -284,6 +285,11 @@ public class IOConfigUtilsTest {
             return null;
         }
 
+        @Override
+        public SinkConfig getSinkConfig() {
+            return null;
+        }
+
         @Override
         public String getTenant() {
             return null;
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
index 6e05818ac94..4718d64235d 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
+import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.functions.api.BaseContext;
 
 /**
@@ -46,6 +47,13 @@ public interface SinkContext extends BaseContext {
      */
     Collection<String> getInputTopics();
 
+    /**
+     * Get sink config at startup.
+     *
+     * @return sink config
+     */
+    SinkConfig getSinkConfig();
+
     /**
      * Get subscription type used by the source providing data for the sink.
      *
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
index c04f654c8e3..b4ef9adbad7 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.SinkContext;
@@ -97,6 +98,11 @@ public class KafkaAbstractSinkTest {
                 return null;
             }
 
+            @Override
+            public SinkConfig getSinkConfig() {
+                return null;
+            }
+
             @Override
             public String getTenant() {
                 return null;