You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/13 13:20:07 UTC
[pulsar] branch master updated: [improve][connector] Add getSinkConfig method to SinkContext (#15482)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 61b9c728a19 [improve][connector] Add getSinkConfig method to SinkContext (#15482)
61b9c728a19 is described below
commit 61b9c728a198029ec52ac292ebf7ae725bc84fbe
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Fri May 13 21:19:57 2022 +0800
[improve][connector] Add getSinkConfig method to SinkContext (#15482)
---
.../java/org/apache/pulsar/functions/instance/ContextImpl.java | 7 +++++++
.../org/apache/pulsar/functions/instance/ContextImplTest.java | 8 ++++++++
.../test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java | 6 ++++++
.../core/src/main/java/org/apache/pulsar/io/core/SinkContext.java | 8 ++++++++
.../org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java | 6 ++++++
5 files changed, 35 insertions(+)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 0828a211fc0..79ff3531de1 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -56,6 +56,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
@@ -73,6 +74,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
@@ -251,6 +253,11 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
return config.getFunctionDetails().getSource().getInputSpecsMap().keySet();
}
+ @Override
+ public SinkConfig getSinkConfig() {
+ return SinkConfigUtils.convertFromDetails(config.getFunctionDetails());
+ }
+
@Override
public String getOutputTopic() {
return config.getFunctionDetails().getSink().getTopic();
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 4e0a380244c..41cb550250a 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.BKStateStoreImpl;
@@ -138,6 +139,13 @@ public class ContextImplTest {
context.getState("test-key");
}
+ @Test
+ public void testGetSinkConfig() {
+ SinkContext sinkContext = context;
+ SinkConfig sinkConfig = sinkContext.getSinkConfig();
+ Assert.assertNotNull(sinkConfig);
+ }
+
@Test
public void testIncrCounterStateEnabled() throws Exception {
context.defaultStateStore = mock(BKStateStoreImpl.class);
diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
index e2ec4918bb7..94d5f8472d6 100644
--- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
+++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
@@ -284,6 +285,11 @@ public class IOConfigUtilsTest {
return null;
}
+ @Override
+ public SinkConfig getSinkConfig() {
+ return null;
+ }
+
@Override
public String getTenant() {
return null;
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
index 6e05818ac94..4718d64235d 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
+import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.functions.api.BaseContext;
/**
@@ -46,6 +47,13 @@ public interface SinkContext extends BaseContext {
*/
Collection<String> getInputTopics();
+ /**
+ * Get sink config at startup.
+ *
+ * @return sink config
+ */
+ SinkConfig getSinkConfig();
+
/**
* Get subscription type used by the source providing data for the sink.
*
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
index c04f654c8e3..b4ef9adbad7 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.SinkContext;
@@ -97,6 +98,11 @@ public class KafkaAbstractSinkTest {
return null;
}
+ @Override
+ public SinkConfig getSinkConfig() {
+ return null;
+ }
+
@Override
public String getTenant() {
return null;