You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2023/01/23 18:48:55 UTC
[streampipes] 04/04: introduce FunctionDefinition resource
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch chore/introduce-function-definition
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit bcce069b891788a389a6ea2cdbef8fb002d30cf6
Author: bossenti <bo...@posteo.de>
AuthorDate: Sun Jan 22 15:53:35 2023 +0100
introduce FunctionDefinition resource
Signed-off-by: bossenti <bo...@posteo.de>
---
.../functions/broker/__init__.py | 2 +-
.../functions/function_handler.py | 4 +-
.../model/container/resource_container.py | 20 ------
.../streampipes_client/model/resource/__init__.py | 2 +
.../model/resource/function_definition.py | 71 ++++++++++++++++++++++
.../tests/client/test_endpoint.py | 3 +-
.../tests/functions/test_function_handler.py | 8 +--
7 files changed, 79 insertions(+), 31 deletions(-)
diff --git a/streampipes-client-python/streampipes_client/functions/broker/__init__.py b/streampipes-client-python/streampipes_client/functions/broker/__init__.py
index e54049050..60d846168 100644
--- a/streampipes-client-python/streampipes_client/functions/broker/__init__.py
+++ b/streampipes-client-python/streampipes_client/functions/broker/__init__.py
@@ -21,4 +21,4 @@ __all__ = [
"Broker",
"NatsBroker",
"SupportedBroker",
-]
\ No newline at end of file
+]
diff --git a/streampipes-client-python/streampipes_client/functions/function_handler.py b/streampipes-client-python/streampipes_client/functions/function_handler.py
index 5097468cb..993fe3a56 100644
--- a/streampipes-client-python/streampipes_client/functions/function_handler.py
+++ b/streampipes-client-python/streampipes_client/functions/function_handler.py
@@ -125,7 +125,7 @@ class FunctionHandler:
messages[stream_id] = broker.get_message()
# Generate the function context
for streampipes_function in self.stream_contexts[stream_id].functions:
- function_id = streampipes_function.getFunctionId()[0]
+ function_id = streampipes_function.getFunctionId().id
if function_id in contexts.keys():
contexts[function_id].add_data_stream_schema(stream_id, data_stream)
else:
@@ -137,7 +137,7 @@ class FunctionHandler:
)
# Start the functions
for streampipes_function in self.registration.getFunctions():
- streampipes_function.onServiceStarted(contexts[streampipes_function.getFunctionId()[0]])
+ streampipes_function.onServiceStarted(contexts[streampipes_function.getFunctionId().id])
# Get the messages continuously and send them to the functions
async for stream_id, msg in AsyncIterHandler.combine_async_messages(messages):
diff --git a/streampipes-client-python/streampipes_client/model/container/resource_container.py b/streampipes-client-python/streampipes_client/model/container/resource_container.py
index f33d5717d..7173be366 100644
--- a/streampipes-client-python/streampipes_client/model/container/resource_container.py
+++ b/streampipes-client-python/streampipes_client/model/container/resource_container.py
@@ -32,7 +32,6 @@ import pandas as pd
from pydantic import ValidationError
__all__ = [
- "PandasCompatibleResourceContainer",
"ResourceContainer",
]
@@ -217,25 +216,6 @@ class ResourceContainer(ABC):
return json.dumps(self.to_dicts(use_source_names=True))
-
-class PandasCompatibleResourceContainer(ResourceContainer, ABC):
- """Resource Container that can be converted to a pandas data frame.
-
- This type of resource containers provides a `to_pandas()` method that
- returns the resource container as a pandas data frame.
- """
-
- @classmethod
- @abstractmethod
- def _resource_cls(cls) -> Type[Resource]:
- """Returns the class of the resource that are bundled.
-
- Returns
- -------
- model.resource.Resource
- """
- raise NotImplementedError # pragma: no cover
-
def to_pandas(self) -> pd.DataFrame:
"""Returns the resource container in representation of a Pandas Dataframe.
diff --git a/streampipes-client-python/streampipes_client/model/resource/__init__.py b/streampipes-client-python/streampipes_client/model/resource/__init__.py
index 3aa99e02d..c6f0cddba 100644
--- a/streampipes-client-python/streampipes_client/model/resource/__init__.py
+++ b/streampipes-client-python/streampipes_client/model/resource/__init__.py
@@ -18,9 +18,11 @@
from .data_lake_measure import DataLakeMeasure
from .data_lake_series import DataLakeSeries
from .data_stream import DataStream
+from .function_definition import FunctionDefinition
__all__ = [
"DataLakeMeasure",
"DataLakeSeries",
"DataStream",
+ "FunctionDefinition",
]
diff --git a/streampipes-client-python/streampipes_client/model/resource/function_definition.py b/streampipes-client-python/streampipes_client/model/resource/function_definition.py
new file mode 100644
index 000000000..0e0e8d2af
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/model/resource/function_definition.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = [
+ "FunctionDefinition",
+]
+
+from typing import Dict, List
+from uuid import uuid4
+
+from pydantic import Field, StrictInt, StrictStr
+from streampipes_client.model.common import BasicModel
+from streampipes_client.model.resource.resource import Resource
+
+
+class FunctionDefinition(Resource):
+ """Configuration for a StreamPipes Function.
+
+ This class maps to the `FunctionDefinition` class in the StreamPipes model.
+ It contains all metadata that are required to register a function at the StreamPipes backend.
+
+ Parameters
+ ----------
+ function_id: FunctionId
+ identifier object of a StreamPipes function
+ consumed_streams: List[str]
+ list of data streams the function is consuming from
+ """
+
+ def convert_to_pandas_representation(self) -> Dict:
+ """Returns the dictionary representation of a function definition
+ to be used when creating a pandas Dataframe.
+ """
+
+ return self.to_dict(use_source_names=False)
+
+ class FunctionId(BasicModel):
+ """Identification object for a StreamPipes function.
+
+ Maps to the `FunctionId` class defined in the StreamPipes model.
+
+ Parameters
+ ----------
+ id: str
+ unique identifier of the function instance
+ version: int
+ version of the corresponding function
+ """
+
+ id: StrictStr = Field(default_factory=lambda: str(uuid4()))
+ version: StrictInt = Field(default=1)
+
+ def __hash__(self):
+ return hash((self.id, self.version))
+
+ function_id: FunctionId = Field(default_factory=FunctionId)
+ consumed_streams: List[str] = Field(default_factory=list)
diff --git a/streampipes-client-python/tests/client/test_endpoint.py b/streampipes-client-python/tests/client/test_endpoint.py
index a74ba6c0e..cdb68fb8e 100644
--- a/streampipes-client-python/tests/client/test_endpoint.py
+++ b/streampipes-client-python/tests/client/test_endpoint.py
@@ -117,7 +117,8 @@ class TestStreamPipesEndpoints(TestCase):
"brokerHostname": "nats",
"elementId": "urn:streampipes.apache.org:spi:natstransportprotocol:VJkHmZ",
"topicDefinition": {
- "actualTopicName": "org.apache.streampipes.connect.fc22b8f6-698a-4127-aa71-e11854dc57c5",
+ "actualTopicName": "org.apache.streampipes.connect."
+ "fc22b8f6-698a-4127-aa71-e11854dc57c5tr",
"elementId": "urn:streampipes.apache.org:spi:simpletopicdefinition:QzCiFI",
},
"port": 4222,
diff --git a/streampipes-client-python/tests/functions/test_function_handler.py b/streampipes-client-python/tests/functions/test_function_handler.py
index c93963d76..3853deb95 100644
--- a/streampipes-client-python/tests/functions/test_function_handler.py
+++ b/streampipes-client-python/tests/functions/test_function_handler.py
@@ -29,9 +29,6 @@ from streampipes_client.model.resource.data_stream import DataStream
class TestFunction(StreamPipesFunction):
- def getFunctionId(self) -> Tuple[str, int]:
- return ("org.test.TestFunction", 1)
-
def requiredStreamIds(self) -> List[str]:
return ["urn:streampipes.apache.org:eventstream:uPDKLI"]
@@ -47,9 +44,6 @@ class TestFunction(StreamPipesFunction):
class TestFunctionTwoStreams(StreamPipesFunction):
- def getFunctionId(self) -> Tuple[str, int]:
- return ("org.test.TestFunction2", 1)
-
def requiredStreamIds(self) -> List[str]:
return ["urn:streampipes.apache.org:eventstream:uPDKLI", "urn:streampipes.apache.org:eventstream:HHoidJ"]
@@ -242,7 +236,7 @@ class TestFunctionHandler(TestCase):
test_function.context.schema, {self.data_stream["elementId"]: DataStream(**self.data_stream)}
)
self.assertListEqual(test_function.context.streams, test_function.requiredStreamIds())
- self.assertEqual(test_function.context.function_id, test_function.getFunctionId()[0])
+ self.assertEqual(test_function.context.function_id, test_function.getFunctionId().id)
self.assertListEqual(test_function.data, self.test_stream_data1)
self.assertTrue(test_function.stopped)