You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2023/01/23 18:48:53 UTC
[streampipes] 02/04: change inheritance of Resource class
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch chore/introduce-function-definition
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 319102e8855bf658116ee6ed4b2e1d47621d8564
Author: bossenti <bo...@posteo.de>
AuthorDate: Sun Jan 22 14:42:33 2023 +0100
change inheritance of Resource class
Signed-off-by: bossenti <bo...@posteo.de>
---
.../functions/broker/__init__.py | 2 +-
.../functions/streampipes_function.py | 25 ++++++++++------
.../streampipes_client/model/common.py | 33 ++++++++++++++--------
.../model/container/resource_container.py | 32 ++++++++++++++++++---
.../model/resource/data_lake_measure.py | 1 +
.../model/resource/data_stream.py | 1 +
.../streampipes_client/model/resource/resource.py | 4 +--
.../tests/client/test_endpoint.py | 7 +++--
8 files changed, 76 insertions(+), 29 deletions(-)
diff --git a/streampipes-client-python/streampipes_client/functions/broker/__init__.py b/streampipes-client-python/streampipes_client/functions/broker/__init__.py
index 509c6b0d2..e54049050 100644
--- a/streampipes-client-python/streampipes_client/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes_client/functions/broker/__init__.py
@@ -20,5 +20,5 @@ from .nats_broker import NatsBroker
__all__ = [
"Broker",
"NatsBroker",
- "SupportedBroker"
+ "SupportedBroker",
]
\ No newline at end of file
diff --git a/streampipes-client-python/streampipes_client/functions/streampipes_function.py b/streampipes-client-python/streampipes_client/functions/streampipes_function.py
index e19e28d42..248cf8220 100644
--- a/streampipes-client-python/streampipes_client/functions/streampipes_function.py
+++ b/streampipes-client-python/streampipes_client/functions/streampipes_function.py
@@ -15,27 +15,36 @@
# limitations under the License.
#
from abc import ABC, abstractmethod
-from typing import Any, Dict, List, Tuple
+from typing import Any, Dict, List, Optional
from streampipes_client.functions.utils.function_context import FunctionContext
+from streampipes_client.model.resource import FunctionDefinition
class StreamPipesFunction(ABC):
"""Abstract implementation of a StreamPipesFunction.
A StreamPipesFunction allows users to get the data of a StreamPipes data streams easily.
- It makes it possible to work with the live data in python and enabels to use the powerful
- data analytics libaries there.
+ It makes it possible to work with the live data in python and enables to use the powerful
+ data analytics libraries there.
+
+ Parameters
+ ----------
+ function_definition: FunctionDefinition
+ the definition of the function that contains metadata about the connected function
"""
- @abstractmethod
- def getFunctionId(self) -> Tuple[str, int]:
- """Get the id of the function.
+ def __init__(self, function_definition: Optional[FunctionDefinition] = None):
+ self.function_definition = function_definition or FunctionDefinition()
+
+ def getFunctionId(self) -> FunctionDefinition.FunctionId:
+ """Returns the id of the function.
Returns
-------
- Tuple of the function id und version number
+ FunctionId: FunctionDefinition.FunctionId
+ Identification object of the StreamPipes function
"""
- raise NotImplementedError
+ return self.function_definition.function_id
@abstractmethod
def requiredStreamIds(self) -> List[str]:
diff --git a/streampipes-client-python/streampipes_client/model/common.py b/streampipes-client-python/streampipes_client/model/common.py
index 83f1b00dd..d9f7b243f 100644
--- a/streampipes-client-python/streampipes_client/model/common.py
+++ b/streampipes-client-python/streampipes_client/model/common.py
@@ -55,30 +55,33 @@ class BaseElement(BasicModel):
element_id: Optional[StrictStr]
-class EventPropertyQualityRequirement(BaseElement):
+class EventPropertyQualityRequirement(BasicModel):
"""
Data model of an `EventPropertyQualityRequirement` in compliance to the StreamPipes Backend.
"""
+ element_id: Optional[StrictStr]
minimum_property_quality: Optional[BaseElement] = Field(alias="eventPropertyQualityDefinition")
maximum_property_quality: Optional[BaseElement] = Field(alias="eventPropertyQualityDefinition")
-class ValueSpecification(BaseElement):
+class ValueSpecification(BasicModel):
"""
Data model of an `ValueSpecification` in compliance to the StreamPipes Backend.
"""
+ element_id: Optional[StrictStr]
min_value: Optional[int]
max_value: Optional[int]
step: Optional[float]
-class EventProperty(BaseElement):
+class EventProperty(BasicModel):
"""
Data model of an `EventProperty` in compliance to the StreamPipes Backend.
"""
+ element_id: Optional[StrictStr]
label: Optional[StrictStr]
description: Optional[StrictStr]
runtime_name: StrictStr
@@ -94,19 +97,21 @@ class EventProperty(BaseElement):
value_specification: Optional[ValueSpecification]
-class EventSchema(BaseElement):
+class EventSchema(BasicModel):
"""
Data model of an `EventSchema` in compliance to the StreamPipes Backend.
"""
+ element_id: Optional[StrictStr]
event_properties: List[EventProperty]
-class ApplicationLink(BaseElement):
+class ApplicationLink(BasicModel):
"""
Data model of an `ApplicationLink` in compliance to the StreamPipes Backend.
"""
+ element_id: Optional[StrictStr]
application_name: Optional[StrictStr]
application_description: Optional[StrictStr]
application_url: Optional[StrictStr]
@@ -114,52 +119,58 @@ class ApplicationLink(BaseElement):
application_link_type: Optional[StrictStr]
-class TopicDefinition(BaseElement):
+class TopicDefinition(BasicModel):
"""
Data model of a `TopicDefinition` in compliance to the StreamPipes Backend.
"""
actual_topic_name: StrictStr
+ element_id: Optional[StrictStr]
-class TransportProtocol(BaseElement):
+class TransportProtocol(BasicModel):
"""
Data model of a `TransportProtocol` in compliance to the StreamPipes Backend.
"""
broker_hostname: StrictStr
+ element_id: Optional[StrictStr]
topic_definition: TopicDefinition
port: StrictInt
-class TransportFormat(BaseElement):
+class TransportFormat(BasicModel):
"""
Data model of a `TransportFormat` in compliance to the StreamPipes Backend.
"""
+ element_id: Optional[StrictStr]
rdf_type: Optional[List[Optional[StrictStr]]]
-class EventGrounding(BaseElement):
+class EventGrounding(BasicModel):
"""
Data model of an `EventGrounding` in compliance to the StreamPipes Backend.
"""
+ element_id: Optional[StrictStr]
transport_protocols: List[TransportProtocol]
transport_formats: Optional[List[Optional[TransportFormat]]]
-class MeasurementCapability(BaseElement):
+class MeasurementCapability(BasicModel):
"""
Data model of a `MeasurementCapability` in compliance to the StreamPipes Backend.
"""
capability: Optional[StrictStr]
+ element_id: Optional[StrictStr]
-class MeasurementObject(BaseElement):
+class MeasurementObject(BasicModel):
"""
Data model of a `MeasurementObject` in compliance to the StreamPipes Backend.
"""
+ element_id: Optional[StrictStr]
measures_object: Optional[StrictStr]
diff --git a/streampipes-client-python/streampipes_client/model/container/resource_container.py b/streampipes-client-python/streampipes_client/model/container/resource_container.py
index 62e990ba9..f33d5717d 100644
--- a/streampipes-client-python/streampipes_client/model/container/resource_container.py
+++ b/streampipes-client-python/streampipes_client/model/container/resource_container.py
@@ -32,6 +32,7 @@ import pandas as pd
from pydantic import ValidationError
__all__ = [
+ "PandasCompatibleResourceContainer",
"ResourceContainer",
]
@@ -198,26 +199,49 @@ class ResourceContainer(ABC):
Returns
-------
- List[Dict]]
+ dictionary_list: List[Dict[str, Any]]
+ List of resources in dictionary representation.
+ If `use_source_names` equals `True` the keys are named as in the StreamPipes backend.
"""
- return [resource.dict(by_alias=use_source_names) for resource in self._resources]
+ return [resource.to_dict(use_source_names=use_source_names) for resource in self._resources]
def to_json(self) -> str:
"""Returns the resource container in the StreamPipes JSON representation.
Returns
-------
- JSON string
+ JSON string: str
+ JSON representation of the resource container where key names are equal to
+ keys used in the StreamPipes backend
"""
return json.dumps(self.to_dicts(use_source_names=True))
+
+class PandasCompatibleResourceContainer(ResourceContainer, ABC):
+ """Resource Container that can be converted to a pandas data frame.
+
+ This type of resource containers provides a `to_pandas()` method that
+ returns the resource container as a pandas data frame.
+ """
+
+ @classmethod
+ @abstractmethod
+ def _resource_cls(cls) -> Type[Resource]:
+ """Returns the class of the resource that are bundled.
+
+ Returns
+ -------
+ model.resource.Resource
+ """
+ raise NotImplementedError # pragma: no cover
+
def to_pandas(self) -> pd.DataFrame:
"""Returns the resource container in representation of a Pandas Dataframe.
Returns
-------
- pd.DataFrame
+ resource_container_df: pd.DataFrame
"""
return pd.DataFrame.from_records(
# ResourceContainer is iterable itself via __get_item__
diff --git a/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py b/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
index 9d2b73b27..3526a22ea 100644
--- a/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
+++ b/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
@@ -45,6 +45,7 @@ class DataLakeMeasure(Resource):
"num_event_properties": len(self.event_schema.event_properties),
}
+ element_id: Optional[StrictStr]
measure_name: StrictStr
timestamp_field: StrictStr
event_schema: Optional[EventSchema]
diff --git a/streampipes-client-python/streampipes_client/model/resource/data_stream.py b/streampipes-client-python/streampipes_client/model/resource/data_stream.py
index dc6f565ff..2f7a3051d 100644
--- a/streampipes-client-python/streampipes_client/model/resource/data_stream.py
+++ b/streampipes-client-python/streampipes_client/model/resource/data_stream.py
@@ -71,6 +71,7 @@ class DataStream(Resource):
"num_included_locales": len(self.included_locales) if self.included_locales is not None else 0,
}
+ element_id: Optional[StrictStr]
name: Optional[StrictStr]
description: Optional[StrictStr]
icon_url: Optional[StrictStr]
diff --git a/streampipes-client-python/streampipes_client/model/resource/resource.py b/streampipes-client-python/streampipes_client/model/resource/resource.py
index e52a01609..5cb2d354d 100644
--- a/streampipes-client-python/streampipes_client/model/resource/resource.py
+++ b/streampipes-client-python/streampipes_client/model/resource/resource.py
@@ -22,14 +22,14 @@ A resource defines the data model that is used by a resource container (`model.c
from abc import ABC, abstractmethod
from typing import Dict
-from streampipes_client.model.common import BaseElement
+from streampipes_client.model.common import BasicModel
__all__ = [
"Resource",
]
-class Resource(ABC, BaseElement):
+class Resource(ABC, BasicModel):
"""General and abstract implementation for a resource.
A resource defines the data model used by a resource container (`model.container.resourceContainer`).
It inherits from Pydantic's BaseModel to get all its superpowers,
diff --git a/streampipes-client-python/tests/client/test_endpoint.py b/streampipes-client-python/tests/client/test_endpoint.py
index 3050687c1..a74ba6c0e 100644
--- a/streampipes-client-python/tests/client/test_endpoint.py
+++ b/streampipes-client-python/tests/client/test_endpoint.py
@@ -114,12 +114,11 @@ class TestStreamPipesEndpoints(TestCase):
"elementId": "urn:streampipes.apache.org:spi:eventgrounding:TwGIQA",
"transportProtocols": [
{
- "elementId": "urn:streampipes.apache.org:spi:natstransportprotocol:VJkHmZ",
"brokerHostname": "nats",
+ "elementId": "urn:streampipes.apache.org:spi:natstransportprotocol:VJkHmZ",
"topicDefinition": {
+ "actualTopicName": "org.apache.streampipes.connect.fc22b8f6-698a-4127-aa71-e11854dc57c5",
"elementId": "urn:streampipes.apache.org:spi:simpletopicdefinition:QzCiFI",
- "actualTopicName": "org.apache.streampipes.connect."
- "fc22b8f6-698a-4127-aa71-e11854dc57c5",
},
"port": 4222,
}
@@ -236,6 +235,8 @@ class TestStreamPipesEndpoints(TestCase):
result = client.dataStreamApi.all()
result_pd = result.to_pandas()
+ self.maxDiff = None
+
self.assertEqual(
1,
len(result),