You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/04/02 18:56:54 UTC
[streampipes] branch dev updated: Migrate KafkaPublisher (#1473)
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new eb4b9cbc2 Migrate KafkaPublisher (#1473)
eb4b9cbc2 is described below
commit eb4b9cbc2fa2acea0a5e65a4636c370b04c3cefc
Author: Liu Xiao <42...@users.noreply.github.com>
AuthorDate: Mon Apr 3 02:56:48 2023 +0800
Migrate KafkaPublisher (#1473)
---
.../sinks/brokers/jvm/BrokersJvmInit.java | 4 +-
.../sinks/brokers/jvm/kafka/KafkaController.java | 91 ---------------
.../sinks/brokers/jvm/kafka/KafkaParameters.java | 56 ++++++---
.../sinks/brokers/jvm/kafka/KafkaPublishSink.java | 130 +++++++++++++++++++++
.../sinks/brokers/jvm/kafka/KafkaPublisher.java | 86 --------------
5 files changed, 169 insertions(+), 198 deletions(-)
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
index e242e18e4..1e07fc515 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
@@ -30,7 +30,7 @@ import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
import org.apache.streampipes.sinks.brokers.jvm.bufferrest.BufferRestController;
import org.apache.streampipes.sinks.brokers.jvm.jms.JmsController;
-import org.apache.streampipes.sinks.brokers.jvm.kafka.KafkaController;
+import org.apache.streampipes.sinks.brokers.jvm.kafka.KafkaPublishSink;
import org.apache.streampipes.sinks.brokers.jvm.mqtt.MqttPublisherSink;
import org.apache.streampipes.sinks.brokers.jvm.nats.NatsController;
import org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink;
@@ -53,7 +53,7 @@ public class BrokersJvmInit extends ExtensionsModelSubmitter {
"",
8096)
.registerPipelineElements(
- new KafkaController(),
+ new KafkaPublishSink(),
new JmsController(),
new RestController(),
new BufferRestController(),
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java
deleted file mode 100644
index d485ddb44..000000000
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.kafka;
-
-import org.apache.streampipes.model.DataSinkType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.pe.shared.config.kafka.KafkaConnectUtils;
-import org.apache.streampipes.sdk.builder.DataSinkBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
-
-public class KafkaController extends StandaloneEventSinkDeclarer<KafkaParameters> {
-
- @Override
- public DataSinkDescription declareModel() {
- return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.kafka")
- .category(DataSinkType.MESSAGING)
- .withLocales(Locales.EN)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
-
- .requiredTextParameter(Labels.withId(KafkaConnectUtils.TOPIC_KEY), false, false)
- .requiredTextParameter(Labels.withId(KafkaConnectUtils.HOST_KEY), false, false)
- .requiredIntegerParameter(Labels.withId(KafkaConnectUtils.PORT_KEY), 9092)
-
- .requiredAlternatives(Labels.withId(KafkaConnectUtils.ACCESS_MODE),
- KafkaConnectUtils.getAlternativeUnauthenticatedPlain(),
- KafkaConnectUtils.getAlternativeUnauthenticatedSSL(),
- KafkaConnectUtils.getAlternativesSaslPlain(),
- KafkaConnectUtils.getAlternativesSaslSSL())
- .build();
- }
-
- @Override
- public ConfiguredEventSink<KafkaParameters> onInvocation(DataSinkInvocation graph,
- DataSinkParameterExtractor extractor) {
- String topic = extractor.singleValueParameter(KafkaConnectUtils.TOPIC_KEY, String.class);
-
- String kafkaHost = extractor.singleValueParameter(KafkaConnectUtils.HOST_KEY, String.class);
- Integer kafkaPort = extractor.singleValueParameter(KafkaConnectUtils.PORT_KEY, Integer.class);
- String authentication = extractor.selectedAlternativeInternalId(KafkaConnectUtils.ACCESS_MODE);
-
- KafkaParameters params;
- if (authentication.equals(KafkaConnectUtils.UNAUTHENTICATED_PLAIN)) {
- params = new KafkaParameters(graph, kafkaHost, kafkaPort, topic, authentication, null, null, false);
- } else if (authentication.equals(KafkaConnectUtils.UNAUTHENTICATED_SSL)) {
- params = new KafkaParameters(graph, kafkaHost, kafkaPort, topic, authentication, null, null, true);
- } else {
- String username = extractor.singleValueParameter(KafkaConnectUtils.USERNAME_KEY, String.class);
- String password = extractor.secretValue(KafkaConnectUtils.PASSWORD_KEY);
- if (authentication.equals(KafkaConnectUtils.SASL_PLAIN)) {
- params = new KafkaParameters(graph, kafkaHost, kafkaPort, topic, authentication, username, password, false);
- } else {
- params = new KafkaParameters(graph, kafkaHost, kafkaPort, topic, authentication, username, password, true);
- }
- }
-
- return new ConfiguredEventSink<>(params, KafkaPublisher::new);
- }
-
-
- public static String getSaslAccessKey() {
- return KafkaConnectUtils.SASL_PLAIN;
- }
-}
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaParameters.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaParameters.java
index f85992bd9..6cb0459e3 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaParameters.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaParameters.java
@@ -18,29 +18,42 @@
package org.apache.streampipes.sinks.brokers.jvm.kafka;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
+import org.apache.streampipes.pe.shared.config.kafka.KafkaConnectUtils;
+import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
-public class KafkaParameters extends EventSinkBindingParams {
+public class KafkaParameters {
+
+ private final String kafkaHost;
+
+ private final Integer kafkaPort;
+
+ private final String topic;
+
+ private final String authentication;
- private String kafkaHost;
- private Integer kafkaPort;
- private String topic;
- private String authentication;
private String username;
+
private String password;
- private boolean useSSL;
-
- public KafkaParameters(DataSinkInvocation graph, String kafkaHost, Integer kafkaPort, String topic,
- String authentication, String username, String password, boolean useSSL) {
- super(graph);
- this.kafkaHost = kafkaHost;
- this.kafkaPort = kafkaPort;
- this.topic = topic;
- this.authentication = authentication;
- this.username = username;
- this.password = password;
- this.useSSL = useSSL;
+
+ private final boolean useSSL;
+
+ public KafkaParameters(SinkParams params) {
+ DataSinkParameterExtractor extractor = params.extractor();
+ this.topic = extractor.singleValueParameter(KafkaConnectUtils.TOPIC_KEY, String.class);
+ this.kafkaHost = extractor.singleValueParameter(KafkaConnectUtils.HOST_KEY, String.class);
+ this.kafkaPort = extractor.singleValueParameter(KafkaConnectUtils.PORT_KEY, Integer.class);
+ this.authentication = extractor.selectedAlternativeInternalId(KafkaConnectUtils.ACCESS_MODE);
+
+ if (!useAuthentication()) {
+ this.useSSL = KafkaConnectUtils.UNAUTHENTICATED_SSL.equals(this.authentication);
+ } else {
+ String username = extractor.singleValueParameter(KafkaConnectUtils.USERNAME_KEY, String.class);
+ String password = extractor.secretValue(KafkaConnectUtils.PASSWORD_KEY);
+ this.username = username;
+ this.password = password;
+ this.useSSL = KafkaConnectUtils.SASL_SSL.equals(this.authentication);
+ }
}
public String getKafkaHost() {
@@ -70,4 +83,9 @@ public class KafkaParameters extends EventSinkBindingParams {
public boolean isUseSSL() {
return useSSL;
}
+
+ public boolean useAuthentication() {
+ return KafkaConnectUtils.SASL_PLAIN.equals(this.authentication)
+ || KafkaConnectUtils.SASL_SSL.equals(this.authentication);
+ }
}
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublishSink.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublishSink.java
new file mode 100644
index 000000000..e085e3712
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublishSink.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.brokers.jvm.kafka;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
+import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
+import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
+import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslPlainConfig;
+import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslSSLConfig;
+import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedPlainConfig;
+import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedSSLConfig;
+import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.pe.shared.config.kafka.KafkaConnectUtils;
+import org.apache.streampipes.sdk.builder.DataSinkBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.List;
+import java.util.Map;
+
+public class KafkaPublishSink extends StreamPipesDataSink {
+
+ private SpKafkaProducer producer;
+
+ private JsonDataFormatDefinition dataFormatDefinition;
+
+ private KafkaParameters params;
+
+ public KafkaPublishSink() {
+ }
+
+ @VisibleForTesting
+ public KafkaPublishSink(SpKafkaProducer producer) {
+ this.producer = producer;
+ }
+
+ @VisibleForTesting
+ public SpKafkaProducer getProducer() {
+ return producer;
+ }
+
+ @Override
+ public DataSinkDescription declareModel() {
+ return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.kafka")
+ .category(DataSinkType.MESSAGING)
+ .withLocales(Locales.EN)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
+
+ .requiredTextParameter(Labels.withId(KafkaConnectUtils.TOPIC_KEY), false, false)
+ .requiredTextParameter(Labels.withId(KafkaConnectUtils.HOST_KEY), false, false)
+ .requiredIntegerParameter(Labels.withId(KafkaConnectUtils.PORT_KEY), 9092)
+
+ .requiredAlternatives(Labels.withId(KafkaConnectUtils.ACCESS_MODE),
+ KafkaConnectUtils.getAlternativeUnauthenticatedPlain(),
+ KafkaConnectUtils.getAlternativeUnauthenticatedSSL(),
+ KafkaConnectUtils.getAlternativesSaslPlain(),
+ KafkaConnectUtils.getAlternativesSaslSSL())
+ .build();
+ }
+
+ @Override
+ public void onInvocation(SinkParams parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+ this.params = new KafkaParameters(parameters);
+ this.dataFormatDefinition = new JsonDataFormatDefinition();
+
+ KafkaSecurityConfig securityConfig;
+ // check if a user for the authentication is defined
+ if (params.useAuthentication()) {
+ securityConfig = params.isUseSSL()
+ ? new KafkaSecuritySaslSSLConfig(params.getUsername(), params.getPassword()) :
+ new KafkaSecuritySaslPlainConfig(params.getUsername(), params.getPassword());
+ } else {
+ // set security config for none authenticated access
+ securityConfig = params.isUseSSL()
+ ? new KafkaSecurityUnauthenticatedSSLConfig() :
+ new KafkaSecurityUnauthenticatedPlainConfig();
+ }
+
+ this.producer = new SpKafkaProducer(
+ params.getKafkaHost() + ":" + params.getKafkaPort(),
+ params.getTopic(),
+ List.of(securityConfig));
+ }
+
+ @Override
+ public void onEvent(Event event) throws SpRuntimeException {
+ try {
+ Map<String, Object> rawEvent = event.getRaw();
+ this.producer.publish(dataFormatDefinition.fromMap(rawEvent));
+ } catch (SpRuntimeException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+ this.producer.disconnect();
+ }
+}
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java
deleted file mode 100644
index a1f90dad8..000000000
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.kafka;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
-import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslPlainConfig;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslSSLConfig;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedPlainConfig;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedSSLConfig;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-import java.util.Arrays;
-import java.util.Map;
-
-public class KafkaPublisher implements EventSink<KafkaParameters> {
-
- private SpKafkaProducer producer;
- private JsonDataFormatDefinition dataFormatDefinition;
-
- public KafkaPublisher() {
- this.dataFormatDefinition = new JsonDataFormatDefinition();
- }
-
- @Override
- public void onInvocation(KafkaParameters parameters, EventSinkRuntimeContext runtimeContext)
- throws SpRuntimeException {
- boolean useAuthentication = parameters.getAuthentication().equals(KafkaController.getSaslAccessKey());
-
- KafkaSecurityConfig securityConfig;
- //KafkaSerializerConfig serializerConfig = new KafkaSerializerByteArrayConfig();
-
- // check if a user for the authentication is defined
- if (useAuthentication) {
- securityConfig = parameters.isUseSSL()
- ? new KafkaSecuritySaslSSLConfig(parameters.getUsername(), parameters.getPassword()) :
- new KafkaSecuritySaslPlainConfig(parameters.getUsername(), parameters.getPassword());
- } else {
- // set security config for none authenticated access
- securityConfig = parameters.isUseSSL()
- ? new KafkaSecurityUnauthenticatedSSLConfig() :
- new KafkaSecurityUnauthenticatedPlainConfig();
- }
-
- this.producer = new SpKafkaProducer(
- parameters.getKafkaHost() + ":" + parameters.getKafkaPort(),
- parameters.getTopic(),
- Arrays.asList(securityConfig));
-
- }
-
- @Override
- public void onEvent(Event inputEvent) {
- try {
- Map<String, Object> event = inputEvent.getRaw();
- producer.publish(dataFormatDefinition.fromMap(event));
- } catch (SpRuntimeException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void onDetach() throws SpRuntimeException {
- this.producer.disconnect();
- }
-}