You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2018/07/01 17:45:30 UTC
[incubator-pulsar] branch master updated: Forward user-properties
to sink (#2057)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7cebe23 Forward user-properties to sink (#2057)
7cebe23 is described below
commit 7cebe2330bd7376df066a31c01231f0cb693deaf
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sun Jul 1 10:45:27 2018 -0700
Forward user-properties to sink (#2057)
---
.../org/apache/pulsar/io/PulsarSinkE2ETest.java | 38 +++++++++++++---------
.../functions/instance/JavaInstanceRunnable.java | 4 +--
.../apache/pulsar/functions/sink/PulsarSink.java | 11 ++++---
.../pulsar/functions/source/PulsarRecord.java | 4 +++
.../pulsar/functions/source/PulsarSource.java | 1 +
5 files changed, 37 insertions(+), 21 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 7b08f6b..5d70525 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -32,10 +32,6 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.test.PortManager;
@@ -49,9 +45,12 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
@@ -80,7 +79,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
-import io.netty.util.concurrent.DefaultThreadFactory;
import jersey.repackaged.com.google.common.collect.Lists;
/**
@@ -240,22 +238,26 @@ public class PulsarSinkE2ETest {
@Test(timeOut = 20000)
public void testE2EPulsarSink() throws Exception {
- final String namespacePortion = "myReplNs";
+ final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
+ final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String propertyKey = "key";
+ final String propertyValue = "value";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
// create a producer that creates a topic at broker
- ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(sourceTopic);
- Producer<byte[]> producer = producerBuilder.create();
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(sourceTopic).create();
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(sinkTopic).subscriptionName("sub").subscribe();
String jarFilePathUrl = Utils.FILE + ":"
+ PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, "PulsarSink-test");
+ FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, "PulsarSink-test",
+ sinkTopic);
admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);
-
+
// try to update function to test: update-function functionality
admin.functions().updateFunctionWithUrl(functionDetails, jarFilePathUrl);
@@ -271,8 +273,8 @@ public class PulsarSinkE2ETest {
int totalMsgs = 5;
for (int i = 0; i < totalMsgs; i++) {
- String message = "my-message-" + i;
- producer.send(message.getBytes());
+ String data = "my-message-" + i;
+ producer.newMessage().property(propertyKey, propertyValue).value(data.getBytes()).send();
}
retryStrategically((test) -> {
try {
@@ -283,14 +285,20 @@ public class PulsarSinkE2ETest {
return false;
}
}, 5, 150);
+
+ Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedPropertyValue = msg.getProperty(propertyKey);
+ Assert.assertEquals(propertyValue, receivedPropertyValue);
+
// validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages
// due to publish failure
Assert.assertNotEquals(
- admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages, totalMsgs);
+ admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+ totalMsgs);
}
- protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String sinkName) {
+ protected FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String sinkName, String sinkTopic) {
File file = new File(jarFile);
try {
@@ -322,7 +330,7 @@ public class PulsarSinkE2ETest {
// set up sink spec
SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
// sinkSpecBuilder.setClassName(PulsarSink.class.getName());
- sinkSpecBuilder.setTopic(String.format("persistent://%s/%s/%s", tenant, namespace, "output"));
+ sinkSpecBuilder.setTopic(sinkTopic);
Map<String, Object> sinkConfigMap = Maps.newHashMap();
sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap));
sinkSpecBuilder.setTypeClassName(typeArg.getName());
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index d538c8b..bd1433c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -184,8 +184,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
if (currentRecord instanceof PulsarRecord) {
PulsarRecord pulsarRecord = (PulsarRecord) currentRecord;
- messageId = pulsarRecord.getMessageId();
- topicName = pulsarRecord.getTopicName();
+ messageId = pulsarRecord.getMessageId();
+ topicName = pulsarRecord.getTopicName();
}
result = javaInstance.handleMessage(messageId, topicName, currentRecord.getValue());
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 590a586..a561096 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -221,10 +221,13 @@ public class PulsarSink<T> implements Sink<T> {
msgBuilder.setContent(output);
if (recordContext instanceof PulsarRecord) {
PulsarRecord pulsarRecord = (PulsarRecord) recordContext;
- msgBuilder
- .setProperty("__pfn_input_topic__", pulsarRecord.getTopicName())
- .setProperty("__pfn_input_msg_id__", new String(
- Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
+ // forward user properties to sink-topic
+ if (pulsarRecord.getProperties() != null) {
+ msgBuilder.setProperties(pulsarRecord.getProperties());
+ }
+ msgBuilder.setProperty("__pfn_input_topic__", pulsarRecord.getTopicName()).setProperty(
+ "__pfn_input_msg_id__",
+ new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
}
this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, recordContext);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 3684e86..78a7c86 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -23,6 +23,9 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
+
+import java.util.Map;
+
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.io.core.Record;
@@ -38,6 +41,7 @@ public class PulsarRecord<T> implements Record<T> {
private T value;
private MessageId messageId;
private String topicName;
+ private Map<String, String> properties;
private Runnable failFunction;
private Runnable ackFunction;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index ae5d0d6..2190be1 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -132,6 +132,7 @@ public class PulsarSource<T> implements Source<T> {
.partitionId(String.format("%s-%s", topicName, partitionId))
.recordSequence(Utils.getSequenceId(message.getMessageId()))
.topicName(topicName)
+ .properties(message.getProperties())
.ackFunction(() -> {
if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
inputConsumer.acknowledgeCumulativeAsync(message);