You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2023/01/23 18:48:55 UTC

[streampipes] 04/04: introduce FunctionDefinition resource

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/introduce-function-definition
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit bcce069b891788a389a6ea2cdbef8fb002d30cf6
Author: bossenti <bo...@posteo.de>
AuthorDate: Sun Jan 22 15:53:35 2023 +0100

    introduce FunctionDefinition resource
    
    Signed-off-by: bossenti <bo...@posteo.de>
---
 .../functions/broker/__init__.py                   |  2 +-
 .../functions/function_handler.py                  |  4 +-
 .../model/container/resource_container.py          | 20 ------
 .../streampipes_client/model/resource/__init__.py  |  2 +
 .../model/resource/function_definition.py          | 71 ++++++++++++++++++++++
 .../tests/client/test_endpoint.py                  |  3 +-
 .../tests/functions/test_function_handler.py       |  8 +--
 7 files changed, 79 insertions(+), 31 deletions(-)

diff --git a/streampipes-client-python/streampipes_client/functions/broker/__init__.py b/streampipes-client-python/streampipes_client/functions/broker/__init__.py
index e54049050..60d846168 100644
--- a/streampipes-client-python/streampipes_client/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes_client/functions/broker/__init__.py
@@ -21,4 +21,4 @@ __all__ = [
     "Broker",
     "NatsBroker",
     "SupportedBroker",
-]
\ No newline at end of file
+]
diff --git a/streampipes-client-python/streampipes_client/functions/function_handler.py b/streampipes-client-python/streampipes_client/functions/function_handler.py
index 5097468cb..993fe3a56 100644
--- a/streampipes-client-python/streampipes_client/functions/function_handler.py
+++ b/streampipes-client-python/streampipes_client/functions/function_handler.py
@@ -125,7 +125,7 @@ class FunctionHandler:
             messages[stream_id] = broker.get_message()
             # Generate the function context
             for streampipes_function in self.stream_contexts[stream_id].functions:
-                function_id = streampipes_function.getFunctionId()[0]
+                function_id = streampipes_function.getFunctionId().id
                 if function_id in contexts.keys():
                     contexts[function_id].add_data_stream_schema(stream_id, data_stream)
                 else:
@@ -137,7 +137,7 @@ class FunctionHandler:
                     )
         # Start the functions
         for streampipes_function in self.registration.getFunctions():
-            streampipes_function.onServiceStarted(contexts[streampipes_function.getFunctionId()[0]])
+            streampipes_function.onServiceStarted(contexts[streampipes_function.getFunctionId().id])
 
         # Get the messages continuously and send them to the functions
         async for stream_id, msg in AsyncIterHandler.combine_async_messages(messages):
diff --git a/streampipes-client-python/streampipes_client/model/container/resource_container.py b/streampipes-client-python/streampipes_client/model/container/resource_container.py
index f33d5717d..7173be366 100644
--- a/streampipes-client-python/streampipes_client/model/container/resource_container.py
+++ b/streampipes-client-python/streampipes_client/model/container/resource_container.py
@@ -32,7 +32,6 @@ import pandas as pd
 from pydantic import ValidationError
 
 __all__ = [
-    "PandasCompatibleResourceContainer",
     "ResourceContainer",
 ]
 
@@ -217,25 +216,6 @@ class ResourceContainer(ABC):
 
         return json.dumps(self.to_dicts(use_source_names=True))
 
-
-class PandasCompatibleResourceContainer(ResourceContainer, ABC):
-    """Resource Container that can be converted to a pandas data frame.
-
-    This type of resource containers provides a `to_pandas()` method that
-    returns the resource container as a pandas data frame.
-    """
-
-    @classmethod
-    @abstractmethod
-    def _resource_cls(cls) -> Type[Resource]:
-        """Returns the class of the resource that are bundled.
-
-        Returns
-        -------
-        model.resource.Resource
-        """
-        raise NotImplementedError  # pragma: no cover
-
     def to_pandas(self) -> pd.DataFrame:
         """Returns the resource container in representation of a Pandas Dataframe.
 
diff --git a/streampipes-client-python/streampipes_client/model/resource/__init__.py b/streampipes-client-python/streampipes_client/model/resource/__init__.py
index 3aa99e02d..c6f0cddba 100644
--- a/streampipes-client-python/streampipes_client/model/resource/__init__.py
+++ b/streampipes-client-python/streampipes_client/model/resource/__init__.py
@@ -18,9 +18,11 @@
 from .data_lake_measure import DataLakeMeasure
 from .data_lake_series import DataLakeSeries
 from .data_stream import DataStream
+from .function_definition import FunctionDefinition
 
 __all__ = [
     "DataLakeMeasure",
     "DataLakeSeries",
     "DataStream",
+    "FunctionDefinition",
 ]
diff --git a/streampipes-client-python/streampipes_client/model/resource/function_definition.py b/streampipes-client-python/streampipes_client/model/resource/function_definition.py
new file mode 100644
index 000000000..0e0e8d2af
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/model/resource/function_definition.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = [
+    "FunctionDefinition",
+]
+
+from typing import Dict, List
+from uuid import uuid4
+
+from pydantic import Field, StrictInt, StrictStr
+from streampipes_client.model.common import BasicModel
+from streampipes_client.model.resource.resource import Resource
+
+
+class FunctionDefinition(Resource):
+    """Configuration for a StreamPipes Function.
+
+    This class maps to the `FunctionDefinition` class in the StreamPipes model.
+    It contains all metadata that are required to register a function at the StreamPipes backend.
+
+    Parameters
+    ----------
+    function_id: FunctionId
+        identifier object of a StreamPipes function
+    consumed_streams: List[str]
+        list of data streams the function is consuming from
+    """
+
+    def convert_to_pandas_representation(self) -> Dict:
+        """Returns the dictionary representation of a function definition
+        to be used when creating a pandas Dataframe.
+        """
+
+        return self.to_dict(use_source_names=False)
+
+    class FunctionId(BasicModel):
+        """Identification object for a StreamPipes function.
+
+        Maps to the `FunctionId` class defined in the StreamPipes model.
+
+        Parameters
+        ----------
+        id: str
+            unique identifier of the function instance
+        version: int
+            version of the corresponding function
+        """
+
+        id: StrictStr = Field(default_factory=lambda: str(uuid4()))
+        version: StrictInt = Field(default=1)
+
+        def __hash__(self):
+            return hash((self.id, self.version))
+
+    function_id: FunctionId = Field(default_factory=FunctionId)
+    consumed_streams: List[str] = Field(default_factory=list)
diff --git a/streampipes-client-python/tests/client/test_endpoint.py b/streampipes-client-python/tests/client/test_endpoint.py
index a74ba6c0e..cdb68fb8e 100644
--- a/streampipes-client-python/tests/client/test_endpoint.py
+++ b/streampipes-client-python/tests/client/test_endpoint.py
@@ -117,7 +117,8 @@ class TestStreamPipesEndpoints(TestCase):
                             "brokerHostname": "nats",
                             "elementId": "urn:streampipes.apache.org:spi:natstransportprotocol:VJkHmZ",
                             "topicDefinition": {
-                                "actualTopicName": "org.apache.streampipes.connect.fc22b8f6-698a-4127-aa71-e11854dc57c5",
+                                "actualTopicName": "org.apache.streampipes.connect."
+                                "fc22b8f6-698a-4127-aa71-e11854dc57c5tr",
                                 "elementId": "urn:streampipes.apache.org:spi:simpletopicdefinition:QzCiFI",
                             },
                             "port": 4222,
diff --git a/streampipes-client-python/tests/functions/test_function_handler.py b/streampipes-client-python/tests/functions/test_function_handler.py
index c93963d76..3853deb95 100644
--- a/streampipes-client-python/tests/functions/test_function_handler.py
+++ b/streampipes-client-python/tests/functions/test_function_handler.py
@@ -29,9 +29,6 @@ from streampipes_client.model.resource.data_stream import DataStream
 
 
 class TestFunction(StreamPipesFunction):
-    def getFunctionId(self) -> Tuple[str, int]:
-        return ("org.test.TestFunction", 1)
-
     def requiredStreamIds(self) -> List[str]:
         return ["urn:streampipes.apache.org:eventstream:uPDKLI"]
 
@@ -47,9 +44,6 @@ class TestFunction(StreamPipesFunction):
 
 
 class TestFunctionTwoStreams(StreamPipesFunction):
-    def getFunctionId(self) -> Tuple[str, int]:
-        return ("org.test.TestFunction2", 1)
-
     def requiredStreamIds(self) -> List[str]:
         return ["urn:streampipes.apache.org:eventstream:uPDKLI", "urn:streampipes.apache.org:eventstream:HHoidJ"]
 
@@ -242,7 +236,7 @@ class TestFunctionHandler(TestCase):
             test_function.context.schema, {self.data_stream["elementId"]: DataStream(**self.data_stream)}
         )
         self.assertListEqual(test_function.context.streams, test_function.requiredStreamIds())
-        self.assertEqual(test_function.context.function_id, test_function.getFunctionId()[0])
+        self.assertEqual(test_function.context.function_id, test_function.getFunctionId().id)
 
         self.assertListEqual(test_function.data, self.test_stream_data1)
         self.assertTrue(test_function.stopped)