You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2018/07/01 17:45:30 UTC

[incubator-pulsar] branch master updated: Forward user-properties to sink (#2057)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cebe23  Forward user-properties to sink (#2057)
7cebe23 is described below

commit 7cebe2330bd7376df066a31c01231f0cb693deaf
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sun Jul 1 10:45:27 2018 -0700

    Forward user-properties to sink (#2057)
---
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    | 38 +++++++++++++---------
 .../functions/instance/JavaInstanceRunnable.java   |  4 +--
 .../apache/pulsar/functions/sink/PulsarSink.java   | 11 ++++---
 .../pulsar/functions/source/PulsarRecord.java      |  4 +++
 .../pulsar/functions/source/PulsarSource.java      |  1 +
 5 files changed, 37 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 7b08f6b..5d70525 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -32,10 +32,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.test.PortManager;
@@ -49,9 +45,12 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
@@ -80,7 +79,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 
-import io.netty.util.concurrent.DefaultThreadFactory;
 import jersey.repackaged.com.google.common.collect.Lists;
 
 /**
@@ -240,22 +238,26 @@ public class PulsarSinkE2ETest {
     @Test(timeOut = 20000)
     public void testE2EPulsarSink() throws Exception {
 
-        final String namespacePortion = "myReplNs";
+        final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
         final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String propertyKey = "key";
+        final String propertyValue = "value";
         admin.namespaces().createNamespace(replNamespace);
         Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
         admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
 
         // create a producer that creates a topic at broker
-        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(sourceTopic);
-        Producer<byte[]> producer = producerBuilder.create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(sourceTopic).create();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(sinkTopic).subscriptionName("sub").subscribe();
 
         String jarFilePathUrl = Utils.FILE + ":"
                 + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, "PulsarSink-test");
+        FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, "PulsarSink-test",
+                sinkTopic);
         admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);
-        
+
         // try to update function to test: update-function functionality
         admin.functions().updateFunctionWithUrl(functionDetails, jarFilePathUrl);
 
@@ -271,8 +273,8 @@ public class PulsarSinkE2ETest {
 
         int totalMsgs = 5;
         for (int i = 0; i < totalMsgs; i++) {
-            String message = "my-message-" + i;
-            producer.send(message.getBytes());
+            String data = "my-message-" + i;
+            producer.newMessage().property(propertyKey, propertyValue).value(data.getBytes()).send();
         }
         retryStrategically((test) -> {
             try {
@@ -283,14 +285,20 @@ public class PulsarSinkE2ETest {
                 return false;
             }
         }, 5, 150);
+
+        Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+        String receivedPropertyValue = msg.getProperty(propertyKey);
+        Assert.assertEquals(propertyValue, receivedPropertyValue);
+
         // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages
         // due to publish failure
         Assert.assertNotEquals(
-                admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages, totalMsgs);
+                admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+                totalMsgs);
 
     }
 
-    protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String sinkName) {
+    protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String sinkName, String sinkTopic) {
 
         File file = new File(jarFile);
         try {
@@ -322,7 +330,7 @@ public class PulsarSinkE2ETest {
         // set up sink spec
         SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
         // sinkSpecBuilder.setClassName(PulsarSink.class.getName());
-        sinkSpecBuilder.setTopic(String.format("persistent://%s/%s/%s", tenant, namespace, "output"));
+        sinkSpecBuilder.setTopic(sinkTopic);
         Map<String, Object> sinkConfigMap = Maps.newHashMap();
         sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap));
         sinkSpecBuilder.setTypeClassName(typeArg.getName());
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index d538c8b..bd1433c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -184,8 +184,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
                 if (currentRecord instanceof PulsarRecord) {
                     PulsarRecord pulsarRecord = (PulsarRecord) currentRecord;
-                     messageId = pulsarRecord.getMessageId();
-                     topicName = pulsarRecord.getTopicName();
+                    messageId = pulsarRecord.getMessageId();
+                    topicName = pulsarRecord.getTopicName();
                 }
                 result = javaInstance.handleMessage(messageId, topicName, currentRecord.getValue());
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 590a586..a561096 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -221,10 +221,13 @@ public class PulsarSink<T> implements Sink<T> {
         msgBuilder.setContent(output);
         if (recordContext instanceof PulsarRecord) {
             PulsarRecord pulsarRecord = (PulsarRecord) recordContext;
-            msgBuilder
-                    .setProperty("__pfn_input_topic__", pulsarRecord.getTopicName())
-                    .setProperty("__pfn_input_msg_id__", new String(
-                            Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
+            // forward user properties to sink-topic
+            if (pulsarRecord.getProperties() != null) {
+                msgBuilder.setProperties(pulsarRecord.getProperties());
+            }
+            msgBuilder.setProperty("__pfn_input_topic__", pulsarRecord.getTopicName()).setProperty(
+                    "__pfn_input_msg_id__",
+                    new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
         }
 
         this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, recordContext);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 3684e86..78a7c86 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -23,6 +23,9 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.ToString;
+
+import java.util.Map;
+
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.io.core.Record;
 
@@ -38,6 +41,7 @@ public class PulsarRecord<T> implements Record<T> {
     private T value;
     private MessageId messageId;
     private String topicName;
+    private Map<String, String> properties;
     private Runnable failFunction;
     private Runnable ackFunction;
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index ae5d0d6..2190be1 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -132,6 +132,7 @@ public class PulsarSource<T> implements Source<T> {
                 .partitionId(String.format("%s-%s", topicName, partitionId))
                 .recordSequence(Utils.getSequenceId(message.getMessageId()))
                 .topicName(topicName)
+                .properties(message.getProperties())
                 .ackFunction(() -> {
                     if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                         inputConsumer.acknowledgeCumulativeAsync(message);