You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/02/24 21:46:47 UTC
[incubator-streampipes] 02/02: [STREAMPIPES-512] Clean up some code
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 1f87304d4bf92cc0b272964683ce2b1987cd23c6
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Thu Feb 24 22:45:40 2022 +0100
[STREAMPIPES-512] Clean up some code
---
.../iiot/protocol/stream/KafkaProtocol.java | 37 +++++++---------------
.../strings.en | 6 ++++
.../pe/shared/config/kafka/KafkaConfig.java | 7 +++-
.../pe/shared/config/kafka/KafkaConnectUtils.java | 21 ------------
.../config/kafka/{ => kafka}/KafkaConfig.java | 8 +++--
.../kafka/{ => kafka}/KafkaConnectUtils.java | 25 ++++-----------
.../sinks/brokers/jvm/kafka/KafkaPublisher.java | 8 -----
.../messaging/kafka/SpKafkaConsumer.java | 20 ++----------
.../messaging/kafka/SpKafkaProducer.java | 15 ---------
.../kafka/config/AbstractConfigFactory.java | 15 ---------
.../kafka/config/ConsumerConfigFactory.java | 13 ++++----
.../kafka/config/ProducerConfigFactory.java | 15 +++++----
.../serializer/KafkaSerializerByteArrayConfig.java | 4 ---
.../kafka/serializer/KafkaSerializerConfig.java | 13 --------
...AbstractConfigurablePipelineElementBuilder.java | 20 ++++++++++++
.../distributed/runtime/DistributedRuntime.java | 4 +--
16 files changed, 76 insertions(+), 155 deletions(-)
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
index e94e9a0..502aa7c 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
@@ -21,8 +21,7 @@ package org.apache.streampipes.connect.iiot.protocol.stream;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.streampipes.commons.constants.GlobalStreamPipesConstants;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.connect.SendToPipeline;
@@ -34,7 +33,6 @@ import org.apache.streampipes.connect.api.exception.ParseException;
import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
-import org.apache.streampipes.messaging.kafka.security.*;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
@@ -47,10 +45,12 @@ import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
import org.apache.streampipes.sdk.helpers.AdapterSourceType;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.utils.Assets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.*;
import java.util.stream.Collectors;
@@ -109,7 +109,7 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
@Override
protected List<byte[]> getNByteElements(int n) throws ParseException {
- final Consumer<Long, String> consumer;
+ final Consumer<byte[], byte[]> consumer;
consumer = createConsumer(this.config);
consumer.subscribe(Arrays.asList(this.topic), new ConsumerRebalanceListener() {
@@ -129,11 +129,11 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
while (true) {
- final ConsumerRecords<Long, String> consumerRecords =
+ final ConsumerRecords<byte[], byte[]> consumerRecords =
consumer.poll(1000);
consumerRecords.forEach(record -> {
- InputStream inputStream = IOUtils.toInputStream(record.value(), "UTF-8");
+ InputStream inputStream = new ByteArrayInputStream(record.value());
nEventsByte.addAll(parser.parseNEvents(inputStream, n));
});
@@ -154,7 +154,7 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
return resultEventsByte;
}
- private static Consumer<Long, String> createConsumer(KafkaConfig kafkaConfig) {
+ private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig) {
final Properties props = new Properties();
kafkaConfig.getSecurityConfig().appendConfig(props);
@@ -162,21 +162,18 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort());
- // TODO make serializer configurable
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"KafkaExampleConsumer" + System.currentTimeMillis());
+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- LongDeserializer.class.getName());
+ ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
+ ByteArrayDeserializer.class.getName());
// Create the consumer using props.
- final Consumer<Long, String> consumer =
+ final Consumer<byte[], byte[]> consumer =
new KafkaConsumer<>(props);
- // Subscribe to the topic.
- consumer.subscribe(Collections.singletonList(kafkaConfig.getTopic()));
-
return consumer;
}
@@ -221,18 +218,8 @@ public class KafkaProtocol extends BrokerProtocol implements ResolvesContainerPr
KafkaConfig kafkaConfig = KafkaConnectUtils.getConfig(extractor, false);
boolean hideInternalTopics = extractor.slideToggleValue(KafkaConnectUtils.getHideInternalTopicsKey());
- String kafkaAddress = kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort();
- Properties props = new Properties();
-
- // add security properties to kafka configuration
- kafkaConfig.getSecurityConfig().appendConfig(props);
-
- props.put("bootstrap.servers", kafkaAddress);
- props.put("group.id", "test-consumer-group");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ Consumer<byte[], byte[]> consumer = createConsumer(kafkaConfig);
- KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
Set<String> topics = consumer.listTopics().keySet();
consumer.close();
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
index abf3588..027f512 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/resources/org.apache.streampipes.connect.iiot.protocol.stream.kafka/strings.en
@@ -33,5 +33,11 @@ sasl-ssl.description=Username and password, with ssl encryption
username-group.title=Username and password
+key-deserialization.title=Key Deserializer
+key-deserialization.description=
+
+value-deserialization.title=Value Deserializer
+value-deserialization.description=
+
hide-internal-topics.title=Hide internal topics
hide-internal-topics.description=Do not show topics that are only used internally
diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
index 3b55309..a104c4a 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
@@ -28,7 +28,11 @@ public class KafkaConfig {
KafkaSecurityConfig securityConfig;
- public KafkaConfig(String kafkaHost, Integer kafkaPort, String topic, KafkaSecurityConfig securityConfig) {
+
+ public KafkaConfig(String kafkaHost,
+ Integer kafkaPort,
+ String topic,
+ KafkaSecurityConfig securityConfig) {
this.kafkaHost = kafkaHost;
this.kafkaPort = kafkaPort;
this.topic = topic;
@@ -66,4 +70,5 @@ public class KafkaConfig {
public void setSecurityConfig(KafkaSecurityConfig securityConfig) {
this.securityConfig = securityConfig;
}
+
}
diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
index 51b41c9..a8f1d2f 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
@@ -25,7 +25,6 @@ import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
import org.apache.streampipes.sdk.helpers.Alternatives;
import org.apache.streampipes.sdk.helpers.Label;
import org.apache.streampipes.sdk.helpers.Labels;
-import org.checkerframework.checker.units.qual.K;
public class KafkaConnectUtils {
@@ -33,13 +32,6 @@ public class KafkaConnectUtils {
public static final String HOST_KEY = "host";
public static final String PORT_KEY = "port";
-// private static final String ACCESS_MODE = "access-mode";
-// private static final String ANONYMOUS_ACCESS = "anonymous-alternative";
-// private static final String USERNAME_ACCESS = "username-alternative";
-// private static final String USERNAME_GROUP = "username-group";
-// private static final String USERNAME_KEY = "username";
-// private static final String PASSWORD_KEY = "password";
-
public static final String ACCESS_MODE = "access-mode";
public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain";
public static final String UNAUTHENTICATED_SSL = "unauthenticated-ssl";
@@ -77,17 +69,6 @@ public class KafkaConnectUtils {
return Labels.withId(ACCESS_MODE);
}
-// public static StaticPropertyAlternative getAlternativesOne() {
-// return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS));
-// }
-//
-// public static StaticPropertyAlternative getAlternativesTwo() {
-// return Alternatives.from(Labels.withId(USERNAME_ACCESS),
-// StaticProperties.group(Labels.withId(USERNAME_GROUP),
-// StaticProperties.stringFreeTextProperty(Labels.withId(USERNAME_KEY)),
-// StaticProperties.secretValue(Labels.withId(PASSWORD_KEY))));
-// }
-
public static KafkaConfig getConfig(StaticPropertyExtractor extractor, boolean containsTopic) {
String brokerUrl = extractor.singleValueParameter(HOST_KEY, String.class);
String topic = "";
@@ -102,8 +83,6 @@ public class KafkaConnectUtils {
KafkaSecurityConfig securityConfig;
- //KafkaSerializerConfig serializerConfig = new KafkaSerializerByteArrayConfig();
-
// check if a user for the authentication is defined
if (authentication.equals(KafkaConnectUtils.SASL_SSL) || authentication.equals(KafkaConnectUtils.SASL_PLAIN)) {
String username = extractor.singleValueParameter(USERNAME_KEY, String.class);
diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConfig.java
similarity index 88%
copy from streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
copy to streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConfig.java
index 3b55309..b2df331 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConfig.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.pe.shared.config.kafka;
+package org.apache.streampipes.pe.shared.config.kafka.kafka;
import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
@@ -28,7 +28,10 @@ public class KafkaConfig {
KafkaSecurityConfig securityConfig;
- public KafkaConfig(String kafkaHost, Integer kafkaPort, String topic, KafkaSecurityConfig securityConfig) {
+ public KafkaConfig(String kafkaHost,
+ Integer kafkaPort,
+ String topic,
+ KafkaSecurityConfig securityConfig) {
this.kafkaHost = kafkaHost;
this.kafkaPort = kafkaPort;
this.topic = topic;
@@ -66,4 +69,5 @@ public class KafkaConfig {
public void setSecurityConfig(KafkaSecurityConfig securityConfig) {
this.securityConfig = securityConfig;
}
+
}
diff --git a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java
similarity index 84%
copy from streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
copy to streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java
index 51b41c9..147b2dc 100644
--- a/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
+++ b/streampipes-extensions/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/kafka/KafkaConnectUtils.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.pe.shared.config.kafka;
+package org.apache.streampipes.pe.shared.config.kafka.kafka;
import org.apache.streampipes.messaging.kafka.security.*;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
@@ -25,7 +25,6 @@ import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
import org.apache.streampipes.sdk.helpers.Alternatives;
import org.apache.streampipes.sdk.helpers.Label;
import org.apache.streampipes.sdk.helpers.Labels;
-import org.checkerframework.checker.units.qual.K;
public class KafkaConnectUtils {
@@ -33,12 +32,11 @@ public class KafkaConnectUtils {
public static final String HOST_KEY = "host";
public static final String PORT_KEY = "port";
-// private static final String ACCESS_MODE = "access-mode";
-// private static final String ANONYMOUS_ACCESS = "anonymous-alternative";
-// private static final String USERNAME_ACCESS = "username-alternative";
-// private static final String USERNAME_GROUP = "username-group";
-// private static final String USERNAME_KEY = "username";
-// private static final String PASSWORD_KEY = "password";
+ public static final String KEY_SERIALIZATION = "key-serialization";
+ public static final String VALUE_SERIALIZATION = "value-serialization";
+
+ public static final String KEY_DESERIALIZATION = "key-deserialization";
+ public static final String VALUE_DESERIALIZATION = "value-deserialization";
public static final String ACCESS_MODE = "access-mode";
public static final String UNAUTHENTICATED_PLAIN = "unauthenticated-plain";
@@ -77,17 +75,6 @@ public class KafkaConnectUtils {
return Labels.withId(ACCESS_MODE);
}
-// public static StaticPropertyAlternative getAlternativesOne() {
-// return Alternatives.from(Labels.withId(ANONYMOUS_ACCESS));
-// }
-//
-// public static StaticPropertyAlternative getAlternativesTwo() {
-// return Alternatives.from(Labels.withId(USERNAME_ACCESS),
-// StaticProperties.group(Labels.withId(USERNAME_GROUP),
-// StaticProperties.stringFreeTextProperty(Labels.withId(USERNAME_KEY)),
-// StaticProperties.secretValue(Labels.withId(PASSWORD_KEY))));
-// }
-
public static KafkaConfig getConfig(StaticPropertyExtractor extractor, boolean containsTopic) {
String brokerUrl = extractor.singleValueParameter(HOST_KEY, String.class);
String topic = "";
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java
index bd72731..52a0ed0 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java
@@ -22,8 +22,6 @@ import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
import org.apache.streampipes.messaging.kafka.security.*;
-import org.apache.streampipes.messaging.kafka.serializer.KafkaSerializerByteArrayConfig;
-import org.apache.streampipes.messaging.kafka.serializer.KafkaSerializerConfig;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;
@@ -40,12 +38,6 @@ public class KafkaPublisher implements EventSink<KafkaParameters> {
this.dataFormatDefinition = new JsonDataFormatDefinition();
}
- // Serialization
- // Key
- // - StringSerializer
- // Value
- // - ByteArraySerializer
-
@Override
public void onInvocation(KafkaParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
boolean useAuthentication = parameters.getAuthentication().equals(KafkaController.getSaslAccessKey());
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
index ed4f12a..449bf1c 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
@@ -71,28 +71,12 @@ public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, R
this.appenders = appenders;
}
- // TODO backwards compatibility, remove later
- public SpKafkaConsumer(String kafkaUrl,String topic,InternalEventProcessor<byte[]> callback,
- KafkaConfigAppender... appenders) {
- KafkaTransportProtocol protocol = new KafkaTransportProtocol();
- protocol.setKafkaPort(Integer.parseInt(kafkaUrl.split(":")[1]));
- protocol.setBrokerHostname(kafkaUrl.split(":")[0]);
- protocol.setTopicDefinition(new SimpleTopicDefinition(topic));
- this.appenders = Arrays.asList(appenders);
-
- try {
- this.connect(protocol, callback);
- } catch (SpRuntimeException e) {
- e.printStackTrace();
- }
- }
-
@Override
public void run() {
Properties props = makeProperties(protocol, appenders);
- KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
+ KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
if (!patternTopic) {
consumer.subscribe(Collections.singletonList(topic));
} else {
@@ -111,7 +95,7 @@ public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, R
}
Duration duration = Duration.of(100, ChronoUnit.MILLIS);
while (isRunning) {
- ConsumerRecords<String, byte[]> records = consumer.poll(duration);
+ ConsumerRecords<byte[], byte[]> records = consumer.poll(duration);
records.forEach(record -> {
eventProcessor.onEvent(record.value());
});
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index 1de82dc..7c46bb9 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -65,17 +65,6 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
this.connected = true;
}
- // TODO backwards compatibility, remove later
-// public SpKafkaProducer(String url, String topic, String username, String password, boolean ssl) {
-// String[] urlParts = url.split(COLON);
-// KafkaTransportProtocol protocol = new KafkaTransportProtocol(urlParts[0],
-// Integer.parseInt(urlParts[1]), topic);
-// this.brokerUrl = url;
-// this.topic = topic;
-// this.producer = new KafkaProducer<>(makePropertiesSaslPlain(protocol, username, password));
-// this.connected = true;
-// }
-
public void publish(String message) {
publish(message.getBytes());
}
@@ -91,10 +80,6 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
return new ProducerConfigFactory(protocol).buildProperties(appenders);
}
-// private Properties makePropertiesSaslPlain(KafkaTransportProtocol protocol, String username, String password) {
-// return new ProducerConfigFactory(protocol).makePropertiesSaslPlain(username, password);
-// }
-
@Override
public void connect(KafkaTransportProtocol protocol) {
LOG.info("Kafka producer: Connecting to " + protocol.getTopicDefinition().getActualTopicName());
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java
index bac2870..8225986 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/AbstractConfigFactory.java
@@ -26,7 +26,6 @@ import java.util.function.Supplier;
public abstract class AbstractConfigFactory {
private static final String COLON = ":";
- private static final String SASL_MECHANISM = "PLAIN";
protected KafkaTransportProtocol protocol;
@@ -50,19 +49,5 @@ public abstract class AbstractConfigFactory {
appenders.forEach(appender -> appender.appendConfig(props));
return props;
- // TODO check Kafka Security
}
-
-// public Properties makePropertiesSaslPlain(String username,
-// String password) {
-// Properties props = makeProperties();
-// props.put(SaslConfigs.SASL_MECHANISM,SASL_MECHANISM);
-// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.toString());
-//
-//// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.toString());
-//
-// String SASL_JAAS_CONFIG = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";";
-// props.put(SaslConfigs.SASL_JAAS_CONFIG, SASL_JAAS_CONFIG);
-// return props;
-// }
}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
index 21550d7..30dfbdd 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
@@ -18,6 +18,8 @@
package org.apache.streampipes.messaging.kafka.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import java.util.Properties;
@@ -29,10 +31,8 @@ public class ConsumerConfigFactory extends AbstractConfigFactory {
private static final String AUTO_COMMIT_INTERVAL_MS_CONFIG_DEFAULT = "5000";
private static final String SESSION_TIMEOUT_MS_CONFIG_DEFAULT = "30000";
private static final Integer FETCH_MAX_BYTES_CONFIG_DEFAULT = 52428800;
- private static final String KEY_DESERIALIZER_CLASS_CONFIG_DEFAULT = "org.apache.kafka.common" +
- ".serialization.StringDeserializer";
- private static final String VALUE_DESERIALIZER_CLASS_CONFIG_DEFAULT = "org.apache.kafka.common" +
- ".serialization.ByteArrayDeserializer";
+ private static final String KEY_DESERIALIZER_CLASS_CONFIG_DEFAULT = ByteArrayDeserializer.class.getName();
+ private static final String VALUE_DESERIALIZER_CLASS_CONFIG_DEFAULT = ByteArrayDeserializer.class.getName();
public ConsumerConfigFactory(KafkaTransportProtocol protocol) {
super(protocol);
@@ -42,8 +42,6 @@ public class ConsumerConfigFactory extends AbstractConfigFactory {
public Properties makeDefaultProperties() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
-// props.put(ConsumerConfig.GROUP_ID_CONFIG, getConfigOrDefault(protocol::getGroupId,
-// UUID.randomUUID().toString()));
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT_CONFIG_DEFAULT);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
@@ -51,8 +49,11 @@ public class ConsumerConfigFactory extends AbstractConfigFactory {
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, SESSION_TIMEOUT_MS_CONFIG_DEFAULT);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
getConfigOrDefault(protocol::getMessageMaxBytes, FETCH_MAX_BYTES_CONFIG_DEFAULT));
+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER_CLASS_CONFIG_DEFAULT);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER_CLASS_CONFIG_DEFAULT);
+
+
props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
return props;
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
index bf76d57..23426d1 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ProducerConfigFactory.java
@@ -17,9 +17,9 @@
*/
package org.apache.streampipes.messaging.kafka.config;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import java.util.Properties;
@@ -33,14 +33,15 @@ public class ProducerConfigFactory extends AbstractConfigFactory {
private static final Integer BUFFER_MEMORY_CONFIG_DEFAULT = 33554432;
private static final Integer MAX_REQUEST_SIZE_CONFIG_DEFAULT = 5000012;
- private static final String KEY_SERIALIZER_DEFAULT = "org.apache.kafka.common.serialization" +
- ".StringSerializer";
- private static final String VALUE_SERIALIZER_DEFAULT = "org.apache.kafka.common.serialization" +
- ".ByteArraySerializer";
+ private static final String KEY_SERIALIZER_DEFAULT = StringSerializer.class.getName();
+ private static final String VALUE_SERIALIZER_DEFAULT = ByteArraySerializer.class.getName();
+
public ProducerConfigFactory(KafkaTransportProtocol protocol) {
super(protocol);
+
+
}
@Override
@@ -57,8 +58,10 @@ public class ProducerConfigFactory extends AbstractConfigFactory {
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, getConfigOrDefault(protocol::getMaxRequestSize,
MAX_REQUEST_SIZE_CONFIG_DEFAULT));
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, BUFFER_MEMORY_CONFIG_DEFAULT);
+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER_DEFAULT);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_DEFAULT);
+
return props;
}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerByteArrayConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerByteArrayConfig.java
deleted file mode 100644
index 40d4bb2..0000000
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerByteArrayConfig.java
+++ /dev/null
@@ -1,4 +0,0 @@
-package org.apache.streampipes.messaging.kafka.serializer;
-
-public class KafkaSerializerByteArrayConfig extends KafkaSerializerConfig {
-}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerConfig.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerConfig.java
deleted file mode 100644
index a317c75..0000000
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/serializer/KafkaSerializerConfig.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.streampipes.messaging.kafka.serializer;
-
-import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
-
-import java.util.Properties;
-
-public class KafkaSerializerConfig implements KafkaConfigAppender {
-
- @Override
- public void appendConfig(Properties props) {
-
- }
-}
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractConfigurablePipelineElementBuilder.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractConfigurablePipelineElementBuilder.java
index af6df4e..b3b4cfb 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractConfigurablePipelineElementBuilder.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/builder/AbstractConfigurablePipelineElementBuilder.java
@@ -696,9 +696,29 @@ public abstract class AbstractConfigurablePipelineElementBuilder<BU extends
this.staticProperties.add(osp);
return me();
+ }
+
+ /**
+ * Defines a configuration parameter that lets preprocessing developers select from a list of pre-defined configuration
+ * options. The parameter will be rendered as a RadioGroup in the StreamPipes UI.
+ * @param label The {@link org.apache.streampipes.sdk.helpers.Label} that describes why this parameter is needed in a
+ * user-friendly manner.
+ * @param options A list of {@link org.apache.streampipes.model.staticproperty.Option} elements. Use
+ * @param horizontalRendering when set to true
+ * {@link org.apache.streampipes.sdk.helpers.Options} to create option elements from string values.
+ * @return this
+ */
+ public BU requiredSingleValueSelection(Label label,
+ List<Option> options,
+ boolean horizontalRendering) {
+ OneOfStaticProperty osp = new OneOfStaticProperty(label.getInternalId(), label.getLabel(), label.getDescription(), horizontalRendering);
+ osp.setOptions(options);
+ this.staticProperties.add(osp);
+ return me();
}
+
/**
* @deprecated Use {@link #requiredMultiValueSelection(Label, Option...)} instead.
* @param internalId
diff --git a/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java b/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java
index 70f61f8..4afc4fa 100644
--- a/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java
+++ b/streampipes-wrapper-distributed/src/main/java/org/apache/streampipes/wrapper/distributed/runtime/DistributedRuntime.java
@@ -64,11 +64,11 @@ public abstract class DistributedRuntime<RP extends RuntimeParams<B, I, RC>, B e
}
protected Properties getProperties(KafkaTransportProtocol protocol) {
- return new ConsumerConfigFactory(protocol).makeProperties();
+ return new ConsumerConfigFactory(protocol).makeDefaultProperties();
}
protected Properties getProducerProperties(KafkaTransportProtocol protocol) {
- return new ProducerConfigFactory(protocol).makeProperties();
+ return new ProducerConfigFactory(protocol).makeDefaultProperties();
}
protected SpDataFormatDefinition getDataFormatDefinition(TransportFormat transportFormat) {