You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/05/13 10:49:58 UTC
[pulsar] branch master updated: Added ability for sources to
publish messages on their own just like their function counterparts (#6941)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7af5ebe Added ability for sources to publish messages on their own just like their function counterparts (#6941)
7af5ebe is described below
commit 7af5ebe167f30cb94ce7fd141746e9f736154c6e
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed May 13 03:49:46 2020 -0700
Added ability for sources to publish messages on their own just like their function counterparts (#6941)
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../org/apache/pulsar/io/common/IOConfigUtilsTest.java | 8 ++++++++
.../main/java/org/apache/pulsar/io/core/SourceContext.java | 14 ++++++++++++++
.../pulsar/io/kafka/source/KafkaAbstractSourceTest.java | 8 ++++++++
3 files changed, 30 insertions(+)
diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
index 51f4e6e..e85f7be 100644
--- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
+++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.io.common;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
@@ -179,6 +182,11 @@ public class IOConfigUtilsTest {
public CompletableFuture<ByteBuffer> getStateAsync(String key) {
return null;
}
+
+ @Override
+ public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException {
+ return null;
+ }
}
@Test
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
index e87a4bc..78fe211 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.io.core;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
@@ -160,4 +163,15 @@ public interface SourceContext {
* @return the state value for the key.
*/
CompletableFuture<ByteBuffer> getStateAsync(String key);
+
+ /**
+ * New output message using schema for serializing to the topic
+ *
+ * @param topicName The name of the topic for output message
+ * @param schema provide a way to convert between serialized data and domain objects
+ * @param <O>
+ * @return the message builder instance
+ * @throws PulsarClientException
+ */
+ <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException;
}
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 3bfd358..c4dde22 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -22,6 +22,9 @@ package org.apache.pulsar.io.kafka.source;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.KafkaAbstractSource;
import org.apache.pulsar.io.kafka.KafkaSourceConfig;
@@ -158,6 +161,11 @@ public class KafkaAbstractSourceTest {
public CompletableFuture<ByteBuffer> getStateAsync(String key) {
return null;
}
+
+ @Override
+ public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException {
+ return null;
+ }
};
Map<String, Object> config = new HashMap<>();
ThrowingRunnable openAndClose = ()->{