You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/06 20:20:31 UTC

[incubator-streampipes] branch dev updated (7270b0b -> 9e70c1b)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from 7270b0b  [hotfix] Avoid NullPointerException in Kafka producer
     new 842d451  [hotfix] Improve exception handling of messaging system
     new f08166a  [hotfix] Modify internal topic name
     new 9e70c1b  [hotfix] Improve exception handling of messaging system

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../messaging/jms/ActiveMQPublisher.java           |  6 ++---
 .../messaging/kafka/SpKafkaProducer.java           | 28 +++++++++++-----------
 .../messaging/mqtt/AbstractMqttConnector.java      |  2 +-
 .../streampipes/messaging/mqtt/MqttPublisher.java  |  2 +-
 .../streampipes/messaging/EventProducer.java       |  2 +-
 .../streampipes/manager/util/TopicGenerator.java   |  6 ++++-
 6 files changed, 25 insertions(+), 21 deletions(-)

[incubator-streampipes] 03/03: [hotfix] Improve exception handling of messaging system

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 9e70c1b02f7ddee37a29b8cd477e8c9eec77ceab
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 6 21:19:45 2022 +0100

    [hotfix] Improve exception handling of messaging system
---
 .../messaging/jms/ActiveMQPublisher.java           |  6 ++---
 .../messaging/kafka/SpKafkaProducer.java           | 28 +++++++++++-----------
 .../messaging/mqtt/AbstractMqttConnector.java      |  2 +-
 .../streampipes/messaging/mqtt/MqttPublisher.java  |  2 +-
 4 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java
index 02df17f..40a1164 100644
--- a/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java
+++ b/streampipes-messaging-jms/src/main/java/org/apache/streampipes/messaging/jms/ActiveMQPublisher.java
@@ -43,7 +43,7 @@ public class ActiveMQPublisher implements EventProducer<JmsTransportProtocol> {
   private Session session;
   private MessageProducer producer;
 
-  private Boolean connected = false;
+  private boolean connected = false;
 
   public ActiveMQPublisher() {
 
@@ -137,8 +137,8 @@ public class ActiveMQPublisher implements EventProducer<JmsTransportProtocol> {
   }
 
   @Override
-  public Boolean isConnected() {
+  public boolean isConnected() {
     return connected;
   }
 
-}
\ No newline at end of file
+}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index 86a6415..a581612 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -48,7 +48,7 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
   private String topic;
   private Producer<String, byte[]> producer;
 
-  private Boolean connected;
+  private boolean connected = false;
 
   private static final Logger LOG = LoggerFactory.getLogger(SpKafkaProducer.class);
 
@@ -97,11 +97,18 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
     LOG.info("Kafka producer: Connecting to " + protocol.getTopicDefinition().getActualTopicName());
     this.brokerUrl = protocol.getBrokerHostname() + ":" + protocol.getKafkaPort();
     this.topic = protocol.getTopicDefinition().getActualTopicName();
+    String zookeeperHost = protocol.getZookeeperHost() + ":" + protocol.getZookeeperPort();
 
-    createKafaTopic(protocol);
+    try {
+      createKafaTopic(protocol);
+    } catch (ExecutionException | InterruptedException e) {
+      LOG.error("Could not create topic: " + topic + " on broker " + zookeeperHost);
+    }
 
     this.producer = new KafkaProducer<>(makeProperties(protocol));
     this.connected = true;
+
+    LOG.info("Successfully created Kafka producer for topic " + this.topic);
   }
 
   /**
@@ -109,7 +116,7 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
    *
    * @param settings The settings to connect to a Kafka broker
    */
-  private void createKafaTopic(KafkaTransportProtocol settings) {
+  private void createKafaTopic(KafkaTransportProtocol settings) throws ExecutionException, InterruptedException {
 
     Properties props = new Properties();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
@@ -119,24 +126,17 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
     ListTopicsResult topics = adminClient.listTopics();
 
     if (!topicExists(topics)) {
-      String zookeeperHost = settings.getZookeeperHost() + ":" + settings.getZookeeperPort();
-
       Map<String, String> topicConfig = new HashMap<>();
       String retentionTime = Envs.SP_KAFKA_RETENTION_MS.exists() ? Envs.SP_KAFKA_RETENTION_MS.getValue() : SP_KAFKA_RETENTION_MS_DEFAULT;
       topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, retentionTime);
 
-
       final NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
       newTopic.configs(topicConfig);
 
       final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
+      createTopicsResult.values().get(topic).get();
+      LOG.info("Successfully created Kafka topic " + topic);
 
-      try {
-        createTopicsResult.values().get(topic).get();
-        LOG.info("Successfully created Kafka topic " + topic);
-      } catch (InterruptedException | ExecutionException e) {
-        LOG.error("Could not create topic: " + topic + " on broker " + zookeeperHost);
-      }
     } else {
       LOG.info("Topic " + topic + "already exists in the broker, skipping topic creation");
     }
@@ -150,8 +150,8 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
   }
 
   @Override
-  public Boolean isConnected() {
-    return connected != null && connected;
+  public boolean isConnected() {
+    return connected;
   }
 
   private boolean topicExists(ListTopicsResult topicsInKafka) {
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
index c34716e..6ba637a 100644
--- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
@@ -24,7 +24,7 @@ public class AbstractMqttConnector {
 
   protected MQTT mqtt;
   protected BlockingConnection connection;
-  protected Boolean connected = false;
+  protected boolean connected = false;
 
   protected void createBrokerConnection(MqttTransportProtocol protocolSettings) throws Exception {
     this.mqtt = new MQTT();
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java
index 04754d6..91dbd38 100644
--- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/MqttPublisher.java
@@ -63,7 +63,7 @@ public class MqttPublisher extends AbstractMqttConnector implements EventProduce
   }
 
   @Override
-  public Boolean isConnected() {
+  public boolean isConnected() {
     return connected;
   }
 }

[incubator-streampipes] 01/03: [hotfix] Improve exception handling of messaging system

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 842d4511322ef2b1ea0a5e6f7213b9da1054ec9b
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 6 21:17:45 2022 +0100

    [hotfix] Improve exception handling of messaging system
---
 .../src/main/java/org/apache/streampipes/messaging/EventProducer.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/EventProducer.java b/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/EventProducer.java
index bddf66e..9d3998b 100644
--- a/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/EventProducer.java
+++ b/streampipes-messaging/src/main/java/org/apache/streampipes/messaging/EventProducer.java
@@ -31,5 +31,5 @@ public interface EventProducer<TP extends TransportProtocol> extends Serializabl
 
     void disconnect() throws SpRuntimeException;
 
-    Boolean isConnected();
+    boolean isConnected();
 }

[incubator-streampipes] 02/03: [hotfix] Modify internal topic name

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit f08166aee09080d6229c0d4078d505d78b93cf01
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 6 21:19:24 2022 +0100

    [hotfix] Modify internal topic name
---
 .../java/org/apache/streampipes/manager/util/TopicGenerator.java    | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TopicGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TopicGenerator.java
index a8a8af7..0b0c0db 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TopicGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/util/TopicGenerator.java
@@ -23,6 +23,10 @@ import org.apache.commons.lang3.RandomStringUtils;
 public class TopicGenerator {
 
   public static String generateRandomTopic() {
-    return "org.apache.streampipes." + RandomStringUtils.randomAlphabetic(20);
+    return generateInternalPipelineElementTopic(RandomStringUtils.randomAlphabetic(15));
+  }
+
+  public static String generateInternalPipelineElementTopic(String appendix) {
+    return "org-apache-streampipes-internal-" + appendix;
   }
 }