You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/03/15 23:48:48 UTC
[incubator-streampipes-examples] branch dev updated: update python
data processor example
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-examples.git
The following commit(s) were added to refs/heads/dev by this push:
new 96be28d update python data processor example
96be28d is described below
commit 96be28d9fd80bda1f2accd1b5372725191dc2f70
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Tue Mar 16 00:48:40 2021 +0100
update python data processor example
---
.../jvm/python/GraphParameterExtractor.java | 76 ----------------------
.../pe/examples/jvm/python/GreeterParameters.java | 33 ----------
.../pe/examples/jvm/python/GreeterPython.java | 60 -----------------
...Controller.java => PythonGreeterProcessor.java} | 46 +++++++------
.../streampipes/pe/examples/jvm/python/Route.java | 45 -------------
5 files changed, 27 insertions(+), 233 deletions(-)
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GraphParameterExtractor.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GraphParameterExtractor.java
deleted file mode 100644
index 5c29939..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GraphParameterExtractor.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import com.google.gson.JsonObject;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public abstract class GraphParameterExtractor {
-
- private static String getInputTopic(EventProcessorBindingParams parameters) {
- return parameters
- .getGraph()
- .getInputStreams()
- .get(0)
- .getEventGrounding()
- .getTransportProtocol()
- .getTopicDefinition()
- .getActualTopicName();
- }
-
- private static String getOutputTopic(EventProcessorBindingParams parameters) {
- return parameters
- .getGraph()
- .getOutputStream()
- .getEventGrounding()
- .getTransportProtocol()
- .getTopicDefinition()
- .getActualTopicName();
- }
-
- private static String getKafkaUrl(EventProcessorBindingParams parameters) {
- String brokerHostname = parameters
- .getGraph()
- .getOutputStream()
- .getEventGrounding()
- .getTransportProtocols()
- .get(0)
- .getBrokerHostname();
-
- Integer kafkaPort = ((KafkaTransportProtocol) parameters
- .getGraph()
- .getOutputStream()
- .getEventGrounding()
- .getTransportProtocols()
- .get(0))
- .getKafkaPort();
-
- return brokerHostname + ":" + kafkaPort.toString();
- }
-
- public static JsonObject toJson(String processorID, String invocationID, EventProcessorBindingParams parameters) {
- JsonObject json = new JsonObject();
- json.addProperty("invocation_id", invocationID);
- json.addProperty("processor_id", processorID);
- json.addProperty("input_topics", GraphParameterExtractor.getInputTopic(parameters));
- json.addProperty("output_topics", GraphParameterExtractor.getOutputTopic(parameters));
- json.addProperty("bootstrap_servers", GraphParameterExtractor.getKafkaUrl(parameters));
-
- return json;
- }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterParameters.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterParameters.java
deleted file mode 100644
index ab1dfac..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterParameters.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class GreeterParameters extends EventProcessorBindingParams {
- private final String greeting;
-
- public GreeterParameters(DataProcessorInvocation graph, String greeting) {
- super(graph);
- this.greeting = greeting;
- }
-
- public String getGreeting() {
- return greeting;
- }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPython.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPython.java
deleted file mode 100644
index 53adfa5..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPython.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import com.google.gson.JsonObject;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
-
-import java.util.UUID;
-
-import static org.apache.streampipes.pe.examples.jvm.python.Route.post;
-
-public class GreeterPython implements ExternalEventProcessor<GreeterParameters> {
- private String invocationId;
-
- @Override
- public void onInvocation(GreeterParameters parameters, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
-
- // use invocationId to keep track of started instances of
- this.invocationId = UUID.randomUUID().toString();
-
- // construct JSON request from Java -> Python
- JsonObject json = GraphParameterExtractor.toJson(
- GreeterPythonController.PROCESSOR_ID,
- this.invocationId,
- parameters);
-
- JsonObject staticProperties = new JsonObject();
- staticProperties.addProperty("greeting", parameters.getGreeting());
-
- json.add("static_properties", staticProperties);
-
- // send invocation request to python
- post(Route.INVOCATION, json.toString());
- }
-
- @Override
- public void onDetach() throws SpRuntimeException {
- JsonObject json = new JsonObject();
- json.addProperty("invocation_id", this.invocationId);
-
- // send detach request to python to stop processor with invocationId
- post(Route.DETACH, this.invocationId);
- }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPythonController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/PythonGreeterProcessor.java
similarity index 58%
rename from streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPythonController.java
rename to streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/PythonGreeterProcessor.java
index b6138f7..23c3be5 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPythonController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/PythonGreeterProcessor.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,18 +15,23 @@ package org.apache.streampipes.pe.examples.jvm.python;/*
* limitations under the License.
*
*/
+package org.apache.streampipes.pe.examples.jvm.python;
+import com.google.gson.JsonObject;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.*;
import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.standalone.ConfiguredExternalEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneExternalEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesExternalDataProcessor;
-public class GreeterPythonController extends StandaloneExternalEventProcessingDeclarer<GreeterParameters> {
+import java.util.HashMap;
+import java.util.Map;
+
+public class PythonGreeterProcessor extends StreamPipesExternalDataProcessor {
private static final String GREETER_KEY = "greeter-key";
public static final String PROCESSOR_ID = "org.apache.streampipes.examples.python.processor.greeter";
@@ -34,18 +39,12 @@ public class GreeterPythonController extends StandaloneExternalEventProcessingDe
@Override
public DataProcessorDescription declareModel() {
return ProcessingElementBuilder.create(PROCESSOR_ID, "Python Greeter", "")
- .requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
-
+ .requiredStream(StreamRequirementsBuilder.any())
// create a simple text parameter
.requiredTextParameter(Labels.withId(GREETER_KEY), "greeting")
-
- // Append greeting to event stream
+ // append greeting to event stream
.outputStrategy(OutputStrategies.append(
EpProperties.stringEp(Labels.empty(),"greeting", SO.Text)))
-
// NOTE: currently one Kafka transport protocol is supported
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
@@ -53,12 +52,21 @@ public class GreeterPythonController extends StandaloneExternalEventProcessingDe
}
@Override
- public ConfiguredExternalEventProcessor<GreeterParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+ public void onInvocation(ProcessorParams parameters, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
- // Extract the greeting value
- String greeting = extractor.singleValueParameter(GREETER_KEY, String.class);
+ // extract static properties and add to map to build minimal invocation graph
+ Map<String, String> staticPropertyMap = new HashMap<>();
+ staticPropertyMap.put("greeting", parameters.extractor().singleValueParameter(GREETER_KEY, String.class));
- // now the text parameter would be added to a parameter class (omitted for this example)
- return new ConfiguredExternalEventProcessor<>(new GreeterParameters(graph, greeting), GreeterPython::new);
+ JsonObject minimalInvocationGraph = createMinimalInvocationGraph(staticPropertyMap);
+
+ // send invocation request to python
+ invoke(minimalInvocationGraph);
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+ // send detach request to python to stop processor with invocationId
+ detach();
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/Route.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/Route.java
deleted file mode 100644
index 3613c62..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/Route.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
-import java.io.IOException;
-
-public abstract class Route {
-
- public static final String INVOCATION = "invoke";
- public static final String DETACH = "detach";
-
- // endpoint of Python processor runs here
- public static final String PYTHON_ENDPOINT = "localhost:5000";
-
- public static String post(String endpoint, String payload) {
- String responseString = null;
-
- try {
- responseString = Request.Post(PYTHON_ENDPOINT + "/" + endpoint)
- .bodyString(payload, ContentType.APPLICATION_JSON)
- .connectTimeout(1000)
- .socketTimeout(100000)
- .execute().returnContent().asString();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return responseString;
- }
-}