You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/02/24 21:46:47 UTC

[incubator-streampipes] 02/02: [STREAMPIPES-512] Clean up some code

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 1f87304d4bf92cc0b272964683ce2b1987cd23c6
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Thu Feb 24 22:45:40 2022 +0100

    [STREAMPIPES-512] Clean up some code
---
 .../iiot/protocol/stream/KafkaProtocol.java        | 37 +++++++---------------
 .../strings.en                                     |  6 ++++
 .../pe/shared/config/kafka/KafkaConfig.java        |  7 +++-
 .../pe/shared/config/kafka/KafkaConnectUtils.java  | 21 ------------
 .../config/kafka/{ => kafka}/KafkaConfig.java      |  8 +++--
 .../kafka/{ => kafka}/KafkaConnectUtils.java       | 25 ++++-----------
 .../sinks/brokers/jvm/kafka/KafkaPublisher.java    |  8 -----
 .../messaging/kafka/SpKafkaConsumer.java           | 20 ++----------
 .../messaging/kafka/SpKafkaProducer.java           | 15 ---------
 .../kafka/config/AbstractConfigFactory.java        | 15 ---------
 .../kafka/config/ConsumerConfigFactory.java        | 13 ++++----
 .../kafka/config/ProducerConfigFactory.java        | 15 +++++----
 .../serializer/KafkaSerializerByteArrayConfig.java |  4 ---
 .../kafka/serializer/KafkaSerializerConfig.java    | 13 --------
 ...AbstractConfigurablePipelineElementBuilder.java | 20 ++++++++++++
 .../distributed/runtime/DistributedRuntime.java    |  4 +--
 16 files changed, 76 insertions(+), 155 deletions(-)

diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
index e94e9a0..502aa7c 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
@@ -21,8 +21,7 @@ package org.apache.streampipes.connect.iiot.protocol.stream;
 import org.apache.commons.io.IOUtils;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.connect.SendToPipeline;
@@ -34,7 +33,6 @@ import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
-import org.apache.streampipes.messaging.kafka.security.*;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
@@ -47,10 +45,12 @@ import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.AdapterSourceType;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.util.*;
 import java.util.stream.Collectors;
@@ -109,7 +109,7 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
 
     @Override
     protected List<byte[]> getNByteElements(int n) throws ParseException {
-        final Consumer<Long, String> consumer;
+        final Consumer<byte[], byte[]> consumer;
 
         consumer = createConsumer(this.config);
         consumer.subscribe(Arrays.asList(this.topic), new ConsumerRebalanceListener() {
@@ -129,11 +129,11 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
 
 
         while (true) {
-            final ConsumerRecords<Long, String> consumerRecords =
+            final ConsumerRecords<byte[], byte[]> consumerRecords =
                     consumer.poll(1000);
 
             consumerRecords.forEach(record -> {
-                InputStream inputStream = IOUtils.toInputStream(record.value(), "UTF-8");
+                InputStream inputStream = new ByteArrayInputStream(record.value());
 
                 nEventsByte.addAll(parser.parseNEvents(inputStream, n));
             });
@@ -154,7 +154,7 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
         return resultEventsByte;
     }
 
-    private static Consumer<Long, String> createConsumer(KafkaConfig kafkaConfig) {
+    private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig) {
         final Properties props = new Properties();
 
         kafkaConfig.getSecurityConfig().appendConfig(props);
@@ -162,21 +162,18 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                 kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort());
 
-        // TODO make serializer configurable
         props.put(ConsumerConfig.GROUP_ID_CONFIG,
                 "KafkaExampleConsumer" + System.currentTimeMillis());
+
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                LongDeserializer.class.getName());
+                ByteArrayDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                StringDeserializer.class.getName());
+                ByteArrayDeserializer.class.getName());
 
         // Create the consumer using props.
-        final Consumer<Long, String> consumer =
+        final Consumer<byte[], byte[]> consumer =
                 new KafkaConsumer<>(props);
 
-        // Subscribe to the topic.
-        consumer.subscribe(Collections.singletonList(kafkaConfig.getTopic()));
-
         return consumer;
     }
 
@@ -221,18 +218,8 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
         KafkaConfig kafkaConfig = KafkaConnectUtils.getConfig(extractor, false);
         boolean hideInternalTopics = extractor.slideToggleValue(KafkaConnectUtils.getHideInternalTopicsKey());
 
-        String kafkaAddress = kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort();
-        Properties props = new Properties();
-
-        // add security properties to kafka configuration
-        kafkaConfig.getSecurityConfig().appendConfig(props);
-
-        props.put("bootstrap.servers", kafkaAddress);
-        props.put("group.id", "test-consumer-group");
-        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        Consumer<byte[], byte[]> consumer = createConsumer(kafkaConfig);
 
-        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
         Set<String> topics = consumer.listTopics().keySet();
         consumer.close();
 
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
index abf3588..027f512 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
@@ -33,5 +33,11 @@ sasl-ssl.description=Username and password, with ssl encryption
 
 username-group.title=Username and password
 
+key-deserialization.title=Key Deserializer
+key-deserialization.description=
+
+value-deserialization.title=Value Deserializer
+value-deserialization.description=
+
 hide-internal-topics.title=Hide internal topics
 hide-internal-topics.description=Do not show topics that are only used internally
diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
index 3b55309..a104c4a 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
@@ -28,7 +28,11 @@ public class KafkaConfig {
 
     KafkaSecurityConfig securityConfig;
 
-    public KafkaConfig(String kafkaHost, Integer kafkaPort, String topic, KafkaSecurityConfig securityConfig) {
+
+    public KafkaConfig(String kafkaHost,
+                       Integer kafkaPort,
+                       String topic,
+                       KafkaSecurityConfig securityConfig) {
         this.kafkaHost = kafkaHost;
         this.kafkaPort = kafkaPort;
         this.topic = topic;
@@ -66,4 +70,5 @@ public class KafkaConfig {
     public void setSecurityConfig(KafkaSecurityConfig securityConfig) {
         this.securityConfig = securityConfig;
     }
+
 }
diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
index 51b41c9..a8f1d2f 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
@@ -25,7 +25,6 @@ import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.Alternatives;
 import org.apache.streampipes.sdk.helpers.Label;
 import org.apache.streampipes.sdk.helpers.Labels;
-import org.checkerframework.checker.units.qual.K;
 
 public class KafkaConnectUtils {
 
@@ -33,13 +32,6 @@ public class KafkaConnectUtils {
     public static final String HOST_KEY = "host";
     public static final String PORT_KEY = "port";
 
-//    private static final String ACCESS_MODE = "access-mode";
-//    private static final String ANONYMOUS_ACCESS = "anonymous-alternative";
-//    private static final String USERNAME_ACCESS = "username-alternative";
-//    private static final String USERNAME_GROUP = "username-group";
-//    private static final String USERNAME_KEY = "username";
-//    private static final String PASSWORD_KEY = "password";
-
     public static final String ACCESS_MODE = "access-mode";
     public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain";
     public static final String UNAUTHENTICATED_SSL = "unauthenticated-ssl";
@@ -77,17 +69,6 @@ public class KafkaConnectUtils {
         return Labels.withId(ACCESS_MODE);
     }
 
-//    public static StaticPropertyAlternative getAlternativesOne() {
-//        return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS));
-//    }
-//
-//    public static StaticPropertyAlternative getAlternativesTwo() {
-//        return Alternatives.from(Labels.withId(USERNAME_ACCESS),
-//                StaticProperties.group(Labels.withId(USERNAME_GROUP),
-//                        StaticProperties.stringFreeTextProperty(Labels.withId(USERNAME_KEY)),
-//                        StaticProperties.secretValue(Labels.withId(PASSWORD_KEY))));
-//    }
-
     public static KafkaConfig getConfig(StaticPropertyExtractor extractor, boolean containsTopic) {
         String brokerUrl = extractor.singleValueParameter(HOST_KEY, String.class);
         String topic = "";
@@ -102,8 +83,6 @@ public class KafkaConnectUtils {
 
         KafkaSecurityConfig securityConfig;
 
-        //KafkaSerializerConfig serializerConfig = new KafkaSerializerByteArrayConfig();
-
         // check if a user for the authentication is defined
         if (authentication.equals(KafkaConnectUtils.SASL_SSL) || authentication.equals(KafkaConnectUtils.SASL_PLAIN)) {
             String username = extractor.singleValueParameter(USERNAME_KEY, String.class);
diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConfig.java
similarity index 88%
copy from streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
copy to streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConfig.java
index 3b55309..b2df331 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConfig.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.pe.shared.config.kafka;
+package org.apache.streampipes.pe.shared.config.kafka.kafka;
 
 import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
 
@@ -28,7 +28,10 @@ public class KafkaConfig {
 
     KafkaSecurityConfig securityConfig;
 
-    public KafkaConfig(String kafkaHost, Integer kafkaPort, String topic, KafkaSecurityConfig securityConfig) {
+    public KafkaConfig(String kafkaHost,
+                       Integer kafkaPort,
+                       String topic,
+                       KafkaSecurityConfig securityConfig) {
         this.kafkaHost = kafkaHost;
         this.kafkaPort = kafkaPort;
         this.topic = topic;
@@ -66,4 +69,5 @@ public class KafkaConfig {
     public void setSecurityConfig(KafkaSecurityConfig securityConfig) {
         this.securityConfig = securityConfig;
     }
+
 }
diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java
similarity index 84%
copy from streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
copy to streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java
index 51b41c9..147b2dc 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.pe.shared.config.kafka;
+package org.apache.streampipes.pe.shared.config.kafka.kafka;
 
 import org.apache.streampipes.messaging.kafka.security.*;
 import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
@@ -25,7 +25,6 @@ import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.Alternatives;
 import org.apache.streampipes.sdk.helpers.Label;
 import org.apache.streampipes.sdk.helpers.Labels;
-import org.checkerframework.checker.units.qual.K;
 
 public class KafkaConnectUtils {
 
@@ -33,12 +32,11 @@ public class KafkaConnectUtils {
     public static final String HOST_KEY = "host";
     public static final String PORT_KEY = "port";
 
-//    private static final String ACCESS_MODE = "access-mode";
-//    private static final String ANONYMOUS_ACCESS = "anonymous-alternative";
-//    private static final String USERNAME_ACCESS = "username-alternative";
-//    private static final String USERNAME_GROUP = "username-group";
-//    private static final String USERNAME_KEY = "username";
-//    private static final String PASSWORD_KEY = "password";
+    public static final String KEY_SERIALIZATION = "key-serialization";
+    public static final String VALUE_SERIALIZATION = "value-serialization";
+
+    public static final String KEY_DESERIALIZATION = "key-deserialization";
+    public static final String VALUE_DESERIALIZATION = "value-deserialization";
 
     public static final String ACCESS_MODE = "access-mode";
     public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain";
@@ -77,17 +75,6 @@ public class KafkaConnectUtils {
         return Labels.withId(ACCESS_MODE);
     }
 
-//    public static StaticPropertyAlternative getAlternativesOne() {
-//        return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS));
-//    }
-//
-//    public static StaticPropertyAlternative getAlternativesTwo() {
-//        return Alternatives.from(Labels.withId(USERNAME_ACCESS),
-//                StaticProperties.group(Labels.withId(USERNAME_GROUP),
-//                        StaticProperties.stringFreeTextProperty(Labels.withId(USERNAME_KEY)),
-//                        StaticProperties.secretValue(Labels.withId(PASSWORD_KEY))));
-//    }
-
     public static KafkaConfig getConfig(StaticPropertyExtractor extractor, boolean containsTopic) {
         String brokerUrl = extractor.singleValueParameter(HOST_KEY, String.class);
         String topic = "";
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java
index bd72731..52a0ed0 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java
@@ -22,8 +22,6 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
 import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
 import org.apache.streampipes.messaging.kafka.security.*;
-import org.apache.streampipes.messaging.kafka.serializer.KafkaSerializerByteArrayConfig;
-import org.apache.streampipes.messaging.kafka.serializer.KafkaSerializerConfig;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
@@ -40,12 +38,6 @@ public class KafkaPublisher implements EventSink<KafkaParameters> {
     this.dataFormatDefinition = new JsonDataFormatDefinition();
   }
 
-  // Serialization
-  // Key
-  // - StringSerializer
-  // Value
-  // - ByteArraySerializer
-
   @Override
   public void onInvocation(KafkaParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
     boolean useAuthentication = parameters.getAuthentication().equals(KafkaController.getSaslAccessKey());
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
index ed4f12a..449bf1c 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
@@ -71,28 +71,12 @@ public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, R
     this.appenders = appenders;
   }
 
-  // TODO backwards compatibility, remove later
-  public SpKafkaConsumer(String kafkaUrl,String topic,InternalEventProcessor<byte[]> callback,
-                         KafkaConfigAppender... appenders) {
-    KafkaTransportProtocol protocol = new KafkaTransportProtocol();
-    protocol.setKafkaPort(Integer.parseInt(kafkaUrl.split(":")[1]));
-    protocol.setBrokerHostname(kafkaUrl.split(":")[0]);
-    protocol.setTopicDefinition(new SimpleTopicDefinition(topic));
-    this.appenders = Arrays.asList(appenders);
-
-    try {
-      this.connect(protocol, callback);
-    } catch (SpRuntimeException e) {
-      e.printStackTrace();
-    }
-  }
-
   @Override
   public void run() {
 
     Properties props = makeProperties(protocol, appenders);
 
-    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
+    KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
     if (!patternTopic) {
       consumer.subscribe(Collections.singletonList(topic));
     } else {
@@ -111,7 +95,7 @@ public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, R
     }
     Duration duration = Duration.of(100, ChronoUnit.MILLIS);
     while (isRunning) {
-      ConsumerRecords<String, byte[]> records = consumer.poll(duration);
+      ConsumerRecords<byte[], byte[]> records = consumer.poll(duration);
       records.forEach(record -> {
         eventProcessor.onEvent(record.value());
       });
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index 1de82dc..7c46bb9 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -65,17 +65,6 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
     this.connected = true;
   }
 
-  // TODO backwards compatibility, remove later
-//  public SpKafkaProducer(String url, String topic, String username, String password, boolean ssl) {
-//    String[] urlParts = url.split(COLON);
-//    KafkaTransportProtocol protocol = new KafkaTransportProtocol(urlParts[0],
-//            Integer.parseInt(urlParts[1]), topic);
-//    this.brokerUrl = url;
-//    this.topic = topic;
-//    this.producer = new KafkaProducer<>(makePropertiesSaslPlain(protocol, username, password));
-//    this.connected = true;
-//  }
-
   public void publish(String message) {
     publish(message.getBytes());
   }
@@ -91,10 +80,6 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
     return new ProducerConfigFactory(protocol).buildProperties(appenders);
   }
 
-//  private Properties makePropertiesSaslPlain(KafkaTransportProtocol protocol, String username, String password) {
-//    return new ProducerConfigFactory(protocol).makePropertiesSaslPlain(username, password);
-//  }
-
   @Override
   public void connect(KafkaTransportProtocol protocol) {
     LOG.info("Kafka producer: Connecting to " + protocol.getTopicDefinition().getActualTopicName());
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java
index bac2870..8225986 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java
@@ -26,7 +26,6 @@ import java.util.function.Supplier;
 public abstract class AbstractConfigFactory {
 
   private static final String COLON = ":";
-  private static final String SASL_MECHANISM = "PLAIN";
 
   protected KafkaTransportProtocol protocol;
 
@@ -50,19 +49,5 @@ public abstract class AbstractConfigFactory {
     appenders.forEach(appender -> appender.appendConfig(props));
 
     return  props;
-    // TODO check Kafka Security
   }
-
-//  public Properties makePropertiesSaslPlain(String username,
-//                                            String password) {
-//    Properties props = makeProperties();
-//    props.put(SaslConfigs.SASL_MECHANISM,SASL_MECHANISM);
-//    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.toString());
-//
-////    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.toString());
-//
-//    String SASL_JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
-//    props.put(SaslConfigs.SASL_JAAS_CONFIG, SASL_JAAS_CONFIG);
-//    return props;
-//  }
 }
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
index 21550d7..30dfbdd 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
@@ -18,6 +18,8 @@
 package org.apache.streampipes.messaging.kafka.config;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 
 import java.util.Properties;
@@ -29,10 +31,8 @@ public class ConsumerConfigFactory extends AbstractConfigFactory {
   private static final String AUTO_COMMIT_INTERVAL_MS_CONFIG_DEFAULT = "5000";
   private static final String SESSION_TIMEOUT_MS_CONFIG_DEFAULT = "30000";
   private static final Integer FETCH_MAX_BYTES_CONFIG_DEFAULT = 52428800;
-  private static final String KEY_DESERIALIZER_CLASS_CONFIG_DEFAULT = "org.apache.kafka.common" +
-          ".serialization.StringDeserializer";
-  private static final String VALUE_DESERIALIZER_CLASS_CONFIG_DEFAULT = "org.apache.kafka.common" +
-          ".serialization.ByteArrayDeserializer";
+  private static final String KEY_DESERIALIZER_CLASS_CONFIG_DEFAULT = ByteArrayDeserializer.class.getName();
+  private static final String VALUE_DESERIALIZER_CLASS_CONFIG_DEFAULT = ByteArrayDeserializer.class.getName();
 
   public ConsumerConfigFactory(KafkaTransportProtocol protocol) {
     super(protocol);
@@ -42,8 +42,6 @@ public class ConsumerConfigFactory extends AbstractConfigFactory {
   public Properties makeDefaultProperties() {
     Properties props = new Properties();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
-//    props.put(ConsumerConfig.GROUP_ID_CONFIG, getConfigOrDefault(protocol::getGroupId,
-//            UUID.randomUUID().toString()));
     props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT_CONFIG_DEFAULT);
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
@@ -51,8 +49,11 @@ public class ConsumerConfigFactory extends AbstractConfigFactory {
     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, SESSION_TIMEOUT_MS_CONFIG_DEFAULT);
     props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
             getConfigOrDefault(protocol::getMessageMaxBytes, FETCH_MAX_BYTES_CONFIG_DEFAULT));
+
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER_CLASS_CONFIG_DEFAULT);
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER_CLASS_CONFIG_DEFAULT);
+
+
     props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
 
     return props;
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
index bf76d57..23426d1 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
@@ -17,9 +17,9 @@
  */
 package org.apache.streampipes.messaging.kafka.config;
 
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 
 import java.util.Properties;
@@ -33,14 +33,15 @@ public class ProducerConfigFactory extends AbstractConfigFactory {
   private static final Integer BUFFER_MEMORY_CONFIG_DEFAULT = 33554432;
   private static final Integer MAX_REQUEST_SIZE_CONFIG_DEFAULT = 5000012;
 
-  private static final String KEY_SERIALIZER_DEFAULT = "org.apache.kafka.common.serialization" +
-          ".StringSerializer";
-  private static final String VALUE_SERIALIZER_DEFAULT = "org.apache.kafka.common.serialization" +
-          ".ByteArraySerializer";
+  private static final String KEY_SERIALIZER_DEFAULT = StringSerializer.class.getName();
+  private static final String VALUE_SERIALIZER_DEFAULT = ByteArraySerializer.class.getName();
+
 
 
   public ProducerConfigFactory(KafkaTransportProtocol protocol) {
     super(protocol);
+
+
   }
 
   @Override
@@ -57,8 +58,10 @@ public class ProducerConfigFactory extends AbstractConfigFactory {
     props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, getConfigOrDefault(protocol::getMaxRequestSize,
             MAX_REQUEST_SIZE_CONFIG_DEFAULT));
     props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, BUFFER_MEMORY_CONFIG_DEFAULT);
+
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER_DEFAULT);
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_DEFAULT);
+
     return props;
   }
 
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerByteArrayConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerByteArrayConfig.java
deleted file mode 100644
index 40d4bb2..0000000
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerByteArrayConfig.java
+++ /dev/null
@@ -1,4 +0,0 @@
-package org.apache.streampipes.messaging.kafka.serializer;
-
-public class KafkaSerializerByteArrayConfig extends KafkaSerializerConfig {
-}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerConfig.java
deleted file mode 100644
index a317c75..0000000
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerConfig.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.streampipes.messaging.kafka.serializer;
-
-import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
-
-import java.util.Properties;
-
-public class KafkaSerializerConfig implements KafkaConfigAppender {
-
-    @Override
-    public void appendConfig(Properties props) {
-
-    }
-}
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractConfigurablePipelineElementBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractConfigurablePipelineElementBuilder.java
index af6df4e..b3b4cfb 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractConfigurablePipelineElementBuilder.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractConfigurablePipelineElementBuilder.java
@@ -696,9 +696,29 @@ public abstract class AbstractConfigurablePipelineElementBuilder<BU extends
 
     this.staticProperties.add(osp);
     return me();
+  }
+
+  /**
+   * Defines a configuration parameter that lets preprocessing developers select from a list of pre-defined configuration
+   * options. The parameter will be rendered as a RadioGroup in the StreamPipes UI.
+   * @param label The {@link org.apache.streampipes.sdk.helpers.Label} that describes why this parameter is needed in a
+   *              user-friendly manner.
+   * @param options A list of {@link org.apache.streampipes.model.staticproperty.Option} elements. Use
+   * @param horizontalRendering when set to true
+   * {@link org.apache.streampipes.sdk.helpers.Options} to create option elements from string values.
+   * @return this
+   */
+  public BU requiredSingleValueSelection(Label label,
+                                         List<Option> options,
+                                         boolean horizontalRendering) {
+    OneOfStaticProperty osp = new OneOfStaticProperty(label.getInternalId(), label.getLabel(), label.getDescription(), horizontalRendering);
+    osp.setOptions(options);
 
+    this.staticProperties.add(osp);
+    return me();
   }
 
+
   /**
    * @deprecated Use {@link #requiredMultiValueSelection(Label, Option...)} instead.
    * @param internalId
diff --git a/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java b/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java
index 70f61f8..4afc4fa 100644
--- a/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java
+++ b/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java
@@ -64,11 +64,11 @@ public abstract class DistributedRuntime<RP extends RuntimeParams<B, I, RC>, B e
   }
 
   protected Properties getProperties(KafkaTransportProtocol protocol) {
-    return new ConsumerConfigFactory(protocol).makeProperties();
+    return new ConsumerConfigFactory(protocol).makeDefaultProperties();
   }
 
   protected Properties getProducerProperties(KafkaTransportProtocol protocol) {
-    return new ProducerConfigFactory(protocol).makeProperties();
+    return new ProducerConfigFactory(protocol).makeDefaultProperties();
   }
 
   protected SpDataFormatDefinition getDataFormatDefinition(TransportFormat transportFormat) {