You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/04/18 10:12:50 UTC

[incubator-streampipes-extensions] branch dev updated: STREAMPIPES-105: Create Kafka Publisher Sink

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/dev by this push:
     new d955152   STREAMPIPES-105: Create Kafka Publisher Sink
d955152 is described below

commit d955152806696e3f47c81afeb27d1250d7436226
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Sat Apr 18 12:12:03 2020 +0200

     STREAMPIPES-105: Create Kafka Publisher Sink
---
 .../sinks/brokers/jvm/jms/JmsController.java       | 24 +++++---
 .../sinks/brokers/jvm/jms/JmsPublisher.java        |  6 +-
 .../sinks/brokers/jvm/kafka/KafkaController.java   | 26 +++++---
 .../sinks/brokers/jvm/mqtt/MqttController.java     | 26 +++++---
 .../brokers/jvm/rabbitmq/RabbitMqConsumer.java     |  5 ++
 .../brokers/jvm/rabbitmq/RabbitMqController.java   | 72 ++++++++++++++--------
 .../brokers/jvm/rabbitmq/RabbitMqPublisher.java    |  6 ++
 .../strings.en                                     |  9 ++-
 .../strings.en                                     |  9 ++-
 .../strings.en                                     |  9 ++-
 .../strings.en                                     | 18 +++++-
 11 files changed, 149 insertions(+), 61 deletions(-)

diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsController.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsController.java
index cf4f276..81a45ba 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsController.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsController.java
@@ -36,9 +36,11 @@ public class JmsController extends StandaloneEventSinkDeclarer<JmsParameters> {
 
   private static final String JMS_BROKER_SETTINGS_KEY = "broker-settings";
   private static final String TOPIC_KEY = "topic";
+  private static final String HOST_KEY = "host";
+  private static final String PORT_KEY = "port";
 
-  private static final String JMS_HOST_URI = "http://schema.org/jmsHost";
-  private static final String JMS_PORT_URI = "http://schema.org/jmsPort";
+//  private static final String JMS_HOST_URI = "http://schema.org/jmsHost";
+//  private static final String JMS_PORT_URI = "http://schema.org/jmsPort";
 
   @Override
   public DataSinkDescription declareModel() {
@@ -50,9 +52,11 @@ public class JmsController extends StandaloneEventSinkDeclarer<JmsParameters> {
                     .requiredProperty(EpRequirements.anyProperty())
                     .build())
             .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
-            .requiredOntologyConcept(Labels.withId(JMS_BROKER_SETTINGS_KEY),
-                    OntologyProperties.mandatory(JMS_HOST_URI),
-                    OntologyProperties.mandatory(JMS_PORT_URI))
+            .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
+            .requiredIntegerParameter(Labels.withId(PORT_KEY), 61616)
+//            .requiredOntologyConcept(Labels.withId(JMS_BROKER_SETTINGS_KEY),
+//                    OntologyProperties.mandatory(JMS_HOST_URI),
+//                    OntologyProperties.mandatory(JMS_PORT_URI))
             .build();
   }
 
@@ -61,10 +65,12 @@ public class JmsController extends StandaloneEventSinkDeclarer<JmsParameters> {
 
     String topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
 
-    String jmsHost = extractor.supportedOntologyPropertyValue(JMS_BROKER_SETTINGS_KEY, JMS_HOST_URI,
-            String.class);
-    Integer jmsPort = extractor.supportedOntologyPropertyValue(JMS_BROKER_SETTINGS_KEY, JMS_PORT_URI,
-            Integer.class);
+    String jmsHost = extractor.singleValueParameter(HOST_KEY, String.class);
+    Integer jmsPort = extractor.singleValueParameter(PORT_KEY, Integer.class);
+//    String jmsHost = extractor.supportedOntologyPropertyValue(JMS_BROKER_SETTINGS_KEY, JMS_HOST_URI,
+//            String.class);
+//    Integer jmsPort = extractor.supportedOntologyPropertyValue(JMS_BROKER_SETTINGS_KEY, JMS_PORT_URI,
+//            Integer.class);
 
     JmsParameters params = new JmsParameters(graph, jmsHost, jmsPort, topic);
 
diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisher.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisher.java
index baf4539..2492412 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisher.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisher.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.sinks.brokers.jvm.jms;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
 import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
@@ -38,7 +39,10 @@ public class JmsPublisher implements EventSink<JmsParameters> {
 
   @Override
   public void onInvocation(JmsParameters params, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
-    this.publisher = new ActiveMQPublisher(params.getJmsHost() + ":" + params.getJmsPort(), params.getTopic());
+//    this.publisher = new ActiveMQPublisher(params.getJmsHost() + ":" + params.getJmsPort(), params.getTopic());
+    this.publisher = new ActiveMQPublisher();
+    JmsTransportProtocol jmsTransportProtocol = new JmsTransportProtocol(params.getJmsHost(),  params.getJmsPort(), params.getTopic());
+    this.publisher.connect(jmsTransportProtocol);
     if (!this.publisher.isConnected()) {
       throw new SpRuntimeException("Could not connect to JMS server " + params.getJmsHost() + " on Port: " + params.getJmsPort() + " to topic: " + params.getTopic());
     }
diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java
index be82dd5..be2e1e0 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java
@@ -33,11 +33,13 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDec
 
 public class KafkaController extends StandaloneEventSinkDeclarer<KafkaParameters> {
 
-  private static final String KAFKA_BROKER_SETTINGS_KEY = "broker-settings";
+//  private static final String KAFKA_BROKER_SETTINGS_KEY = "broker-settings";
   private static final String TOPIC_KEY = "topic";
+  private static final String HOST_KEY = "host";
+  private static final String PORT_KEY = "port";
 
-  private static final String KAFKA_HOST_URI = "http://schema.org/kafkaHost";
-  private static final String KAFKA_PORT_URI = "http://schema.org/kafkaPort";
+//  private static final String KAFKA_HOST_URI = "http://schema.org/kafkaHost";
+//  private static final String KAFKA_PORT_URI = "http://schema.org/kafkaPort";
 
   @Override
   public DataSinkDescription declareModel() {
@@ -49,9 +51,11 @@ public class KafkaController extends StandaloneEventSinkDeclarer<KafkaParameters
                     .requiredProperty(EpRequirements.anyProperty())
                     .build())
             .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
-            .requiredOntologyConcept(Labels.withId(KAFKA_BROKER_SETTINGS_KEY),
-                    OntologyProperties.mandatory(KAFKA_HOST_URI),
-                    OntologyProperties.mandatory(KAFKA_PORT_URI))
+            .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
+            .requiredIntegerParameter(Labels.withId(PORT_KEY), 9092)
+//            .requiredOntologyConcept(Labels.withId(KAFKA_BROKER_SETTINGS_KEY),
+//                    OntologyProperties.mandatory(KAFKA_HOST_URI),
+//                    OntologyProperties.mandatory(KAFKA_PORT_URI))
             .build();
   }
 
@@ -60,10 +64,12 @@ public class KafkaController extends StandaloneEventSinkDeclarer<KafkaParameters
                                                            DataSinkParameterExtractor extractor) {
     String topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
 
-    String kafkaHost = extractor.supportedOntologyPropertyValue(KAFKA_BROKER_SETTINGS_KEY, KAFKA_HOST_URI,
-            String.class);
-    Integer kafkaPort = extractor.supportedOntologyPropertyValue(KAFKA_BROKER_SETTINGS_KEY, KAFKA_PORT_URI,
-            Integer.class);
+    String kafkaHost = extractor.singleValueParameter(HOST_KEY, String.class);
+    Integer kafkaPort = extractor.singleValueParameter(PORT_KEY, Integer.class);
+//    String kafkaHost = extractor.supportedOntologyPropertyValue(KAFKA_BROKER_SETTINGS_KEY, KAFKA_HOST_URI,
+//            String.class);
+//    Integer kafkaPort = extractor.supportedOntologyPropertyValue(KAFKA_BROKER_SETTINGS_KEY, KAFKA_PORT_URI,
+//            Integer.class);
 
     KafkaParameters params = new KafkaParameters(graph, kafkaHost, kafkaPort, topic);
 
diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/mqtt/MqttController.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/mqtt/MqttController.java
index e6f6fa4..9976572 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/mqtt/MqttController.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/mqtt/MqttController.java
@@ -33,11 +33,13 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDec
 
 public class MqttController extends StandaloneEventSinkDeclarer<MqttParameters> {
 
-    private static final String MQTT_BROKER_SETTINGS_KEY = "broker-settings";
+//    private static final String MQTT_BROKER_SETTINGS_KEY = "broker-settings";
     private static final String TOPIC_KEY = "topic";
+    private static final String HOST_KEY = "host";
+    private static final String PORT_KEY = "port";
 
-    private static final String MQTT_HOST_URI = "http://schema.org/mqttHost";
-    private static final String MQTT_PORT_URI = "http://schema.org/mqttPort";
+//    private static final String MQTT_HOST_URI = "http://schema.org/mqttHost";
+//    private static final String MQTT_PORT_URI = "http://schema.org/mqttPort";
 
     @Override
     public DataSinkDescription declareModel() {
@@ -49,9 +51,11 @@ public class MqttController extends StandaloneEventSinkDeclarer<MqttParameters>
                         .requiredProperty(EpRequirements.anyProperty())
                         .build())
                 .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
-                .requiredOntologyConcept(Labels.withId(MQTT_BROKER_SETTINGS_KEY),
-                        OntologyProperties.mandatory(MQTT_HOST_URI),
-                        OntologyProperties.mandatory(MQTT_PORT_URI))
+                .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
+                .requiredIntegerParameter(Labels.withId(PORT_KEY), 1883)
+//                .requiredOntologyConcept(Labels.withId(MQTT_BROKER_SETTINGS_KEY),,
+//                        OntologyProperties.mandatory(MQTT_HOST_URI),
+//                        OntologyProperties.mandatory(MQTT_PORT_URI))
                 .build();
     }
 
@@ -60,10 +64,12 @@ public class MqttController extends StandaloneEventSinkDeclarer<MqttParameters>
                                                              DataSinkParameterExtractor extractor) {
 
         String topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
-        String mqttHost = extractor.supportedOntologyPropertyValue(MQTT_BROKER_SETTINGS_KEY, MQTT_HOST_URI,
-                String.class);
-        Integer mqttPort = extractor.supportedOntologyPropertyValue(MQTT_BROKER_SETTINGS_KEY, MQTT_PORT_URI,
-                Integer.class);
+        String mqttHost = extractor.singleValueParameter(HOST_KEY, String.class);
+        Integer mqttPort = extractor.singleValueParameter(PORT_KEY, Integer.class);
+//        String mqttHost = extractor.supportedOntologyPropertyValue(MQTT_BROKER_SETTINGS_KEY, MQTT_HOST_URI,
+//                String.class);
+//        Integer mqttPort = extractor.supportedOntologyPropertyValue(MQTT_BROKER_SETTINGS_KEY, MQTT_PORT_URI,
+//                Integer.class);
 
         MqttParameters params = new MqttParameters(graph, mqttHost, mqttPort, topic);
         return new ConfiguredEventSink<>(params, MqttPublisher::new);
diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
index a1ef054..3dec7cb 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
@@ -45,6 +45,11 @@ public class RabbitMqConsumer implements EventSink<RabbitMqParameters> {
   public void onInvocation(RabbitMqParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
     this.publisher = new RabbitMqPublisher(parameters);
     this.topic = parameters.getRabbitMqTopic();
+
+    if (!this.publisher.isConnected()) {
+      throw new SpRuntimeException("Could not establish conntection to RabbitMQ broker. Host: " +
+              parameters.getRabbitMqHost() + " Port: " + parameters.getRabbitMqPort());
+    }
   }
 
   @Override
diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqController.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqController.java
index 26a9e45..01384dd 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqController.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqController.java
@@ -36,11 +36,18 @@ public class RabbitMqController extends StandaloneEventSinkDeclarer<RabbitMqPara
   private static final String RABBITMQ_BROKER_SETTINGS_KEY = "broker-settings";
   private static final String TOPIC_KEY = "topic";
 
-  private static final String RABBITMQ_HOST_URI = "http://schema.org/rabbitMqHost";
-  private static final String RABBITMQ_PORT_URI = "http://schema.org/rabbitMqPort";
-  private static final String RABBITMQ_USER_URI = "http://schema.org/rabbitMqUser";
-  private static final String RABBITMQ_PASSWORD_URI = "http://schema.org/rabbitMqPassword";
-  private static final String EXCHANGE_NAME_URI = "http://schema.org/exchangeName";
+  private static final String HOST_KEY = "host";
+  private static final String PORT_KEY = "port";
+  private static final String USER_KEY = "user";
+  private static final String PASSWORD_KEY = "password";
+  private static final String EXCHANGE_NAME_KEY = "exchange-name";
+
+
+//  private static final String RABBITMQ_HOST_URI = "http://schema.org/rabbitMqHost";
+//  private static final String RABBITMQ_PORT_URI = "http://schema.org/rabbitMqPort";
+//  private static final String RABBITMQ_USER_URI = "http://schema.org/rabbitMqUser";
+//  private static final String RABBITMQ_PASSWORD_URI = "http://schema.org/rabbitMqPassword";
+//  private static final String EXCHANGE_NAME_URI = "http://schema.org/exchangeName";
 
   @Override
   public DataSinkDescription declareModel() {
@@ -51,33 +58,44 @@ public class RabbitMqController extends StandaloneEventSinkDeclarer<RabbitMqPara
                     .create()
                     .requiredProperty(EpRequirements.anyProperty())
                     .build())
-            .requiredTextParameter(Labels.withId(TOPIC_KEY), false, true)
-            .requiredOntologyConcept(Labels.withId(RABBITMQ_BROKER_SETTINGS_KEY),
-                    OntologyProperties.mandatory(RABBITMQ_HOST_URI),
-                    OntologyProperties.mandatory(RABBITMQ_PORT_URI),
-                    OntologyProperties.mandatory(RABBITMQ_USER_URI),
-                    OntologyProperties.mandatory(RABBITMQ_PASSWORD_URI),
-                    OntologyProperties.optional(EXCHANGE_NAME_URI))
+            .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
+            .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
+            .requiredIntegerParameter(Labels.withId(PORT_KEY), 5672)
+            .requiredTextParameter(Labels.withId(USER_KEY), false, false)
+            .requiredSecret(Labels.withId(PASSWORD_KEY))
+//            .requiredTextParameter(Labels.withId(EXCHANGE_NAME_KEY), false, false)
+//            .requiredOntologyConcept(Labels.withId(RABBITMQ_BROKER_SETTINGS_KEY),
+//                    OntologyProperties.mandatory(RABBITMQ_HOST_URI),
+//                    OntologyProperties.mandatory(RABBITMQ_PORT_URI),
+//                    OntologyProperties.mandatory(RABBITMQ_USER_URI),
+//                    OntologyProperties.mandatory(RABBITMQ_PASSWORD_URI),
+//                    OntologyProperties.optional(EXCHANGE_NAME_URI))
             .build();
   }
 
   @Override
   public ConfiguredEventSink<RabbitMqParameters> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor) {
-    String publisherTopic = extractor.singleValueParameter(TOPIC_KEY,
-            String.class);
-
-    String rabbitMqHost = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY, RABBITMQ_HOST_URI,
-            String.class);
-    Integer rabbitMqPort = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY, RABBITMQ_PORT_URI,
-            Integer.class);
-    String rabbitMqUser = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY, RABBITMQ_USER_URI,
-            String.class);
-    String rabbitMqPassword = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY,
-            RABBITMQ_PASSWORD_URI,
-            String.class);
-    String exchangeName = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY,
-            EXCHANGE_NAME_URI,
-            String.class);
+    String publisherTopic = extractor.singleValueParameter(TOPIC_KEY, String.class);
+
+//    String rabbitMqHost = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY, RABBITMQ_HOST_URI,
+//            String.class);
+//    Integer rabbitMqPort = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY, RABBITMQ_PORT_URI,
+//            Integer.class);
+//    String rabbitMqUser = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY, RABBITMQ_USER_URI,
+//            String.class);
+//    String rabbitMqPassword = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY,
+//            RABBITMQ_PASSWORD_URI,
+//            String.class);
+//    String exchangeName = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY,
+//            EXCHANGE_NAME_URI,
+//            String.class);
+
+    String rabbitMqHost = extractor.singleValueParameter(HOST_KEY, String.class);
+    Integer rabbitMqPort = extractor.singleValueParameter(PORT_KEY, Integer.class);
+    String rabbitMqUser = extractor.singleValueParameter(USER_KEY, String.class);
+    String rabbitMqPassword = extractor.secretValue(PASSWORD_KEY);
+//    String exchangeName = extractor.singleValueParameter(EXCHANGE_NAME_KEY, String.class);
+    String exchangeName = "logs";
 
     RabbitMqParameters params = new RabbitMqParameters(graph, rabbitMqHost, rabbitMqPort, publisherTopic,
             rabbitMqUser, rabbitMqPassword, exchangeName);
diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisher.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisher.java
index 3dd323c..3a0ed2d 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisher.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisher.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.sinks.brokers.jvm.rabbitmq;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +49,7 @@ public class RabbitMqPublisher {
       this.params = params;
       this.exchangeName = params.getExchangeName();
       setupConnection();
+
       this.errorMode = false;
     } catch (IOException e) {
       LOG.error("Error (IOException) while connecting to RabbitMQ..entering error mode");
@@ -68,6 +70,10 @@ public class RabbitMqPublisher {
 
   }
 
+  public boolean isConnected()  {
+    return this.connection.isOpen();
+  }
+
   public void fire(byte[] event, String topic) {
     if (!channelActive(topic)) {
       setupChannel(topic);
diff --git a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.jms/strings.en b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.jms/strings.en
index 1c27a06..98cfd24 100644
--- a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.jms/strings.en
+++ b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.jms/strings.en
@@ -5,4 +5,11 @@ topic.title=JMS Topic
 topic.description=Select a JMS topic
 
 broker-settings.title=JMS broker settings (use prefix tcp://)
-broker-settings.description=Provide settings of the JMS broker to connect with.
\ No newline at end of file
+broker-settings.description=Provide settings of the JMS broker to connect with.
+
+host.title=JMS Endpoint
+host.description=IP address or hostname of JMS endpoint. (use prefix tcp://)
+
+port.title=Port
+port.description=Port of the JMS endpoint
+
diff --git a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
index d2a0a88..e6f914c 100644
--- a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
+++ b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
@@ -5,4 +5,11 @@ topic.title=Kafka Topic
 topic.description=Select a Kafka topic
 
 broker-settings.title=Kafka broker settings
-broker-settings.description=Provide settings of the Kafka broker to connect with.
\ No newline at end of file
+broker-settings.description=Provide settings of the Kafka broker to connect with.
+
+host.title=Kafka Broker
+host.description=IP address or hostname of Kafka broker
+
+port.title=Port
+port.description=Port of the Kafka broker. Default port 9092
+
diff --git a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.mqtt/strings.en b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.mqtt/strings.en
index bb6eaaf..e740419 100644
--- a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.mqtt/strings.en
+++ b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.mqtt/strings.en
@@ -5,4 +5,11 @@ topic.title= MQTT Topic
 topic.description=Select a MQTT topic
 
 broker-settings.title=MQTT broker settings (use prefix tcp://)
-broker-settings.description=Provide settings of the MQTT broker to connect with.
\ No newline at end of file
+broker-settings.description=Provide settings of the MQTT broker to connect with.
+
+host.title=MQTT Broker
+host.description=IP address or hostname of MQTT broker. (use prefix tcp://)
+
+port.title=Port
+port.description=Port of the MQTT broker. Default port 1883
+
diff --git a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.rabbitmq/strings.en b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.rabbitmq/strings.en
index 4539f98..72b6719 100644
--- a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.rabbitmq/strings.en
+++ b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.rabbitmq/strings.en
@@ -5,4 +5,20 @@ topic.title=RabbitMQ Topic
 topic.description=Select a RabbitMQ topic
 
 broker-settings.title=RabbitMQ broker settings
-broker-settings.description=Provide settings of the RabbitMQ broker to connect with.
\ No newline at end of file
+broker-settings.description=Provide settings of the RabbitMQ broker to connect with.
+
+host.title=Host
+host.description=Host or IP of the broker
+
+port.title=Port
+port.description=Port of the  broker (e.g. 5672)
+
+user.title=User
+user.description=User to log in to the broker
+
+password.title=Password
+password.description=Password of the user
+
+exchange-name.title=Exchange Name
+exchange-name.description=Leave empty for default exchange
+