You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2023/01/25 18:24:35 UTC
[streampipes] branch dev updated: refactor: further clean up python data model & introduce function definition (#1147)
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 569421cd8 refactor: further clean up python data model & introduce function definition (#1147)
569421cd8 is described below
commit 569421cd89e96342c8ab4486910e2cd5e57114ac
Author: Tim <50...@users.noreply.github.com>
AuthorDate: Wed Jan 25 19:24:29 2023 +0100
refactor: further clean up python data model & introduce function definition (#1147)
* simplify import of broker classes
Signed-off-by: bossenti <bo...@posteo.de>
* change inheritance of Resource class
Signed-off-by: bossenti <bo...@posteo.de>
* extend resource definition by dict conversion method
Signed-off-by: bossenti <bo...@posteo.de>
* introduce FunctionDefinition resource
Signed-off-by: bossenti <bo...@posteo.de>
Signed-off-by: bossenti <bo...@posteo.de>
Co-authored-by: Philipp Zehnder <te...@users.noreply.github.com>
---
.../functions/broker/__init__.py | 8 +++
.../functions/function_handler.py | 7 +--
.../functions/streampipes_function.py | 25 +++++---
.../streampipes_client/model/common.py | 33 ++++++----
.../model/container/resource_container.py | 12 ++--
.../streampipes_client/model/resource/__init__.py | 2 +
.../model/resource/data_lake_measure.py | 1 +
.../model/resource/data_stream.py | 1 +
.../model/resource/function_definition.py | 71 ++++++++++++++++++++++
.../streampipes_client/model/resource/resource.py | 22 ++++++-
.../tests/client/test_endpoint.py | 8 ++-
.../tests/functions/test_function_handler.py | 8 +--
12 files changed, 159 insertions(+), 39 deletions(-)
diff --git a/streampipes-client-python/streampipes_client/functions/broker/__init__.py b/streampipes-client-python/streampipes_client/functions/broker/__init__.py
index cce3acad3..60d846168 100644
--- a/streampipes-client-python/streampipes_client/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes_client/functions/broker/__init__.py
@@ -14,3 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+from .broker import Broker, SupportedBroker
+from .nats_broker import NatsBroker
+
+__all__ = [
+ "Broker",
+ "NatsBroker",
+ "SupportedBroker",
+]
diff --git a/streampipes-client-python/streampipes_client/functions/function_handler.py b/streampipes-client-python/streampipes_client/functions/function_handler.py
index a0e6bfe6e..993fe3a56 100644
--- a/streampipes-client-python/streampipes_client/functions/function_handler.py
+++ b/streampipes-client-python/streampipes_client/functions/function_handler.py
@@ -20,8 +20,7 @@ import logging
from typing import AsyncIterator, Dict, List
from streampipes_client.client.client import StreamPipesClient
-from streampipes_client.functions.broker.broker import Broker, SupportedBroker
-from streampipes_client.functions.broker.nats_broker import NatsBroker
+from streampipes_client.functions.broker import Broker, NatsBroker, SupportedBroker
from streampipes_client.functions.registration import Registration
from streampipes_client.functions.utils.async_iter_handler import AsyncIterHandler
from streampipes_client.functions.utils.data_stream_context import DataStreamContext
@@ -126,7 +125,7 @@ class FunctionHandler:
messages[stream_id] = broker.get_message()
# Generate the function context
for streampipes_function in self.stream_contexts[stream_id].functions:
- function_id = streampipes_function.getFunctionId()[0]
+ function_id = streampipes_function.getFunctionId().id
if function_id in contexts.keys():
contexts[function_id].add_data_stream_schema(stream_id, data_stream)
else:
@@ -138,7 +137,7 @@ class FunctionHandler:
)
# Start the functions
for streampipes_function in self.registration.getFunctions():
- streampipes_function.onServiceStarted(contexts[streampipes_function.getFunctionId()[0]])
+ streampipes_function.onServiceStarted(contexts[streampipes_function.getFunctionId().id])
# Get the messages continuously and send them to the functions
async for stream_id, msg in AsyncIterHandler.combine_async_messages(messages):
diff --git a/streampipes-client-python/streampipes_client/functions/streampipes_function.py b/streampipes-client-python/streampipes_client/functions/streampipes_function.py
index e19e28d42..248cf8220 100644
--- a/streampipes-client-python/streampipes_client/functions/streampipes_function.py
+++ b/streampipes-client-python/streampipes_client/functions/streampipes_function.py
@@ -15,27 +15,36 @@
# limitations under the License.
#
from abc import ABC, abstractmethod
-from typing import Any, Dict, List, Tuple
+from typing import Any, Dict, List, Optional
from streampipes_client.functions.utils.function_context import FunctionContext
+from streampipes_client.model.resource import FunctionDefinition
class StreamPipesFunction(ABC):
"""Abstract implementation of a StreamPipesFunction.
A StreamPipesFunction allows users to get the data of a StreamPipes data streams easily.
- It makes it possible to work with the live data in python and enabels to use the powerful
- data analytics libaries there.
+ It makes it possible to work with the live data in python and enables to use the powerful
+ data analytics libraries there.
+
+ Parameters
+ ----------
+ function_definition: FunctionDefinition
+ the definition of the function that contains metadata about the connected function
"""
- @abstractmethod
- def getFunctionId(self) -> Tuple[str, int]:
- """Get the id of the function.
+ def __init__(self, function_definition: Optional[FunctionDefinition] = None):
+ self.function_definition = function_definition or FunctionDefinition()
+
+ def getFunctionId(self) -> FunctionDefinition.FunctionId:
+ """Returns the id of the function.
Returns
-------
- Tuple of the function id und version number
+ FunctionId: FunctionDefinition.FunctionId
+ Identification object of the StreamPipes function
"""
- raise NotImplementedError
+ return self.function_definition.function_id
@abstractmethod
def requiredStreamIds(self) -> List[str]:
diff --git a/streampipes-client-python/streampipes_client/model/common.py b/streampipes-client-python/streampipes_client/model/common.py
index 83f1b00dd..d9f7b243f 100644
--- a/streampipes-client-python/streampipes_client/model/common.py
+++ b/streampipes-client-python/streampipes_client/model/common.py
@@ -55,30 +55,33 @@ class BaseElement(BasicModel):
element_id: Optional[StrictStr]
-class EventPropertyQualityRequirement(BaseElement):
+class EventPropertyQualityRequirement(BasicModel):
"""
Data model of an `EventPropertyQualityRequirement` in compliance to the StreamPipes Backend.
"""
+ element_id: Optional[StrictStr]
minimum_property_quality: Optional[BaseElement] = Field(alias="eventPropertyQualityDefinition")
maximum_property_quality: Optional[BaseElement] = Field(alias="eventPropertyQualityDefinition")
-class ValueSpecification(BaseElement):
+class ValueSpecification(BasicModel):
"""
Data model of an `ValueSpecification` in compliance to the StreamPipes Backend.
"""
+ element_id: Optional[StrictStr]
min_value: Optional[int]
max_value: Optional[int]
step: Optional[float]
-class EventProperty(BaseElement):
+class EventProperty(BasicModel):
"""
Data model of an `EventProperty` in compliance to the StreamPipes Backend.
"""
+ element_id: Optional[StrictStr]
label: Optional[StrictStr]
description: Optional[StrictStr]
runtime_name: StrictStr
@@ -94,19 +97,21 @@ class EventProperty(BaseElement):
value_specification: Optional[ValueSpecification]
-class EventSchema(BaseElement):
+class EventSchema(BasicModel):
"""
Data model of an `EventSchema` in compliance to the StreamPipes Backend.
"""
+ element_id: Optional[StrictStr]
event_properties: List[EventProperty]
-class ApplicationLink(BaseElement):
+class ApplicationLink(BasicModel):
"""
Data model of an `ApplicationLink` in compliance to the StreamPipes Backend.
"""
+ element_id: Optional[StrictStr]
application_name: Optional[StrictStr]
application_description: Optional[StrictStr]
application_url: Optional[StrictStr]
@@ -114,52 +119,58 @@ class ApplicationLink(BaseElement):
application_link_type: Optional[StrictStr]
-class TopicDefinition(BaseElement):
+class TopicDefinition(BasicModel):
"""
Data model of a `TopicDefinition` in compliance to the StreamPipes Backend.
"""
actual_topic_name: StrictStr
+ element_id: Optional[StrictStr]
-class TransportProtocol(BaseElement):
+class TransportProtocol(BasicModel):
"""
Data model of a `TransportProtocol` in compliance to the StreamPipes Backend.
"""
broker_hostname: StrictStr
+ element_id: Optional[StrictStr]
topic_definition: TopicDefinition
port: StrictInt
-class TransportFormat(BaseElement):
+class TransportFormat(BasicModel):
"""
Data model of a `TransportFormat` in compliance to the StreamPipes Backend.
"""
+ element_id: Optional[StrictStr]
rdf_type: Optional[List[Optional[StrictStr]]]
-class EventGrounding(BaseElement):
+class EventGrounding(BasicModel):
"""
Data model of an `EventGrounding` in compliance to the StreamPipes Backend.
"""
+ element_id: Optional[StrictStr]
transport_protocols: List[TransportProtocol]
transport_formats: Optional[List[Optional[TransportFormat]]]
-class MeasurementCapability(BaseElement):
+class MeasurementCapability(BasicModel):
"""
Data model of a `MeasurementCapability` in compliance to the StreamPipes Backend.
"""
capability: Optional[StrictStr]
+ element_id: Optional[StrictStr]
-class MeasurementObject(BaseElement):
+class MeasurementObject(BasicModel):
"""
Data model of a `MeasurementObject` in compliance to the StreamPipes Backend.
"""
+ element_id: Optional[StrictStr]
measures_object: Optional[StrictStr]
diff --git a/streampipes-client-python/streampipes_client/model/container/resource_container.py b/streampipes-client-python/streampipes_client/model/container/resource_container.py
index 62e990ba9..7173be366 100644
--- a/streampipes-client-python/streampipes_client/model/container/resource_container.py
+++ b/streampipes-client-python/streampipes_client/model/container/resource_container.py
@@ -198,16 +198,20 @@ class ResourceContainer(ABC):
Returns
-------
- List[Dict]]
+ dictionary_list: List[Dict[str, Any]]
+ List of resources in dictionary representation.
+ If `use_source_names` equals `True` the keys are named as in the StreamPipes backend.
"""
- return [resource.dict(by_alias=use_source_names) for resource in self._resources]
+ return [resource.to_dict(use_source_names=use_source_names) for resource in self._resources]
def to_json(self) -> str:
"""Returns the resource container in the StreamPipes JSON representation.
Returns
-------
- JSON string
+ JSON string: str
+ JSON representation of the resource container where key names are equal to
+ keys used in the StreamPipes backend
"""
return json.dumps(self.to_dicts(use_source_names=True))
@@ -217,7 +221,7 @@ class ResourceContainer(ABC):
Returns
-------
- pd.DataFrame
+ resource_container_df: pd.DataFrame
"""
return pd.DataFrame.from_records(
# ResourceContainer is iterable itself via __get_item__
diff --git a/streampipes-client-python/streampipes_client/model/resource/__init__.py b/streampipes-client-python/streampipes_client/model/resource/__init__.py
index 3aa99e02d..c6f0cddba 100644
--- a/streampipes-client-python/streampipes_client/model/resource/__init__.py
+++ b/streampipes-client-python/streampipes_client/model/resource/__init__.py
@@ -18,9 +18,11 @@
from .data_lake_measure import DataLakeMeasure
from .data_lake_series import DataLakeSeries
from .data_stream import DataStream
+from .function_definition import FunctionDefinition
__all__ = [
"DataLakeMeasure",
"DataLakeSeries",
"DataStream",
+ "FunctionDefinition",
]
diff --git a/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py b/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
index 9d2b73b27..3526a22ea 100644
--- a/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
+++ b/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
@@ -45,6 +45,7 @@ class DataLakeMeasure(Resource):
"num_event_properties": len(self.event_schema.event_properties),
}
+ element_id: Optional[StrictStr]
measure_name: StrictStr
timestamp_field: StrictStr
event_schema: Optional[EventSchema]
diff --git a/streampipes-client-python/streampipes_client/model/resource/data_stream.py b/streampipes-client-python/streampipes_client/model/resource/data_stream.py
index dc6f565ff..2f7a3051d 100644
--- a/streampipes-client-python/streampipes_client/model/resource/data_stream.py
+++ b/streampipes-client-python/streampipes_client/model/resource/data_stream.py
@@ -71,6 +71,7 @@ class DataStream(Resource):
"num_included_locales": len(self.included_locales) if self.included_locales is not None else 0,
}
+ element_id: Optional[StrictStr]
name: Optional[StrictStr]
description: Optional[StrictStr]
icon_url: Optional[StrictStr]
diff --git a/streampipes-client-python/streampipes_client/model/resource/function_definition.py b/streampipes-client-python/streampipes_client/model/resource/function_definition.py
new file mode 100644
index 000000000..0e0e8d2af
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/model/resource/function_definition.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = [
+ "FunctionDefinition",
+]
+
+from typing import Dict, List
+from uuid import uuid4
+
+from pydantic import Field, StrictInt, StrictStr
+from streampipes_client.model.common import BasicModel
+from streampipes_client.model.resource.resource import Resource
+
+
+class FunctionDefinition(Resource):
+ """Configuration for a StreamPipes Function.
+
+ This class maps to the `FunctionDefinition` class in the StreamPipes model.
+ It contains all metadata that are required to register a function at the StreamPipes backend.
+
+ Parameters
+ ----------
+ function_id: FunctionId
+ identifier object of a StreamPipes function
+ consumed_streams: List[str]
+ list of data streams the function is consuming from
+ """
+
+ def convert_to_pandas_representation(self) -> Dict:
+ """Returns the dictionary representation of a function definition
+ to be used when creating a pandas Dataframe.
+ """
+
+ return self.to_dict(use_source_names=False)
+
+ class FunctionId(BasicModel):
+ """Identification object for a StreamPipes function.
+
+ Maps to the `FunctionId` class defined in the StreamPipes model.
+
+ Parameters
+ ----------
+ id: str
+ unique identifier of the function instance
+ version: int
+ version of the corresponding function
+ """
+
+ id: StrictStr = Field(default_factory=lambda: str(uuid4()))
+ version: StrictInt = Field(default=1)
+
+ def __hash__(self):
+ return hash((self.id, self.version))
+
+ function_id: FunctionId = Field(default_factory=FunctionId)
+ consumed_streams: List[str] = Field(default_factory=list)
diff --git a/streampipes-client-python/streampipes_client/model/resource/resource.py b/streampipes-client-python/streampipes_client/model/resource/resource.py
index e52a01609..1fcf7d2b6 100644
--- a/streampipes-client-python/streampipes_client/model/resource/resource.py
+++ b/streampipes-client-python/streampipes_client/model/resource/resource.py
@@ -22,15 +22,16 @@ A resource defines the data model that is used by a resource container (`model.c
from abc import ABC, abstractmethod
from typing import Dict
-from streampipes_client.model.common import BaseElement
+from streampipes_client.model.common import BasicModel
__all__ = [
"Resource",
]
-class Resource(ABC, BaseElement):
+class Resource(ABC, BasicModel):
"""General and abstract implementation for a resource.
+
A resource defines the data model used by a resource container (`model.container.resourceContainer`).
It inherits from Pydantic's BaseModel to get all its superpowers,
which are used to parse, validate the API response and to easily switch between
@@ -41,3 +42,20 @@ class Resource(ABC, BaseElement):
def convert_to_pandas_representation(self) -> Dict:
"""Returns a dictionary representation to be used when creating a pandas Dataframe."""
raise NotImplementedError # pragma: no cover
+
+ def to_dict(self, use_source_names=True):
+ """Returns the resource in dictionary representation.
+
+ Parameters
+ ----------
+ use_source_names: bool
+ Indicates if the dictionary keys are in python representation or
+ equally named to the StreamPipes backend
+
+ Returns
+ ------
+ resource: Dict[str, Any]
+ The resource as dictionary representation
+
+ """
+ return self.dict(by_alias=use_source_names)
diff --git a/streampipes-client-python/tests/client/test_endpoint.py b/streampipes-client-python/tests/client/test_endpoint.py
index 3050687c1..cdb68fb8e 100644
--- a/streampipes-client-python/tests/client/test_endpoint.py
+++ b/streampipes-client-python/tests/client/test_endpoint.py
@@ -114,12 +114,12 @@ class TestStreamPipesEndpoints(TestCase):
"elementId": "urn:streampipes.apache.org:spi:eventgrounding:TwGIQA",
"transportProtocols": [
{
- "elementId": "urn:streampipes.apache.org:spi:natstransportprotocol:VJkHmZ",
"brokerHostname": "nats",
+ "elementId": "urn:streampipes.apache.org:spi:natstransportprotocol:VJkHmZ",
"topicDefinition": {
- "elementId": "urn:streampipes.apache.org:spi:simpletopicdefinition:QzCiFI",
"actualTopicName": "org.apache.streampipes.connect."
- "fc22b8f6-698a-4127-aa71-e11854dc57c5",
+ "fc22b8f6-698a-4127-aa71-e11854dc57c5tr",
+ "elementId": "urn:streampipes.apache.org:spi:simpletopicdefinition:QzCiFI",
},
"port": 4222,
}
@@ -236,6 +236,8 @@ class TestStreamPipesEndpoints(TestCase):
result = client.dataStreamApi.all()
result_pd = result.to_pandas()
+ self.maxDiff = None
+
self.assertEqual(
1,
len(result),
diff --git a/streampipes-client-python/tests/functions/test_function_handler.py b/streampipes-client-python/tests/functions/test_function_handler.py
index c93963d76..3853deb95 100644
--- a/streampipes-client-python/tests/functions/test_function_handler.py
+++ b/streampipes-client-python/tests/functions/test_function_handler.py
@@ -29,9 +29,6 @@ from streampipes_client.model.resource.data_stream import DataStream
class TestFunction(StreamPipesFunction):
- def getFunctionId(self) -> Tuple[str, int]:
- return ("org.test.TestFunction", 1)
-
def requiredStreamIds(self) -> List[str]:
return ["urn:streampipes.apache.org:eventstream:uPDKLI"]
@@ -47,9 +44,6 @@ class TestFunction(StreamPipesFunction):
class TestFunctionTwoStreams(StreamPipesFunction):
- def getFunctionId(self) -> Tuple[str, int]:
- return ("org.test.TestFunction2", 1)
-
def requiredStreamIds(self) -> List[str]:
return ["urn:streampipes.apache.org:eventstream:uPDKLI", "urn:streampipes.apache.org:eventstream:HHoidJ"]
@@ -242,7 +236,7 @@ class TestFunctionHandler(TestCase):
test_function.context.schema, {self.data_stream["elementId"]: DataStream(**self.data_stream)}
)
self.assertListEqual(test_function.context.streams, test_function.requiredStreamIds())
- self.assertEqual(test_function.context.function_id, test_function.getFunctionId()[0])
+ self.assertEqual(test_function.context.function_id, test_function.getFunctionId().id)
self.assertListEqual(test_function.data, self.test_stream_data1)
self.assertTrue(test_function.stopped)