You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/02/24 21:46:45 UTC

[incubator-streampipes] branch dev updated (a3fc041 -> 1f87304)

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a change to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from a3fc041  [hotfix] Add missing header
     new 7a1b562  [STREAMPIPES-512] Add SASL and SSL support to kafka adapter and sink
     new 1f87304  [STREAMPIPES-512] Clean up some code

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../iiot/protocol/stream/KafkaProtocol.java        | 120 ++++++-----------
 .../strings.en                                     |  33 +++--
 .../streampipes-pipeline-elements-shared/pom.xml   |   5 +
 .../pe/shared/config/kafka/KafkaConfig.java        |  38 ++++--
 .../pe/shared/config/kafka/KafkaConnectUtils.java  | 118 +++++++++--------
 .../config/kafka/{ => kafka}/KafkaConfig.java      |  39 ++++--
 .../config/kafka/kafka/KafkaConnectUtils.java      | 143 +++++++++++++++++++++
 .../sinks/brokers/jvm/kafka/KafkaController.java   |  78 ++++++-----
 .../sinks/brokers/jvm/kafka/KafkaParameters.java   |   8 +-
 .../sinks/brokers/jvm/kafka/KafkaPublisher.java    |  29 +++--
 .../strings.en                                     |  14 +-
 .../messaging/kafka/SpKafkaConsumer.java           |  60 +++------
 .../messaging/kafka/SpKafkaProducer.java           |  34 ++---
 .../kafka/config/AbstractConfigFactory.java        |  20 +--
 .../kafka/config/ConsumerConfigFactory.java        |  15 ++-
 .../kafka/config/KafkaConfigAppender.java          |   8 ++
 .../kafka/config/ProducerConfigFactory.java        |  15 ++-
 .../kafka/security/KafkaSecurityConfig.java        |   7 +
 .../security/KafkaSecuritySaslPlainConfig.java     |  28 ++++
 .../kafka/security/KafkaSecuritySaslSSLConfig.java |  28 ++++
 .../KafkaSecurityUnauthenticatedPlainConfig.java   |  12 ++
 .../KafkaSecurityUnauthenticatedSSLConfig.java     |  14 ++
 .../performance/producer/DataSimulator.java        |   3 +-
 ...AbstractConfigurablePipelineElementBuilder.java |  20 +++
 .../distributed/runtime/DistributedRuntime.java    |   4 +-
 ui/cypress/support/utils/ParameterUtils.ts         |   1 +
 ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts    |   4 +-
 27 files changed, 583 insertions(+), 315 deletions(-)
 copy streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/{ => kafka}/KafkaConfig.java (58%)
 create mode 100644 streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java
 create mode 100644 streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
 create mode 100644 streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java
 create mode 100644 streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java
 create mode 100644 streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java
 create mode 100644 streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
 create mode 100644 streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java

[incubator-streampipes] 02/02: [STREAMPIPES-512] Clean up some code

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 1f87304d4bf92cc0b272964683ce2b1987cd23c6
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Thu Feb 24 22:45:40 2022 +0100

    [STREAMPIPES-512] Clean up some code
---
 .../iiot/protocol/stream/KafkaProtocol.java        | 37 +++++++---------------
 .../strings.en                                     |  6 ++++
 .../pe/shared/config/kafka/KafkaConfig.java        |  7 +++-
 .../pe/shared/config/kafka/KafkaConnectUtils.java  | 21 ------------
 .../config/kafka/{ => kafka}/KafkaConfig.java      |  8 +++--
 .../kafka/{ => kafka}/KafkaConnectUtils.java       | 25 ++++-----------
 .../sinks/brokers/jvm/kafka/KafkaPublisher.java    |  8 -----
 .../messaging/kafka/SpKafkaConsumer.java           | 20 ++----------
 .../messaging/kafka/SpKafkaProducer.java           | 15 ---------
 .../kafka/config/AbstractConfigFactory.java        | 15 ---------
 .../kafka/config/ConsumerConfigFactory.java        | 13 ++++----
 .../kafka/config/ProducerConfigFactory.java        | 15 +++++----
 .../serializer/KafkaSerializerByteArrayConfig.java |  4 ---
 .../kafka/serializer/KafkaSerializerConfig.java    | 13 --------
 ...AbstractConfigurablePipelineElementBuilder.java | 20 ++++++++++++
 .../distributed/runtime/DistributedRuntime.java    |  4 +--
 16 files changed, 76 insertions(+), 155 deletions(-)

diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
index e94e9a0..502aa7c 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
@@ -21,8 +21,7 @@ package org.apache.streampipes.connect.iiot.protocol.stream;
 import org.apache.commons.io.IOUtils;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.connect.SendToPipeline;
@@ -34,7 +33,6 @@ import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
-import org.apache.streampipes.messaging.kafka.security.*;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
@@ -47,10 +45,12 @@ import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.AdapterSourceType;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.util.*;
 import java.util.stream.Collectors;
@@ -109,7 +109,7 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
 
     @Override
     protected List<byte[]> getNByteElements(int n) throws ParseException {
-        final Consumer<Long, String> consumer;
+        final Consumer<byte[], byte[]> consumer;
 
         consumer = createConsumer(this.config);
         consumer.subscribe(Arrays.asList(this.topic), new ConsumerRebalanceListener() {
@@ -129,11 +129,11 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
 
 
         while (true) {
-            final ConsumerRecords<Long, String> consumerRecords =
+            final ConsumerRecords<byte[], byte[]> consumerRecords =
                     consumer.poll(1000);
 
             consumerRecords.forEach(record -> {
-                InputStream inputStream = IOUtils.toInputStream(record.value(), "UTF-8");
+                InputStream inputStream = new ByteArrayInputStream(record.value());
 
                 nEventsByte.addAll(parser.parseNEvents(inputStream, n));
             });
@@ -154,7 +154,7 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
         return resultEventsByte;
     }
 
-    private static Consumer<Long, String> createConsumer(KafkaConfig kafkaConfig) {
+    private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig) {
         final Properties props = new Properties();
 
         kafkaConfig.getSecurityConfig().appendConfig(props);
@@ -162,21 +162,18 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                 kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort());
 
-        // TODO make serializer configurable
         props.put(ConsumerConfig.GROUP_ID_CONFIG,
                 "KafkaExampleConsumer" + System.currentTimeMillis());
+
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                LongDeserializer.class.getName());
+                ByteArrayDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                StringDeserializer.class.getName());
+                ByteArrayDeserializer.class.getName());
 
         // Create the consumer using props.
-        final Consumer<Long, String> consumer =
+        final Consumer<byte[], byte[]> consumer =
                 new KafkaConsumer<>(props);
 
-        // Subscribe to the topic.
-        consumer.subscribe(Collections.singletonList(kafkaConfig.getTopic()));
-
         return consumer;
     }
 
@@ -221,18 +218,8 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
         KafkaConfig kafkaConfig = KafkaConnectUtils.getConfig(extractor, false);
         boolean hideInternalTopics = extractor.slideToggleValue(KafkaConnectUtils.getHideInternalTopicsKey());
 
-        String kafkaAddress = kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort();
-        Properties props = new Properties();
-
-        // add security properties to kafka configuration
-        kafkaConfig.getSecurityConfig().appendConfig(props);
-
-        props.put("bootstrap.servers", kafkaAddress);
-        props.put("group.id", "test-consumer-group");
-        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        Consumer<byte[], byte[]> consumer = createConsumer(kafkaConfig);
 
-        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
         Set<String> topics = consumer.listTopics().keySet();
         consumer.close();
 
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
index abf3588..027f512 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
@@ -33,5 +33,11 @@ sasl-ssl.description=Username and password, with ssl encryption
 
 username-group.title=Username and password
 
+key-deserialization.title=Key Deserializer
+key-deserialization.description=
+
+value-deserialization.title=Value Deserializer
+value-deserialization.description=
+
 hide-internal-topics.title=Hide internal topics
 hide-internal-topics.description=Do not show topics that are only used internally
diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
index 3b55309..a104c4a 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
@@ -28,7 +28,11 @@ public class KafkaConfig {
 
     KafkaSecurityConfig securityConfig;
 
-    public KafkaConfig(String kafkaHost, Integer kafkaPort, String topic, KafkaSecurityConfig securityConfig) {
+
+    public KafkaConfig(String kafkaHost,
+                       Integer kafkaPort,
+                       String topic,
+                       KafkaSecurityConfig securityConfig) {
         this.kafkaHost = kafkaHost;
         this.kafkaPort = kafkaPort;
         this.topic = topic;
@@ -66,4 +70,5 @@ public class KafkaConfig {
     public void setSecurityConfig(KafkaSecurityConfig securityConfig) {
         this.securityConfig = securityConfig;
     }
+
 }
diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
index 51b41c9..a8f1d2f 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
@@ -25,7 +25,6 @@ import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.Alternatives;
 import org.apache.streampipes.sdk.helpers.Label;
 import org.apache.streampipes.sdk.helpers.Labels;
-import org.checkerframework.checker.units.qual.K;
 
 public class KafkaConnectUtils {
 
@@ -33,13 +32,6 @@ public class KafkaConnectUtils {
     public static final String HOST_KEY = "host";
     public static final String PORT_KEY = "port";
 
-//    private static final String ACCESS_MODE = "access-mode";
-//    private static final String ANONYMOUS_ACCESS = "anonymous-alternative";
-//    private static final String USERNAME_ACCESS = "username-alternative";
-//    private static final String USERNAME_GROUP = "username-group";
-//    private static final String USERNAME_KEY = "username";
-//    private static final String PASSWORD_KEY = "password";
-
     public static final String ACCESS_MODE = "access-mode";
     public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain";
     public static final String UNAUTHENTICATED_SSL = "unauthenticated-ssl";
@@ -77,17 +69,6 @@ public class KafkaConnectUtils {
         return Labels.withId(ACCESS_MODE);
     }
 
-//    public static StaticPropertyAlternative getAlternativesOne() {
-//        return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS));
-//    }
-//
-//    public static StaticPropertyAlternative getAlternativesTwo() {
-//        return Alternatives.from(Labels.withId(USERNAME_ACCESS),
-//                StaticProperties.group(Labels.withId(USERNAME_GROUP),
-//                        StaticProperties.stringFreeTextProperty(Labels.withId(USERNAME_KEY)),
-//                        StaticProperties.secretValue(Labels.withId(PASSWORD_KEY))));
-//    }
-
     public static KafkaConfig getConfig(StaticPropertyExtractor extractor, boolean containsTopic) {
         String brokerUrl = extractor.singleValueParameter(HOST_KEY, String.class);
         String topic = "";
@@ -102,8 +83,6 @@ public class KafkaConnectUtils {
 
         KafkaSecurityConfig securityConfig;
 
-        //KafkaSerializerConfig serializerConfig = new KafkaSerializerByteArrayConfig();
-
         // check if a user for the authentication is defined
         if (authentication.equals(KafkaConnectUtils.SASL_SSL) || authentication.equals(KafkaConnectUtils.SASL_PLAIN)) {
             String username = extractor.singleValueParameter(USERNAME_KEY, String.class);
diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConfig.java
similarity index 88%
copy from streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
copy to streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConfig.java
index 3b55309..b2df331 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConfig.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.pe.shared.config.kafka;
+package org.apache.streampipes.pe.shared.config.kafka.kafka;
 
 import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
 
@@ -28,7 +28,10 @@ public class KafkaConfig {
 
     KafkaSecurityConfig securityConfig;
 
-    public KafkaConfig(String kafkaHost, Integer kafkaPort, String topic, KafkaSecurityConfig securityConfig) {
+    public KafkaConfig(String kafkaHost,
+                       Integer kafkaPort,
+                       String topic,
+                       KafkaSecurityConfig securityConfig) {
         this.kafkaHost = kafkaHost;
         this.kafkaPort = kafkaPort;
         this.topic = topic;
@@ -66,4 +69,5 @@ public class KafkaConfig {
     public void setSecurityConfig(KafkaSecurityConfig securityConfig) {
         this.securityConfig = securityConfig;
     }
+
 }
diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java
similarity index 84%
copy from streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
copy to streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java
index 51b41c9..147b2dc 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.pe.shared.config.kafka;
+package org.apache.streampipes.pe.shared.config.kafka.kafka;
 
 import org.apache.streampipes.messaging.kafka.security.*;
 import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
@@ -25,7 +25,6 @@ import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.Alternatives;
 import org.apache.streampipes.sdk.helpers.Label;
 import org.apache.streampipes.sdk.helpers.Labels;
-import org.checkerframework.checker.units.qual.K;
 
 public class KafkaConnectUtils {
 
@@ -33,12 +32,11 @@ public class KafkaConnectUtils {
     public static final String HOST_KEY = "host";
     public static final String PORT_KEY = "port";
 
-//    private static final String ACCESS_MODE = "access-mode";
-//    private static final String ANONYMOUS_ACCESS = "anonymous-alternative";
-//    private static final String USERNAME_ACCESS = "username-alternative";
-//    private static final String USERNAME_GROUP = "username-group";
-//    private static final String USERNAME_KEY = "username";
-//    private static final String PASSWORD_KEY = "password";
+    public static final String KEY_SERIALIZATION = "key-serialization";
+    public static final String VALUE_SERIALIZATION = "value-serialization";
+
+    public static final String KEY_DESERIALIZATION = "key-deserialization";
+    public static final String VALUE_DESERIALIZATION = "value-deserialization";
 
     public static final String ACCESS_MODE = "access-mode";
     public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain";
@@ -77,17 +75,6 @@ public class KafkaConnectUtils {
         return Labels.withId(ACCESS_MODE);
     }
 
-//    public static StaticPropertyAlternative getAlternativesOne() {
-//        return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS));
-//    }
-//
-//    public static StaticPropertyAlternative getAlternativesTwo() {
-//        return Alternatives.from(Labels.withId(USERNAME_ACCESS),
-//                StaticProperties.group(Labels.withId(USERNAME_GROUP),
-//                        StaticProperties.stringFreeTextProperty(Labels.withId(USERNAME_KEY)),
-//                        StaticProperties.secretValue(Labels.withId(PASSWORD_KEY))));
-//    }
-
     public static KafkaConfig getConfig(StaticPropertyExtractor extractor, boolean containsTopic) {
         String brokerUrl = extractor.singleValueParameter(HOST_KEY, String.class);
         String topic = "";
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java
index bd72731..52a0ed0 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java
@@ -22,8 +22,6 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
 import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
 import org.apache.streampipes.messaging.kafka.security.*;
-import org.apache.streampipes.messaging.kafka.serializer.KafkaSerializerByteArrayConfig;
-import org.apache.streampipes.messaging.kafka.serializer.KafkaSerializerConfig;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
@@ -40,12 +38,6 @@ public class KafkaPublisher implements EventSink<KafkaParameters> {
     this.dataFormatDefinition = new JsonDataFormatDefinition();
   }
 
-  // Serialization
-  // Key
-  // - StringSerializer
-  // Value
-  // - ByteArraySerializer
-
   @Override
   public void onInvocation(KafkaParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
     boolean useAuthentication = parameters.getAuthentication().equals(KafkaController.getSaslAccessKey());
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
index ed4f12a..449bf1c 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
@@ -71,28 +71,12 @@ public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, R
     this.appenders = appenders;
   }
 
-  // TODO backwards compatibility, remove later
-  public SpKafkaConsumer(String kafkaUrl,String topic,InternalEventProcessor<byte[]> callback,
-                         KafkaConfigAppender... appenders) {
-    KafkaTransportProtocol protocol = new KafkaTransportProtocol();
-    protocol.setKafkaPort(Integer.parseInt(kafkaUrl.split(":")[1]));
-    protocol.setBrokerHostname(kafkaUrl.split(":")[0]);
-    protocol.setTopicDefinition(new SimpleTopicDefinition(topic));
-    this.appenders = Arrays.asList(appenders);
-
-    try {
-      this.connect(protocol, callback);
-    } catch (SpRuntimeException e) {
-      e.printStackTrace();
-    }
-  }
-
   @Override
   public void run() {
 
     Properties props = makeProperties(protocol, appenders);
 
-    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
+    KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
     if (!patternTopic) {
       consumer.subscribe(Collections.singletonList(topic));
     } else {
@@ -111,7 +95,7 @@ public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, R
     }
     Duration duration = Duration.of(100, ChronoUnit.MILLIS);
     while (isRunning) {
-      ConsumerRecords<String, byte[]> records = consumer.poll(duration);
+      ConsumerRecords<byte[], byte[]> records = consumer.poll(duration);
       records.forEach(record -> {
         eventProcessor.onEvent(record.value());
       });
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index 1de82dc..7c46bb9 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -65,17 +65,6 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
     this.connected = true;
   }
 
-  // TODO backwards compatibility, remove later
-//  public SpKafkaProducer(String url, String topic, String username, String password, boolean ssl) {
-//    String[] urlParts = url.split(COLON);
-//    KafkaTransportProtocol protocol = new KafkaTransportProtocol(urlParts[0],
-//            Integer.parseInt(urlParts[1]), topic);
-//    this.brokerUrl = url;
-//    this.topic = topic;
-//    this.producer = new KafkaProducer<>(makePropertiesSaslPlain(protocol, username, password));
-//    this.connected = true;
-//  }
-
   public void publish(String message) {
     publish(message.getBytes());
   }
@@ -91,10 +80,6 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
     return new ProducerConfigFactory(protocol).buildProperties(appenders);
   }
 
-//  private Properties makePropertiesSaslPlain(KafkaTransportProtocol protocol, String username, String password) {
-//    return new ProducerConfigFactory(protocol).makePropertiesSaslPlain(username, password);
-//  }
-
   @Override
   public void connect(KafkaTransportProtocol protocol) {
     LOG.info("Kafka producer: Connecting to " + protocol.getTopicDefinition().getActualTopicName());
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java
index bac2870..8225986 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java
@@ -26,7 +26,6 @@ import java.util.function.Supplier;
 public abstract class AbstractConfigFactory {
 
   private static final String COLON = ":";
-  private static final String SASL_MECHANISM = "PLAIN";
 
   protected KafkaTransportProtocol protocol;
 
@@ -50,19 +49,5 @@ public abstract class AbstractConfigFactory {
     appenders.forEach(appender -> appender.appendConfig(props));
 
     return  props;
-    // TODO check Kafka Security
   }
-
-//  public Properties makePropertiesSaslPlain(String username,
-//                                            String password) {
-//    Properties props = makeProperties();
-//    props.put(SaslConfigs.SASL_MECHANISM,SASL_MECHANISM);
-//    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.toString());
-//
-////    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.toString());
-//
-//    String SASL_JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
-//    props.put(SaslConfigs.SASL_JAAS_CONFIG, SASL_JAAS_CONFIG);
-//    return props;
-//  }
 }
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
index 21550d7..30dfbdd 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
@@ -18,6 +18,8 @@
 package org.apache.streampipes.messaging.kafka.config;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 
 import java.util.Properties;
@@ -29,10 +31,8 @@ public class ConsumerConfigFactory extends AbstractConfigFactory {
   private static final String AUTO_COMMIT_INTERVAL_MS_CONFIG_DEFAULT = "5000";
   private static final String SESSION_TIMEOUT_MS_CONFIG_DEFAULT = "30000";
   private static final Integer FETCH_MAX_BYTES_CONFIG_DEFAULT = 52428800;
-  private static final String KEY_DESERIALIZER_CLASS_CONFIG_DEFAULT = "org.apache.kafka.common" +
-          ".serialization.StringDeserializer";
-  private static final String VALUE_DESERIALIZER_CLASS_CONFIG_DEFAULT = "org.apache.kafka.common" +
-          ".serialization.ByteArrayDeserializer";
+  private static final String KEY_DESERIALIZER_CLASS_CONFIG_DEFAULT = ByteArrayDeserializer.class.getName();
+  private static final String VALUE_DESERIALIZER_CLASS_CONFIG_DEFAULT = ByteArrayDeserializer.class.getName();
 
   public ConsumerConfigFactory(KafkaTransportProtocol protocol) {
     super(protocol);
@@ -42,8 +42,6 @@ public class ConsumerConfigFactory extends AbstractConfigFactory {
   public Properties makeDefaultProperties() {
     Properties props = new Properties();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
-//    props.put(ConsumerConfig.GROUP_ID_CONFIG, getConfigOrDefault(protocol::getGroupId,
-//            UUID.randomUUID().toString()));
     props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT_CONFIG_DEFAULT);
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
@@ -51,8 +49,11 @@ public class ConsumerConfigFactory extends AbstractConfigFactory {
     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, SESSION_TIMEOUT_MS_CONFIG_DEFAULT);
     props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
             getConfigOrDefault(protocol::getMessageMaxBytes, FETCH_MAX_BYTES_CONFIG_DEFAULT));
+
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER_CLASS_CONFIG_DEFAULT);
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER_CLASS_CONFIG_DEFAULT);
+
+
     props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
 
     return props;
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
index bf76d57..23426d1 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
@@ -17,9 +17,9 @@
  */
 package org.apache.streampipes.messaging.kafka.config;
 
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 
 import java.util.Properties;
@@ -33,14 +33,15 @@ public class ProducerConfigFactory extends AbstractConfigFactory {
   private static final Integer BUFFER_MEMORY_CONFIG_DEFAULT = 33554432;
   private static final Integer MAX_REQUEST_SIZE_CONFIG_DEFAULT = 5000012;
 
-  private static final String KEY_SERIALIZER_DEFAULT = "org.apache.kafka.common.serialization" +
-          ".StringSerializer";
-  private static final String VALUE_SERIALIZER_DEFAULT = "org.apache.kafka.common.serialization" +
-          ".ByteArraySerializer";
+  private static final String KEY_SERIALIZER_DEFAULT = StringSerializer.class.getName();
+  private static final String VALUE_SERIALIZER_DEFAULT = ByteArraySerializer.class.getName();
+
 
 
   public ProducerConfigFactory(KafkaTransportProtocol protocol) {
     super(protocol);
+
+
   }
 
   @Override
@@ -57,8 +58,10 @@ public class ProducerConfigFactory extends AbstractConfigFactory {
     props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, getConfigOrDefault(protocol::getMaxRequestSize,
             MAX_REQUEST_SIZE_CONFIG_DEFAULT));
     props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, BUFFER_MEMORY_CONFIG_DEFAULT);
+
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER_DEFAULT);
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_DEFAULT);
+
     return props;
   }
 
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerByteArrayConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerByteArrayConfig.java
deleted file mode 100644
index 40d4bb2..0000000
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerByteArrayConfig.java
+++ /dev/null
@@ -1,4 +0,0 @@
-package org.apache.streampipes.messaging.kafka.serializer;
-
-public class KafkaSerializerByteArrayConfig extends KafkaSerializerConfig {
-}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerConfig.java
deleted file mode 100644
index a317c75..0000000
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerConfig.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.streampipes.messaging.kafka.serializer;
-
-import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
-
-import java.util.Properties;
-
-public class KafkaSerializerConfig implements KafkaConfigAppender {
-
-    @Override
-    public void appendConfig(Properties props) {
-
-    }
-}
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractConfigurablePipelineElementBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractConfigurablePipelineElementBuilder.java
index af6df4e..b3b4cfb 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractConfigurablePipelineElementBuilder.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractConfigurablePipelineElementBuilder.java
@@ -696,9 +696,29 @@ public abstract class AbstractConfigurablePipelineElementBuilder<BU extends
 
     this.staticProperties.add(osp);
     return me();
+  }
+
+  /**
+   * Defines a configuration parameter that lets preprocessing developers select from a list of pre-defined configuration
+   * options. The parameter will be rendered as a RadioGroup in the StreamPipes UI.
+   * @param label The {@link org.apache.streampipes.sdk.helpers.Label} that describes why this parameter is needed in a
+   *              user-friendly manner.
+   * @param options A list of {@link org.apache.streampipes.model.staticproperty.Option} elements. Use
+   * @param horizontalRendering when set to true
+   * {@link org.apache.streampipes.sdk.helpers.Options} to create option elements from string values.
+   * @return this
+   */
+  public BU requiredSingleValueSelection(Label label,
+                                         List<Option> options,
+                                         boolean horizontalRendering) {
+    OneOfStaticProperty osp = new OneOfStaticProperty(label.getInternalId(), label.getLabel(), label.getDescription(), horizontalRendering);
+    osp.setOptions(options);
 
+    this.staticProperties.add(osp);
+    return me();
   }
 
+
   /**
    * @deprecated Use {@link #requiredMultiValueSelection(Label, Option...)} instead.
    * @param internalId
diff --git a/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java b/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java
index 70f61f8..4afc4fa 100644
--- a/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java
+++ b/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java
@@ -64,11 +64,11 @@ public abstract class DistributedRuntime<RP extends RuntimeParams<B, I, RC>, B e
   }
 
   protected Properties getProperties(KafkaTransportProtocol protocol) {
-    return new ConsumerConfigFactory(protocol).makeProperties();
+    return new ConsumerConfigFactory(protocol).makeDefaultProperties();
   }
 
   protected Properties getProducerProperties(KafkaTransportProtocol protocol) {
-    return new ProducerConfigFactory(protocol).makeProperties();
+    return new ProducerConfigFactory(protocol).makeDefaultProperties();
   }
 
   protected SpDataFormatDefinition getDataFormatDefinition(TransportFormat transportFormat) {

[incubator-streampipes] 01/02: [STREAMPIPES-512] Add SASL and SSL support to kafka adapter and sink

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 7a1b5628b50123695aa9527879f416e42c1c5efc
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Thu Feb 24 13:44:51 2022 +0100

    [STREAMPIPES-512] Add SASL and SSL support to kafka adapter and sink
---
 .../iiot/protocol/stream/KafkaProtocol.java        |  99 +++++----------
 .../strings.en                                     |  27 ++--
 .../streampipes-pipeline-elements-shared/pom.xml   |   5 +
 .../pe/shared/config/kafka/KafkaConfig.java        |  35 ++++--
 .../pe/shared/config/kafka/KafkaConnectUtils.java  | 139 +++++++++++++--------
 .../sinks/brokers/jvm/kafka/KafkaController.java   |  78 ++++++------
 .../sinks/brokers/jvm/kafka/KafkaParameters.java   |   8 +-
 .../sinks/brokers/jvm/kafka/KafkaPublisher.java    |  37 ++++--
 .../strings.en                                     |  14 ++-
 .../messaging/kafka/SpKafkaConsumer.java           |  50 ++++----
 .../messaging/kafka/SpKafkaProducer.java           |  43 +++----
 .../kafka/config/AbstractConfigFactory.java        |  33 +++--
 .../kafka/config/ConsumerConfigFactory.java        |   2 +-
 .../kafka/config/KafkaConfigAppender.java          |   8 ++
 .../kafka/config/ProducerConfigFactory.java        |   4 +-
 .../kafka/security/KafkaSecurityConfig.java        |   7 ++
 .../security/KafkaSecuritySaslPlainConfig.java     |  28 +++++
 .../kafka/security/KafkaSecuritySaslSSLConfig.java |  28 +++++
 .../KafkaSecurityUnauthenticatedPlainConfig.java   |  12 ++
 .../KafkaSecurityUnauthenticatedSSLConfig.java     |  14 +++
 .../serializer/KafkaSerializerByteArrayConfig.java |   4 +
 .../kafka/serializer/KafkaSerializerConfig.java    |  13 ++
 .../performance/producer/DataSimulator.java        |   3 +-
 ui/cypress/support/utils/ParameterUtils.ts         |   1 +
 ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts    |   4 +-
 25 files changed, 438 insertions(+), 258 deletions(-)

diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
index b198664..e94e9a0 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
@@ -19,11 +19,8 @@
 package org.apache.streampipes.connect.iiot.protocol.stream;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
@@ -37,6 +34,7 @@ import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
+import org.apache.streampipes.messaging.kafka.security.*;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
@@ -47,6 +45,7 @@ import org.apache.streampipes.pe.shared.config.kafka.KafkaConnectUtils;
 import org.apache.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
 import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.AdapterSourceType;
+import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.slf4j.Logger;
@@ -78,7 +77,7 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
     public Protocol getInstance(ProtocolDescription protocolDescription, IParser parser, IFormat format) {
         StaticPropertyExtractor extractor = StaticPropertyExtractor
                 .from(protocolDescription.getConfig(), new ArrayList<>());
-        this.config = KafkaConnectUtils.getConfig(extractor);
+        this.config = KafkaConnectUtils.getConfig(extractor, true);
 
         return new KafkaProtocol(parser, format, config);
     }
@@ -90,27 +89,29 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
                 .withLocales(Locales.EN)
                 .category(AdapterType.Generic, AdapterType.Manufacturing)
                 .sourceType(AdapterSourceType.STREAM)
-                .requiredAlternatives(KafkaConnectUtils.getAccessModeLabel(),
-                        KafkaConnectUtils.getAlternativesOne(), KafkaConnectUtils.getAlternativesTwo())
+
+                .requiredAlternatives(Labels.withId(KafkaConnectUtils.ACCESS_MODE),
+                        KafkaConnectUtils.getAlternativeUnauthenticatedPlain(),
+                        KafkaConnectUtils.getAlternativeUnauthenticatedSSL(),
+                        KafkaConnectUtils.getAlternativesSaslPlain(),
+                        KafkaConnectUtils.getAlternativesSaslSSL())
+
                 .requiredTextParameter(KafkaConnectUtils.getHostLabel())
                 .requiredIntegerParameter(KafkaConnectUtils.getPortLabel())
+
                 .requiredSlideToggle(KafkaConnectUtils.getHideInternalTopicsLabel(), true)
+
                 .requiredSingleValueSelectionFromContainer(KafkaConnectUtils.getTopicLabel(), Arrays.asList(
-                        KafkaConnectUtils.getHostKey(),
-                        KafkaConnectUtils.getPortKey()))
+                        KafkaConnectUtils.HOST_KEY,
+                        KafkaConnectUtils.PORT_KEY))
                 .build();
     }
 
     @Override
     protected List<byte[]> getNByteElements(int n) throws ParseException {
         final Consumer<Long, String> consumer;
-        if (authenticationRequired()) {
-            consumer = createConsumer(this.brokerUrl, this.topic, config.getUsername(), config.getPassword());
-        }
-        else {
-            consumer = createConsumer(this.brokerUrl, this.topic);
-        }
 
+        consumer = createConsumer(this.config);
         consumer.subscribe(Arrays.asList(this.topic), new ConsumerRebalanceListener() {
             @Override
             public void onPartitionsRevoked(Collection<TopicPartition> collection) {
@@ -124,7 +125,7 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
         });
 
         List<byte[]> nEventsByte = new ArrayList<>();
-        List<byte[]> resultEventsByte = new ArrayList<>();
+        List<byte[]> resultEventsByte;
 
 
         while (true) {
@@ -153,53 +154,33 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
         return resultEventsByte;
     }
 
-    private static Consumer<Long, String> createConsumer(String broker, String topic) {
+    private static Consumer<Long, String> createConsumer(KafkaConfig kafkaConfig) {
         final Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                broker);
-
-        props.put(ConsumerConfig.GROUP_ID_CONFIG,
-                "KafkaExampleConsumer" + System.currentTimeMillis());
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                LongDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                StringDeserializer.class.getName());
-
-        // Create the consumer using props.
-        final Consumer<Long, String> consumer =
-                new KafkaConsumer<>(props);
-
-        // Subscribe to the topic.
-        consumer.subscribe(Collections.singletonList(topic));
 
-        return consumer;
-    }
+        kafkaConfig.getSecurityConfig().appendConfig(props);
 
-    private static Consumer<Long, String> createConsumer(String broker, String topic, String username, String password) {
-        final Properties props = new Properties();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                broker);
+                kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort());
 
+        // TODO make serializer configurable
         props.put(ConsumerConfig.GROUP_ID_CONFIG,
                 "KafkaExampleConsumer" + System.currentTimeMillis());
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                 LongDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                 StringDeserializer.class.getName());
-        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";");
-        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
-        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.toString());
 
         // Create the consumer using props.
         final Consumer<Long, String> consumer =
                 new KafkaConsumer<>(props);
 
         // Subscribe to the topic.
-        consumer.subscribe(Collections.singletonList(topic));
+        consumer.subscribe(Collections.singletonList(kafkaConfig.getTopic()));
 
         return consumer;
     }
 
+
     @Override
     public void run(IAdapterPipeline adapterPipeline) {
         SendToPipeline stk = new SendToPipeline(format, adapterPipeline);
@@ -207,13 +188,11 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
         protocol.setKafkaPort(config.getKafkaPort());
         protocol.setBrokerHostname(config.getKafkaHost());
         protocol.setTopicDefinition(new SimpleTopicDefinition(topic));
-        if (authenticationRequired()) {
-            this.kafkaConsumer = new SpKafkaConsumer(protocol, config.getTopic(), new EventProcessor(stk),
-                    config.getUsername(), config.getPassword());
-        }
-        else {
-            this.kafkaConsumer = new SpKafkaConsumer(protocol, config.getTopic(), new EventProcessor(stk));
-        }
+
+        this.kafkaConsumer = new SpKafkaConsumer(protocol,
+                config.getTopic(),
+                new EventProcessor(stk),
+                Arrays.asList(this.config.getSecurityConfig()));
 
         thread = new Thread(this.kafkaConsumer);
         thread.start();
@@ -239,28 +218,20 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
 
     @Override
     public List<Option> resolveOptions(String requestId, StaticPropertyExtractor extractor) {
-        String kafkaHost = extractor.singleValueParameter(KafkaConnectUtils.getHostKey(), String.class);
-        Integer kafkaPort = extractor.singleValueParameter(KafkaConnectUtils.getPortKey(), Integer.class);
-        String authenticationMode = extractor.selectedAlternativeInternalId(KafkaConnectUtils.getAccessModeKey());
+        KafkaConfig kafkaConfig = KafkaConnectUtils.getConfig(extractor, false);
         boolean hideInternalTopics = extractor.slideToggleValue(KafkaConnectUtils.getHideInternalTopicsKey());
-        boolean useAuthentication = authenticationMode.equals(KafkaConnectUtils.getSaslAccessKey());
-
-        String kafkaAddress = kafkaHost + ":" + kafkaPort;
 
+        String kafkaAddress = kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort();
         Properties props = new Properties();
+
+        // add security properties to kafka configuration
+        kafkaConfig.getSecurityConfig().appendConfig(props);
+
         props.put("bootstrap.servers", kafkaAddress);
         props.put("group.id", "test-consumer-group");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
-        if (useAuthentication) {
-            String username = extractor.singleValueParameter(KafkaConnectUtils.getUsernameKey(), String.class);
-            String password = extractor.secretValue(KafkaConnectUtils.getPasswordKey());
-            props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";");
-            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
-            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.toString());
-        }
-
         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
         Set<String> topics = consumer.listTopics().keySet();
         consumer.close();
@@ -295,8 +266,4 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
     public String getId() {
         return ID;
     }
-
-    private boolean authenticationRequired() {
-        return config.getAuthentication().equals(KafkaConnectUtils.getSaslAccessKey());
-    }
 }
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
index d41f01c..abf3588 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
@@ -10,23 +10,28 @@ port.description=9092
 topic.title=Topic
 topic.description=Example: test.topic
 
+username.title=Username
+username.description=
+
+password.title=Password
+password.description=
+
 access-mode.title=Access Mode
-access-mode.description=
+access-mode.description=Unauthenticated or SASL/PLAIN
 
-anonymous-alternative.title=Unauthenticated
-anonymous-alternative.description=
+unauthenticated-plain.title=Unauthenticated Plain
+unauthenticated-plain.description=No authentication and plaintext
 
-username-alternative.title=Username/Password
-username-alternative.description=
+unauthenticated-ssl.title=Unauthenticated SSL
+unauthenticated-ssl.description=Using SSL with no authentication
 
-username-group.title=User Group
-username-group.description=
+sasl-plain.title=SASL/PLAIN
+sasl-plain.description=Username and password, no encryption
 
-username.title=Username
-username.description=
+sasl-ssl.title=SASL/SSL
+sasl-ssl.description=Username and password, with ssl encryption
 
-password.title=Password
-password.description=
+username-group.title=Username and password
 
 hide-internal-topics.title=Hide internal topics
 hide-internal-topics.description=Do not show topics that are only used internally
diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/pom.xml b/streampipes-extensions/streampipes-pipeline-elements-shared/pom.xml
index 69dee79..db91cac 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/pom.xml
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/pom.xml
@@ -56,6 +56,11 @@
             <artifactId>streampipes-model</artifactId>
             <version>0.69.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-messaging-kafka</artifactId>
+            <version>0.69.0-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 </project>
 
diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
index e0a9a27..3b55309 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
@@ -18,41 +18,52 @@
 
 package org.apache.streampipes.pe.shared.config.kafka;
 
+import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
+
 public class KafkaConfig {
 
     private String kafkaHost;
     private Integer kafkaPort;
     private String topic;
-    private String authentication;
-    private String username;
-    private String password;
 
-    public KafkaConfig(String kafkaHost, Integer kafkaPort, String topic,
-                       String authentication, String username, String password) {
+    KafkaSecurityConfig securityConfig;
+
+    public KafkaConfig(String kafkaHost, Integer kafkaPort, String topic, KafkaSecurityConfig securityConfig) {
         this.kafkaHost = kafkaHost;
         this.kafkaPort = kafkaPort;
         this.topic = topic;
-        this.authentication = authentication;
-        this.username = username;
-        this.password = password;
+        this.securityConfig = securityConfig;
     }
 
     public String getKafkaHost() {
         return kafkaHost;
     }
 
+    public void setKafkaHost(String kafkaHost) {
+        this.kafkaHost = kafkaHost;
+    }
+
     public Integer getKafkaPort() {
         return kafkaPort;
     }
 
+    public void setKafkaPort(Integer kafkaPort) {
+        this.kafkaPort = kafkaPort;
+    }
+
     public String getTopic() {
         return topic;
     }
 
-    public String getUsername() { return username; }
-
-    public String getPassword() { return password; }
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
 
-    public String getAuthentication() { return authentication; }
+    public KafkaSecurityConfig getSecurityConfig() {
+        return securityConfig;
+    }
 
+    public void setSecurityConfig(KafkaSecurityConfig securityConfig) {
+        this.securityConfig = securityConfig;
+    }
 }
diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
index 4238fae..51b41c9 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
@@ -18,38 +18,40 @@
 
 package org.apache.streampipes.pe.shared.config.kafka;
 
+import org.apache.streampipes.messaging.kafka.security.*;
 import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
 import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.Alternatives;
 import org.apache.streampipes.sdk.helpers.Label;
 import org.apache.streampipes.sdk.helpers.Labels;
+import org.checkerframework.checker.units.qual.K;
 
 public class KafkaConnectUtils {
 
-    private static final String TOPIC_KEY = "topic";
-    private static final String HOST_KEY = "host";
-    private static final String PORT_KEY = "port";
-    private static final String ACCESS_MODE = "access-mode";
-    private static final String ANONYMOUS_ACCESS = "anonymous-alternative";
-    private static final String USERNAME_ACCESS = "username-alternative";
-    private static final String USERNAME_GROUP = "username-group";
-    private static final String USERNAME_KEY = "username";
-    private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics";
+    public static final String TOPIC_KEY = "topic";
+    public static final String HOST_KEY = "host";
+    public static final String PORT_KEY = "port";
 
-    public static String getUsernameKey() {
-        return USERNAME_KEY;
-    }
+//    private static final String ACCESS_MODE = "access-mode";
+//    private static final String ANONYMOUS_ACCESS = "anonymous-alternative";
+//    private static final String USERNAME_ACCESS = "username-alternative";
+//    private static final String USERNAME_GROUP = "username-group";
+//    private static final String USERNAME_KEY = "username";
+//    private static final String PASSWORD_KEY = "password";
 
-    public static String getPasswordKey() {
-        return PASSWORD_KEY;
-    }
+    public static final String ACCESS_MODE = "access-mode";
+    public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain";
+    public static final String UNAUTHENTICATED_SSL = "unauthenticated-ssl";
+    public static final String SASL_PLAIN = "sasl-plain";
+    public static final String SASL_SSL = "sasl-ssl";
 
-    private static final String PASSWORD_KEY = "password";
+    public static final String USERNAME_GROUP = "username-group";
+    public static final String USERNAME_KEY = "username";
+    public static final String PASSWORD_KEY = "password";
 
-    public static String getAccessModeKey() {
-        return ACCESS_MODE;
-    }
+
+    private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics";
 
     public static Label getTopicLabel() {
         return Labels.withId(TOPIC_KEY);
@@ -67,14 +69,6 @@ public class KafkaConnectUtils {
         return Labels.withId(HOST_KEY);
     }
 
-    public static String getHostKey() {
-        return HOST_KEY;
-    }
-
-    public static String getPortKey() {
-        return PORT_KEY;
-    }
-
     public static Label getPortLabel() {
         return Labels.withId(PORT_KEY);
     }
@@ -83,37 +77,80 @@ public class KafkaConnectUtils {
         return Labels.withId(ACCESS_MODE);
     }
 
-    public static String getSaslAccessKey() {
-        return USERNAME_ACCESS;
+//    public static StaticPropertyAlternative getAlternativesOne() {
+//        return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS));
+//    }
+//
+//    public static StaticPropertyAlternative getAlternativesTwo() {
+//        return Alternatives.from(Labels.withId(USERNAME_ACCESS),
+//                StaticProperties.group(Labels.withId(USERNAME_GROUP),
+//                        StaticProperties.stringFreeTextProperty(Labels.withId(USERNAME_KEY)),
+//                        StaticProperties.secretValue(Labels.withId(PASSWORD_KEY))));
+//    }
+
+    public static KafkaConfig getConfig(StaticPropertyExtractor extractor, boolean containsTopic) {
+        String brokerUrl = extractor.singleValueParameter(HOST_KEY, String.class);
+        String topic = "";
+        if (containsTopic) {
+            topic = extractor.selectedSingleValue(TOPIC_KEY, String.class);
+        }
+
+        Integer port = extractor.singleValueParameter(PORT_KEY, Integer.class);
+
+        String authentication = extractor.selectedAlternativeInternalId(ACCESS_MODE);
+        boolean isUseSSL = isUseSSL(authentication);
+
+        KafkaSecurityConfig securityConfig;
+
+        //KafkaSerializerConfig serializerConfig = new KafkaSerializerByteArrayConfig();
+
+        // check if a user for the authentication is defined
+        if (authentication.equals(KafkaConnectUtils.SASL_SSL) || authentication.equals(KafkaConnectUtils.SASL_PLAIN)) {
+            String username = extractor.singleValueParameter(USERNAME_KEY, String.class);
+            String password = extractor.secretValue(PASSWORD_KEY);
+
+            securityConfig = isUseSSL ?
+                    new KafkaSecuritySaslSSLConfig(username, password) :
+                    new KafkaSecuritySaslPlainConfig(username, password);
+        } else {
+            // set security config for none authenticated access
+            securityConfig = isUseSSL ?
+                    new KafkaSecurityUnauthenticatedSSLConfig() :
+                    new KafkaSecurityUnauthenticatedPlainConfig();
+        }
+
+        return new KafkaConfig(brokerUrl, port, topic, securityConfig);
+    }
+
+    private static boolean isUseSSL(String authentication) {
+        if (authentication.equals(KafkaConnectUtils.UNAUTHENTICATED_PLAIN) ||
+                authentication.equals(KafkaConnectUtils.SASL_PLAIN)) {
+            return false;
+        } else {
+            return true;
+        }
     }
 
-    public static String getAnonymousAccessKey() {
-        return ANONYMOUS_ACCESS;
+
+    public static StaticPropertyAlternative getAlternativeUnauthenticatedPlain() {
+        return Alternatives.from(Labels.withId(KafkaConnectUtils.UNAUTHENTICATED_PLAIN));
     }
 
-    public static StaticPropertyAlternative getAlternativesOne() {
-        return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS));
+    public static StaticPropertyAlternative getAlternativeUnauthenticatedSSL() {
+        return Alternatives.from(Labels.withId(KafkaConnectUtils.UNAUTHENTICATED_SSL));
     }
 
-    public static StaticPropertyAlternative getAlternativesTwo() {
-        return Alternatives.from(Labels.withId(USERNAME_ACCESS),
-                StaticProperties.group(Labels.withId(USERNAME_GROUP),
-                        StaticProperties.stringFreeTextProperty(Labels.withId(USERNAME_KEY)),
-                        StaticProperties.secretValue(Labels.withId(PASSWORD_KEY))));
+    public static StaticPropertyAlternative getAlternativesSaslPlain() {
+        return Alternatives.from(Labels.withId(KafkaConnectUtils.SASL_PLAIN),
+                StaticProperties.group(Labels.withId(KafkaConnectUtils.USERNAME_GROUP),
+                        StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConnectUtils.USERNAME_KEY)),
+                        StaticProperties.secretValue(Labels.withId(KafkaConnectUtils.PASSWORD_KEY))));
     }
 
-    public static KafkaConfig getConfig(StaticPropertyExtractor extractor) {
-        String brokerUrl = extractor.singleValueParameter(HOST_KEY, String.class);
-        String topic = extractor.selectedSingleValue(TOPIC_KEY, String.class);
-        Integer port = extractor.singleValueParameter(PORT_KEY, Integer.class);
-        String authentication = extractor.selectedAlternativeInternalId(ACCESS_MODE);
-        if (authentication.equals(USERNAME_ACCESS)) {
-            String password = extractor.secretValue(PASSWORD_KEY);
-            String username = extractor.singleValueParameter(USERNAME_KEY, String.class);
-            return new KafkaConfig(brokerUrl, port, topic, authentication, username, password);
-        }
-        else {
-            return new KafkaConfig(brokerUrl, port, topic, authentication, null, null);
-        }
+    public static StaticPropertyAlternative getAlternativesSaslSSL() {
+        return Alternatives.from(Labels.withId(KafkaConnectUtils.SASL_SSL),
+                StaticProperties.group(Labels.withId(KafkaConnectUtils.USERNAME_GROUP),
+                        StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConnectUtils.USERNAME_KEY)),
+                        StaticProperties.secretValue(Labels.withId(KafkaConnectUtils.PASSWORD_KEY))));
     }
 }
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java
index 13a2f48..1e1a94e 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java
@@ -22,6 +22,7 @@ import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.pe.shared.config.kafka.KafkaConnectUtils;
 import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
@@ -36,19 +37,15 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDec
 
 public class KafkaController extends StandaloneEventSinkDeclarer<KafkaParameters> {
 
-//  private static final String KAFKA_BROKER_SETTINGS_KEY = "broker-settings";
-  private static final String TOPIC_KEY = "topic";
-  private static final String HOST_KEY = "host";
-  private static final String PORT_KEY = "port";
-  private static final String ACCESS_MODE = "access-mode";
-  private static final String ANONYMOUS_ACCESS = "anonymous-alternative";
-  private static final String USERNAME_ACCESS = "username-alternative";
-  private static final String USERNAME_GROUP = "username-group";
-  private static final String USERNAME_KEY = "username";
-  private static final String PASSWORD_KEY = "password";
-
-//  private static final String KAFKA_HOST_URI = "http://schema.org/kafkaHost";
-//  private static final String KAFKA_PORT_URI = "http://schema.org/kafkaPort";
+//  private static final String ACCESS_MODE = "access-mode";
+//  private static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain";
+//  private static final String UNAUTHENTICATED_SSL = "unauthenticated-ssl";
+//  private static final String SASL_PLAIN = "sasl-plain";
+//  private static final String SASL_SSL = "sasl-ssl";
+//
+//  private static final String USERNAME_GROUP = "username-group";
+//  private static final String USERNAME_KEY = "username";
+//  private static final String PASSWORD_KEY = "password";
 
   @Override
   public DataSinkDescription declareModel() {
@@ -60,49 +57,48 @@ public class KafkaController extends StandaloneEventSinkDeclarer<KafkaParameters
                     .create()
                     .requiredProperty(EpRequirements.anyProperty())
                     .build())
-            .requiredTextParameter(Labels.withId(TOPIC_KEY), false, false)
-            .requiredTextParameter(Labels.withId(HOST_KEY), false, false)
-            .requiredIntegerParameter(Labels.withId(PORT_KEY), 9092)
-            .requiredAlternatives(Labels.withId(ACCESS_MODE), getAlternativesOne(), getAlternativesTwo())
+
+            .requiredTextParameter(Labels.withId(KafkaConnectUtils.TOPIC_KEY), false, false)
+            .requiredTextParameter(Labels.withId(KafkaConnectUtils.HOST_KEY), false, false)
+            .requiredIntegerParameter(Labels.withId(KafkaConnectUtils.PORT_KEY), 9092)
+
+            .requiredAlternatives(Labels.withId(KafkaConnectUtils.ACCESS_MODE),
+                    KafkaConnectUtils.getAlternativeUnauthenticatedPlain(),
+                    KafkaConnectUtils.getAlternativeUnauthenticatedSSL(),
+                    KafkaConnectUtils.getAlternativesSaslPlain(),
+                    KafkaConnectUtils.getAlternativesSaslSSL())
             .build();
   }
 
   @Override
   public ConfiguredEventSink<KafkaParameters> onInvocation(DataSinkInvocation graph,
                                                            DataSinkParameterExtractor extractor) {
-    String topic = extractor.singleValueParameter(TOPIC_KEY, String.class);
+    String topic = extractor.singleValueParameter(KafkaConnectUtils.TOPIC_KEY, String.class);
 
-    String kafkaHost = extractor.singleValueParameter(HOST_KEY, String.class);
-    Integer kafkaPort = extractor.singleValueParameter(PORT_KEY, Integer.class);
-    String authentication = extractor.selectedAlternativeInternalId(ACCESS_MODE);
+    String kafkaHost = extractor.singleValueParameter(KafkaConnectUtils.HOST_KEY, String.class);
+    Integer kafkaPort = extractor.singleValueParameter(KafkaConnectUtils.PORT_KEY, Integer.class);
+    String authentication = extractor.selectedAlternativeInternalId(KafkaConnectUtils.ACCESS_MODE);
 
     KafkaParameters params;
-    if (authentication.equals(ANONYMOUS_ACCESS)) {
-      params = new KafkaParameters(graph, kafkaHost, kafkaPort, topic, authentication, null, null);
-    }
-    else {
-      String username = extractor.singleValueParameter(USERNAME_KEY, String.class);
-      String password = extractor.secretValue(PASSWORD_KEY);
-      params = new KafkaParameters(graph, kafkaHost, kafkaPort, topic, authentication, username, password);
+    if (authentication.equals(KafkaConnectUtils.UNAUTHENTICATED_PLAIN)) {
+      params = new KafkaParameters(graph, kafkaHost, kafkaPort, topic, authentication, null, null, false);
+    } else if (authentication.equals(KafkaConnectUtils.UNAUTHENTICATED_SSL)) {
+      params = new KafkaParameters(graph, kafkaHost, kafkaPort, topic, authentication, null, null, true);
+    } else {
+      String username = extractor.singleValueParameter(KafkaConnectUtils.USERNAME_KEY, String.class);
+      String password = extractor.secretValue(KafkaConnectUtils.PASSWORD_KEY);
+      if (authentication.equals(KafkaConnectUtils.SASL_PLAIN)) {
+        params = new KafkaParameters(graph, kafkaHost, kafkaPort, topic, authentication, username, password, false);
+      } else {
+        params = new KafkaParameters(graph, kafkaHost, kafkaPort, topic, authentication, username, password, true);
+      }
     }
 
     return new ConfiguredEventSink<>(params, KafkaPublisher::new);
   }
 
-  public static StaticPropertyAlternative getAlternativesOne() {
-    return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS));
-
-  }
-
-  public static StaticPropertyAlternative getAlternativesTwo() {
-    return Alternatives.from(Labels.withId(USERNAME_ACCESS),
-            StaticProperties.group(Labels.withId(USERNAME_GROUP),
-                    StaticProperties.stringFreeTextProperty(Labels.withId(USERNAME_KEY)),
-                    StaticProperties.secretValue(Labels.withId(PASSWORD_KEY))));
-
-  }
 
   public static  String getSaslAccessKey() {
-    return USERNAME_ACCESS;
+    return KafkaConnectUtils.SASL_PLAIN;
   }
 }
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaParameters.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaParameters.java
index 79f8bdc..de2e1fb 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaParameters.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaParameters.java
@@ -30,9 +30,10 @@ public class KafkaParameters extends EventSinkBindingParams {
   private String authentication;
   private String username;
   private String password;
+  private boolean useSSL;
 
   public KafkaParameters(DataSinkInvocation graph, String kafkaHost, Integer kafkaPort, String topic,
-                         String authentication, String username, String password) {
+                         String authentication, String username, String password, boolean useSSL) {
     super(graph);
     this.kafkaHost = kafkaHost;
     this.kafkaPort = kafkaPort;
@@ -40,6 +41,7 @@ public class KafkaParameters extends EventSinkBindingParams {
     this.authentication = authentication;
     this.username = username;
     this.password = password;
+    this.useSSL = useSSL;
   }
 
   public String getKafkaHost() {
@@ -59,4 +61,8 @@ public class KafkaParameters extends EventSinkBindingParams {
   public String getPassword() { return password; }
 
   public String getAuthentication() { return authentication; }
+
+  public boolean isUseSSL() {
+    return useSSL;
+  }
 }
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java
index bf2a44d..bd72731 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java
@@ -21,10 +21,14 @@ package org.apache.streampipes.sinks.brokers.jvm.kafka;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
 import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
+import org.apache.streampipes.messaging.kafka.security.*;
+import org.apache.streampipes.messaging.kafka.serializer.KafkaSerializerByteArrayConfig;
+import org.apache.streampipes.messaging.kafka.serializer.KafkaSerializerConfig;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
 
+import java.util.Arrays;
 import java.util.Map;
 
 public class KafkaPublisher implements EventSink<KafkaParameters> {
@@ -36,19 +40,36 @@ public class KafkaPublisher implements EventSink<KafkaParameters> {
     this.dataFormatDefinition = new JsonDataFormatDefinition();
   }
 
+  // Serialization
+  // Key
+  // - StringSerializer
+  // Value
+  // - ByteArraySerializer
+
   @Override
   public void onInvocation(KafkaParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
     boolean useAuthentication = parameters.getAuthentication().equals(KafkaController.getSaslAccessKey());
+
+    KafkaSecurityConfig securityConfig;
+    //KafkaSerializerConfig serializerConfig = new KafkaSerializerByteArrayConfig();
+
+    // check if a user for the authentication is defined
     if (useAuthentication) {
-      this.producer = new SpKafkaProducer(parameters.getKafkaHost() + ":" + parameters.getKafkaPort(),
-              parameters.getTopic(),
-              parameters.getUsername(),
-              parameters.getPassword());
-    }
-    else {
-      this.producer = new SpKafkaProducer(parameters.getKafkaHost() + ":" + parameters.getKafkaPort(),
-              parameters.getTopic());
+      securityConfig = parameters.isUseSSL() ?
+              new KafkaSecuritySaslSSLConfig(parameters.getUsername(), parameters.getPassword()) :
+              new KafkaSecuritySaslPlainConfig(parameters.getUsername(), parameters.getPassword());
+    } else {
+        // set security config for none authenticated access
+        securityConfig = parameters.isUseSSL() ?
+                new KafkaSecurityUnauthenticatedSSLConfig() :
+                new KafkaSecurityUnauthenticatedPlainConfig();
     }
+
+      this.producer = new SpKafkaProducer(
+              parameters.getKafkaHost() + ":" + parameters.getKafkaPort(),
+              parameters.getTopic(),
+              Arrays.asList(securityConfig));
+
   }
 
   @Override
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
index 900cc3d..af8bb95 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/resources/org.apache.streampipes.sinks.brokers.jvm.kafka/strings.en
@@ -22,10 +22,16 @@ password.description=The password to authenticate with the broker
 access-mode.title=Access Mode
 access-mode.description=Unauthenticated or SASL/PLAIN
 
-anonymous-alternative.title=Unauthenticated
-anonymous-alternative.description=No authentication and plaintext
+unauthenticated-plain.title=Unauthenticated Plain
+unauthenticated-plain.description=No authentication and plaintext
 
-username-alternative.title=SASL/PLAIN
-username-alternative.description=Username and password, no encryption
+unauthenticated-ssl.title=Unauthenticated SSL
+unauthenticated-ssl.description=Using SSL with no authentication
+
+sasl-plain.title=SASL/PLAIN
+sasl-plain.description=Username and password, no encryption
+
+sasl-ssl.title=SASL/SSL
+sasl-ssl.description=Username and password, with ssl encryption
 
 username-group.title=Username and password
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
index cf457df..ed4f12a 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.messaging.kafka;
 
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
@@ -33,9 +34,7 @@ import org.apache.streampipes.model.grounding.WildcardTopicDefinition;
 import java.io.Serializable;
 import java.time.Duration;
 import java.time.temporal.ChronoUnit;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Properties;
+import java.util.*;
 import java.util.regex.Pattern;
 
 public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, Runnable,
@@ -44,37 +43,42 @@ public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, R
   private String topic;
   private InternalEventProcessor<byte[]> eventProcessor;
   private KafkaTransportProtocol protocol;
-  private String username;
-  private String password;
   private volatile boolean isRunning;
   private Boolean patternTopic = false;
 
+  private List<KafkaConfigAppender> appenders = new ArrayList<>();
+
   private static final Logger LOG = LoggerFactory.getLogger(SpKafkaConsumer.class);
 
   public SpKafkaConsumer() {
 
   }
 
-  public SpKafkaConsumer(KafkaTransportProtocol protocol, String topic, InternalEventProcessor<byte[]> eventProcessor) {
-    this(protocol, topic, eventProcessor, null, null);
-  }
-
-  public SpKafkaConsumer(KafkaTransportProtocol protocol, String topic, InternalEventProcessor<byte[]> eventProcessor,
-                         String username, String password) {
+  public SpKafkaConsumer(KafkaTransportProtocol protocol,
+                         String topic,
+                         InternalEventProcessor<byte[]> eventProcessor) {
     this.protocol = protocol;
     this.topic = topic;
     this.eventProcessor = eventProcessor;
     this.isRunning = true;
-    this.username = username;
-    this.password = password;
+  }
+
+  public SpKafkaConsumer(KafkaTransportProtocol protocol,
+                         String topic,
+                         InternalEventProcessor<byte[]> eventProcessor,
+                         List<KafkaConfigAppender> appenders) {
+    this(protocol, topic, eventProcessor);
+    this.appenders = appenders;
   }
 
   // TODO backwards compatibility, remove later
-  public SpKafkaConsumer(String kafkaUrl, String topic, InternalEventProcessor<byte[]> callback) {
+  public SpKafkaConsumer(String kafkaUrl,String topic,InternalEventProcessor<byte[]> callback,
+                         KafkaConfigAppender... appenders) {
     KafkaTransportProtocol protocol = new KafkaTransportProtocol();
     protocol.setKafkaPort(Integer.parseInt(kafkaUrl.split(":")[1]));
     protocol.setBrokerHostname(kafkaUrl.split(":")[0]);
     protocol.setTopicDefinition(new SimpleTopicDefinition(topic));
+    this.appenders = Arrays.asList(appenders);
 
     try {
       this.connect(protocol, callback);
@@ -85,12 +89,9 @@ public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, R
 
   @Override
   public void run() {
-    Properties props;
-    if (username != null && password != null) {
-      props = makePropertiesSaslPlain(protocol, username, password);
-    } else {
-      props = makeProperties(protocol);
-    }
+
+    Properties props = makeProperties(protocol, appenders);
+
     KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
     if (!patternTopic) {
       consumer.subscribe(Collections.singletonList(topic));
@@ -124,12 +125,9 @@ public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, R
     return topic.replaceAll("\\*", ".*");
   }
 
-  private Properties makeProperties(KafkaTransportProtocol protocol) {
-    return new ConsumerConfigFactory(protocol).makeProperties();
-  }
-
-  private Properties makePropertiesSaslPlain(KafkaTransportProtocol protocol, String username, String password) {
-    return new ConsumerConfigFactory(protocol).makePropertiesSaslPlain(username, password);
+  private Properties makeProperties(KafkaTransportProtocol protocol,
+            List<KafkaConfigAppender> appenders) {
+    return new ConsumerConfigFactory(protocol).buildProperties(appenders);
   }
 
   @Override
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index f9dba54..1de82dc 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.messaging.EventProducer;
@@ -32,10 +33,7 @@ import org.apache.streampipes.messaging.kafka.config.ProducerConfigFactory;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 
 import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 
 public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, Serializable {
@@ -55,26 +53,28 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
   public SpKafkaProducer() { }
 
   // TODO backwards compatibility, remove later
-  public SpKafkaProducer(String url, String topic) {
+  public SpKafkaProducer(String url,
+                         String topic,
+                         List<KafkaConfigAppender> appenders) {
     String[] urlParts = url.split(COLON);
     KafkaTransportProtocol protocol = new KafkaTransportProtocol(urlParts[0],
             Integer.parseInt(urlParts[1]), topic);
     this.brokerUrl = url;
     this.topic = topic;
-    this.producer = new KafkaProducer<>(makeProperties(protocol));
+    this.producer = new KafkaProducer<>(makeProperties(protocol, appenders));
     this.connected = true;
   }
 
   // TODO backwards compatibility, remove later
-  public SpKafkaProducer(String url, String topic, String username, String password) {
-    String[] urlParts = url.split(COLON);
-    KafkaTransportProtocol protocol = new KafkaTransportProtocol(urlParts[0],
-            Integer.parseInt(urlParts[1]), topic);
-    this.brokerUrl = url;
-    this.topic = topic;
-    this.producer = new KafkaProducer<>(makePropertiesSaslPlain(protocol, username, password));
-    this.connected = true;
-  }
+//  public SpKafkaProducer(String url, String topic, String username, String password, boolean ssl) {
+//    String[] urlParts = url.split(COLON);
+//    KafkaTransportProtocol protocol = new KafkaTransportProtocol(urlParts[0],
+//            Integer.parseInt(urlParts[1]), topic);
+//    this.brokerUrl = url;
+//    this.topic = topic;
+//    this.producer = new KafkaProducer<>(makePropertiesSaslPlain(protocol, username, password));
+//    this.connected = true;
+//  }
 
   public void publish(String message) {
     publish(message.getBytes());
@@ -86,13 +86,14 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
     }
   }
 
-  private Properties makeProperties(KafkaTransportProtocol protocol) {
-    return new ProducerConfigFactory(protocol).makeProperties();
+  private Properties makeProperties(KafkaTransportProtocol protocol,
+                                    List<KafkaConfigAppender> appenders) {
+    return new ProducerConfigFactory(protocol).buildProperties(appenders);
   }
 
-  private Properties makePropertiesSaslPlain(KafkaTransportProtocol protocol, String username, String password) {
-    return new ProducerConfigFactory(protocol).makePropertiesSaslPlain(username, password);
-  }
+//  private Properties makePropertiesSaslPlain(KafkaTransportProtocol protocol, String username, String password) {
+//    return new ProducerConfigFactory(protocol).makePropertiesSaslPlain(username, password);
+//  }
 
   @Override
   public void connect(KafkaTransportProtocol protocol) {
@@ -107,7 +108,7 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
       LOG.error("Could not create topic: " + topic + " on broker " + zookeeperHost);
     }
 
-    this.producer = new KafkaProducer<>(makeProperties(protocol));
+    this.producer = new KafkaProducer<>(makeProperties(protocol, Collections.emptyList()));
     this.connected = true;
 
     LOG.info("Successfully created Kafka producer for topic " + this.topic);
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java
index 80d9a2e..bac2870 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java
@@ -17,11 +17,9 @@
  */
 package org.apache.streampipes.messaging.kafka.config;
 
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 
+import java.util.List;
 import java.util.Properties;
 import java.util.function.Supplier;
 
@@ -36,7 +34,7 @@ public abstract class AbstractConfigFactory {
     this.protocol = protocol;
   }
 
-  public abstract Properties makeProperties();
+  protected abstract Properties makeDefaultProperties();
 
   protected <T> T getConfigOrDefault(Supplier<T> function,
                                       T defaultValue) {
@@ -47,13 +45,24 @@ public abstract class AbstractConfigFactory {
     return protocol.getBrokerHostname() + COLON + protocol.getKafkaPort();
   }
 
-  public Properties makePropertiesSaslPlain(String username,
-                                            String password) {
-    Properties props = makeProperties();
-    props.put(SaslConfigs.SASL_MECHANISM,SASL_MECHANISM);
-    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.toString());
-    String SASL_JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
-    props.put(SaslConfigs.SASL_JAAS_CONFIG, SASL_JAAS_CONFIG);
-    return props;
+  public Properties buildProperties(List<KafkaConfigAppender> appenders) {
+    Properties props = makeDefaultProperties();
+    appenders.forEach(appender -> appender.appendConfig(props));
+
+    return  props;
+    // TODO check Kafka Security
   }
+
+//  public Properties makePropertiesSaslPlain(String username,
+//                                            String password) {
+//    Properties props = makeProperties();
+//    props.put(SaslConfigs.SASL_MECHANISM,SASL_MECHANISM);
+//    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.toString());
+//
+////    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.toString());
+//
+//    String SASL_JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
+//    props.put(SaslConfigs.SASL_JAAS_CONFIG, SASL_JAAS_CONFIG);
+//    return props;
+//  }
 }
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
index ad19534..21550d7 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
@@ -39,7 +39,7 @@ public class ConsumerConfigFactory extends AbstractConfigFactory {
   }
 
   @Override
-  public Properties makeProperties() {
+  public Properties makeDefaultProperties() {
     Properties props = new Properties();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
 //    props.put(ConsumerConfig.GROUP_ID_CONFIG, getConfigOrDefault(protocol::getGroupId,
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
new file mode 100644
index 0000000..b894960
--- /dev/null
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/KafkaConfigAppender.java
@@ -0,0 +1,8 @@
+package org.apache.streampipes.messaging.kafka.config;
+
+import java.util.Properties;
+
+public interface KafkaConfigAppender {
+
+    void appendConfig(Properties props);
+}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
index 41937ac..bf76d57 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
@@ -17,7 +17,9 @@
  */
 package org.apache.streampipes.messaging.kafka.config;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 
 import java.util.Properties;
@@ -42,7 +44,7 @@ public class ProducerConfigFactory extends AbstractConfigFactory {
   }
 
   @Override
-  public Properties makeProperties() {
+  public Properties makeDefaultProperties() {
     Properties props = new Properties();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
     props.put(ProducerConfig.ACKS_CONFIG, getConfigOrDefault(protocol::getAcks,
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java
new file mode 100644
index 0000000..7acc632
--- /dev/null
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityConfig.java
@@ -0,0 +1,7 @@
+package org.apache.streampipes.messaging.kafka.security;
+
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
+
+public abstract class KafkaSecurityConfig implements KafkaConfigAppender {
+
+}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java
new file mode 100644
index 0000000..4ed49e0
--- /dev/null
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslPlainConfig.java
@@ -0,0 +1,28 @@
+package org.apache.streampipes.messaging.kafka.security;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.util.Properties;
+
+public class KafkaSecuritySaslPlainConfig extends KafkaSecurityConfig {
+
+    private final String username;
+    private final String password;
+
+    public KafkaSecuritySaslPlainConfig(String username, String password) {
+        this.username = username;
+        this.password = password;
+    }
+
+    @Override
+    public void appendConfig(Properties props) {
+
+        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.toString());
+
+        String SASL_JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
+        props.put(SaslConfigs.SASL_JAAS_CONFIG, SASL_JAAS_CONFIG);
+    }
+}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java
new file mode 100644
index 0000000..626b0d3
--- /dev/null
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecuritySaslSSLConfig.java
@@ -0,0 +1,28 @@
+package org.apache.streampipes.messaging.kafka.security;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.util.Properties;
+
+public class KafkaSecuritySaslSSLConfig extends KafkaSecurityConfig {
+
+    private final String username;
+    private final String password;
+
+    public KafkaSecuritySaslSSLConfig(String username, String password) {
+        this.username = username;
+        this.password = password;
+    }
+
+    @Override
+    public void appendConfig(Properties props) {
+
+        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.toString());
+
+        String SASL_JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
+        props.put(SaslConfigs.SASL_JAAS_CONFIG, SASL_JAAS_CONFIG);
+    }
+}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
new file mode 100644
index 0000000..23746be
--- /dev/null
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedPlainConfig.java
@@ -0,0 +1,12 @@
+package org.apache.streampipes.messaging.kafka.security;
+
+import java.util.Properties;
+
+public class KafkaSecurityUnauthenticatedPlainConfig extends KafkaSecurityConfig {
+
+
+    @Override
+    public void appendConfig(Properties props) {
+
+    }
+}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java
new file mode 100644
index 0000000..8fe33ea
--- /dev/null
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/security/KafkaSecurityUnauthenticatedSSLConfig.java
@@ -0,0 +1,14 @@
+package org.apache.streampipes.messaging.kafka.security;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+
+import java.util.Properties;
+
+public class KafkaSecurityUnauthenticatedSSLConfig extends KafkaSecurityConfig {
+
+    @Override
+    public void appendConfig(Properties props) {
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.toString());
+    }
+}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerByteArrayConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerByteArrayConfig.java
new file mode 100644
index 0000000..40d4bb2
--- /dev/null
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerByteArrayConfig.java
@@ -0,0 +1,4 @@
+package org.apache.streampipes.messaging.kafka.serializer;
+
+public class KafkaSerializerByteArrayConfig extends KafkaSerializerConfig {
+}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerConfig.java
new file mode 100644
index 0000000..a317c75
--- /dev/null
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerConfig.java
@@ -0,0 +1,13 @@
+package org.apache.streampipes.messaging.kafka.serializer;
+
+import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
+
+import java.util.Properties;
+
+public class KafkaSerializerConfig implements KafkaConfigAppender {
+
+    @Override
+    public void appendConfig(Properties props) {
+
+    }
+}
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/producer/DataSimulator.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/producer/DataSimulator.java
index 9fa46e9..ac0eb3e 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/producer/DataSimulator.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/producer/DataSimulator.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.performance.producer;
 import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
 import org.apache.streampipes.performance.simulation.DataReplayStatusNotifier;
 
+import java.util.Collections;
 import java.util.Random;
 import java.util.UUID;
 
@@ -38,7 +39,7 @@ public class DataSimulator implements Runnable {
 
   public DataSimulator(String kafkaUrl, Long totalNumberOfEvents, Long waitTimeBetweenEvents, String threadId,
                        DataReplayStatusNotifier statusNotifier) {
-    this.kafkaProducer = new SpKafkaProducer(kafkaUrl, topic);
+    this.kafkaProducer = new SpKafkaProducer(kafkaUrl, topic, Collections.emptyList());
     this.threadId = threadId;
 
     this.totalNumberOfEvents = totalNumberOfEvents;
diff --git a/ui/cypress/support/utils/ParameterUtils.ts b/ui/cypress/support/utils/ParameterUtils.ts
index ba4c8df..f48fd23 100644
--- a/ui/cypress/support/utils/ParameterUtils.ts
+++ b/ui/cypress/support/utils/ParameterUtils.ts
@@ -20,6 +20,7 @@
 export class ParameterUtils {
 
     public static get(localVariable: string, containerVariable: string): string {
+        return localVariable;
         if (Cypress.env('DEVELOPMENT')) {
             return localVariable;
         } else {
diff --git a/ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts b/ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts
index feab875..6be40ea 100644
--- a/ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts
+++ b/ui/cypress/tests/thirdparty/Kafka.smoke.spec.ts
@@ -33,7 +33,7 @@ describe('Test Kafka Integration', () => {
     const port: string = ParameterUtils.get('9094', '9092');
 
     const sink: PipelineElementInput = PipelineElementBuilder.create('kafka_publisher')
-      .addInput('select', 'access-mode-unauthenticated', 'check')
+      .addInput('select', 'access-mode-unauthenticated_plain', 'check')
         .addInput('input', 'host', host)
       .addInput('input', 'port', '{backspace}{backspace}{backspace}{backspace}' + port)
       .addInput('input', 'topic', topicName)
@@ -43,7 +43,7 @@ describe('Test Kafka Integration', () => {
       .create('Apache_Kafka')
       .setName('Kafka4')
       .setTimestampProperty('timestamp')
-      .addProtocolInput('select', 'access-mode-unauthenticated', 'check')
+      .addProtocolInput('select', 'access-mode-unauthenticated_plain', 'check')
         .addProtocolInput('input', 'host', host)
         .addProtocolInput('input', 'port', port)
       .addProtocolInput('click', 'sp-reload', '')