You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/03/17 09:17:22 UTC
[incubator-streampipes] 01/02: [STREAMPIPES-174] add new
StreamPipesExternalDataProcessor
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 0ea252b245983cc257a8906fa06e1de8c7f066db
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Tue Mar 16 00:45:11 2021 +0100
[STREAMPIPES-174] add new StreamPipesExternalDataProcessor
---
.../StreamPipesExternalDataProcessor.java | 145 +++++++++++++++++++++
1 file changed, 145 insertions(+)
diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java
new file mode 100644
index 0000000..b3c73c2
--- /dev/null
+++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/StreamPipesExternalDataProcessor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.standalone;
+
+import com.google.gson.JsonObject;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
+import org.apache.streampipes.wrapper.standalone.declarer.StandaloneExternalEventProcessingDeclarer;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Supplier;
+
+public abstract class StreamPipesExternalDataProcessor extends StandaloneExternalEventProcessingDeclarer<ProcessorParams>
+ implements ExternalEventProcessor<ProcessorParams> {
+
+ // endpoint of Python processor runs here
+ private static final String PYTHON_ENDPOINT = "localhost:5000";
+
+ private String invocationId;
+ private String appId;
+ private String inputTopic;
+ private String outputTopic;
+ private String kafkaUrl;
+
+ @Override
+ public ConfiguredExternalEventProcessor<ProcessorParams> onInvocation(DataProcessorInvocation graph,
+ ProcessingElementParameterExtractor extractor) {
+ EventProcessorBindingParams params = new ProcessorParams(graph);
+ invocationId = UUID.randomUUID().toString();
+ appId = graph.getAppId();
+ inputTopic = getInputTopic(params);
+ outputTopic = getOutputTopic(params);
+ kafkaUrl = getKafkaUrl(params);
+
+ Supplier<ExternalEventProcessor<ProcessorParams>> supplier = () -> this;
+ return new ConfiguredExternalEventProcessor<>(new ProcessorParams(graph), supplier);
+ }
+
+ protected JsonObject createMinimalInvocationGraph(Map<String, String> staticPropertyMap) {
+ JsonObject json = new JsonObject();
+
+ json.addProperty("invocation_id", invocationId);
+ json.addProperty("processor_id", appId);
+ json.addProperty("input_topics", inputTopic);
+ json.addProperty("output_topics", outputTopic);
+ json.addProperty("bootstrap_servers", kafkaUrl);
+
+ JsonObject staticProperties = new JsonObject();
+ staticPropertyMap.forEach(staticProperties::addProperty);
+ json.add("static_properties", staticProperties);
+
+ return json;
+ }
+
+ protected void invoke(JsonObject json) {
+ post("invoke", json.toString());
+ }
+
+ protected void detach () {
+ JsonObject json = new JsonObject();
+ json.addProperty("invocation_id", invocationId);
+ post("detach", json.toString());
+ }
+
+ private static String post(String endpoint, String payload) {
+ String responseString = null;
+
+ try {
+ responseString = Request.Post(PYTHON_ENDPOINT + "/" + endpoint)
+ .bodyString(payload, ContentType.APPLICATION_JSON)
+ .connectTimeout(1000)
+ .socketTimeout(100000)
+ .execute().returnContent().asString();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return responseString;
+ }
+
+ private String getInputTopic(EventProcessorBindingParams parameters) {
+ return parameters
+ .getGraph()
+ .getInputStreams()
+ .get(0)
+ .getEventGrounding()
+ .getTransportProtocol()
+ .getTopicDefinition()
+ .getActualTopicName();
+ }
+
+ private String getOutputTopic(EventProcessorBindingParams parameters) {
+ return parameters
+ .getGraph()
+ .getOutputStream()
+ .getEventGrounding()
+ .getTransportProtocol()
+ .getTopicDefinition()
+ .getActualTopicName();
+ }
+
+ private String getKafkaUrl(EventProcessorBindingParams parameters) {
+ String brokerHostname = parameters
+ .getGraph()
+ .getOutputStream()
+ .getEventGrounding()
+ .getTransportProtocols()
+ .get(0)
+ .getBrokerHostname();
+
+ Integer kafkaPort = ((KafkaTransportProtocol) parameters
+ .getGraph()
+ .getOutputStream()
+ .getEventGrounding()
+ .getTransportProtocols()
+ .get(0))
+ .getKafkaPort();
+
+ return brokerHostname + ":" + kafkaPort.toString();
+ }
+
+}
+
+