You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/03/01 10:15:46 UTC
[incubator-streampipes] branch dev updated: STREAMPIPES-9 Remove
dependency to kafka-rest
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 918fbb7 STREAMPIPES-9 Remove dependency to kafka-rest
new 07e80d6 Merge branch 'dev' into STREAMPIPES-9
918fbb7 is described below
commit 918fbb7406e3fcfa5eeac2fd4fbac7352480cfd4
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Sun Mar 1 11:14:27 2020 +0100
STREAMPIPES-9 Remove dependency to kafka-rest
---
streampipes-backend/development/env | 1 +
.../messaging/kafka/SpKafkaConsumer.java | 8 +
.../runtime/PipelineElementRuntimeInfoFetcher.java | 187 +++++----------------
ui/package.json | 4 +-
.../editor/dialog/help/help-dialog.controller.ts | 1 +
.../services/editor-dialog-manager.service.ts | 3 +-
6 files changed, 56 insertions(+), 148 deletions(-)
diff --git a/streampipes-backend/development/env b/streampipes-backend/development/env
index 6cf517d..1467e19 100644
--- a/streampipes-backend/development/env
+++ b/streampipes-backend/development/env
@@ -20,3 +20,4 @@ SP_BACKEND_HOST=localhost
SP_INFLUX_HOST=localhost
SP_INFLUX_PORT=8086
SP_JMS_HOST=localhost
+SP_DEBUG=true
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
index 4e9a933..edef194 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java
@@ -54,6 +54,14 @@ public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, R
}
+ public SpKafkaConsumer(KafkaTransportProtocol protocol, String topic, InternalEventProcessor<byte[]> eventProcessor) {
+ this.protocol = protocol;
+ this.topic = topic;
+ this.eventProcessor = eventProcessor;
+ this.isRunning = true;
+ }
+
+
// TODO backwards compatibility, remove later
public SpKafkaConsumer(String kafkaUrl, String topic, InternalEventProcessor<byte[]> callback) {
KafkaTransportProtocol protocol = new KafkaTransportProtocol();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
index 6d0dd63..6c0e0e6 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
@@ -17,19 +17,10 @@
*/
package org.apache.streampipes.manager.runtime;
-import com.google.common.base.Charsets;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import org.apache.http.HttpHeaders;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.client.fluent.Response;
-import org.apache.http.entity.StringEntity;
+import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.messaging.jms.ActiveMQConsumer;
import org.apache.streampipes.model.SpDataStream;
@@ -37,30 +28,17 @@ import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.TransportFormat;
-import java.io.IOException;
-import java.util.Base64;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
public enum PipelineElementRuntimeInfoFetcher {
INSTANCE;
- Logger logger = LoggerFactory.getLogger(JsonParser.class);
+ Logger logger = LoggerFactory.getLogger(PipelineElementRuntimeInfoFetcher.class);
- private static Set<String> consumerInstances = new HashSet<>();
private Map<String, SpDataFormatConverter> converterMap;
- private static final String CONSUMER_GROUP_ID = "streampipes-backend-listener-group-";
- private static final String KAFKA_REST_ACCEPT = "application/vnd.kafka.binary.v2+json";
- private static final String KAFKA_REST_CONTENT_TYPE = "application/vnd.kafka.v2+json";
-
- private static final String OFFSET_FIELD_NAME = "offset";
- private static final String VALUE_FIELD_NAME = "value";
-
-
PipelineElementRuntimeInfoFetcher() {
this.converterMap = new HashMap<>();
}
@@ -110,103 +88,6 @@ public enum PipelineElementRuntimeInfoFetcher {
return spDataStream.getEventGrounding().getTransportFormats().get(0);
}
- private String getLatestEventFromKafka(SpDataStream spDataStream) throws SpRuntimeException {
- String kafkaRestUrl = getKafkaRestUrl();
- String kafkaTopic = getOutputTopic(spDataStream);
-
- return getLatestSubscription(kafkaRestUrl, kafkaTopic, spDataStream);
- }
-
- private String getLatestSubscription(String kafkaRestUrl, String kafkaTopic,
- SpDataStream spDataStream) throws SpRuntimeException {
- String kafkaRestRecordsUrl = getConsumerInstanceUrl(kafkaRestUrl,
- getConsumerInstanceId(kafkaTopic), kafkaTopic) + "/records";
-
- try {
- if (!consumerInstances.contains(getConsumerInstanceId(kafkaTopic)) ||
- !converterMap.containsKey(kafkaTopic)) {
- createSubscription(kafkaRestUrl, kafkaTopic);
- consumerInstances.add(getConsumerInstanceId(kafkaTopic));
- converterMap.put(kafkaTopic,
- new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
- }
- Response response = Request.Get(kafkaRestRecordsUrl)
- .addHeader(HttpHeaders.ACCEPT, KAFKA_REST_ACCEPT)
- .execute();
-
- return extractPayload(response.returnContent().asString(), spDataStream);
- } catch (IOException | SpRuntimeException e) {
- if (!e.getMessage().equals("")) {
- logger.error("Could not get any sample data from Kafka", e);
- }
- consumerInstances.remove(getConsumerInstanceId(kafkaTopic));
-
- throw new SpRuntimeException(e.getMessage());
- }
- }
-
- private void createSubscription(String kafkaRestUrl, String kafkaTopic) throws IOException, SpRuntimeException {
- String consumerInstance = getConsumerInstanceId(kafkaTopic);
-
- Integer statusCode = createConsumer(kafkaRestUrl, consumerInstance, kafkaTopic);
- Integer subscriptionStatusCode = subscribeConsumer(kafkaRestUrl, consumerInstance, kafkaTopic);
- if (subscriptionStatusCode != 204) {
- throw new SpRuntimeException("Could not read message form Kafka-REST: " + kafkaRestUrl);
- }
-
- }
-
- private Integer subscribeConsumer(String kafkaRestUrl, String consumerInstance, String kafkaTopic) throws IOException {
- String subscribeConsumerUrl = getConsumerInstanceUrl(kafkaRestUrl, consumerInstance, kafkaTopic)
- + "/subscription";
-
-
- return Request.Post(subscribeConsumerUrl)
- .addHeader(HttpHeaders.CONTENT_TYPE, KAFKA_REST_CONTENT_TYPE)
- .body(new StringEntity(makeSubscribeConsumerBody(kafkaTopic), Charsets.UTF_8))
- .execute()
- .returnResponse()
- .getStatusLine()
- .getStatusCode();
- }
-
- private String getConsumerInstanceUrl(String kafkaRestUrl, String consumerInstance, String topic) {
- return kafkaRestUrl + "/"
- + "consumers/"
- + getConsumerGroupId(topic)
- + "/instances/"
- + consumerInstance;
- }
-
- private String getConsumerGroupId(String topic) {
- return CONSUMER_GROUP_ID + topic;
- }
-
- private String makeSubscribeConsumerBody(String kafkaTopic) {
- return "{\"topics\":[\"" + kafkaTopic + "\"]}";
- }
-
- private Integer createConsumer(String kafkaRestUrl, String consumerInstance, String topic) throws IOException {
- String createConsumerUrl = kafkaRestUrl + "/consumers/" + getConsumerGroupId(topic);
- return Request.Post(createConsumerUrl)
- .addHeader(HttpHeaders.CONTENT_TYPE, KAFKA_REST_CONTENT_TYPE)
- .body(new StringEntity(makeCreateConsumerBody(consumerInstance), Charsets.UTF_8))
- .execute()
- .returnResponse()
- .getStatusLine()
- .getStatusCode();
- }
-
- private String makeCreateConsumerBody(String consumerInstance) {
- return "{\"name\": \""
- + consumerInstance
- + "\", \"format\": \"binary\", \"auto.offset.reset\": \"latest\"}";
- }
-
- private String getConsumerInstanceId(String kafkaTopic) {
- return CONSUMER_GROUP_ID + "-" + kafkaTopic;
- }
-
private String getOutputTopic(SpDataStream spDataStream) {
return spDataStream
.getEventGrounding()
@@ -215,33 +96,49 @@ public enum PipelineElementRuntimeInfoFetcher {
.getActualTopicName();
}
- private String getKafkaRestUrl() {
- return BackendConfig.INSTANCE.getKafkaRestUrl();
- }
+ private String getLatestEventFromKafka(SpDataStream spDataStream) throws SpRuntimeException {
+ final String[] result = {null};
+ String kafkaTopic = getOutputTopic(spDataStream);
+ KafkaTransportProtocol protocol = (KafkaTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
- private String extractPayload(String rawResponse, SpDataStream spDataStream) throws SpRuntimeException {
- Long lastOffset = 0L;
- JsonElement jsonElement = new JsonParser().parse(rawResponse);
- JsonObject lastItem;
-
- if (jsonElement.isJsonArray()) {
- JsonArray allElements = jsonElement.getAsJsonArray();
- if (allElements.size() > 0) {
- lastItem = allElements.get(0).getAsJsonObject();
- lastOffset = lastItem.get(OFFSET_FIELD_NAME).getAsLong();
- for (int i = 1; i < allElements.size(); i++) {
- JsonObject obj = allElements.get(i).getAsJsonObject();
- Long offset = obj.get(OFFSET_FIELD_NAME).getAsLong();
- if (offset > lastOffset) {
- lastItem = obj;
- }
+ // Change kafka config when running in development mode
+ if ("true".equals(System.getenv("SP_DEBUG"))) {
+ protocol.setBrokerHostname("localhost");
+ protocol.setKafkaPort(9094);
+ }
+
+ if (!converterMap.containsKey(kafkaTopic)) {
+ this.converterMap.put(kafkaTopic,
+ new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
+ }
+
+ SpKafkaConsumer kafkaConsumer = new SpKafkaConsumer(protocol, kafkaTopic, new InternalEventProcessor<byte[]>() {
+ @Override
+ public void onEvent(byte[] event) {
+ try {
+ result[0] = converterMap.get(kafkaTopic).convert(event);
+ } catch (SpRuntimeException e) {
+ e.printStackTrace();
}
- byte[] content = Base64
- .getDecoder()
- .decode(lastItem.get(VALUE_FIELD_NAME).getAsString());
- return converterMap.get(getOutputTopic(spDataStream)).convert(content);
+ }
+ });
+
+ Thread t = new Thread(kafkaConsumer);
+ t.start();
+
+ long timeout = 0;
+ while (result[0] == null && timeout < 6000) {
+ try {
+ Thread.sleep(300);
+ timeout = timeout + 300;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
}
- return "{}";
+
+ kafkaConsumer.disconnect();
+
+ return result[0];
}
+
}
diff --git a/ui/package.json b/ui/package.json
index 5b49f01..5215d7c 100644
--- a/ui/package.json
+++ b/ui/package.json
@@ -36,7 +36,7 @@
"@uirouter/angular-hybrid": "10.0.1",
"@uirouter/core": "6.0.4",
"@uirouter/angular": "^6.0.1",
- "@uirouter/angularjs": "^1.0.25",
+ "@uirouter/angularjs": "^1.0.25",
"@uirouter/rx": "^0.6.5",
"angular": "1.7.7",
"angular-animate": "1.7.7",
@@ -146,4 +146,4 @@
"webpack-bundle-analyzer": "^3.4.1",
"webpack-merge": "^4.2.1"
}
-}
\ No newline at end of file
+}
diff --git a/ui/src/app/editor/dialog/help/help-dialog.controller.ts b/ui/src/app/editor/dialog/help/help-dialog.controller.ts
index b75e003..f5b244c 100644
--- a/ui/src/app/editor/dialog/help/help-dialog.controller.ts
+++ b/ui/src/app/editor/dialog/help/help-dialog.controller.ts
@@ -99,6 +99,7 @@ export class HelpDialogController {
};
cancel() {
+ this.pollingActive = false;
this.$mdDialog.cancel();
};
diff --git a/ui/src/app/editor/services/editor-dialog-manager.service.ts b/ui/src/app/editor/services/editor-dialog-manager.service.ts
index 9410261..9fca51f 100644
--- a/ui/src/app/editor/services/editor-dialog-manager.service.ts
+++ b/ui/src/app/editor/services/editor-dialog-manager.service.ts
@@ -80,7 +80,8 @@ export class EditorDialogManager {
controllerAs: 'ctrl',
template: require('../dialog/help/help-dialog.tmpl.html'),
parent: angular.element(document.body),
- clickOutsideToClose: true,
+ // must be false, otherwise polling of live data is not stopped in help-dialog.controller.js when dialog is closed
+ clickOutsideToClose: false,
locals: {
pipelineElement: pipelineElementPayload,
},