You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/04/02 18:56:54 UTC

[streampipes] branch dev updated: Migrate KafkaPublisher (#1473)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new eb4b9cbc2 Migrate KafkaPublisher (#1473)
eb4b9cbc2 is described below

commit eb4b9cbc2fa2acea0a5e65a4636c370b04c3cefc
Author: Liu Xiao <42...@users.noreply.github.com>
AuthorDate: Mon Apr 3 02:56:48 2023 +0800

    Migrate KafkaPublisher (#1473)
---
 .../sinks/brokers/jvm/BrokersJvmInit.java          |   4 +-
 .../sinks/brokers/jvm/kafka/KafkaController.java   |  91 ---------------
 .../sinks/brokers/jvm/kafka/KafkaParameters.java   |  56 ++++++---
 .../sinks/brokers/jvm/kafka/KafkaPublishSink.java  | 130 +++++++++++++++++++++
 .../sinks/brokers/jvm/kafka/KafkaPublisher.java    |  86 --------------
 5 files changed, 169 insertions(+), 198 deletions(-)

diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
index e242e18e4..1e07fc515 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokersJvmInit.java
@@ -30,7 +30,7 @@ import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
 import org.apache.streampipes.sinks.brokers.jvm.bufferrest.BufferRestController;
 import org.apache.streampipes.sinks.brokers.jvm.jms.JmsController;
-import org.apache.streampipes.sinks.brokers.jvm.kafka.KafkaController;
+import org.apache.streampipes.sinks.brokers.jvm.kafka.KafkaPublishSink;
 import org.apache.streampipes.sinks.brokers.jvm.mqtt.MqttPublisherSink;
 import org.apache.streampipes.sinks.brokers.jvm.nats.NatsController;
 import org.apache.streampipes.sinks.brokers.jvm.pulsar.PulsarPublisherSink;
@@ -53,7 +53,7 @@ public class BrokersJvmInit extends ExtensionsModelSubmitter {
             "",
             8096)
         .registerPipelineElements(
-            new KafkaController(),
+            new KafkaPublishSink(),
             new JmsController(),
             new RestController(),
             new BufferRestController(),
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java
deleted file mode 100644
index d485ddb44..000000000
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaController.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.kafka;
-
-import org.apache.streampipes.model.DataSinkType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.pe.shared.config.kafka.KafkaConnectUtils;
-import org.apache.streampipes.sdk.builder.DataSinkBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
-
-public class KafkaController extends StandaloneEventSinkDeclarer<KafkaParameters> {
-
-  @Override
-  public DataSinkDescription declareModel() {
-    return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.kafka")
-        .category(DataSinkType.MESSAGING)
-        .withLocales(Locales.EN)
-        .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredProperty(EpRequirements.anyProperty())
-            .build())
-
-        .requiredTextParameter(Labels.withId(KafkaConnectUtils.TOPIC_KEY), false, false)
-        .requiredTextParameter(Labels.withId(KafkaConnectUtils.HOST_KEY), false, false)
-        .requiredIntegerParameter(Labels.withId(KafkaConnectUtils.PORT_KEY), 9092)
-
-        .requiredAlternatives(Labels.withId(KafkaConnectUtils.ACCESS_MODE),
-            KafkaConnectUtils.getAlternativeUnauthenticatedPlain(),
-            KafkaConnectUtils.getAlternativeUnauthenticatedSSL(),
-            KafkaConnectUtils.getAlternativesSaslPlain(),
-            KafkaConnectUtils.getAlternativesSaslSSL())
-        .build();
-  }
-
-  @Override
-  public ConfiguredEventSink<KafkaParameters> onInvocation(DataSinkInvocation graph,
-                                                           DataSinkParameterExtractor extractor) {
-    String topic = extractor.singleValueParameter(KafkaConnectUtils.TOPIC_KEY, String.class);
-
-    String kafkaHost = extractor.singleValueParameter(KafkaConnectUtils.HOST_KEY, String.class);
-    Integer kafkaPort = extractor.singleValueParameter(KafkaConnectUtils.PORT_KEY, Integer.class);
-    String authentication = extractor.selectedAlternativeInternalId(KafkaConnectUtils.ACCESS_MODE);
-
-    KafkaParameters params;
-    if (authentication.equals(KafkaConnectUtils.UNAUTHENTICATED_PLAIN)) {
-      params = new KafkaParameters(graph, kafkaHost, kafkaPort, topic, authentication, null, null, false);
-    } else if (authentication.equals(KafkaConnectUtils.UNAUTHENTICATED_SSL)) {
-      params = new KafkaParameters(graph, kafkaHost, kafkaPort, topic, authentication, null, null, true);
-    } else {
-      String username = extractor.singleValueParameter(KafkaConnectUtils.USERNAME_KEY, String.class);
-      String password = extractor.secretValue(KafkaConnectUtils.PASSWORD_KEY);
-      if (authentication.equals(KafkaConnectUtils.SASL_PLAIN)) {
-        params = new KafkaParameters(graph, kafkaHost, kafkaPort, topic, authentication, username, password, false);
-      } else {
-        params = new KafkaParameters(graph, kafkaHost, kafkaPort, topic, authentication, username, password, true);
-      }
-    }
-
-    return new ConfiguredEventSink<>(params, KafkaPublisher::new);
-  }
-
-
-  public static String getSaslAccessKey() {
-    return KafkaConnectUtils.SASL_PLAIN;
-  }
-}
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaParameters.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaParameters.java
index f85992bd9..6cb0459e3 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaParameters.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaParameters.java
@@ -18,29 +18,42 @@
 
 package org.apache.streampipes.sinks.brokers.jvm.kafka;
 
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
+import org.apache.streampipes.pe.shared.config.kafka.KafkaConnectUtils;
+import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
 
-public class KafkaParameters extends EventSinkBindingParams {
+public class KafkaParameters {
+
+  private final String kafkaHost;
+
+  private final Integer kafkaPort;
+
+  private final String topic;
+
+  private final String authentication;
 
-  private String kafkaHost;
-  private Integer kafkaPort;
-  private String topic;
-  private String authentication;
   private String username;
+
   private String password;
-  private boolean useSSL;
-
-  public KafkaParameters(DataSinkInvocation graph, String kafkaHost, Integer kafkaPort, String topic,
-                         String authentication, String username, String password, boolean useSSL) {
-    super(graph);
-    this.kafkaHost = kafkaHost;
-    this.kafkaPort = kafkaPort;
-    this.topic = topic;
-    this.authentication = authentication;
-    this.username = username;
-    this.password = password;
-    this.useSSL = useSSL;
+
+  private final boolean useSSL;
+
+  public KafkaParameters(SinkParams params) {
+    DataSinkParameterExtractor extractor = params.extractor();
+    this.topic = extractor.singleValueParameter(KafkaConnectUtils.TOPIC_KEY, String.class);
+    this.kafkaHost = extractor.singleValueParameter(KafkaConnectUtils.HOST_KEY, String.class);
+    this.kafkaPort = extractor.singleValueParameter(KafkaConnectUtils.PORT_KEY, Integer.class);
+    this.authentication = extractor.selectedAlternativeInternalId(KafkaConnectUtils.ACCESS_MODE);
+
+    if (!useAuthentication()) {
+      this.useSSL = KafkaConnectUtils.UNAUTHENTICATED_SSL.equals(this.authentication);
+    } else {
+      String username = extractor.singleValueParameter(KafkaConnectUtils.USERNAME_KEY, String.class);
+      String password = extractor.secretValue(KafkaConnectUtils.PASSWORD_KEY);
+      this.username = username;
+      this.password = password;
+      this.useSSL = KafkaConnectUtils.SASL_SSL.equals(this.authentication);
+    }
   }
 
   public String getKafkaHost() {
@@ -70,4 +83,9 @@ public class KafkaParameters extends EventSinkBindingParams {
   public boolean isUseSSL() {
     return useSSL;
   }
+
+  public boolean useAuthentication() {
+    return KafkaConnectUtils.SASL_PLAIN.equals(this.authentication)
+        || KafkaConnectUtils.SASL_SSL.equals(this.authentication);
+  }
 }
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublishSink.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublishSink.java
new file mode 100644
index 000000000..e085e3712
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublishSink.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.brokers.jvm.kafka;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
+import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
+import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
+import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslPlainConfig;
+import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslSSLConfig;
+import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedPlainConfig;
+import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedSSLConfig;
+import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.pe.shared.config.kafka.KafkaConnectUtils;
+import org.apache.streampipes.sdk.builder.DataSinkBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.List;
+import java.util.Map;
+
+public class KafkaPublishSink extends StreamPipesDataSink {
+
+  private SpKafkaProducer producer;
+
+  private JsonDataFormatDefinition dataFormatDefinition;
+
+  private KafkaParameters params;
+
+  public KafkaPublishSink() {
+  }
+
+  @VisibleForTesting
+  public KafkaPublishSink(SpKafkaProducer producer) {
+    this.producer = producer;
+  }
+
+  @VisibleForTesting
+  public SpKafkaProducer getProducer() {
+    return producer;
+  }
+
+  @Override
+  public DataSinkDescription declareModel() {
+    return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.kafka")
+        .category(DataSinkType.MESSAGING)
+        .withLocales(Locales.EN)
+        .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+        .requiredStream(StreamRequirementsBuilder
+            .create()
+            .requiredProperty(EpRequirements.anyProperty())
+            .build())
+
+        .requiredTextParameter(Labels.withId(KafkaConnectUtils.TOPIC_KEY), false, false)
+        .requiredTextParameter(Labels.withId(KafkaConnectUtils.HOST_KEY), false, false)
+        .requiredIntegerParameter(Labels.withId(KafkaConnectUtils.PORT_KEY), 9092)
+
+        .requiredAlternatives(Labels.withId(KafkaConnectUtils.ACCESS_MODE),
+            KafkaConnectUtils.getAlternativeUnauthenticatedPlain(),
+            KafkaConnectUtils.getAlternativeUnauthenticatedSSL(),
+            KafkaConnectUtils.getAlternativesSaslPlain(),
+            KafkaConnectUtils.getAlternativesSaslSSL())
+        .build();
+  }
+
+  @Override
+  public void onInvocation(SinkParams parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+    this.params = new KafkaParameters(parameters);
+    this.dataFormatDefinition = new JsonDataFormatDefinition();
+
+    KafkaSecurityConfig securityConfig;
+    // check if a user for the authentication is defined
+    if (params.useAuthentication()) {
+      securityConfig = params.isUseSSL()
+          ? new KafkaSecuritySaslSSLConfig(params.getUsername(), params.getPassword()) :
+          new KafkaSecuritySaslPlainConfig(params.getUsername(), params.getPassword());
+    } else {
+      // set security config for none authenticated access
+      securityConfig = params.isUseSSL()
+          ? new KafkaSecurityUnauthenticatedSSLConfig() :
+          new KafkaSecurityUnauthenticatedPlainConfig();
+    }
+
+    this.producer = new SpKafkaProducer(
+        params.getKafkaHost() + ":" + params.getKafkaPort(),
+        params.getTopic(),
+        List.of(securityConfig));
+  }
+
+  @Override
+  public void onEvent(Event event) throws SpRuntimeException {
+    try {
+      Map<String, Object> rawEvent = event.getRaw();
+      this.producer.publish(dataFormatDefinition.fromMap(rawEvent));
+    } catch (SpRuntimeException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+    this.producer.disconnect();
+  }
+}
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java
deleted file mode 100644
index a1f90dad8..000000000
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublisher.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.brokers.jvm.kafka;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
-import org.apache.streampipes.messaging.kafka.SpKafkaProducer;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecurityConfig;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslPlainConfig;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecuritySaslSSLConfig;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedPlainConfig;
-import org.apache.streampipes.messaging.kafka.security.KafkaSecurityUnauthenticatedSSLConfig;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-import java.util.Arrays;
-import java.util.Map;
-
-public class KafkaPublisher implements EventSink<KafkaParameters> {
-
-  private SpKafkaProducer producer;
-  private JsonDataFormatDefinition dataFormatDefinition;
-
-  public KafkaPublisher() {
-    this.dataFormatDefinition = new JsonDataFormatDefinition();
-  }
-
-  @Override
-  public void onInvocation(KafkaParameters parameters, EventSinkRuntimeContext runtimeContext)
-      throws SpRuntimeException {
-    boolean useAuthentication = parameters.getAuthentication().equals(KafkaController.getSaslAccessKey());
-
-    KafkaSecurityConfig securityConfig;
-    //KafkaSerializerConfig serializerConfig = new KafkaSerializerByteArrayConfig();
-
-    // check if a user for the authentication is defined
-    if (useAuthentication) {
-      securityConfig = parameters.isUseSSL()
-          ? new KafkaSecuritySaslSSLConfig(parameters.getUsername(), parameters.getPassword()) :
-          new KafkaSecuritySaslPlainConfig(parameters.getUsername(), parameters.getPassword());
-    } else {
-      // set security config for none authenticated access
-      securityConfig = parameters.isUseSSL()
-          ? new KafkaSecurityUnauthenticatedSSLConfig() :
-          new KafkaSecurityUnauthenticatedPlainConfig();
-    }
-
-    this.producer = new SpKafkaProducer(
-        parameters.getKafkaHost() + ":" + parameters.getKafkaPort(),
-        parameters.getTopic(),
-        Arrays.asList(securityConfig));
-
-  }
-
-  @Override
-  public void onEvent(Event inputEvent) {
-    try {
-      Map<String, Object> event = inputEvent.getRaw();
-      producer.publish(dataFormatDefinition.fromMap(event));
-    } catch (SpRuntimeException e) {
-      e.printStackTrace();
-    }
-  }
-
-  @Override
-  public void onDetach() throws SpRuntimeException {
-    this.producer.disconnect();
-  }
-}