You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/04/18 10:12:50 UTC
[incubator-streampipes-extensions] branch dev updated:
STREAMPIPES-105: Create Kafka Publisher Sink
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new d955152 STREAMPIPES-105: Create Kafka Publisher Sink
d955152 is described below
commit d955152806696e3f47c81afeb27d1250d7436226
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Sat Apr 18 12:12:03 2020 +0200
STREAMPIPES-105: Create Kafka Publisher Sink
---
.../sinks/brokers/jvm/jms/JmsController.java | 24 +++++---
.../sinks/brokers/jvm/jms/JmsPublisher.java | 6 +-
.../sinks/brokers/jvm/kafka/KafkaController.java | 26 +++++---
.../sinks/brokers/jvm/mqtt/MqttController.java | 26 +++++---
.../brokers/jvm/rabbitmq/RabbitMqConsumer.java | 5 ++
.../brokers/jvm/rabbitmq/RabbitMqController.java | 72 ++++++++++++++--------
.../brokers/jvm/rabbitmq/RabbitMqPublisher.java | 6 ++
.../strings.en | 9 ++-
.../strings.en | 9 ++-
.../strings.en | 9 ++-
.../strings.en | 18 +++++-
11 files changed, 149 insertions(+), 61 deletions(-)
diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsController.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsController.java
index cf4f276..81a45ba 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsController.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsController.java
@@ -36,9 +36,11 @@ public class JmsController extends StandaloneEventSinkDeclarer<JmsParameters> {
private static final String JMS_BROKER_SETTINGS_KEY = "broker-settings";
private static final String TOPIC_KEY = "topic";
+ private static final String HOST_KEY = "host";
+ private static final String PORT_KEY = "port";
- private static final String JMS_HOST_URI = "http://schema.org/jmsHost";
- private static final String JMS_PORT_URI = "http://schema.org/jmsPort";
+// private static final String JMS_HOST_URI = "http://schema.org/jmsHost";
+// private static final String JMS_PORT_URI = "http://schema.org/jmsPort";
@Override
public DataSinkDescription declareModel() {
@@ -50,9 +52,11 @@ public class JmsController extends StandaloneEventSinkDeclarer<JmsParameters> {
.requiredProperty(EpRequirements.anyProperty())
.build())
.requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
- .requiredOntologyConcept(Labels.withId(JMS_BROKER_SETTINGS_KEY),
- OntologyProperties.mandatory(JMS_HOST_URI),
- OntologyProperties.mandatory(JMS_PORT_URI))
+ .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
+ .requiredIntegerParameter(Labels.withId(PORT_KEY), 61616)
+// .requiredOntologyConcept(Labels.withId(JMS_BROKER_SETTINGS_KEY),
+// OntologyProperties.mandatory(JMS_HOST_URI),
+// OntologyProperties.mandatory(JMS_PORT_URI))
.build();
}
@@ -61,10 +65,12 @@ public class JmsController extends StandaloneEventSinkDeclarer<JmsParameters> {
String topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
- String jmsHost = extractor.supportedOntologyPropertyValue(JMS_BROKER_SETTINGS_KEY, JMS_HOST_URI,
- String.class);
- Integer jmsPort = extractor.supportedOntologyPropertyValue(JMS_BROKER_SETTINGS_KEY, JMS_PORT_URI,
- Integer.class);
+ String jmsHost = extractor.singleValueParameter(HOST_KEY, String.class);
+ Integer jmsPort = extractor.singleValueParameter(PORT_KEY, Integer.class);
+// String jmsHost = extractor.supportedOntologyPropertyValue(JMS_BROKER_SETTINGS_KEY, JMS_HOST_URI,
+// String.class);
+// Integer jmsPort = extractor.supportedOntologyPropertyValue(JMS_BROKER_SETTINGS_KEY, JMS_PORT_URI,
+// Integer.class);
JmsParameters params = new JmsParameters(graph, jmsHost, jmsPort, topic);
diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisher.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisher.java
index baf4539..2492412 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisher.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/jms/JmsPublisher.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.sinks.brokers.jvm.jms;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
+import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;
@@ -38,7 +39,10 @@ public class JmsPublisher implements EventSink<JmsParameters> {
@Override
public void onInvocation(JmsParameters params, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
- this.publisher = new ActiveMQPublisher(params.getJmsHost() + ":" + params.getJmsPort(), params.getTopic());
+// this.publisher = new ActiveMQPublisher(params.getJmsHost() + ":" + params.getJmsPort(), params.getTopic());
+ this.publisher = new ActiveMQPublisher();
+ JmsTransportProtocol jmsTransportProtocol = new JmsTransportProtocol(params.getJmsHost(), params.getJmsPort(), params.getTopic());
+ this.publisher.connect(jmsTransportProtocol);
if (!this.publisher.isConnected()) {
throw new SpRuntimeException("Could not connect to JMS server " + params.getJmsHost() + " on Port: " + params.getJmsPort() + " to topic: " + params.getTopic());
}
diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java
index be82dd5..be2e1e0 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java
@@ -33,11 +33,13 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDec
public class KafkaController extends StandaloneEventSinkDeclarer<KafkaParameters> {
- private static final String KAFKA_BROKER_SETTINGS_KEY = "broker-settings";
+// private static final String KAFKA_BROKER_SETTINGS_KEY = "broker-settings";
private static final String TOPIC_KEY = "topic";
+ private static final String HOST_KEY = "host";
+ private static final String PORT_KEY = "port";
- private static final String KAFKA_HOST_URI = "http://schema.org/kafkaHost";
- private static final String KAFKA_PORT_URI = "http://schema.org/kafkaPort";
+// private static final String KAFKA_HOST_URI = "http://schema.org/kafkaHost";
+// private static final String KAFKA_PORT_URI = "http://schema.org/kafkaPort";
@Override
public DataSinkDescription declareModel() {
@@ -49,9 +51,11 @@ public class KafkaController extends StandaloneEventSinkDeclarer<KafkaParameters
.requiredProperty(EpRequirements.anyProperty())
.build())
.requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
- .requiredOntologyConcept(Labels.withId(KAFKA_BROKER_SETTINGS_KEY),
- OntologyProperties.mandatory(KAFKA_HOST_URI),
- OntologyProperties.mandatory(KAFKA_PORT_URI))
+ .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
+ .requiredIntegerParameter(Labels.withId(PORT_KEY), 9092)
+// .requiredOntologyConcept(Labels.withId(KAFKA_BROKER_SETTINGS_KEY),
+// OntologyProperties.mandatory(KAFKA_HOST_URI),
+// OntologyProperties.mandatory(KAFKA_PORT_URI))
.build();
}
@@ -60,10 +64,12 @@ public class KafkaController extends StandaloneEventSinkDeclarer<KafkaParameters
DataSinkParameterExtractor extractor) {
String topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
- String kafkaHost = extractor.supportedOntologyPropertyValue(KAFKA_BROKER_SETTINGS_KEY, KAFKA_HOST_URI,
- String.class);
- Integer kafkaPort = extractor.supportedOntologyPropertyValue(KAFKA_BROKER_SETTINGS_KEY, KAFKA_PORT_URI,
- Integer.class);
+ String kafkaHost = extractor.singleValueParameter(HOST_KEY, String.class);
+ Integer kafkaPort = extractor.singleValueParameter(PORT_KEY, Integer.class);
+// String kafkaHost = extractor.supportedOntologyPropertyValue(KAFKA_BROKER_SETTINGS_KEY, KAFKA_HOST_URI,
+// String.class);
+// Integer kafkaPort = extractor.supportedOntologyPropertyValue(KAFKA_BROKER_SETTINGS_KEY, KAFKA_PORT_URI,
+// Integer.class);
KafkaParameters params = new KafkaParameters(graph, kafkaHost, kafkaPort, topic);
diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/mqtt/MqttController.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/mqtt/MqttController.java
index e6f6fa4..9976572 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/mqtt/MqttController.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/mqtt/MqttController.java
@@ -33,11 +33,13 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDec
public class MqttController extends StandaloneEventSinkDeclarer<MqttParameters> {
- private static final String MQTT_BROKER_SETTINGS_KEY = "broker-settings";
+// private static final String MQTT_BROKER_SETTINGS_KEY = "broker-settings";
private static final String TOPIC_KEY = "topic";
+ private static final String HOST_KEY = "host";
+ private static final String PORT_KEY = "port";
- private static final String MQTT_HOST_URI = "http://schema.org/mqttHost";
- private static final String MQTT_PORT_URI = "http://schema.org/mqttPort";
+// private static final String MQTT_HOST_URI = "http://schema.org/mqttHost";
+// private static final String MQTT_PORT_URI = "http://schema.org/mqttPort";
@Override
public DataSinkDescription declareModel() {
@@ -49,9 +51,11 @@ public class MqttController extends StandaloneEventSinkDeclarer<MqttParameters>
.requiredProperty(EpRequirements.anyProperty())
.build())
.requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
- .requiredOntologyConcept(Labels.withId(MQTT_BROKER_SETTINGS_KEY),
- OntologyProperties.mandatory(MQTT_HOST_URI),
- OntologyProperties.mandatory(MQTT_PORT_URI))
+ .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
+ .requiredIntegerParameter(Labels.withId(PORT_KEY), 1883)
+// .requiredOntologyConcept(Labels.withId(MQTT_BROKER_SETTINGS_KEY),,
+// OntologyProperties.mandatory(MQTT_HOST_URI),
+// OntologyProperties.mandatory(MQTT_PORT_URI))
.build();
}
@@ -60,10 +64,12 @@ public class MqttController extends StandaloneEventSinkDeclarer<MqttParameters>
DataSinkParameterExtractor extractor) {
String topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
- String mqttHost = extractor.supportedOntologyPropertyValue(MQTT_BROKER_SETTINGS_KEY, MQTT_HOST_URI,
- String.class);
- Integer mqttPort = extractor.supportedOntologyPropertyValue(MQTT_BROKER_SETTINGS_KEY, MQTT_PORT_URI,
- Integer.class);
+ String mqttHost = extractor.singleValueParameter(HOST_KEY, String.class);
+ Integer mqttPort = extractor.singleValueParameter(PORT_KEY, Integer.class);
+// String mqttHost = extractor.supportedOntologyPropertyValue(MQTT_BROKER_SETTINGS_KEY, MQTT_HOST_URI,
+// String.class);
+// Integer mqttPort = extractor.supportedOntologyPropertyValue(MQTT_BROKER_SETTINGS_KEY, MQTT_PORT_URI,
+// Integer.class);
MqttParameters params = new MqttParameters(graph, mqttHost, mqttPort, topic);
return new ConfiguredEventSink<>(params, MqttPublisher::new);
diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
index a1ef054..3dec7cb 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
@@ -45,6 +45,11 @@ public class RabbitMqConsumer implements EventSink<RabbitMqParameters> {
public void onInvocation(RabbitMqParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
this.publisher = new RabbitMqPublisher(parameters);
this.topic = parameters.getRabbitMqTopic();
+
+ if (!this.publisher.isConnected()) {
+ throw new SpRuntimeException("Could not establish conntection to RabbitMQ broker. Host: " +
+ parameters.getRabbitMqHost() + " Port: " + parameters.getRabbitMqPort());
+ }
}
@Override
diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqController.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqController.java
index 26a9e45..01384dd 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqController.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqController.java
@@ -36,11 +36,18 @@ public class RabbitMqController extends StandaloneEventSinkDeclarer<RabbitMqPara
private static final String RABBITMQ_BROKER_SETTINGS_KEY = "broker-settings";
private static final String TOPIC_KEY = "topic";
- private static final String RABBITMQ_HOST_URI = "http://schema.org/rabbitMqHost";
- private static final String RABBITMQ_PORT_URI = "http://schema.org/rabbitMqPort";
- private static final String RABBITMQ_USER_URI = "http://schema.org/rabbitMqUser";
- private static final String RABBITMQ_PASSWORD_URI = "http://schema.org/rabbitMqPassword";
- private static final String EXCHANGE_NAME_URI = "http://schema.org/exchangeName";
+ private static final String HOST_KEY = "host";
+ private static final String PORT_KEY = "port";
+ private static final String USER_KEY = "user";
+ private static final String PASSWORD_KEY = "password";
+ private static final String EXCHANGE_NAME_KEY = "exchange-name";
+
+
+// private static final String RABBITMQ_HOST_URI = "http://schema.org/rabbitMqHost";
+// private static final String RABBITMQ_PORT_URI = "http://schema.org/rabbitMqPort";
+// private static final String RABBITMQ_USER_URI = "http://schema.org/rabbitMqUser";
+// private static final String RABBITMQ_PASSWORD_URI = "http://schema.org/rabbitMqPassword";
+// private static final String EXCHANGE_NAME_URI = "http://schema.org/exchangeName";
@Override
public DataSinkDescription declareModel() {
@@ -51,33 +58,44 @@ public class RabbitMqController extends StandaloneEventSinkDeclarer<RabbitMqPara
.create()
.requiredProperty(EpRequirements.anyProperty())
.build())
- .requiredTextParameter(Labels.withId(TOPIC_KEY), false, true)
- .requiredOntologyConcept(Labels.withId(RABBITMQ_BROKER_SETTINGS_KEY),
- OntologyProperties.mandatory(RABBITMQ_HOST_URI),
- OntologyProperties.mandatory(RABBITMQ_PORT_URI),
- OntologyProperties.mandatory(RABBITMQ_USER_URI),
- OntologyProperties.mandatory(RABBITMQ_PASSWORD_URI),
- OntologyProperties.optional(EXCHANGE_NAME_URI))
+ .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
+ .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
+ .requiredIntegerParameter(Labels.withId(PORT_KEY), 5672)
+ .requiredTextParameter(Labels.withId(USER_KEY), false, false)
+ .requiredSecret(Labels.withId(PASSWORD_KEY))
+// .requiredTextParameter(Labels.withId(EXCHANGE_NAME_KEY), false, false)
+// .requiredOntologyConcept(Labels.withId(RABBITMQ_BROKER_SETTINGS_KEY),
+// OntologyProperties.mandatory(RABBITMQ_HOST_URI),
+// OntologyProperties.mandatory(RABBITMQ_PORT_URI),
+// OntologyProperties.mandatory(RABBITMQ_USER_URI),
+// OntologyProperties.mandatory(RABBITMQ_PASSWORD_URI),
+// OntologyProperties.optional(EXCHANGE_NAME_URI))
.build();
}
@Override
public ConfiguredEventSink<RabbitMqParameters> onInvocation(DataSinkInvocation graph, DataSinkParameterExtractor extractor) {
- String publisherTopic = extractor.singleValueParameter(TOPIC_KEY,
- String.class);
-
- String rabbitMqHost = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY, RABBITMQ_HOST_URI,
- String.class);
- Integer rabbitMqPort = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY, RABBITMQ_PORT_URI,
- Integer.class);
- String rabbitMqUser = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY, RABBITMQ_USER_URI,
- String.class);
- String rabbitMqPassword = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY,
- RABBITMQ_PASSWORD_URI,
- String.class);
- String exchangeName = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY,
- EXCHANGE_NAME_URI,
- String.class);
+ String publisherTopic = extractor.singleValueParameter(TOPIC_KEY, String.class);
+
+// String rabbitMqHost = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY, RABBITMQ_HOST_URI,
+// String.class);
+// Integer rabbitMqPort = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY, RABBITMQ_PORT_URI,
+// Integer.class);
+// String rabbitMqUser = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY, RABBITMQ_USER_URI,
+// String.class);
+// String rabbitMqPassword = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY,
+// RABBITMQ_PASSWORD_URI,
+// String.class);
+// String exchangeName = extractor.supportedOntologyPropertyValue(RABBITMQ_BROKER_SETTINGS_KEY,
+// EXCHANGE_NAME_URI,
+// String.class);
+
+ String rabbitMqHost = extractor.singleValueParameter(HOST_KEY, String.class);
+ Integer rabbitMqPort = extractor.singleValueParameter(PORT_KEY, Integer.class);
+ String rabbitMqUser = extractor.singleValueParameter(USER_KEY, String.class);
+ String rabbitMqPassword = extractor.secretValue(PASSWORD_KEY);
+// String exchangeName = extractor.singleValueParameter(EXCHANGE_NAME_KEY, String.class);
+ String exchangeName = "logs";
RabbitMqParameters params = new RabbitMqParameters(graph, rabbitMqHost, rabbitMqPort, publisherTopic,
rabbitMqUser, rabbitMqPassword, exchangeName);
diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisher.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisher.java
index 3dd323c..3a0ed2d 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisher.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqPublisher.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.sinks.brokers.jvm.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +49,7 @@ public class RabbitMqPublisher {
this.params = params;
this.exchangeName = params.getExchangeName();
setupConnection();
+
this.errorMode = false;
} catch (IOException e) {
LOG.error("Error (IOException) while connecting to RabbitMQ..entering error mode");
@@ -68,6 +70,10 @@ public class RabbitMqPublisher {
}
+ public boolean isConnected() {
+ return this.connection.isOpen();
+ }
+
public void fire(byte[] event, String topic) {
if (!channelActive(topic)) {
setupChannel(topic);
diff --git a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.jms/strings.en b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.jms/strings.en
index 1c27a06..98cfd24 100644
--- a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.jms/strings.en
+++ b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.jms/strings.en
@@ -5,4 +5,11 @@ topic.title=JMS Topic
topic.description=Select a JMS topic
broker-settings.title=JMS broker settings (use prefix tcp://)
-broker-settings.description=Provide settings of the JMS broker to connect with.
\ No newline at end of file
+broker-settings.description=Provide settings of the JMS broker to connect with.
+
+host.title=JMS Endpoint
+host.description=IP address or hostname of JMS endpoint. (use prefix tcp://)
+
+port.title=Port
+port.description=Port of the JMS endpoint
+
diff --git a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
index d2a0a88..e6f914c 100644
--- a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
+++ b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
@@ -5,4 +5,11 @@ topic.title=Kafka Topic
topic.description=Select a Kafka topic
broker-settings.title=Kafka broker settings
-broker-settings.description=Provide settings of the Kafka broker to connect with.
\ No newline at end of file
+broker-settings.description=Provide settings of the Kafka broker to connect with.
+
+host.title=Kafka Broker
+host.description=IP address or hostname of Kafka broker
+
+port.title=Port
+port.description=Port of the Kafka broker. Default port 9092
+
diff --git a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.mqtt/strings.en b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.mqtt/strings.en
index bb6eaaf..e740419 100644
--- a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.mqtt/strings.en
+++ b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.mqtt/strings.en
@@ -5,4 +5,11 @@ topic.title= MQTT Topic
topic.description=Select a MQTT topic
broker-settings.title=MQTT broker settings (use prefix tcp://)
-broker-settings.description=Provide settings of the MQTT broker to connect with.
\ No newline at end of file
+broker-settings.description=Provide settings of the MQTT broker to connect with.
+
+host.title=MQTT Broker
+host.description=IP address or hostname of MQTT broker. (use prefix tcp://)
+
+port.title=Port
+port.description=Port of the MQTT broker. Default port 1883
+
diff --git a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.rabbitmq/strings.en b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.rabbitmq/strings.en
index 4539f98..72b6719 100644
--- a/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.rabbitmq/strings.en
+++ b/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.rabbitmq/strings.en
@@ -5,4 +5,20 @@ topic.title=RabbitMQ Topic
topic.description=Select a RabbitMQ topic
broker-settings.title=RabbitMQ broker settings
-broker-settings.description=Provide settings of the RabbitMQ broker to connect with.
\ No newline at end of file
+broker-settings.description=Provide settings of the RabbitMQ broker to connect with.
+
+host.title=Host
+host.description=Host or IP of the broker
+
+port.title=Port
+port.description=Port of the broker (e.g. 5672)
+
+user.title=User
+user.description=User to log in to the broker
+
+password.title=Password
+password.description=Password of the user
+
+exchange-name.title=Exchange Name
+exchange-name.description=Leave empty for default exchange
+