You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/25 19:20:43 UTC

[incubator-streampipes] branch rel/0.70.0 updated: [STREAMPIPES-580] Refactor the Pulsar sink element

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch rel/0.70.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/rel/0.70.0 by this push:
     new 3764f9860 [STREAMPIPES-580] Refactor the Pulsar sink element
3764f9860 is described below

commit 3764f9860e59d30830edfe18800b3ff6a4a56f2e
Author: Zike Yang <zi...@apache.org>
AuthorDate: Thu Aug 25 23:01:30 2022 +0800

    [STREAMPIPES-580] Refactor the Pulsar sink element
---
 .../streampipes-sinks-brokers-jvm/pom.xml          |   6 +
 .../sinks/brokers/jvm/BrokersJvmInit.java          |   4 +-
 .../sinks/brokers/jvm/pulsar/Pulsar.java           |  85 ---------------
 .../sinks/brokers/jvm/pulsar/PulsarController.java |  68 ------------
 .../sinks/brokers/jvm/pulsar/PulsarParameters.java |  19 ++--
 .../brokers/jvm/pulsar/PulsarPublisherSink.java    | 121 +++++++++++++++++++++
 .../brokers/jvm/pulsar/TestPulsarParameters.java   |  50 +++++++++
 .../jvm/pulsar/TestPulsarPublisherSink.java        |  97 +++++++++++++++++
 8 files changed, 286 insertions(+), 164 deletions(-)

diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml b/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml
index b75e1ae4b..65844ccd2 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/pom.xml
@@ -102,6 +102,12 @@
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
index 97dad40ad..9910a94de 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
@@ -33,7 +33,7 @@ import org.apache.streampipes.sinks.brokers.jvm.jms.JmsController;
 import org.apache.streampipes.sinks.brokers.jvm.kafka.KafkaController;
 import org.apache.streampipes.sinks.brokers.jvm.mqtt.MqttPublisherSink;
 import org.apache.streampipes.sinks.brokers.jvm.nats.NatsController;
-import org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarController;
+import org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink;
 import org.apache.streampipes.sinks.brokers.jvm.rabbitmq.RabbitMqController;
 import org.apache.streampipes.sinks.brokers.jvm.rest.RestController;
 import org.apache.streampipes.sinks.brokers.jvm.websocket.WebsocketServerSink;
@@ -58,7 +58,7 @@ public class BrokersJvmInit extends StandaloneModelSubmitter {
                     new RabbitMqController(),
                     new MqttPublisherSink(),
                     new WebsocketServerSink(),
-                    new PulsarController(),
+                    new PulsarPublisherSink(),
                     new NatsController())
             .registerMessagingFormats(
                     new JsonDataFormatFactory(),
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/Pulsar.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/Pulsar.java
deleted file mode 100644
index c51ce9343..000000000
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/Pulsar.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.sinks.brokers.jvm.pulsar;
-
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.SpDataFormatDefinition;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-import java.util.Map;
-
-public class Pulsar implements EventSink<PulsarParameters> {
-
-  private static final String PulsarScheme = "pulsar://";
-  private static final String Colon = ":";
-
-  private Producer<byte[]> producer;
-  private PulsarClient pulsarClient;
-  private SpDataFormatDefinition spDataFormatDefinition;
-
-  public Pulsar() {
-    this.spDataFormatDefinition = new JsonDataFormatDefinition();
-  }
-
-  @Override
-  public void onInvocation(PulsarParameters params,
-                           EventSinkRuntimeContext eventSinkRuntimeContext) throws SpRuntimeException {
-    try {
-     this.pulsarClient = PulsarClient.builder()
-              .serviceUrl(makePulsarUrl(params.getPulsarHost(), params.getPulsarPort()))
-              .build();
-
-      this.producer = this.pulsarClient.newProducer()
-              .topic(params.getTopic())
-              .create();
-    } catch (PulsarClientException e) {
-      throw new SpRuntimeException(e);
-    }
-  }
-
-  @Override
-  public void onEvent(Event event) throws SpRuntimeException {
-    Map<String, Object> rawMap = event.getRaw();
-    byte[] jsonMessage = this.spDataFormatDefinition.fromMap(rawMap);
-
-    try {
-      this.producer.send(jsonMessage);
-    } catch (PulsarClientException e) {
-      throw new SpRuntimeException(e);
-    }
-  }
-
-  @Override
-  public void onDetach() throws SpRuntimeException {
-    try {
-      this.pulsarClient.close();
-    } catch (PulsarClientException e) {
-      throw new SpRuntimeException(e);
-    }
-  }
-
-  private String makePulsarUrl(String hostname, Integer port) {
-    return PulsarScheme + hostname + Colon + port;
-  }
-}
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarController.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarController.java
deleted file mode 100644
index b79458c8d..000000000
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarController.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.sinks.brokers.jvm.pulsar;
-
-import org.apache.streampipes.model.DataSinkType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.sdk.builder.DataSinkBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
-
-public class PulsarController extends StandaloneEventSinkDeclarer<PulsarParameters> {
-
-  private static final String TOPIC_KEY = "topic";
-  private static final String PULSAR_HOST_KEY = "pulsar-host";
-  private static final String PULSAR_PORT_KEY = "pulsar-port";
-
-  @Override
-  public DataSinkDescription declareModel() {
-    return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.pulsar")
-            .category(DataSinkType.MESSAGING)
-            .withLocales(Locales.EN)
-            .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-            .requiredStream(StreamRequirementsBuilder
-                    .create()
-                    .requiredProperty(EpRequirements.anyProperty())
-                    .build())
-            .requiredTextParameter(Labels.withId(PULSAR_HOST_KEY))
-            .requiredIntegerParameter(Labels.withId(PULSAR_PORT_KEY), 6650)
-            .requiredTextParameter(Labels.withId(TOPIC_KEY))
-            .build();
-  }
-
-  @Override
-  public ConfiguredEventSink<PulsarParameters> onInvocation(DataSinkInvocation graph,
-                                                          DataSinkParameterExtractor extractor) {
-    String pulsarHost = extractor.singleValueParameter(PULSAR_HOST_KEY, String.class);
-    Integer pulsarPort = extractor.singleValueParameter(PULSAR_PORT_KEY, Integer.class);
-    String topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
-
-    PulsarParameters params = new PulsarParameters(graph, pulsarHost, pulsarPort, topic);
-
-    return new ConfiguredEventSink<>(params, Pulsar::new);
-  }
-
-
-}
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarParameters.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarParameters.java
index 82351eead..0accba057 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarParameters.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarParameters.java
@@ -17,21 +17,22 @@
  */
 package org.apache.streampipes.sinks.brokers.jvm.pulsar;
 
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
+import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
 
-public class PulsarParameters extends EventSinkBindingParams {
+import static org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink.*;
 
+public class PulsarParameters {
   private String pulsarHost;
   private Integer pulsarPort;
   private String topic;
 
-  public PulsarParameters(DataSinkInvocation graph, String pulsarHost, Integer pulsarPort,
-                          String topic) {
-    super(graph);
-    this.pulsarHost = pulsarHost;
-    this.pulsarPort = pulsarPort;
-    this.topic = topic;
+  public PulsarParameters(SinkParams parameters) {
+    DataSinkParameterExtractor extractor = parameters.extractor();
+
+    this.pulsarHost = extractor.singleValueParameter(PULSAR_HOST_KEY, String.class);
+    this.pulsarPort = extractor.singleValueParameter(PULSAR_PORT_KEY, Integer.class);
+    this.topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
   }
 
   public String getPulsarHost() {
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarPublisherSink.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarPublisherSink.java
new file mode 100644
index 000000000..0eadc54f0
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/PulsarPublisherSink.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.sinks.brokers.jvm.pulsar;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.SpDataFormatDefinition;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
+import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.sdk.builder.DataSinkBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
+
+public class PulsarPublisherSink extends StreamPipesDataSink {
+
+    public static final String TOPIC_KEY = "topic";
+    public static final String PULSAR_HOST_KEY = "pulsar-host";
+    public static final String PULSAR_PORT_KEY = "pulsar-port";
+    private static final String PulsarScheme = "pulsar://";
+    private static final String Colon = ":";
+    private final ClientBuilder clientBuilder;
+    private Producer<byte[]> producer;
+    private PulsarClient pulsarClient;
+    private SpDataFormatDefinition spDataFormatDefinition;
+    private PulsarParameters params;
+
+    public PulsarPublisherSink() {
+        this.clientBuilder = PulsarClient.builder();
+    }
+
+    @VisibleForTesting
+    public PulsarPublisherSink(ClientBuilder pulsarClientBuilder) {
+        this.clientBuilder = pulsarClientBuilder;
+    }
+
+    @Override
+    public DataSinkDescription declareModel() {
+        return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.pulsar")
+                .category(DataSinkType.MESSAGING)
+                .withLocales(Locales.EN)
+                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+                .requiredStream(StreamRequirementsBuilder
+                        .create()
+                        .requiredProperty(EpRequirements.anyProperty())
+                        .build())
+                .requiredTextParameter(Labels.withId(PULSAR_HOST_KEY))
+                .requiredIntegerParameter(Labels.withId(PULSAR_PORT_KEY), 6650)
+                .requiredTextParameter(Labels.withId(TOPIC_KEY))
+                .build();
+    }
+
+    @Override
+    public void onInvocation(SinkParams parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+        params = new PulsarParameters(parameters);
+
+        this.spDataFormatDefinition = new JsonDataFormatDefinition();
+        try {
+            this.pulsarClient = clientBuilder.serviceUrl(makePulsarUrl(params.getPulsarHost(), params.getPulsarPort()))
+                    .build();
+
+            this.producer = this.pulsarClient.newProducer()
+                    .topic(params.getTopic())
+                    .create();
+        } catch (PulsarClientException e) {
+            throw new SpRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void onEvent(Event event) throws SpRuntimeException {
+        Map<String, Object> rawMap = event.getRaw();
+        byte[] jsonMessage = this.spDataFormatDefinition.fromMap(rawMap);
+
+        try {
+            this.producer.send(jsonMessage);
+        } catch (PulsarClientException e) {
+            throw new SpRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void onDetach() throws SpRuntimeException {
+        try {
+            this.pulsarClient.close();
+        } catch (PulsarClientException e) {
+            throw new SpRuntimeException(e);
+        }
+    }
+
+    private String makePulsarUrl(String hostname, Integer port) {
+        return PulsarScheme + hostname + Colon + port;
+    }
+}
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/test/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/TestPulsarParameters.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/test/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/TestPulsarParameters.java
new file mode 100644
index 000000000..aaf4fd2a7
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/test/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/TestPulsarParameters.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.sinks.brokers.jvm.pulsar;
+
+import static org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink.PULSAR_HOST_KEY;
+import static org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink.PULSAR_PORT_KEY;
+import static org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink.TOPIC_KEY;
+import static org.mockito.Mockito.mock;
+import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.junit.Assert;
+import org.mockito.Mockito;
+import org.junit.Test;
+
+public class TestPulsarParameters {
+    @Test
+    public void testInitPulsarParameters() {
+        String pulsarHost = "localhost";
+        Integer pulsarPort = 6650;
+        String topic = "test";
+
+        SinkParams params = mock(SinkParams.class);
+        DataSinkParameterExtractor extractor = mock(DataSinkParameterExtractor.class);
+        Mockito.when(params.extractor()).thenReturn(extractor);
+        Mockito.when(extractor.singleValueParameter(PULSAR_HOST_KEY, String.class)).thenReturn(pulsarHost);
+        Mockito.when(extractor.singleValueParameter(PULSAR_PORT_KEY, Integer.class)).thenReturn(pulsarPort);
+        Mockito.when(extractor.singleValueParameter(TOPIC_KEY, String.class)).thenReturn(topic);
+
+        PulsarParameters pulsarParameters = new PulsarParameters(params);
+
+        Assert.assertEquals(pulsarHost, pulsarParameters.getPulsarHost());
+        Assert.assertEquals(pulsarPort, pulsarParameters.getPulsarPort());
+        Assert.assertEquals(topic, pulsarParameters.getTopic());
+    }
+}
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/test/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/TestPulsarPublisherSink.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/test/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/TestPulsarPublisherSink.java
new file mode 100644
index 000000000..5e15a1391
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/test/java/org/apache/streampipes/sinks/brokers/jvm/pulsar/TestPulsarPublisherSink.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.sinks.brokers.jvm.pulsar;
+
+import static org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink.PULSAR_HOST_KEY;
+import static org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink.PULSAR_PORT_KEY;
+import static org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink.TOPIC_KEY;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestPulsarPublisherSink {
+    @Test
+    public void testSimpleEventSink() throws PulsarClientException {
+        String pulsarHost = "localhost";
+        Integer pulsarPort = 6650;
+        String topic = "test";
+
+        Map<String, Object> rawMap = new HashMap<>(2);
+
+        rawMap.put("key1", "value1");
+        rawMap.put("key2", "value2");
+
+        SinkParams params = mock(SinkParams.class);
+        DataSinkParameterExtractor extractor = mock(DataSinkParameterExtractor.class);
+        when(params.extractor()).thenReturn(extractor);
+        when(extractor.singleValueParameter(PULSAR_HOST_KEY, String.class)).thenReturn(pulsarHost);
+        when(extractor.singleValueParameter(PULSAR_PORT_KEY, Integer.class)).thenReturn(pulsarPort);
+        when(extractor.singleValueParameter(TOPIC_KEY, String.class)).thenReturn(topic);
+
+        ClientBuilder clientBuilder = mock(ClientBuilder.class);
+        PulsarClient pulsarClient = mock(PulsarClient.class);
+        ProducerBuilder<byte[]> producerBuilder = mock(ProducerBuilder.class);
+        Producer<byte[]> producer = mock(Producer.class);
+        when(clientBuilder.serviceUrl(anyString())).thenReturn(clientBuilder);
+        when(clientBuilder.build()).thenReturn(pulsarClient);
+        when(pulsarClient.newProducer()).thenReturn(producerBuilder);
+        when(producerBuilder.topic(topic)).thenReturn(producerBuilder);
+        when(producerBuilder.create()).thenReturn(producer);
+        when(producer.send(Mockito.any(byte[].class))).thenAnswer(data -> {
+            HashMap<String, String> map;
+            ObjectMapper mapper = new ObjectMapper();
+            String json = new String((byte[]) data.getArgument(0));
+            map = mapper.readValue(json, new TypeReference<>() {
+            });
+            Assert.assertEquals(map, rawMap);
+            return null;
+        });
+
+        PulsarPublisherSink pulsarPublisherSink = new PulsarPublisherSink(clientBuilder);
+
+        // Test invocation
+        pulsarPublisherSink.onInvocation(params, null);
+
+        verify(clientBuilder).serviceUrl(String.format("pulsar://%s:%d", pulsarHost, pulsarPort));
+
+        // Test publish event
+        Event event = mock(Event.class);
+        when(event.getRaw()).thenReturn(rawMap);
+
+        pulsarPublisherSink.onEvent(event);
+
+        verify(producer, times(1)).send(Mockito.any(byte[].class));
+    }
+}