You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by oe...@apache.org on 2023/04/26 12:39:15 UTC

[streampipes] branch 1260-refactor-required-streams-for-python-functions updated: Replace requiredStreamIds with consumed_streams

This is an automated email from the ASF dual-hosted git repository.

oehler pushed a commit to branch 1260-refactor-required-streams-for-python-functions
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/1260-refactor-required-streams-for-python-functions by this push:
     new 4c2f98e6f Replace requiredStreamIds with consumed_streams
4c2f98e6f is described below

commit 4c2f98e6faeb289ddf225f01892d3f55f497a446
Author: Sven Oehler <oe...@web.de>
AuthorDate: Wed Apr 26 14:39:10 2023 +0200

    Replace requiredStreamIds with consumed_streams
---
 ...ive-data-from-the-streampipes-data-stream.ipynb | 43 ++++++++++++----------
 .../streampipes/function_zoo/river_function.py     | 19 +---------
 .../streampipes/functions/streampipes_function.py  |  3 +-
 .../tests/functions/test_function_handler.py       | 30 ++++++++-------
 4 files changed, 43 insertions(+), 52 deletions(-)

diff --git a/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb b/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb
index 0a2187486..938565bfa 100644
--- a/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb
+++ b/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb
@@ -34,14 +34,14 @@
   {
    "cell_type": "code",
    "execution_count": null,
+   "metadata": {
+    "collapsed": false
+   },
    "outputs": [],
    "source": [
     "# You can install all required libraries for this tutorial with the following command\n",
     "%pip install matplotlib ipython streampipes"
-   ],
-   "metadata": {
-    "collapsed": false
-   }
+   ]
   },
   {
    "cell_type": "code",
@@ -243,13 +243,14 @@
    "cell_type": "markdown",
    "metadata": {},
    "source": [
-    "Next we can create a StreamPipesFunction. For this we need to implement the 4 following methods:\n",
-    "- In `requiredStreamIds` you need to insert the `element_id` of the required streams.\n",
+    "Next we can create a StreamPipesFunction. For this we need to implement the 3 following methods:\n",
     "- `onServiceStarted` is called when the function gets started. There you can use the given meta information of the `FunctionContext` to initialize the function.\n",
     "- `onEvent` is called when ever a new event arrives. The `event` contains the live data and you can use the `streamId` to identify a stream if the function is connected to multiple data streams.\n",
     "- `onServiceStopped` is called when the function gets stopped.\n",
     "\n",
-    "For this tutorial we just create a function that saves every new event in a `pandas DataFrame` and plots the first column of the `DataFrame` when the function gets stopped."
+    "For this tutorial we just create a function that saves every new event in a `pandas DataFrame` and plots the first column of the `DataFrame` when the function gets stopped.  \n",
+    "  \n",
+    "(If you want to use the same structure like in Java you can overwrite the `getFunctionId` and `requiredStreamIds` methods instead of using the `FunctionDefinition`)"
    ]
   },
   {
@@ -258,7 +259,7 @@
    "metadata": {},
    "outputs": [],
    "source": [
-    "from typing import List, Dict, Any\n",
+    "from typing import Dict, Any\n",
     "import pandas as pd\n",
     "from datetime import datetime\n",
     "import matplotlib.pyplot as plt\n",
@@ -266,17 +267,14 @@
     "from streampipes.functions.registration import Registration\n",
     "from streampipes.functions.streampipes_function import StreamPipesFunction\n",
     "from streampipes.functions.utils.function_context import FunctionContext\n",
-    "\n",
+    "from streampipes.model.resource.function_definition import FunctionDefinition, FunctionId\n",
     "\n",
     "class ExampleFunction(StreamPipesFunction):\n",
-    "    def __init__(self) -> None:\n",
-    "        super().__init__()\n",
+    "    def __init__(self, function_definition: FunctionDefinition) -> None:\n",
+    "        super().__init__(function_definition)\n",
     "        # Create the Dataframe to save the live data\n",
     "        self.df = pd.DataFrame()\n",
     "\n",
-    "    def requiredStreamIds(self) -> List[str]:\n",
-    "        return [\"urn:streampipes.apache.org:eventstream:uPDKLI\"]\n",
-    "\n",
     "    def onServiceStarted(self, context: FunctionContext):\n",
     "        # Get the name of the timestamp field\n",
     "        for event_property in context.schema[context.streams[0]].event_schema.event_properties:\n",
@@ -305,7 +303,7 @@
    "cell_type": "markdown",
    "metadata": {},
    "source": [
-    "Now we can start the function. We have to register an instance of the `StreamPipesFunction` and them we can start the functions by initializing the `FunctionHandler`. (it's also possible to register multiple functions with `.register(...).register(...)`)"
+    "Now we can start the function. First we create an instance of the `ExampleFunction` and insert the `element_id` of the stream which data we want to consume. Then we have to register this function and we can start all functions by initializing the `FunctionHandler`. (it's also possible to register multiple functions with `.register(...).register(...)`)"
    ]
   },
   {
@@ -323,7 +321,12 @@
     }
    ],
    "source": [
-    "example_function = ExampleFunction()\n",
+    "example_function = ExampleFunction(\n",
+    "    FunctionDefinition(\n",
+    "        function_id=FunctionId(id=\"example-function\"),\n",
+    "        consumed_streams=[\"urn:streampipes.apache.org:eventstream:uPDKLI\"]\n",
+    "    )\n",
+    ")\n",
     "\n",
     "registration = Registration()\n",
     "registration.register(example_function)\n",
@@ -591,12 +594,12 @@
   },
   {
    "cell_type": "markdown",
-   "source": [
-    "Want to see more exciting use cases you can achieve with StreamPipes functions in Python? Then don't hesitate and jump to our [next tutorial](../4-using-online-machine-learning-on-a-streampipes-data-stream) on applying online machine learning algorithms to StreamPipes data streams with [River](https://riverml.xyz)."
-   ],
    "metadata": {
     "collapsed": false
-   }
+   },
+   "source": [
+    "Want to see more exciting use cases you can achieve with StreamPipes functions in Python? Then don't hesitate and jump to our [next tutorial](../4-using-online-machine-learning-on-a-streampipes-data-stream) on applying online machine learning algorithms to StreamPipes data streams with [River](https://riverml.xyz)."
+   ]
   },
   {
    "attachments": {},
diff --git a/streampipes-client-python/streampipes/function_zoo/river_function.py b/streampipes-client-python/streampipes/function_zoo/river_function.py
index b43b40d0c..dca6f3705 100644
--- a/streampipes-client-python/streampipes/function_zoo/river_function.py
+++ b/streampipes-client-python/streampipes/function_zoo/river_function.py
@@ -38,8 +38,6 @@ class RiverFunction(StreamPipesFunction):
     ----------
     function_definition: FunctionDefinition
         The function definition which contains the output stream.
-    stream_ids: List[str]
-        The ids of the data stream to train the model.
     model: Any
         The model to train. It meant to be a River model/pipeline,
         but can be every model with a 'learn_one' and 'predict_one' method.
@@ -58,7 +56,6 @@ class RiverFunction(StreamPipesFunction):
     def __init__(
         self,
         function_definition: FunctionDefinition,
-        stream_ids: List[str],
         model: Any,
         supervised: bool,
         target_label: Optional[str],
@@ -67,7 +64,6 @@ class RiverFunction(StreamPipesFunction):
         on_stop: Callable[[Any], None],
     ) -> None:
         super().__init__(function_definition)
-        self.stream_ids = stream_ids
         self.model = model
         self.supervised = supervised
         self.target_label = target_label
@@ -77,17 +73,6 @@ class RiverFunction(StreamPipesFunction):
 
         self.learning = True
 
-    def requiredStreamIds(self) -> List[str]:
-        """Returns the stream ids required by this function.
-
-        Returns
-        -------
-        stream_ids: List[str]
-            List of stream ids required by the function
-
-        """
-        return self.stream_ids
-
     def onServiceStarted(self, context: FunctionContext):
         """Executes the `on_start` method of the function.
 
@@ -188,9 +173,9 @@ class OnlineML:
             if target_label is None:
                 raise ValueError("You must define a target attribute for a supervised model.")
         output_stream = create_data_stream("prediction", attributes)
-        function_definition = FunctionDefinition().add_output_data_stream(output_stream)
+        function_definition = FunctionDefinition(consumed_streams=stream_ids).add_output_data_stream(output_stream)
         self.sp_function = RiverFunction(
-            function_definition, stream_ids, model, supervised, target_label, on_start, on_event, on_stop
+            function_definition, model, supervised, target_label, on_start, on_event, on_stop
         )
 
     def start(self):
diff --git a/streampipes-client-python/streampipes/functions/streampipes_function.py b/streampipes-client-python/streampipes/functions/streampipes_function.py
index bfb267c4a..6e93f1d2f 100644
--- a/streampipes-client-python/streampipes/functions/streampipes_function.py
+++ b/streampipes-client-python/streampipes/functions/streampipes_function.py
@@ -83,7 +83,6 @@ class StreamPipesFunction(ABC):
             collector.disconnect()
         self.onServiceStopped()
 
-    @abstractmethod
     def requiredStreamIds(self) -> List[str]:
         """Get the ids of the streams needed by the function.
 
@@ -92,7 +91,7 @@ class StreamPipesFunction(ABC):
         stream_ids: List[str]
             List of the stream ids
         """
-        raise NotImplementedError  # pragma: no cover
+        return self.function_definition.consumed_streams
 
     @abstractmethod
     def onServiceStarted(self, context: FunctionContext) -> None:
diff --git a/streampipes-client-python/tests/functions/test_function_handler.py b/streampipes-client-python/tests/functions/test_function_handler.py
index 6b544f656..acbb791e2 100644
--- a/streampipes-client-python/tests/functions/test_function_handler.py
+++ b/streampipes-client-python/tests/functions/test_function_handler.py
@@ -34,9 +34,6 @@ from streampipes.model.resource.function_definition import FunctionDefinition
 
 
 class TestFunction(StreamPipesFunction):
-    def requiredStreamIds(self) -> List[str]:
-        return ["urn:streampipes.apache.org:eventstream:uPDKLI"]
-
     def onServiceStarted(self, context: FunctionContext):
         self.context = context
         self.data: List[Dict[str, Any]] = []
@@ -49,9 +46,6 @@ class TestFunction(StreamPipesFunction):
 
 
 class TestFunctionTwoStreams(StreamPipesFunction):
-    def requiredStreamIds(self) -> List[str]:
-        return ["urn:streampipes.apache.org:eventstream:uPDKLI", "urn:streampipes.apache.org:eventstream:HHoidJ"]
-
     def onServiceStarted(self, context: FunctionContext):
         self.context = context
         self.data1: List[Dict[str, Any]] = []
@@ -68,9 +62,6 @@ class TestFunctionTwoStreams(StreamPipesFunction):
 
 
 class TestFunctionOutput(StreamPipesFunction):
-    def requiredStreamIds(self) -> List[str]:
-        return ["urn:streampipes.apache.org:eventstream:uPDKLI"]
-
     def onServiceStarted(self, context: FunctionContext):
         self.context = context
         self.i = 0
@@ -248,7 +239,9 @@ class TestFunctionHandler(TestCase):
         )
 
         registration = Registration()
-        test_function = TestFunction()
+        test_function = TestFunction(
+            FunctionDefinition(consumed_streams=["urn:streampipes.apache.org:eventstream:uPDKLI"])
+        )
         registration.register(test_function)
         function_handler = FunctionHandler(registration, client)
         function_handler.initializeFunctions()
@@ -296,8 +289,17 @@ class TestFunctionHandler(TestCase):
         )
 
         registration = Registration()
-        test_function1 = TestFunction()
-        test_function2 = TestFunctionTwoStreams()
+        test_function1 = TestFunction(
+            FunctionDefinition(consumed_streams=["urn:streampipes.apache.org:eventstream:uPDKLI"])
+        )
+        test_function2 = TestFunctionTwoStreams(
+            FunctionDefinition(
+                consumed_streams=[
+                    "urn:streampipes.apache.org:eventstream:uPDKLI",
+                    "urn:streampipes.apache.org:eventstream:HHoidJ",
+                ]
+            )
+        )
         registration.register(test_function1).register(test_function2)
         function_handler = FunctionHandler(registration, client)
         function_handler.initializeFunctions()
@@ -355,7 +357,9 @@ class TestFunctionHandler(TestCase):
 
         output_stream = create_data_stream("test", attributes={"number": RuntimeType.INTEGER.value})
         test_function = TestFunctionOutput(
-            function_definition=FunctionDefinition().add_output_data_stream(output_stream)
+            function_definition=FunctionDefinition(
+                consumed_streams=["urn:streampipes.apache.org:eventstream:uPDKLI"]
+            ).add_output_data_stream(output_stream)
         )
         registration = Registration()
         registration.register(test_function)