You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by oe...@apache.org on 2023/04/26 12:39:15 UTC
[streampipes] branch 1260-refactor-required-streams-for-python-functions updated: Replace requiredStreamIds with consumed_streams
This is an automated email from the ASF dual-hosted git repository.
oehler pushed a commit to branch 1260-refactor-required-streams-for-python-functions
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/1260-refactor-required-streams-for-python-functions by this push:
new 4c2f98e6f Replace requiredStreamIds with consumed_streams
4c2f98e6f is described below
commit 4c2f98e6faeb289ddf225f01892d3f55f497a446
Author: Sven Oehler <oe...@web.de>
AuthorDate: Wed Apr 26 14:39:10 2023 +0200
Replace requiredStreamIds with consumed_streams
---
...ive-data-from-the-streampipes-data-stream.ipynb | 43 ++++++++++++----------
.../streampipes/function_zoo/river_function.py | 19 +---------
.../streampipes/functions/streampipes_function.py | 3 +-
.../tests/functions/test_function_handler.py | 30 ++++++++-------
4 files changed, 43 insertions(+), 52 deletions(-)
diff --git a/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb b/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb
index 0a2187486..938565bfa 100644
--- a/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb
+++ b/streampipes-client-python/docs/tutorials/3-getting-live-data-from-the-streampipes-data-stream.ipynb
@@ -34,14 +34,14 @@
{
"cell_type": "code",
"execution_count": null,
+ "metadata": {
+ "collapsed": false
+ },
"outputs": [],
"source": [
"# You can install all required libraries for this tutorial with the following command\n",
"%pip install matplotlib ipython streampipes"
- ],
- "metadata": {
- "collapsed": false
- }
+ ]
},
{
"cell_type": "code",
@@ -243,13 +243,14 @@
"cell_type": "markdown",
"metadata": {},
"source": [
- "Next we can create a StreamPipesFunction. For this we need to implement the 4 following methods:\n",
- "- In `requiredStreamIds` you need to insert the `element_id` of the required streams.\n",
+ "Next we can create a StreamPipesFunction. For this we need to implement the 3 following methods:\n",
"- `onServiceStarted` is called when the function gets started. There you can use the given meta information of the `FunctionContext` to initialize the function.\n",
"- `onEvent` is called when ever a new event arrives. The `event` contains the live data and you can use the `streamId` to identify a stream if the function is connected to multiple data streams.\n",
"- `onServiceStopped` is called when the function gets stopped.\n",
"\n",
- "For this tutorial we just create a function that saves every new event in a `pandas DataFrame` and plots the first column of the `DataFrame` when the function gets stopped."
+ "For this tutorial we just create a function that saves every new event in a `pandas DataFrame` and plots the first column of the `DataFrame` when the function gets stopped. \n",
+ " \n",
+ "(If you want to use the same structure like in Java you can overwrite the `getFunctionId` and `requiredStreamIds` methods instead of using the `FunctionDefinition`)"
]
},
{
@@ -258,7 +259,7 @@
"metadata": {},
"outputs": [],
"source": [
- "from typing import List, Dict, Any\n",
+ "from typing import Dict, Any\n",
"import pandas as pd\n",
"from datetime import datetime\n",
"import matplotlib.pyplot as plt\n",
@@ -266,17 +267,14 @@
"from streampipes.functions.registration import Registration\n",
"from streampipes.functions.streampipes_function import StreamPipesFunction\n",
"from streampipes.functions.utils.function_context import FunctionContext\n",
- "\n",
+ "from streampipes.model.resource.function_definition import FunctionDefinition, FunctionId\n",
"\n",
"class ExampleFunction(StreamPipesFunction):\n",
- " def __init__(self) -> None:\n",
- " super().__init__()\n",
+ " def __init__(self, function_definition: FunctionDefinition) -> None:\n",
+ " super().__init__(function_definition)\n",
" # Create the Dataframe to save the live data\n",
" self.df = pd.DataFrame()\n",
"\n",
- " def requiredStreamIds(self) -> List[str]:\n",
- " return [\"urn:streampipes.apache.org:eventstream:uPDKLI\"]\n",
- "\n",
" def onServiceStarted(self, context: FunctionContext):\n",
" # Get the name of the timestamp field\n",
" for event_property in context.schema[context.streams[0]].event_schema.event_properties:\n",
@@ -305,7 +303,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
- "Now we can start the function. We have to register an instance of the `StreamPipesFunction` and them we can start the functions by initializing the `FunctionHandler`. (it's also possible to register multiple functions with `.register(...).register(...)`)"
+ "Now we can start the function. First we create an instance of the `ExampleFunction` and insert the `element_id` of the stream which data we want to consume. Then we have to register this function and we can start all functions by initializing the `FunctionHandler`. (it's also possible to register multiple functions with `.register(...).register(...)`)"
]
},
{
@@ -323,7 +321,12 @@
}
],
"source": [
- "example_function = ExampleFunction()\n",
+ "example_function = ExampleFunction(\n",
+ " FunctionDefinition(\n",
+ " function_id=FunctionId(id=\"example-function\"),\n",
+ " consumed_streams=[\"urn:streampipes.apache.org:eventstream:uPDKLI\"]\n",
+ " )\n",
+ ")\n",
"\n",
"registration = Registration()\n",
"registration.register(example_function)\n",
@@ -591,12 +594,12 @@
},
{
"cell_type": "markdown",
- "source": [
- "Want to see more exciting use cases you can achieve with StreamPipes functions in Python? Then don't hesitate and jump to our [next tutorial](../4-using-online-machine-learning-on-a-streampipes-data-stream) on applying online machine learning algorithms to StreamPipes data streams with [River](https://riverml.xyz)."
- ],
"metadata": {
"collapsed": false
- }
+ },
+ "source": [
+ "Want to see more exciting use cases you can achieve with StreamPipes functions in Python? Then don't hesitate and jump to our [next tutorial](../4-using-online-machine-learning-on-a-streampipes-data-stream) on applying online machine learning algorithms to StreamPipes data streams with [River](https://riverml.xyz)."
+ ]
},
{
"attachments": {},
diff --git a/streampipes-client-python/streampipes/function_zoo/river_function.py b/streampipes-client-python/streampipes/function_zoo/river_function.py
index b43b40d0c..dca6f3705 100644
--- a/streampipes-client-python/streampipes/function_zoo/river_function.py
+++ b/streampipes-client-python/streampipes/function_zoo/river_function.py
@@ -38,8 +38,6 @@ class RiverFunction(StreamPipesFunction):
----------
function_definition: FunctionDefinition
The function definition which contains the output stream.
- stream_ids: List[str]
- The ids of the data stream to train the model.
model: Any
The model to train. It meant to be a River model/pipeline,
but can be every model with a 'learn_one' and 'predict_one' method.
@@ -58,7 +56,6 @@ class RiverFunction(StreamPipesFunction):
def __init__(
self,
function_definition: FunctionDefinition,
- stream_ids: List[str],
model: Any,
supervised: bool,
target_label: Optional[str],
@@ -67,7 +64,6 @@ class RiverFunction(StreamPipesFunction):
on_stop: Callable[[Any], None],
) -> None:
super().__init__(function_definition)
- self.stream_ids = stream_ids
self.model = model
self.supervised = supervised
self.target_label = target_label
@@ -77,17 +73,6 @@ class RiverFunction(StreamPipesFunction):
self.learning = True
- def requiredStreamIds(self) -> List[str]:
- """Returns the stream ids required by this function.
-
- Returns
- -------
- stream_ids: List[str]
- List of stream ids required by the function
-
- """
- return self.stream_ids
-
def onServiceStarted(self, context: FunctionContext):
"""Executes the `on_start` method of the function.
@@ -188,9 +173,9 @@ class OnlineML:
if target_label is None:
raise ValueError("You must define a target attribute for a supervised model.")
output_stream = create_data_stream("prediction", attributes)
- function_definition = FunctionDefinition().add_output_data_stream(output_stream)
+ function_definition = FunctionDefinition(consumed_streams=stream_ids).add_output_data_stream(output_stream)
self.sp_function = RiverFunction(
- function_definition, stream_ids, model, supervised, target_label, on_start, on_event, on_stop
+ function_definition, model, supervised, target_label, on_start, on_event, on_stop
)
def start(self):
diff --git a/streampipes-client-python/streampipes/functions/streampipes_function.py b/streampipes-client-python/streampipes/functions/streampipes_function.py
index bfb267c4a..6e93f1d2f 100644
--- a/streampipes-client-python/streampipes/functions/streampipes_function.py
+++ b/streampipes-client-python/streampipes/functions/streampipes_function.py
@@ -83,7 +83,6 @@ class StreamPipesFunction(ABC):
collector.disconnect()
self.onServiceStopped()
- @abstractmethod
def requiredStreamIds(self) -> List[str]:
"""Get the ids of the streams needed by the function.
@@ -92,7 +91,7 @@ class StreamPipesFunction(ABC):
stream_ids: List[str]
List of the stream ids
"""
- raise NotImplementedError # pragma: no cover
+ return self.function_definition.consumed_streams
@abstractmethod
def onServiceStarted(self, context: FunctionContext) -> None:
diff --git a/streampipes-client-python/tests/functions/test_function_handler.py b/streampipes-client-python/tests/functions/test_function_handler.py
index 6b544f656..acbb791e2 100644
--- a/streampipes-client-python/tests/functions/test_function_handler.py
+++ b/streampipes-client-python/tests/functions/test_function_handler.py
@@ -34,9 +34,6 @@ from streampipes.model.resource.function_definition import FunctionDefinition
class TestFunction(StreamPipesFunction):
- def requiredStreamIds(self) -> List[str]:
- return ["urn:streampipes.apache.org:eventstream:uPDKLI"]
-
def onServiceStarted(self, context: FunctionContext):
self.context = context
self.data: List[Dict[str, Any]] = []
@@ -49,9 +46,6 @@ class TestFunction(StreamPipesFunction):
class TestFunctionTwoStreams(StreamPipesFunction):
- def requiredStreamIds(self) -> List[str]:
- return ["urn:streampipes.apache.org:eventstream:uPDKLI", "urn:streampipes.apache.org:eventstream:HHoidJ"]
-
def onServiceStarted(self, context: FunctionContext):
self.context = context
self.data1: List[Dict[str, Any]] = []
@@ -68,9 +62,6 @@ class TestFunctionTwoStreams(StreamPipesFunction):
class TestFunctionOutput(StreamPipesFunction):
- def requiredStreamIds(self) -> List[str]:
- return ["urn:streampipes.apache.org:eventstream:uPDKLI"]
-
def onServiceStarted(self, context: FunctionContext):
self.context = context
self.i = 0
@@ -248,7 +239,9 @@ class TestFunctionHandler(TestCase):
)
registration = Registration()
- test_function = TestFunction()
+ test_function = TestFunction(
+ FunctionDefinition(consumed_streams=["urn:streampipes.apache.org:eventstream:uPDKLI"])
+ )
registration.register(test_function)
function_handler = FunctionHandler(registration, client)
function_handler.initializeFunctions()
@@ -296,8 +289,17 @@ class TestFunctionHandler(TestCase):
)
registration = Registration()
- test_function1 = TestFunction()
- test_function2 = TestFunctionTwoStreams()
+ test_function1 = TestFunction(
+ FunctionDefinition(consumed_streams=["urn:streampipes.apache.org:eventstream:uPDKLI"])
+ )
+ test_function2 = TestFunctionTwoStreams(
+ FunctionDefinition(
+ consumed_streams=[
+ "urn:streampipes.apache.org:eventstream:uPDKLI",
+ "urn:streampipes.apache.org:eventstream:HHoidJ",
+ ]
+ )
+ )
registration.register(test_function1).register(test_function2)
function_handler = FunctionHandler(registration, client)
function_handler.initializeFunctions()
@@ -355,7 +357,9 @@ class TestFunctionHandler(TestCase):
output_stream = create_data_stream("test", attributes={"number": RuntimeType.INTEGER.value})
test_function = TestFunctionOutput(
- function_definition=FunctionDefinition().add_output_data_stream(output_stream)
+ function_definition=FunctionDefinition(
+ consumed_streams=["urn:streampipes.apache.org:eventstream:uPDKLI"]
+ ).add_output_data_stream(output_stream)
)
registration = Registration()
registration.register(test_function)