You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2023/04/07 07:52:14 UTC

[streampipes] branch dev updated: Refactor method name in Kafka messaging layer (#1490)

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3e5e57a62 Refactor method name in Kafka messaging layer (#1490)
3e5e57a62 is described below

commit 3e5e57a625dc0ed433f5bd038aa60b46c0c138e9
Author: Liu Xiao <42...@users.noreply.github.com>
AuthorDate: Fri Apr 7 15:52:08 2023 +0800

    Refactor method name in Kafka messaging layer (#1490)
    
    * update method name
    
    * update method name
    
    * fix checkstyle
---
 .../sinks/brokers/jvm/kafka/KafkaPublishSink.java            | 12 ------------
 .../apache/streampipes/messaging/kafka/SpKafkaProducer.java  |  4 ++--
 2 files changed, 2 insertions(+), 14 deletions(-)

diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublishSink.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublishSink.java
index e085e3712..72ad8f295 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublishSink.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublishSink.java
@@ -40,8 +40,6 @@ import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.standalone.SinkParams;
 import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import java.util.List;
 import java.util.Map;
 
@@ -56,16 +54,6 @@ public class KafkaPublishSink extends StreamPipesDataSink {
   public KafkaPublishSink() {
   }
 
-  @VisibleForTesting
-  public KafkaPublishSink(SpKafkaProducer producer) {
-    this.producer = producer;
-  }
-
-  @VisibleForTesting
-  public SpKafkaProducer getProducer() {
-    return producer;
-  }
-
   @Override
   public DataSinkDescription declareModel() {
     return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.kafka")
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index 407f2d028..9f0d262ee 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -97,7 +97,7 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
     String zookeeperHost = protocol.getZookeeperHost() + ":" + protocol.getZookeeperPort();
 
     try {
-      createKafaTopic(protocol);
+      createKafkaTopic(protocol);
     } catch (ExecutionException | InterruptedException e) {
       LOG.error("Could not create topic: " + topic + " on broker " + zookeeperHost);
     }
@@ -113,7 +113,7 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
    *
    * @param settings The settings to connect to a Kafka broker
    */
-  private void createKafaTopic(KafkaTransportProtocol settings) throws ExecutionException, InterruptedException {
+  private void createKafkaTopic(KafkaTransportProtocol settings) throws ExecutionException, InterruptedException {
 
     Properties props = new Properties();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);