You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/13 15:55:33 UTC

[pulsar] branch master updated: [improve][connector] Add getSourceConfig method on SourceContext (#16305)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c122928c585  [improve][connector] Add getSourceConfig method on SourceContext (#16305)
c122928c585 is described below

commit c122928c585b7302db49f427c984fe55aa9e871f
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Wed Jul 13 23:55:27 2022 +0800

     [improve][connector] Add getSourceConfig method on SourceContext (#16305)
---
 .../java/org/apache/pulsar/functions/instance/ContextImpl.java    | 7 +++++++
 .../org/apache/pulsar/functions/instance/ContextImplTest.java     | 7 +++++++
 .../test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java  | 6 ++++++
 .../src/main/java/org/apache/pulsar/io/core/SourceContext.java    | 8 ++++++++
 4 files changed, 28 insertions(+)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 336c2bd0be3..b3b165ea536 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Context;
@@ -76,6 +77,7 @@ import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
+import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
@@ -264,6 +266,11 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
         return config.getFunctionDetails().getSink().getTopic();
     }
 
+    @Override
+    public SourceConfig getSourceConfig() {
+        return SourceConfigUtils.convertFromDetails(config.getFunctionDetails());
+    }
+
     @Override
     public String getOutputSchemaType() {
         SinkSpec sink = config.getFunctionDetails().getSink();
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index e2e34f4ff91..13c467e1e50 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -150,6 +150,13 @@ public class ContextImplTest {
         Assert.assertNotNull(sinkConfig);
     }
 
+    @Test
+    public void testGetSourceConfig() {
+        SinkContext sourceContext = context;
+        SinkConfig sinkConfig = sourceContext.getSinkConfig();
+        Assert.assertNotNull(sinkConfig);
+    }
+
     @Test
     public void testIncrCounterStateEnabled() throws Exception {
         context.defaultStateStore = mock(BKStateStoreImpl.class);
diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
index 94d5f8472d6..7317e9296c6 100644
--- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
+++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
@@ -123,6 +124,11 @@ public class IOConfigUtilsTest {
             return null;
         }
 
+        @Override
+        public SourceConfig getSourceConfig() {
+            return null;
+        }
+
         @Override
         public String getTenant() {
             return null;
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
index c4d756857eb..3f1665ab48c 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
+import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.functions.api.BaseContext;
 
 /**
@@ -47,6 +48,13 @@ public interface SourceContext extends BaseContext {
      */
     String getOutputTopic();
 
+    /**
+     * Get the source config.
+     *
+     * @return source config
+     */
+    SourceConfig getSourceConfig();
+
     /**
      * New output message using schema for serializing to the topic.
      *