You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2021/03/15 23:48:48 UTC

[incubator-streampipes-examples] branch dev updated: update python data processor example

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-examples.git


The following commit(s) were added to refs/heads/dev by this push:
     new 96be28d  update python data processor example
96be28d is described below

commit 96be28d9fd80bda1f2accd1b5372725191dc2f70
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Tue Mar 16 00:48:40 2021 +0100

    update python data processor example
---
 .../jvm/python/GraphParameterExtractor.java        | 76 ----------------------
 .../pe/examples/jvm/python/GreeterParameters.java  | 33 ----------
 .../pe/examples/jvm/python/GreeterPython.java      | 60 -----------------
 ...Controller.java => PythonGreeterProcessor.java} | 46 +++++++------
 .../streampipes/pe/examples/jvm/python/Route.java  | 45 -------------
 5 files changed, 27 insertions(+), 233 deletions(-)

diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GraphParameterExtractor.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GraphParameterExtractor.java
deleted file mode 100644
index 5c29939..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GraphParameterExtractor.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import com.google.gson.JsonObject;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public abstract class GraphParameterExtractor {
-
-    private static String getInputTopic(EventProcessorBindingParams parameters) {
-        return parameters
-                .getGraph()
-                .getInputStreams()
-                .get(0)
-                .getEventGrounding()
-                .getTransportProtocol()
-                .getTopicDefinition()
-                .getActualTopicName();
-    }
-
-    private static String getOutputTopic(EventProcessorBindingParams parameters) {
-        return parameters
-                .getGraph()
-                .getOutputStream()
-                .getEventGrounding()
-                .getTransportProtocol()
-                .getTopicDefinition()
-                .getActualTopicName();
-    }
-
-    private static String getKafkaUrl(EventProcessorBindingParams parameters) {
-        String brokerHostname = parameters
-                .getGraph()
-                .getOutputStream()
-                .getEventGrounding()
-                .getTransportProtocols()
-                .get(0)
-                .getBrokerHostname();
-
-        Integer kafkaPort = ((KafkaTransportProtocol) parameters
-                .getGraph()
-                .getOutputStream()
-                .getEventGrounding()
-                .getTransportProtocols()
-                .get(0))
-                .getKafkaPort();
-
-        return brokerHostname + ":" + kafkaPort.toString();
-    }
-
-    public static JsonObject toJson(String processorID, String invocationID, EventProcessorBindingParams parameters) {
-        JsonObject json = new JsonObject();
-        json.addProperty("invocation_id", invocationID);
-        json.addProperty("processor_id", processorID);
-        json.addProperty("input_topics", GraphParameterExtractor.getInputTopic(parameters));
-        json.addProperty("output_topics", GraphParameterExtractor.getOutputTopic(parameters));
-        json.addProperty("bootstrap_servers", GraphParameterExtractor.getKafkaUrl(parameters));
-
-        return json;
-    }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterParameters.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterParameters.java
deleted file mode 100644
index ab1dfac..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterParameters.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class GreeterParameters extends EventProcessorBindingParams {
-    private final String greeting;
-
-    public GreeterParameters(DataProcessorInvocation graph, String greeting) {
-        super(graph);
-        this.greeting = greeting;
-    }
-
-    public String getGreeting() {
-        return greeting;
-    }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPython.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPython.java
deleted file mode 100644
index 53adfa5..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPython.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import com.google.gson.JsonObject;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
-
-import java.util.UUID;
-
-import static org.apache.streampipes.pe.examples.jvm.python.Route.post;
-
-public class GreeterPython implements ExternalEventProcessor<GreeterParameters> {
-    private String invocationId;
-
-    @Override
-    public void onInvocation(GreeterParameters parameters, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
-
-        // use invocationId to keep track of started instances of
-        this.invocationId = UUID.randomUUID().toString();
-
-        // construct JSON request from Java -> Python
-        JsonObject json = GraphParameterExtractor.toJson(
-                GreeterPythonController.PROCESSOR_ID,
-                this.invocationId,
-                parameters);
-
-        JsonObject staticProperties = new JsonObject();
-        staticProperties.addProperty("greeting", parameters.getGreeting());
-
-        json.add("static_properties", staticProperties);
-
-        // send invocation request to python
-        post(Route.INVOCATION, json.toString());
-    }
-
-    @Override
-    public void onDetach() throws SpRuntimeException {
-        JsonObject json = new JsonObject();
-        json.addProperty("invocation_id", this.invocationId);
-
-        // send detach request to python to stop processor with invocationId
-        post(Route.DETACH, this.invocationId);
-    }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPythonController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/PythonGreeterProcessor.java
similarity index 58%
rename from streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPythonController.java
rename to streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/PythonGreeterProcessor.java
index b6138f7..23c3be5 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPythonController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/PythonGreeterProcessor.java
@@ -1,4 +1,4 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,18 +15,23 @@ package org.apache.streampipes.pe.examples.jvm.python;/*
  * limitations under the License.
  *
  */
+package org.apache.streampipes.pe.examples.jvm.python;
 
+import com.google.gson.JsonObject;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.standalone.ConfiguredExternalEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneExternalEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesExternalDataProcessor;
 
-public class GreeterPythonController extends StandaloneExternalEventProcessingDeclarer<GreeterParameters> {
+import java.util.HashMap;
+import java.util.Map;
+
+public class PythonGreeterProcessor extends StreamPipesExternalDataProcessor {
 
     private static final String GREETER_KEY = "greeter-key";
     public static final String PROCESSOR_ID = "org.apache.streampipes.examples.python.processor.greeter";
@@ -34,18 +39,12 @@ public class GreeterPythonController extends StandaloneExternalEventProcessingDe
     @Override
     public DataProcessorDescription declareModel() {
         return ProcessingElementBuilder.create(PROCESSOR_ID, "Python Greeter", "")
-                .requiredStream(StreamRequirementsBuilder.
-                        create()
-                        .requiredProperty(EpRequirements.anyProperty())
-                        .build())
-
+                .requiredStream(StreamRequirementsBuilder.any())
                 // create a simple text parameter
                 .requiredTextParameter(Labels.withId(GREETER_KEY), "greeting")
-
-                // Append greeting to event stream
+                // append greeting to event stream
                 .outputStrategy(OutputStrategies.append(
                         EpProperties.stringEp(Labels.empty(),"greeting", SO.Text)))
-
                 // NOTE: currently one Kafka transport protocol is supported
                 .supportedProtocols(SupportedProtocols.kafka())
                 .supportedFormats(SupportedFormats.jsonFormat())
@@ -53,12 +52,21 @@ public class GreeterPythonController extends StandaloneExternalEventProcessingDe
     }
 
     @Override
-    public ConfiguredExternalEventProcessor<GreeterParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+    public void onInvocation(ProcessorParams parameters, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
 
-        // Extract the greeting value
-        String greeting = extractor.singleValueParameter(GREETER_KEY, String.class);
+        // extract static properties and add to map to build minimal invocation graph
+        Map<String, String> staticPropertyMap = new HashMap<>();
+        staticPropertyMap.put("greeting", parameters.extractor().singleValueParameter(GREETER_KEY, String.class));
 
-        // now the text parameter would be added to a parameter class (omitted for this example)
-        return new ConfiguredExternalEventProcessor<>(new GreeterParameters(graph, greeting), GreeterPython::new);
+        JsonObject minimalInvocationGraph = createMinimalInvocationGraph(staticPropertyMap);
+
+        // send invocation request to python
+        invoke(minimalInvocationGraph);
+    }
+
+    @Override
+    public void onDetach() throws SpRuntimeException {
+        // send detach request to python to stop processor with invocationId
+        detach();
     }
 }
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/Route.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/Route.java
deleted file mode 100644
index 3613c62..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/Route.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
-import java.io.IOException;
-
-public abstract class Route {
-
-    public static final String INVOCATION = "invoke";
-    public static final String DETACH = "detach";
-
-    // endpoint of Python processor runs here
-    public static final String PYTHON_ENDPOINT = "localhost:5000";
-
-    public static String post(String endpoint, String payload) {
-        String responseString = null;
-
-        try {
-            responseString = Request.Post(PYTHON_ENDPOINT + "/" + endpoint)
-                    .bodyString(payload, ContentType.APPLICATION_JSON)
-                    .connectTimeout(1000)
-                    .socketTimeout(100000)
-                    .execute().returnContent().asString();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        return responseString;
-    }
-}