You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/13 15:55:33 UTC
[pulsar] branch master updated: [improve][connector] Add getSourceConfig method on SourceContext (#16305)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c122928c585 [improve][connector] Add getSourceConfig method on SourceContext (#16305)
c122928c585 is described below
commit c122928c585b7302db49f427c984fe55aa9e871f
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Wed Jul 13 23:55:27 2022 +0800
[improve][connector] Add getSourceConfig method on SourceContext (#16305)
---
.../java/org/apache/pulsar/functions/instance/ContextImpl.java | 7 +++++++
.../org/apache/pulsar/functions/instance/ContextImplTest.java | 7 +++++++
.../test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java | 6 ++++++
.../src/main/java/org/apache/pulsar/io/core/SourceContext.java | 8 ++++++++
4 files changed, 28 insertions(+)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 336c2bd0be3..b3b165ea536 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
@@ -76,6 +77,7 @@ import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
+import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
@@ -264,6 +266,11 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
return config.getFunctionDetails().getSink().getTopic();
}
+ @Override
+ public SourceConfig getSourceConfig() {
+ return SourceConfigUtils.convertFromDetails(config.getFunctionDetails());
+ }
+
@Override
public String getOutputSchemaType() {
SinkSpec sink = config.getFunctionDetails().getSink();
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index e2e34f4ff91..13c467e1e50 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -150,6 +150,13 @@ public class ContextImplTest {
Assert.assertNotNull(sinkConfig);
}
+ @Test
+ public void testGetSourceConfig() {
+ SinkContext sourceContext = context;
+ SinkConfig sinkConfig = sourceContext.getSinkConfig();
+ Assert.assertNotNull(sinkConfig);
+ }
+
@Test
public void testIncrCounterStateEnabled() throws Exception {
context.defaultStateStore = mock(BKStateStoreImpl.class);
diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
index 94d5f8472d6..7317e9296c6 100644
--- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
+++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
@@ -123,6 +124,11 @@ public class IOConfigUtilsTest {
return null;
}
+ @Override
+ public SourceConfig getSourceConfig() {
+ return null;
+ }
+
@Override
public String getTenant() {
return null;
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
index c4d756857eb..3f1665ab48c 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
+import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.functions.api.BaseContext;
/**
@@ -47,6 +48,13 @@ public interface SourceContext extends BaseContext {
*/
String getOutputTopic();
+ /**
+ * Get the source config.
+ *
+ * @return source config
+ */
+ SourceConfig getSourceConfig();
+
/**
* New output message using schema for serializing to the topic.
*