You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2023/01/25 18:24:35 UTC

[streampipes] branch dev updated: refactor: further clean up python data model & introduce function definition (#1147)

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 569421cd8 refactor: further clean up python data model & introduce function definition (#1147)
569421cd8 is described below

commit 569421cd89e96342c8ab4486910e2cd5e57114ac
Author: Tim <50...@users.noreply.github.com>
AuthorDate: Wed Jan 25 19:24:29 2023 +0100

    refactor: further clean up python data model & introduce function definition (#1147)
    
    * simplify import of broker classes
    
    Signed-off-by: bossenti <bo...@posteo.de>
    
    * change inheritance of Resource class
    
    Signed-off-by: bossenti <bo...@posteo.de>
    
    * extend resource definition by dict conversion method
    
    Signed-off-by: bossenti <bo...@posteo.de>
    
    * introduce FunctionDefinition resource
    
    Signed-off-by: bossenti <bo...@posteo.de>
    
    Signed-off-by: bossenti <bo...@posteo.de>
    Co-authored-by: Philipp Zehnder <te...@users.noreply.github.com>
---
 .../functions/broker/__init__.py                   |  8 +++
 .../functions/function_handler.py                  |  7 +--
 .../functions/streampipes_function.py              | 25 +++++---
 .../streampipes_client/model/common.py             | 33 ++++++----
 .../model/container/resource_container.py          | 12 ++--
 .../streampipes_client/model/resource/__init__.py  |  2 +
 .../model/resource/data_lake_measure.py            |  1 +
 .../model/resource/data_stream.py                  |  1 +
 .../model/resource/function_definition.py          | 71 ++++++++++++++++++++++
 .../streampipes_client/model/resource/resource.py  | 22 ++++++-
 .../tests/client/test_endpoint.py                  |  8 ++-
 .../tests/functions/test_function_handler.py       |  8 +--
 12 files changed, 159 insertions(+), 39 deletions(-)

diff --git a/streampipes-client-python/streampipes_client/functions/broker/__init__.py b/streampipes-client-python/streampipes_client/functions/broker/__init__.py
index cce3acad3..60d846168 100644
--- a/streampipes-client-python/streampipes_client/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes_client/functions/broker/__init__.py
@@ -14,3 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from .broker import Broker, SupportedBroker
+from .nats_broker import NatsBroker
+
+__all__ = [
+    "Broker",
+    "NatsBroker",
+    "SupportedBroker",
+]
diff --git a/streampipes-client-python/streampipes_client/functions/function_handler.py b/streampipes-client-python/streampipes_client/functions/function_handler.py
index a0e6bfe6e..993fe3a56 100644
--- a/streampipes-client-python/streampipes_client/functions/function_handler.py
+++ b/streampipes-client-python/streampipes_client/functions/function_handler.py
@@ -20,8 +20,7 @@ import logging
 from typing import AsyncIterator, Dict, List
 
 from streampipes_client.client.client import StreamPipesClient
-from streampipes_client.functions.broker.broker import Broker, SupportedBroker
-from streampipes_client.functions.broker.nats_broker import NatsBroker
+from streampipes_client.functions.broker import Broker, NatsBroker, SupportedBroker
 from streampipes_client.functions.registration import Registration
 from streampipes_client.functions.utils.async_iter_handler import AsyncIterHandler
 from streampipes_client.functions.utils.data_stream_context import DataStreamContext
@@ -126,7 +125,7 @@ class FunctionHandler:
             messages[stream_id] = broker.get_message()
             # Generate the function context
             for streampipes_function in self.stream_contexts[stream_id].functions:
-                function_id = streampipes_function.getFunctionId()[0]
+                function_id = streampipes_function.getFunctionId().id
                 if function_id in contexts.keys():
                     contexts[function_id].add_data_stream_schema(stream_id, data_stream)
                 else:
@@ -138,7 +137,7 @@ class FunctionHandler:
                     )
         # Start the functions
         for streampipes_function in self.registration.getFunctions():
-            streampipes_function.onServiceStarted(contexts[streampipes_function.getFunctionId()[0]])
+            streampipes_function.onServiceStarted(contexts[streampipes_function.getFunctionId().id])
 
         # Get the messages continuously and send them to the functions
         async for stream_id, msg in AsyncIterHandler.combine_async_messages(messages):
diff --git a/streampipes-client-python/streampipes_client/functions/streampipes_function.py b/streampipes-client-python/streampipes_client/functions/streampipes_function.py
index e19e28d42..248cf8220 100644
--- a/streampipes-client-python/streampipes_client/functions/streampipes_function.py
+++ b/streampipes-client-python/streampipes_client/functions/streampipes_function.py
@@ -15,27 +15,36 @@
 # limitations under the License.
 #
 from abc import ABC, abstractmethod
-from typing import Any, Dict, List, Tuple
+from typing import Any, Dict, List, Optional
 
 from streampipes_client.functions.utils.function_context import FunctionContext
+from streampipes_client.model.resource import FunctionDefinition
 
 
 class StreamPipesFunction(ABC):
     """Abstract implementation of a StreamPipesFunction.
     A StreamPipesFunction allows users to get the data of a StreamPipes data streams easily.
-    It makes it possible to work with the live data in python and enabels to use the powerful
-    data analytics libaries there.
+    It makes it possible to work with the live data in python and enables to use the powerful
+    data analytics libraries there.
+
+    Parameters
+    ----------
+    function_definition: FunctionDefinition
+        the definition of the function that contains metadata about the connected function
     """
 
-    @abstractmethod
-    def getFunctionId(self) -> Tuple[str, int]:
-        """Get the id of the function.
+    def __init__(self, function_definition: Optional[FunctionDefinition] = None):
+        self.function_definition = function_definition or FunctionDefinition()
+
+    def getFunctionId(self) -> FunctionDefinition.FunctionId:
+        """Returns the id of the function.
 
         Returns
         -------
-        Tuple of the function id und version number
+        FunctionId: FunctionDefinition.FunctionId
+            Identification object of the StreamPipes function
         """
-        raise NotImplementedError
+        return self.function_definition.function_id
 
     @abstractmethod
     def requiredStreamIds(self) -> List[str]:
diff --git a/streampipes-client-python/streampipes_client/model/common.py b/streampipes-client-python/streampipes_client/model/common.py
index 83f1b00dd..d9f7b243f 100644
--- a/streampipes-client-python/streampipes_client/model/common.py
+++ b/streampipes-client-python/streampipes_client/model/common.py
@@ -55,30 +55,33 @@ class BaseElement(BasicModel):
     element_id: Optional[StrictStr]
 
 
-class EventPropertyQualityRequirement(BaseElement):
+class EventPropertyQualityRequirement(BasicModel):
     """
     Data model of an `EventPropertyQualityRequirement` in compliance to the StreamPipes Backend.
     """
 
+    element_id: Optional[StrictStr]
     minimum_property_quality: Optional[BaseElement] = Field(alias="eventPropertyQualityDefinition")
     maximum_property_quality: Optional[BaseElement] = Field(alias="eventPropertyQualityDefinition")
 
 
-class ValueSpecification(BaseElement):
+class ValueSpecification(BasicModel):
     """
     Data model of an `ValueSpecification` in compliance to the StreamPipes Backend.
     """
 
+    element_id: Optional[StrictStr]
     min_value: Optional[int]
     max_value: Optional[int]
     step: Optional[float]
 
 
-class EventProperty(BaseElement):
+class EventProperty(BasicModel):
     """
     Data model of an `EventProperty` in compliance to the StreamPipes Backend.
     """
 
+    element_id: Optional[StrictStr]
     label: Optional[StrictStr]
     description: Optional[StrictStr]
     runtime_name: StrictStr
@@ -94,19 +97,21 @@ class EventProperty(BaseElement):
     value_specification: Optional[ValueSpecification]
 
 
-class EventSchema(BaseElement):
+class EventSchema(BasicModel):
     """
     Data model of an `EventSchema` in compliance to the StreamPipes Backend.
     """
 
+    element_id: Optional[StrictStr]
     event_properties: List[EventProperty]
 
 
-class ApplicationLink(BaseElement):
+class ApplicationLink(BasicModel):
     """
     Data model of an `ApplicationLink` in compliance to the StreamPipes Backend.
     """
 
+    element_id: Optional[StrictStr]
     application_name: Optional[StrictStr]
     application_description: Optional[StrictStr]
     application_url: Optional[StrictStr]
@@ -114,52 +119,58 @@ class ApplicationLink(BaseElement):
     application_link_type: Optional[StrictStr]
 
 
-class TopicDefinition(BaseElement):
+class TopicDefinition(BasicModel):
     """
     Data model of a `TopicDefinition` in compliance to the StreamPipes Backend.
     """
 
     actual_topic_name: StrictStr
+    element_id: Optional[StrictStr]
 
 
-class TransportProtocol(BaseElement):
+class TransportProtocol(BasicModel):
     """
     Data model of a `TransportProtocol` in compliance to the StreamPipes Backend.
     """
 
     broker_hostname: StrictStr
+    element_id: Optional[StrictStr]
     topic_definition: TopicDefinition
     port: StrictInt
 
 
-class TransportFormat(BaseElement):
+class TransportFormat(BasicModel):
     """
     Data model of a `TransportFormat` in compliance to the StreamPipes Backend.
     """
 
+    element_id: Optional[StrictStr]
     rdf_type: Optional[List[Optional[StrictStr]]]
 
 
-class EventGrounding(BaseElement):
+class EventGrounding(BasicModel):
     """
     Data model of an `EventGrounding` in compliance to the StreamPipes Backend.
     """
 
+    element_id: Optional[StrictStr]
     transport_protocols: List[TransportProtocol]
     transport_formats: Optional[List[Optional[TransportFormat]]]
 
 
-class MeasurementCapability(BaseElement):
+class MeasurementCapability(BasicModel):
     """
     Data model of a `MeasurementCapability` in compliance to the StreamPipes Backend.
     """
 
     capability: Optional[StrictStr]
+    element_id: Optional[StrictStr]
 
 
-class MeasurementObject(BaseElement):
+class MeasurementObject(BasicModel):
     """
     Data model of a `MeasurementObject` in compliance to the StreamPipes Backend.
     """
 
+    element_id: Optional[StrictStr]
     measures_object: Optional[StrictStr]
diff --git a/streampipes-client-python/streampipes_client/model/container/resource_container.py b/streampipes-client-python/streampipes_client/model/container/resource_container.py
index 62e990ba9..7173be366 100644
--- a/streampipes-client-python/streampipes_client/model/container/resource_container.py
+++ b/streampipes-client-python/streampipes_client/model/container/resource_container.py
@@ -198,16 +198,20 @@ class ResourceContainer(ABC):
 
         Returns
         -------
-        List[Dict]]
+        dictionary_list: List[Dict[str, Any]]
+            List of resources in dictionary representation.
+            If `use_source_names` equals `True` the keys are named as in the StreamPipes backend.
         """
-        return [resource.dict(by_alias=use_source_names) for resource in self._resources]
+        return [resource.to_dict(use_source_names=use_source_names) for resource in self._resources]
 
     def to_json(self) -> str:
         """Returns the resource container in the StreamPipes JSON representation.
 
         Returns
         -------
-        JSON string
+        JSON string: str
+            JSON representation of the resource container where key names are equal to
+            keys used in the StreamPipes backend
         """
 
         return json.dumps(self.to_dicts(use_source_names=True))
@@ -217,7 +221,7 @@ class ResourceContainer(ABC):
 
         Returns
         -------
-        pd.DataFrame
+        resource_container_df: pd.DataFrame
         """
         return pd.DataFrame.from_records(
             # ResourceContainer is iterable itself via __get_item__
diff --git a/streampipes-client-python/streampipes_client/model/resource/__init__.py b/streampipes-client-python/streampipes_client/model/resource/__init__.py
index 3aa99e02d..c6f0cddba 100644
--- a/streampipes-client-python/streampipes_client/model/resource/__init__.py
+++ b/streampipes-client-python/streampipes_client/model/resource/__init__.py
@@ -18,9 +18,11 @@
 from .data_lake_measure import DataLakeMeasure
 from .data_lake_series import DataLakeSeries
 from .data_stream import DataStream
+from .function_definition import FunctionDefinition
 
 __all__ = [
     "DataLakeMeasure",
     "DataLakeSeries",
     "DataStream",
+    "FunctionDefinition",
 ]
diff --git a/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py b/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
index 9d2b73b27..3526a22ea 100644
--- a/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
+++ b/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
@@ -45,6 +45,7 @@ class DataLakeMeasure(Resource):
             "num_event_properties": len(self.event_schema.event_properties),
         }
 
+    element_id: Optional[StrictStr]
     measure_name: StrictStr
     timestamp_field: StrictStr
     event_schema: Optional[EventSchema]
diff --git a/streampipes-client-python/streampipes_client/model/resource/data_stream.py b/streampipes-client-python/streampipes_client/model/resource/data_stream.py
index dc6f565ff..2f7a3051d 100644
--- a/streampipes-client-python/streampipes_client/model/resource/data_stream.py
+++ b/streampipes-client-python/streampipes_client/model/resource/data_stream.py
@@ -71,6 +71,7 @@ class DataStream(Resource):
             "num_included_locales": len(self.included_locales) if self.included_locales is not None else 0,
         }
 
+    element_id: Optional[StrictStr]
     name: Optional[StrictStr]
     description: Optional[StrictStr]
     icon_url: Optional[StrictStr]
diff --git a/streampipes-client-python/streampipes_client/model/resource/function_definition.py b/streampipes-client-python/streampipes_client/model/resource/function_definition.py
new file mode 100644
index 000000000..0e0e8d2af
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/model/resource/function_definition.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = [
+    "FunctionDefinition",
+]
+
+from typing import Dict, List
+from uuid import uuid4
+
+from pydantic import Field, StrictInt, StrictStr
+from streampipes_client.model.common import BasicModel
+from streampipes_client.model.resource.resource import Resource
+
+
+class FunctionDefinition(Resource):
+    """Configuration for a StreamPipes Function.
+
+    This class maps to the `FunctionDefinition` class in the StreamPipes model.
+    It contains all metadata that are required to register a function at the StreamPipes backend.
+
+    Parameters
+    ----------
+    function_id: FunctionId
+        identifier object of a StreamPipes function
+    consumed_streams: List[str]
+        list of data streams the function is consuming from
+    """
+
+    def convert_to_pandas_representation(self) -> Dict:
+        """Returns the dictionary representation of a function definition
+        to be used when creating a pandas Dataframe.
+        """
+
+        return self.to_dict(use_source_names=False)
+
+    class FunctionId(BasicModel):
+        """Identification object for a StreamPipes function.
+
+        Maps to the `FunctionId` class defined in the StreamPipes model.
+
+        Parameters
+        ----------
+        id: str
+            unique identifier of the function instance
+        version: int
+            version of the corresponding function
+        """
+
+        id: StrictStr = Field(default_factory=lambda: str(uuid4()))
+        version: StrictInt = Field(default=1)
+
+        def __hash__(self):
+            return hash((self.id, self.version))
+
+    function_id: FunctionId = Field(default_factory=FunctionId)
+    consumed_streams: List[str] = Field(default_factory=list)
diff --git a/streampipes-client-python/streampipes_client/model/resource/resource.py b/streampipes-client-python/streampipes_client/model/resource/resource.py
index e52a01609..1fcf7d2b6 100644
--- a/streampipes-client-python/streampipes_client/model/resource/resource.py
+++ b/streampipes-client-python/streampipes_client/model/resource/resource.py
@@ -22,15 +22,16 @@ A resource defines the data model that is used by a resource container (`model.c
 from abc import ABC, abstractmethod
 from typing import Dict
 
-from streampipes_client.model.common import BaseElement
+from streampipes_client.model.common import BasicModel
 
 __all__ = [
     "Resource",
 ]
 
 
-class Resource(ABC, BaseElement):
+class Resource(ABC, BasicModel):
     """General and abstract implementation for a resource.
+
     A resource defines the data model used by a resource container (`model.container.resourceContainer`).
     It inherits from Pydantic's BaseModel to get all its superpowers,
     which are used to parse, validate the API response and to easily switch between
@@ -41,3 +42,20 @@ class Resource(ABC, BaseElement):
     def convert_to_pandas_representation(self) -> Dict:
         """Returns a dictionary representation to be used when creating a pandas Dataframe."""
         raise NotImplementedError  # pragma: no cover
+
+    def to_dict(self, use_source_names=True):
+        """Returns the resource in dictionary representation.
+
+        Parameters
+        ----------
+        use_source_names: bool
+            Indicates if the dictionary keys are in python representation or
+            equally named to the StreamPipes backend
+
+        Returns
+        ------
+        resource: Dict[str, Any]
+            The resource as dictionary representation
+
+        """
+        return self.dict(by_alias=use_source_names)
diff --git a/streampipes-client-python/tests/client/test_endpoint.py b/streampipes-client-python/tests/client/test_endpoint.py
index 3050687c1..cdb68fb8e 100644
--- a/streampipes-client-python/tests/client/test_endpoint.py
+++ b/streampipes-client-python/tests/client/test_endpoint.py
@@ -114,12 +114,12 @@ class TestStreamPipesEndpoints(TestCase):
                     "elementId": "urn:streampipes.apache.org:spi:eventgrounding:TwGIQA",
                     "transportProtocols": [
                         {
-                            "elementId": "urn:streampipes.apache.org:spi:natstransportprotocol:VJkHmZ",
                             "brokerHostname": "nats",
+                            "elementId": "urn:streampipes.apache.org:spi:natstransportprotocol:VJkHmZ",
                             "topicDefinition": {
-                                "elementId": "urn:streampipes.apache.org:spi:simpletopicdefinition:QzCiFI",
                                 "actualTopicName": "org.apache.streampipes.connect."
-                                "fc22b8f6-698a-4127-aa71-e11854dc57c5",
+                                "fc22b8f6-698a-4127-aa71-e11854dc57c5tr",
+                                "elementId": "urn:streampipes.apache.org:spi:simpletopicdefinition:QzCiFI",
                             },
                             "port": 4222,
                         }
@@ -236,6 +236,8 @@ class TestStreamPipesEndpoints(TestCase):
         result = client.dataStreamApi.all()
         result_pd = result.to_pandas()
 
+        self.maxDiff = None
+
         self.assertEqual(
             1,
             len(result),
diff --git a/streampipes-client-python/tests/functions/test_function_handler.py b/streampipes-client-python/tests/functions/test_function_handler.py
index c93963d76..3853deb95 100644
--- a/streampipes-client-python/tests/functions/test_function_handler.py
+++ b/streampipes-client-python/tests/functions/test_function_handler.py
@@ -29,9 +29,6 @@ from streampipes_client.model.resource.data_stream import DataStream
 
 
 class TestFunction(StreamPipesFunction):
-    def getFunctionId(self) -> Tuple[str, int]:
-        return ("org.test.TestFunction", 1)
-
     def requiredStreamIds(self) -> List[str]:
         return ["urn:streampipes.apache.org:eventstream:uPDKLI"]
 
@@ -47,9 +44,6 @@ class TestFunction(StreamPipesFunction):
 
 
 class TestFunctionTwoStreams(StreamPipesFunction):
-    def getFunctionId(self) -> Tuple[str, int]:
-        return ("org.test.TestFunction2", 1)
-
     def requiredStreamIds(self) -> List[str]:
         return ["urn:streampipes.apache.org:eventstream:uPDKLI", "urn:streampipes.apache.org:eventstream:HHoidJ"]
 
@@ -242,7 +236,7 @@ class TestFunctionHandler(TestCase):
             test_function.context.schema, {self.data_stream["elementId"]: DataStream(**self.data_stream)}
         )
         self.assertListEqual(test_function.context.streams, test_function.requiredStreamIds())
-        self.assertEqual(test_function.context.function_id, test_function.getFunctionId()[0])
+        self.assertEqual(test_function.context.function_id, test_function.getFunctionId().id)
 
         self.assertListEqual(test_function.data, self.test_stream_data1)
         self.assertTrue(test_function.stopped)