You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/06 19:41:42 UTC
[incubator-streampipes] branch dev updated: [hotfix] Avoid NullPointerException in Kafka producer
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 7270b0b [hotfix] Avoid NullPointerException in Kafka producer
7270b0b is described below
commit 7270b0bf3631d7a9255cb2435083ba7603ce9d73
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 6 20:41:28 2022 +0100
[hotfix] Avoid NullPointerException in Kafka producer
---
.../messaging/kafka/SpKafkaProducer.java | 56 ++++++++++++++--------
1 file changed, 36 insertions(+), 20 deletions(-)
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index bfca8a0..86a6415 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -18,10 +18,7 @@
package org.apache.streampipes.messaging.kafka;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -82,7 +79,9 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
}
public void publish(byte[] message) {
- producer.send(new ProducerRecord<>(topic, message));
+ if (connected) {
+ producer.send(new ProducerRecord<>(topic, message));
+ }
}
private Properties makeProperties(KafkaTransportProtocol protocol) {
@@ -108,42 +107,59 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
/**
* Create a new topic and define number partitions, replicas, and retention time
*
- * @param settings
+ * @param settings The settings to connect to a Kafka broker
*/
private void createKafaTopic(KafkaTransportProtocol settings) {
- String zookeeperHost = settings.getZookeeperHost() + ":" + settings.getZookeeperPort();
-
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
- Map<String, String> topicConfig = new HashMap<>();
- String retentionTime = Envs.SP_KAFKA_RETENTION_MS.exists() ? Envs.SP_KAFKA_RETENTION_MS.getValue() : SP_KAFKA_RETENTION_MS_DEFAULT;
- topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, retentionTime);
-
AdminClient adminClient = KafkaAdminClient.create(props);
- final NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
- newTopic.configs(topicConfig);
+ ListTopicsResult topics = adminClient.listTopics();
- final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
+ if (!topicExists(topics)) {
+ String zookeeperHost = settings.getZookeeperHost() + ":" + settings.getZookeeperPort();
- try {
- createTopicsResult.values().get(topic).get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Could not create topic: " + topic + " on broker " + zookeeperHost);
+ Map<String, String> topicConfig = new HashMap<>();
+ String retentionTime = Envs.SP_KAFKA_RETENTION_MS.exists() ? Envs.SP_KAFKA_RETENTION_MS.getValue() : SP_KAFKA_RETENTION_MS_DEFAULT;
+ topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, retentionTime);
+
+
+ final NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
+ newTopic.configs(topicConfig);
+
+ final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
+
+ try {
+ createTopicsResult.values().get(topic).get();
+ LOG.info("Successfully created Kafka topic " + topic);
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Could not create topic: " + topic + " on broker " + zookeeperHost);
+ }
+ } else {
+ LOG.info("Topic " + topic + "already exists in the broker, skipping topic creation");
}
}
@Override
public void disconnect() {
LOG.info("Kafka producer: Disconnecting from " + topic);
- this.producer.close();
this.connected = false;
+ this.producer.close();
}
@Override
public Boolean isConnected() {
return connected != null && connected;
}
+
+ private boolean topicExists(ListTopicsResult topicsInKafka) {
+ try {
+ return topicsInKafka.names().get().stream().anyMatch(t -> t.equals(topic));
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Could not fetch existing topics", e);
+ return false;
+ }
+ }
}