You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/05/13 10:49:58 UTC

[pulsar] branch master updated: Added ability for sources to publish messages on their own just like their function counterparts (#6941)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7af5ebe  Added ability for sources to publish messages on their own just like their function counterparts (#6941)
7af5ebe is described below

commit 7af5ebe167f30cb94ce7fd141746e9f736154c6e
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed May 13 03:49:46 2020 -0700

    Added ability for sources to publish messages on their own just like their function counterparts (#6941)
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../org/apache/pulsar/io/common/IOConfigUtilsTest.java     |  8 ++++++++
 .../main/java/org/apache/pulsar/io/core/SourceContext.java | 14 ++++++++++++++
 .../pulsar/io/kafka/source/KafkaAbstractSourceTest.java    |  8 ++++++++
 3 files changed, 30 insertions(+)

diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
index 51f4e6e..e85f7be 100644
--- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
+++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.io.common;
 
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
@@ -179,6 +182,11 @@ public class IOConfigUtilsTest {
         public CompletableFuture<ByteBuffer> getStateAsync(String key) {
             return null;
         }
+
+        @Override
+        public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException {
+            return null;
+        }
     }
 
     @Test
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
index e87a4bc..78fe211 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.io.core;
 
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
@@ -160,4 +163,15 @@ public interface SourceContext {
      * @return the state value for the key.
      */
     CompletableFuture<ByteBuffer> getStateAsync(String key);
+
+    /**
+     * New output message using schema for serializing to the topic
+     *
+     * @param topicName The name of the topic for output message
+     * @param schema provide a way to convert between serialized data and domain objects
+     * @param <O>
+     * @return the message builder instance
+     * @throws PulsarClientException
+     */
+    <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException;
 }
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 3bfd358..c4dde22 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -22,6 +22,9 @@ package org.apache.pulsar.io.kafka.source;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.kafka.KafkaAbstractSource;
 import org.apache.pulsar.io.kafka.KafkaSourceConfig;
@@ -158,6 +161,11 @@ public class KafkaAbstractSourceTest {
             public CompletableFuture<ByteBuffer> getStateAsync(String key) {
                 return null;
             }
+
+            @Override
+            public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException {
+                return null;
+            }
         };
         Map<String, Object> config = new HashMap<>();
         ThrowingRunnable openAndClose = ()->{