You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/03/01 10:15:46 UTC

[incubator-streampipes] branch dev updated: STREAMPIPES-9 Remove dependency to kafka-rest

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 918fbb7  STREAMPIPES-9 Remove dependency to kafka-rest
     new 07e80d6  Merge branch 'dev' into STREAMPIPES-9
918fbb7 is described below

commit 918fbb7406e3fcfa5eeac2fd4fbac7352480cfd4
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Sun Mar 1 11:14:27 2020 +0100

    STREAMPIPES-9 Remove dependency to kafka-rest
---
 streampipes-backend/development/env                |   1 +
 .../messaging/kafka/SpKafkaConsumer.java           |   8 +
 .../runtime/PipelineElementRuntimeInfoFetcher.java | 187 +++++----------------
 ui/package.json                                    |   4 +-
 .../editor/dialog/help/help-dialog.controller.ts   |   1 +
 .../services/editor-dialog-manager.service.ts      |   3 +-
 6 files changed, 56 insertions(+), 148 deletions(-)

diff --git a/streampipes-backend/development/env b/streampipes-backend/development/env
index 6cf517d..1467e19 100644
--- a/streampipes-backend/development/env
+++ b/streampipes-backend/development/env
@@ -20,3 +20,4 @@ SP_BACKEND_HOST=localhost
 SP_INFLUX_HOST=localhost
 SP_INFLUX_PORT=8086
 SP_JMS_HOST=localhost
+SP_DEBUG=true
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
index 4e9a933..edef194 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
@@ -54,6 +54,14 @@ public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, R
 
   }
 
+  public SpKafkaConsumer(KafkaTransportProtocol protocol, String topic, InternalEventProcessor<byte[]> eventProcessor) {
+      this.protocol = protocol;
+      this.topic = topic;
+      this.eventProcessor = eventProcessor;
+      this.isRunning = true;
+  }
+
+
   // TODO backwards compatibility, remove later
   public SpKafkaConsumer(String kafkaUrl, String topic, InternalEventProcessor<byte[]> callback) {
     KafkaTransportProtocol protocol = new KafkaTransportProtocol();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
index 6d0dd63..6c0e0e6 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
@@ -17,19 +17,10 @@
  */
 package org.apache.streampipes.manager.runtime;
 
-import com.google.common.base.Charsets;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import org.apache.http.HttpHeaders;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.client.fluent.Response;
-import org.apache.http.entity.StringEntity;
+import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.config.backend.BackendConfig;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.messaging.jms.ActiveMQConsumer;
 import org.apache.streampipes.model.SpDataStream;
@@ -37,30 +28,17 @@ import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.model.grounding.TransportFormat;
 
-import java.io.IOException;
-import java.util.Base64;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
 public enum PipelineElementRuntimeInfoFetcher {
 
   INSTANCE;
 
-  Logger logger = LoggerFactory.getLogger(JsonParser.class);
+  Logger logger = LoggerFactory.getLogger(PipelineElementRuntimeInfoFetcher.class);
 
-  private static Set<String> consumerInstances = new HashSet<>();
   private Map<String, SpDataFormatConverter> converterMap;
 
-  private static final String CONSUMER_GROUP_ID = "streampipes-backend-listener-group-";
-  private static final String KAFKA_REST_ACCEPT = "application/vnd.kafka.binary.v2+json";
-  private static final String KAFKA_REST_CONTENT_TYPE = "application/vnd.kafka.v2+json";
-
-  private static final String OFFSET_FIELD_NAME = "offset";
-  private static final String VALUE_FIELD_NAME = "value";
-
-
   PipelineElementRuntimeInfoFetcher() {
     this.converterMap = new HashMap<>();
   }
@@ -110,103 +88,6 @@ public enum PipelineElementRuntimeInfoFetcher {
     return spDataStream.getEventGrounding().getTransportFormats().get(0);
   }
 
-  private String getLatestEventFromKafka(SpDataStream spDataStream) throws SpRuntimeException {
-    String kafkaRestUrl = getKafkaRestUrl();
-    String kafkaTopic = getOutputTopic(spDataStream);
-
-    return getLatestSubscription(kafkaRestUrl, kafkaTopic, spDataStream);
-  }
-
-  private String getLatestSubscription(String kafkaRestUrl, String kafkaTopic,
-                                       SpDataStream spDataStream) throws SpRuntimeException {
-    String kafkaRestRecordsUrl = getConsumerInstanceUrl(kafkaRestUrl,
-            getConsumerInstanceId(kafkaTopic), kafkaTopic) + "/records";
-
-    try {
-      if (!consumerInstances.contains(getConsumerInstanceId(kafkaTopic)) ||
-              !converterMap.containsKey(kafkaTopic)) {
-        createSubscription(kafkaRestUrl, kafkaTopic);
-        consumerInstances.add(getConsumerInstanceId(kafkaTopic));
-        converterMap.put(kafkaTopic,
-                new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
-      }
-      Response response = Request.Get(kafkaRestRecordsUrl)
-              .addHeader(HttpHeaders.ACCEPT, KAFKA_REST_ACCEPT)
-              .execute();
-
-      return extractPayload(response.returnContent().asString(), spDataStream);
-    } catch (IOException | SpRuntimeException e) {
-      if (!e.getMessage().equals("")) {
-        logger.error("Could not get any sample data from Kafka", e);
-      }
-      consumerInstances.remove(getConsumerInstanceId(kafkaTopic));
-
-      throw new SpRuntimeException(e.getMessage());
-    }
-  }
-
-  private void createSubscription(String kafkaRestUrl, String kafkaTopic) throws IOException, SpRuntimeException {
-    String consumerInstance = getConsumerInstanceId(kafkaTopic);
-
-    Integer statusCode = createConsumer(kafkaRestUrl, consumerInstance, kafkaTopic);
-    Integer subscriptionStatusCode = subscribeConsumer(kafkaRestUrl, consumerInstance, kafkaTopic);
-    if (subscriptionStatusCode != 204) {
-      throw new SpRuntimeException("Could not read message form Kafka-REST: " + kafkaRestUrl);
-    }
-
-  }
-
-  private Integer subscribeConsumer(String kafkaRestUrl, String consumerInstance, String kafkaTopic) throws IOException {
-    String subscribeConsumerUrl = getConsumerInstanceUrl(kafkaRestUrl, consumerInstance, kafkaTopic)
-            + "/subscription";
-
-
-    return Request.Post(subscribeConsumerUrl)
-            .addHeader(HttpHeaders.CONTENT_TYPE, KAFKA_REST_CONTENT_TYPE)
-            .body(new StringEntity(makeSubscribeConsumerBody(kafkaTopic), Charsets.UTF_8))
-            .execute()
-            .returnResponse()
-            .getStatusLine()
-            .getStatusCode();
-  }
-
-  private String getConsumerInstanceUrl(String kafkaRestUrl, String consumerInstance, String topic) {
-    return kafkaRestUrl + "/"
-            + "consumers/"
-            + getConsumerGroupId(topic)
-            + "/instances/"
-            + consumerInstance;
-  }
-
-  private String getConsumerGroupId(String topic) {
-    return CONSUMER_GROUP_ID + topic;
-  }
-
-  private String makeSubscribeConsumerBody(String kafkaTopic) {
-    return "{\"topics\":[\"" + kafkaTopic + "\"]}";
-  }
-
-  private Integer createConsumer(String kafkaRestUrl, String consumerInstance, String topic) throws IOException {
-    String createConsumerUrl = kafkaRestUrl + "/consumers/" + getConsumerGroupId(topic);
-    return Request.Post(createConsumerUrl)
-            .addHeader(HttpHeaders.CONTENT_TYPE, KAFKA_REST_CONTENT_TYPE)
-            .body(new StringEntity(makeCreateConsumerBody(consumerInstance), Charsets.UTF_8))
-            .execute()
-            .returnResponse()
-            .getStatusLine()
-            .getStatusCode();
-  }
-
-  private String makeCreateConsumerBody(String consumerInstance) {
-    return "{\"name\": \""
-            + consumerInstance
-            + "\", \"format\": \"binary\", \"auto.offset.reset\": \"latest\"}";
-  }
-
-  private String getConsumerInstanceId(String kafkaTopic) {
-    return CONSUMER_GROUP_ID + "-" + kafkaTopic;
-  }
-
   private String getOutputTopic(SpDataStream spDataStream) {
     return spDataStream
             .getEventGrounding()
@@ -215,33 +96,49 @@ public enum PipelineElementRuntimeInfoFetcher {
             .getActualTopicName();
   }
 
-  private String getKafkaRestUrl() {
-    return BackendConfig.INSTANCE.getKafkaRestUrl();
-  }
+  private String getLatestEventFromKafka(SpDataStream spDataStream) throws SpRuntimeException {
+    final String[] result = {null};
+    String kafkaTopic = getOutputTopic(spDataStream);
+    KafkaTransportProtocol protocol = (KafkaTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
 
-  private String extractPayload(String rawResponse, SpDataStream spDataStream) throws SpRuntimeException {
-    Long lastOffset = 0L;
-    JsonElement jsonElement = new JsonParser().parse(rawResponse);
-    JsonObject lastItem;
-
-    if (jsonElement.isJsonArray()) {
-      JsonArray allElements = jsonElement.getAsJsonArray();
-      if (allElements.size() > 0) {
-        lastItem = allElements.get(0).getAsJsonObject();
-        lastOffset = lastItem.get(OFFSET_FIELD_NAME).getAsLong();
-        for (int i = 1; i < allElements.size(); i++) {
-          JsonObject obj = allElements.get(i).getAsJsonObject();
-          Long offset = obj.get(OFFSET_FIELD_NAME).getAsLong();
-          if (offset > lastOffset) {
-            lastItem = obj;
-          }
+    // Change kafka config when running in development mode
+    if ("true".equals(System.getenv("SP_DEBUG"))) {
+      protocol.setBrokerHostname("localhost");
+      protocol.setKafkaPort(9094);
+    }
+
+    if (!converterMap.containsKey(kafkaTopic)) {
+      this.converterMap.put(kafkaTopic,
+              new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
+    }
+
+    SpKafkaConsumer kafkaConsumer = new SpKafkaConsumer(protocol, kafkaTopic, new InternalEventProcessor<byte[]>() {
+      @Override
+      public void onEvent(byte[] event) {
+        try {
+          result[0] = converterMap.get(kafkaTopic).convert(event);
+        } catch (SpRuntimeException e) {
+          e.printStackTrace();
         }
-        byte[] content = Base64
-                .getDecoder()
-                .decode(lastItem.get(VALUE_FIELD_NAME).getAsString());
-        return converterMap.get(getOutputTopic(spDataStream)).convert(content);
+      }
+    });
+
+    Thread t = new Thread(kafkaConsumer);
+    t.start();
+
+    long timeout = 0;
+    while (result[0] == null && timeout < 6000) {
+      try {
+        Thread.sleep(300);
+        timeout = timeout + 300;
+      } catch (InterruptedException e) {
+        e.printStackTrace();
       }
     }
-    return "{}";
+
+    kafkaConsumer.disconnect();
+
+    return result[0];
   }
+
 }
diff --git a/ui/package.json b/ui/package.json
index 5b49f01..5215d7c 100644
--- a/ui/package.json
+++ b/ui/package.json
@@ -36,7 +36,7 @@
     "@uirouter/angular-hybrid": "10.0.1",
     "@uirouter/core": "6.0.4",
     "@uirouter/angular": "^6.0.1",
-    "@uirouter/angularjs":  "^1.0.25",
+    "@uirouter/angularjs": "^1.0.25",
     "@uirouter/rx": "^0.6.5",
     "angular": "1.7.7",
     "angular-animate": "1.7.7",
@@ -146,4 +146,4 @@
     "webpack-bundle-analyzer": "^3.4.1",
     "webpack-merge": "^4.2.1"
   }
-}
\ No newline at end of file
+}
diff --git a/ui/src/app/editor/dialog/help/help-dialog.controller.ts b/ui/src/app/editor/dialog/help/help-dialog.controller.ts
index b75e003..f5b244c 100644
--- a/ui/src/app/editor/dialog/help/help-dialog.controller.ts
+++ b/ui/src/app/editor/dialog/help/help-dialog.controller.ts
@@ -99,6 +99,7 @@ export class HelpDialogController {
     };
 
     cancel() {
+        this.pollingActive = false;
         this.$mdDialog.cancel();
     };
 
diff --git a/ui/src/app/editor/services/editor-dialog-manager.service.ts b/ui/src/app/editor/services/editor-dialog-manager.service.ts
index 9410261..9fca51f 100644
--- a/ui/src/app/editor/services/editor-dialog-manager.service.ts
+++ b/ui/src/app/editor/services/editor-dialog-manager.service.ts
@@ -80,7 +80,8 @@ export class EditorDialogManager {
             controllerAs: 'ctrl',
             template: require('../dialog/help/help-dialog.tmpl.html'),
             parent: angular.element(document.body),
-            clickOutsideToClose: true,
+            // must be false, otherwise polling of live data is not stopped in help-dialog.controller.js when dialog is closed
+            clickOutsideToClose: false,
             locals: {
                 pipelineElement: pipelineElementPayload,
             },