You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2023/04/07 07:52:14 UTC
[streampipes] branch dev updated: Refactor method name in Kafka messaging layer (#1490)
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 3e5e57a62 Refactor method name in Kafka messaging layer (#1490)
3e5e57a62 is described below
commit 3e5e57a625dc0ed433f5bd038aa60b46c0c138e9
Author: Liu Xiao <42...@users.noreply.github.com>
AuthorDate: Fri Apr 7 15:52:08 2023 +0800
Refactor method name in Kafka messaging layer (#1490)
* update method name
* update method name
* fix checkstyle
---
.../sinks/brokers/jvm/kafka/KafkaPublishSink.java | 12 ------------
.../apache/streampipes/messaging/kafka/SpKafkaProducer.java | 4 ++--
2 files changed, 2 insertions(+), 14 deletions(-)
diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublishSink.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublishSink.java
index e085e3712..72ad8f295 100644
--- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublishSink.java
+++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/kafka/KafkaPublishSink.java
@@ -40,8 +40,6 @@ import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.standalone.SinkParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
-import com.google.common.annotations.VisibleForTesting;
-
import java.util.List;
import java.util.Map;
@@ -56,16 +54,6 @@ public class KafkaPublishSink extends StreamPipesDataSink {
public KafkaPublishSink() {
}
- @VisibleForTesting
- public KafkaPublishSink(SpKafkaProducer producer) {
- this.producer = producer;
- }
-
- @VisibleForTesting
- public SpKafkaProducer getProducer() {
- return producer;
- }
-
@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.kafka")
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index 407f2d028..9f0d262ee 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -97,7 +97,7 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
String zookeeperHost = protocol.getZookeeperHost() + ":" + protocol.getZookeeperPort();
try {
- createKafaTopic(protocol);
+ createKafkaTopic(protocol);
} catch (ExecutionException | InterruptedException e) {
LOG.error("Could not create topic: " + topic + " on broker " + zookeeperHost);
}
@@ -113,7 +113,7 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
*
* @param settings The settings to connect to a Kafka broker
*/
- private void createKafaTopic(KafkaTransportProtocol settings) throws ExecutionException, InterruptedException {
+ private void createKafkaTopic(KafkaTransportProtocol settings) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);