You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/06 19:41:42 UTC

[incubator-streampipes] branch dev updated: [hotfix] Avoid NullPointerException in Kafka producer

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7270b0b  [hotfix] Avoid NullPointerException in Kafka producer
7270b0b is described below

commit 7270b0bf3631d7a9255cb2435083ba7603ce9d73
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 6 20:41:28 2022 +0100

    [hotfix] Avoid NullPointerException in Kafka producer
---
 .../messaging/kafka/SpKafkaProducer.java           | 56 ++++++++++++++--------
 1 file changed, 36 insertions(+), 20 deletions(-)

diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index bfca8a0..86a6415 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -18,10 +18,7 @@
 
 package org.apache.streampipes.messaging.kafka;
 
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.*;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -82,7 +79,9 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
   }
 
   public void publish(byte[] message) {
-    producer.send(new ProducerRecord<>(topic, message));
+    if (connected) {
+      producer.send(new ProducerRecord<>(topic, message));
+    }
   }
 
   private Properties makeProperties(KafkaTransportProtocol protocol) {
@@ -108,42 +107,59 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
   /**
    * Create a new topic and define number partitions, replicas, and retention time
    *
-   * @param settings
+   * @param settings The settings to connect to a Kafka broker
    */
   private void createKafaTopic(KafkaTransportProtocol settings) {
-    String zookeeperHost = settings.getZookeeperHost() + ":" + settings.getZookeeperPort();
-
 
     Properties props = new Properties();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
 
-    Map<String, String> topicConfig = new HashMap<>();
-    String retentionTime = Envs.SP_KAFKA_RETENTION_MS.exists() ? Envs.SP_KAFKA_RETENTION_MS.getValue() : SP_KAFKA_RETENTION_MS_DEFAULT;
-    topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, retentionTime);
-
     AdminClient adminClient = KafkaAdminClient.create(props);
 
-    final NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
-    newTopic.configs(topicConfig);
+    ListTopicsResult topics = adminClient.listTopics();
 
-    final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
+    if (!topicExists(topics)) {
+      String zookeeperHost = settings.getZookeeperHost() + ":" + settings.getZookeeperPort();
 
-    try {
-      createTopicsResult.values().get(topic).get();
-    } catch (InterruptedException | ExecutionException e) {
-      LOG.error("Could not create topic: " + topic + " on broker " + zookeeperHost);
+      Map<String, String> topicConfig = new HashMap<>();
+      String retentionTime = Envs.SP_KAFKA_RETENTION_MS.exists() ? Envs.SP_KAFKA_RETENTION_MS.getValue() : SP_KAFKA_RETENTION_MS_DEFAULT;
+      topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, retentionTime);
+
+
+      final NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
+      newTopic.configs(topicConfig);
+
+      final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
+
+      try {
+        createTopicsResult.values().get(topic).get();
+        LOG.info("Successfully created Kafka topic " + topic);
+      } catch (InterruptedException | ExecutionException e) {
+        LOG.error("Could not create topic: " + topic + " on broker " + zookeeperHost);
+      }
+    } else {
+      LOG.info("Topic " + topic + "already exists in the broker, skipping topic creation");
     }
   }
 
   @Override
   public void disconnect() {
     LOG.info("Kafka producer: Disconnecting from " + topic);
-    this.producer.close();
     this.connected = false;
+    this.producer.close();
   }
 
   @Override
   public Boolean isConnected() {
     return connected != null && connected;
   }
+
+  private boolean topicExists(ListTopicsResult topicsInKafka) {
+    try {
+      return topicsInKafka.names().get().stream().anyMatch(t -> t.equals(topic));
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.error("Could not fetch existing topics", e);
+      return false;
+    }
+  }
 }