You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/25 19:20:43 UTC
[incubator-streampipes] branch rel/0.70.0 updated: [STREAMPIPES-580] Refactor the Pulsar sink element
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch rel/0.70.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/rel/0.70.0 by this push:
new 3764f9860 [STREAMPIPES-580] Refactor the Pulsar sink element
3764f9860 is described below
commit 3764f9860e59d30830edfe18800b3ff6a4a56f2e
Author: Zike Yang <zi...@apache.org>
AuthorDate: Thu Aug 25 23:01:30 2022 +0800
[STREAMPIPES-580] Refactor the Pulsar sink element
---
.../streampipes-sinks-brokers-jvm/pom.xml | 6 +
.../sinks/brokers/jvm/BrokersJvmInit.java | 4 +-
.../sinks/brokers/jvm/pulsar/Pulsar.java | 85 ---------------
.../sinks/brokers/jvm/pulsar/PulsarController.java | 68 ------------
.../sinks/brokers/jvm/pulsar/PulsarParameters.java | 19 ++--
.../brokers/jvm/pulsar/PulsarPublisherSink.java | 121 +++++++++++++++++++++
.../brokers/jvm/pulsar/TestPulsarParameters.java | 50 +++++++++
.../jvm/pulsar/TestPulsarPublisherSink.java | 97 +++++++++++++++++
8 files changed, 286 insertions(+), 164 deletions(-)
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml b/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml
index b75e1ae4b..65844ccd2 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml
@@ -102,6 +102,12 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
index 97dad40ad..9910a94de 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
@@ -33,7 +33,7 @@ import org.apache.streampipes.sinks.brokers.jvm.jms.JmsController;
import org.apache.streampipes.sinks.brokers.jvm.kafka.KafkaController;
import org.apache.streampipes.sinks.brokers.jvm.mqtt.MqttPublisherSink;
import org.apache.streampipes.sinks.brokers.jvm.nats.NatsController;
-import org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarController;
+import org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink;
import org.apache.streampipes.sinks.brokers.jvm.rabbitmq.RabbitMqController;
import org.apache.streampipes.sinks.brokers.jvm.rest.RestController;
import org.apache.streampipes.sinks.brokers.jvm.websocket.WebsocketServerSink;
@@ -58,7 +58,7 @@ public class BrokersJvmInit extends StandaloneModelSubmitter {
new RabbitMqController(),
new MqttPublisherSink(),
new WebsocketServerSink(),
- new PulsarController(),
+ new PulsarPublisherSink(),
new NatsController())
.registerMessagingFormats(
new JsonDataFormatFactory(),
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/Pulsar.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/Pulsar.java
deleted file mode 100644
index c51ce9343..000000000
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/Pulsar.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.sinks.brokers.jvm.pulsar;
-
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.SpDataFormatDefinition;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-import java.util.Map;
-
-public class Pulsar implements EventSink<PulsarParameters> {
-
- private static final String PulsarScheme = "pulsar://";
- private static final String Colon = ":";
-
- private Producer<byte[]> producer;
- private PulsarClient pulsarClient;
- private SpDataFormatDefinition spDataFormatDefinition;
-
- public Pulsar() {
- this.spDataFormatDefinition = new JsonDataFormatDefinition();
- }
-
- @Override
- public void onInvocation(PulsarParameters params,
- EventSinkRuntimeContext eventSinkRuntimeContext) throws SpRuntimeException {
- try {
- this.pulsarClient = PulsarClient.builder()
- .serviceUrl(makePulsarUrl(params.getPulsarHost(), params.getPulsarPort()))
- .build();
-
- this.producer = this.pulsarClient.newProducer()
- .topic(params.getTopic())
- .create();
- } catch (PulsarClientException e) {
- throw new SpRuntimeException(e);
- }
- }
-
- @Override
- public void onEvent(Event event) throws SpRuntimeException {
- Map<String, Object> rawMap = event.getRaw();
- byte[] jsonMessage = this.spDataFormatDefinition.fromMap(rawMap);
-
- try {
- this.producer.send(jsonMessage);
- } catch (PulsarClientException e) {
- throw new SpRuntimeException(e);
- }
- }
-
- @Override
- public void onDetach() throws SpRuntimeException {
- try {
- this.pulsarClient.close();
- } catch (PulsarClientException e) {
- throw new SpRuntimeException(e);
- }
- }
-
- private String makePulsarUrl(String hostname, Integer port) {
- return PulsarScheme + hostname + Colon + port;
- }
-}
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarController.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarController.java
deleted file mode 100644
index b79458c8d..000000000
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarController.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.sinks.brokers.jvm.pulsar;
-
-import org.apache.streampipes.model.DataSinkType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.sdk.builder.DataSinkBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
-
-public class PulsarController extends StandaloneEventSinkDeclarer<PulsarParameters> {
-
- private static final String TOPIC_KEY = "topic";
- private static final String PULSAR_HOST_KEY = "pulsar-host";
- private static final String PULSAR_PORT_KEY = "pulsar-port";
-
- @Override
- public DataSinkDescription declareModel() {
- return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.pulsar")
- .category(DataSinkType.MESSAGING)
- .withLocales(Locales.EN)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
- .requiredTextParameter(Labels.withId(PULSAR_HOST_KEY))
- .requiredIntegerParameter(Labels.withId(PULSAR_PORT_KEY), 6650)
- .requiredTextParameter(Labels.withId(TOPIC_KEY))
- .build();
- }
-
- @Override
- public ConfiguredEventSink<PulsarParameters> onInvocation(DataSinkInvocation graph,
- DataSinkParameterExtractor extractor) {
- String pulsarHost = extractor.singleValueParameter(PULSAR_HOST_KEY, String.class);
- Integer pulsarPort = extractor.singleValueParameter(PULSAR_PORT_KEY, Integer.class);
- String topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
-
- PulsarParameters params = new PulsarParameters(graph, pulsarHost, pulsarPort, topic);
-
- return new ConfiguredEventSink<>(params, Pulsar::new);
- }
-
-
-}
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarParameters.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarParameters.java
index 82351eead..0accba057 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarParameters.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarParameters.java
@@ -17,21 +17,22 @@
*/
package org.apache.streampipes.sinks.brokers.jvm.pulsar;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
+import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
-public class PulsarParameters extends EventSinkBindingParams {
+import static org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink.*;
+public class PulsarParameters {
private String pulsarHost;
private Integer pulsarPort;
private String topic;
- public PulsarParameters(DataSinkInvocation graph, String pulsarHost, Integer pulsarPort,
- String topic) {
- super(graph);
- this.pulsarHost = pulsarHost;
- this.pulsarPort = pulsarPort;
- this.topic = topic;
+ public PulsarParameters(SinkParams parameters) {
+ DataSinkParameterExtractor extractor = parameters.extractor();
+
+ this.pulsarHost = extractor.singleValueParameter(PULSAR_HOST_KEY, String.class);
+ this.pulsarPort = extractor.singleValueParameter(PULSAR_PORT_KEY, Integer.class);
+ this.topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
}
public String getPulsarHost() {
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarPublisherSink.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarPublisherSink.java
new file mode 100644
index 000000000..0eadc54f0
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarPublisherSink.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.sinks.brokers.jvm.pulsar;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.SpDataFormatDefinition;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
+import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.sdk.builder.DataSinkBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
+
+public class PulsarPublisherSink extends StreamPipesDataSink {
+
+ public static final String TOPIC_KEY = "topic";
+ public static final String PULSAR_HOST_KEY = "pulsar-host";
+ public static final String PULSAR_PORT_KEY = "pulsar-port";
+ private static final String PulsarScheme = "pulsar://";
+ private static final String Colon = ":";
+ private final ClientBuilder clientBuilder;
+ private Producer<byte[]> producer;
+ private PulsarClient pulsarClient;
+ private SpDataFormatDefinition spDataFormatDefinition;
+ private PulsarParameters params;
+
+ public PulsarPublisherSink() {
+ this.clientBuilder = PulsarClient.builder();
+ }
+
+ @VisibleForTesting
+ public PulsarPublisherSink(ClientBuilder pulsarClientBuilder) {
+ this.clientBuilder = pulsarClientBuilder;
+ }
+
+ @Override
+ public DataSinkDescription declareModel() {
+ return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.pulsar")
+ .category(DataSinkType.MESSAGING)
+ .withLocales(Locales.EN)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
+ .requiredTextParameter(Labels.withId(PULSAR_HOST_KEY))
+ .requiredIntegerParameter(Labels.withId(PULSAR_PORT_KEY), 6650)
+ .requiredTextParameter(Labels.withId(TOPIC_KEY))
+ .build();
+ }
+
+ @Override
+ public void onInvocation(SinkParams parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+ params = new PulsarParameters(parameters);
+
+ this.spDataFormatDefinition = new JsonDataFormatDefinition();
+ try {
+ this.pulsarClient = clientBuilder.serviceUrl(makePulsarUrl(params.getPulsarHost(), params.getPulsarPort()))
+ .build();
+
+ this.producer = this.pulsarClient.newProducer()
+ .topic(params.getTopic())
+ .create();
+ } catch (PulsarClientException e) {
+ throw new SpRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onEvent(Event event) throws SpRuntimeException {
+ Map<String, Object> rawMap = event.getRaw();
+ byte[] jsonMessage = this.spDataFormatDefinition.fromMap(rawMap);
+
+ try {
+ this.producer.send(jsonMessage);
+ } catch (PulsarClientException e) {
+ throw new SpRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+ try {
+ this.pulsarClient.close();
+ } catch (PulsarClientException e) {
+ throw new SpRuntimeException(e);
+ }
+ }
+
+ private String makePulsarUrl(String hostname, Integer port) {
+ return PulsarScheme + hostname + Colon + port;
+ }
+}
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/test/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/TestPulsarParameters.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/test/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/TestPulsarParameters.java
new file mode 100644
index 000000000..aaf4fd2a7
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/test/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/TestPulsarParameters.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.sinks.brokers.jvm.pulsar;
+
+import static org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink.PULSAR_HOST_KEY;
+import static org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink.PULSAR_PORT_KEY;
+import static org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink.TOPIC_KEY;
+import static org.mockito.Mockito.mock;
+import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.junit.Assert;
+import org.mockito.Mockito;
+import org.junit.Test;
+
+public class TestPulsarParameters {
+ @Test
+ public void testInitPulsarParameters() {
+ String pulsarHost = "localhost";
+ Integer pulsarPort = 6650;
+ String topic = "test";
+
+ SinkParams params = mock(SinkParams.class);
+ DataSinkParameterExtractor extractor = mock(DataSinkParameterExtractor.class);
+ Mockito.when(params.extractor()).thenReturn(extractor);
+ Mockito.when(extractor.singleValueParameter(PULSAR_HOST_KEY, String.class)).thenReturn(pulsarHost);
+ Mockito.when(extractor.singleValueParameter(PULSAR_PORT_KEY, Integer.class)).thenReturn(pulsarPort);
+ Mockito.when(extractor.singleValueParameter(TOPIC_KEY, String.class)).thenReturn(topic);
+
+ PulsarParameters pulsarParameters = new PulsarParameters(params);
+
+ Assert.assertEquals(pulsarHost, pulsarParameters.getPulsarHost());
+ Assert.assertEquals(pulsarPort, pulsarParameters.getPulsarPort());
+ Assert.assertEquals(topic, pulsarParameters.getTopic());
+ }
+}
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/test/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/TestPulsarPublisherSink.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/test/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/TestPulsarPublisherSink.java
new file mode 100644
index 000000000..5e15a1391
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/test/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/TestPulsarPublisherSink.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.sinks.brokers.jvm.pulsar;
+
+import static org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink.PULSAR_HOST_KEY;
+import static org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink.PULSAR_PORT_KEY;
+import static org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink.TOPIC_KEY;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestPulsarPublisherSink {
+ @Test
+ public void testSimpleEventSink() throws PulsarClientException {
+ String pulsarHost = "localhost";
+ Integer pulsarPort = 6650;
+ String topic = "test";
+
+ Map<String, Object> rawMap = new HashMap<>(2);
+
+ rawMap.put("key1", "value1");
+ rawMap.put("key2", "value2");
+
+ SinkParams params = mock(SinkParams.class);
+ DataSinkParameterExtractor extractor = mock(DataSinkParameterExtractor.class);
+ when(params.extractor()).thenReturn(extractor);
+ when(extractor.singleValueParameter(PULSAR_HOST_KEY, String.class)).thenReturn(pulsarHost);
+ when(extractor.singleValueParameter(PULSAR_PORT_KEY, Integer.class)).thenReturn(pulsarPort);
+ when(extractor.singleValueParameter(TOPIC_KEY, String.class)).thenReturn(topic);
+
+ ClientBuilder clientBuilder = mock(ClientBuilder.class);
+ PulsarClient pulsarClient = mock(PulsarClient.class);
+ ProducerBuilder<byte[]> producerBuilder = mock(ProducerBuilder.class);
+ Producer<byte[]> producer = mock(Producer.class);
+ when(clientBuilder.serviceUrl(anyString())).thenReturn(clientBuilder);
+ when(clientBuilder.build()).thenReturn(pulsarClient);
+ when(pulsarClient.newProducer()).thenReturn(producerBuilder);
+ when(producerBuilder.topic(topic)).thenReturn(producerBuilder);
+ when(producerBuilder.create()).thenReturn(producer);
+ when(producer.send(Mockito.any(byte[].class))).thenAnswer(data -> {
+ HashMap<String, String> map;
+ ObjectMapper mapper = new ObjectMapper();
+ String json = new String((byte[]) data.getArgument(0));
+ map = mapper.readValue(json, new TypeReference<>() {
+ });
+ Assert.assertEquals(map, rawMap);
+ return null;
+ });
+
+ PulsarPublisherSink pulsarPublisherSink = new PulsarPublisherSink(clientBuilder);
+
+ // Test invocation
+ pulsarPublisherSink.onInvocation(params, null);
+
+ verify(clientBuilder).serviceUrl(String.format("pulsar://%s:%d", pulsarHost, pulsarPort));
+
+ // Test publish event
+ Event event = mock(Event.class);
+ when(event.getRaw()).thenReturn(rawMap);
+
+ pulsarPublisherSink.onEvent(event);
+
+ verify(producer, times(1)).send(Mockito.any(byte[].class));
+ }
+}