You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/06 20:20:34 UTC

[incubator-streampipes] 03/03: [hotfix] Improve exception handling of messaging system

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 9e70c1b02f7ddee37a29b8cd477e8c9eec77ceab
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 6 21:19:45 2022 +0100

    [hotfix] Improve exception handling of messaging system
---
 .../messaging/jms/ActiveMQPublisher.java           |  6 ++---
 .../messaging/kafka/SpKafkaProducer.java           | 28 +++++++++++-----------
 .../messaging/mqtt/AbstractMqttConnector.java      |  2 +-
 .../streampipes/messaging/mqtt/MqttPublisher.java  |  2 +-
 4 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java
index 02df17f..40a1164 100644
--- a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java
+++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java
@@ -43,7 +43,7 @@ public class ActiveMQPublisher implements EventProducer<JmsTransportProtocol> {
   private Session session;
   private MessageProducer producer;
 
-  private Boolean connected = false;
+  private boolean connected = false;
 
   public ActiveMQPublisher() {
 
@@ -137,8 +137,8 @@ public class ActiveMQPublisher implements EventProducer<JmsTransportProtocol> {
   }
 
   @Override
-  public Boolean isConnected() {
+  public boolean isConnected() {
     return connected;
   }
 
-}
\ No newline at end of file
+}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index 86a6415..a581612 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -48,7 +48,7 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
   private String topic;
   private Producer<String, byte[]> producer;
 
-  private Boolean connected;
+  private boolean connected = false;
 
   private static final Logger LOG = LoggerFactory.getLogger(SpKafkaProducer.class);
 
@@ -97,11 +97,18 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
     LOG.info("Kafka producer: Connecting to " + protocol.getTopicDefinition().getActualTopicName());
     this.brokerUrl = protocol.getBrokerHostname() + ":" + protocol.getKafkaPort();
     this.topic = protocol.getTopicDefinition().getActualTopicName();
+    String zookeeperHost = protocol.getZookeeperHost() + ":" + protocol.getZookeeperPort();
 
-    createKafaTopic(protocol);
+    try {
+      createKafaTopic(protocol);
+    } catch (ExecutionException | InterruptedException e) {
+      LOG.error("Could not create topic: " + topic + " on broker " + zookeeperHost);
+    }
 
     this.producer = new KafkaProducer<>(makeProperties(protocol));
     this.connected = true;
+
+    LOG.info("Successfully created Kafka producer for topic " + this.topic);
   }
 
   /**
@@ -109,7 +116,7 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
    *
    * @param settings The settings to connect to a Kafka broker
    */
-  private void createKafaTopic(KafkaTransportProtocol settings) {
+  private void createKafaTopic(KafkaTransportProtocol settings) throws ExecutionException, InterruptedException {
 
     Properties props = new Properties();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
@@ -119,24 +126,17 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
     ListTopicsResult topics = adminClient.listTopics();
 
     if (!topicExists(topics)) {
-      String zookeeperHost = settings.getZookeeperHost() + ":" + settings.getZookeeperPort();
-
       Map<String, String> topicConfig = new HashMap<>();
       String retentionTime = Envs.SP_KAFKA_RETENTION_MS.exists() ? Envs.SP_KAFKA_RETENTION_MS.getValue() : SP_KAFKA_RETENTION_MS_DEFAULT;
       topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, retentionTime);
 
-
       final NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
       newTopic.configs(topicConfig);
 
       final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
+      createTopicsResult.values().get(topic).get();
+      LOG.info("Successfully created Kafka topic " + topic);
 
-      try {
-        createTopicsResult.values().get(topic).get();
-        LOG.info("Successfully created Kafka topic " + topic);
-      } catch (InterruptedException | ExecutionException e) {
-        LOG.error("Could not create topic: " + topic + " on broker " + zookeeperHost);
-      }
     } else {
       LOG.info("Topic " + topic + "already exists in the broker, skipping topic creation");
     }
@@ -150,8 +150,8 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
   }
 
   @Override
-  public Boolean isConnected() {
-    return connected != null && connected;
+  public boolean isConnected() {
+    return connected;
   }
 
   private boolean topicExists(ListTopicsResult topicsInKafka) {
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
index c34716e..6ba637a 100644
--- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
@@ -24,7 +24,7 @@ public class AbstractMqttConnector {
 
   protected MQTT mqtt;
   protected BlockingConnection connection;
-  protected Boolean connected = false;
+  protected boolean connected = false;
 
   protected void createBrokerConnection(MqttTransportProtocol protocolSettings) throws Exception {
     this.mqtt = new MQTT();
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java
index 04754d6..91dbd38 100644
--- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java
@@ -63,7 +63,7 @@ public class MqttPublisher extends AbstractMqttConnector implements EventProduce
   }
 
   @Override
-  public Boolean isConnected() {
+  public boolean isConnected() {
     return connected;
   }
 }