You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2023/01/19 20:50:38 UTC
[streampipes] 02/03: refactor: introduce messaging endpoint
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch chore/introduce-messaging-endpoint
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit a2509bd60acd89e1d4dd45fcaa2d7af1defb6e1b
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Jan 19 21:35:41 2023 +0100
refactor: introduce messaging endpoint
Signed-off-by: bossenti <bo...@posteo.de>
---
streampipes-client-python/setup.py | 1 +
.../streampipes_client/client/client.py | 2 +-
.../streampipes_client/endpoint/__init__.py | 9 +--
.../endpoint/{ => api}/__init__.py | 1 +
.../endpoint/{ => api}/data_lake_measure.py | 0
.../endpoint/{ => api}/data_stream.py | 0
.../streampipes_client/endpoint/endpoint.py | 78 +++++++++++++++++++++-
.../streampipes_client/endpoint/exceptions.py | 52 +++++++++++++++
.../endpoint/messaging/__init__.py | 0
.../tests/client/test_client.py | 2 +-
10 files changed, 138 insertions(+), 7 deletions(-)
diff --git a/streampipes-client-python/setup.py b/streampipes-client-python/setup.py
index d3f3f742d..e832c0dee 100644
--- a/streampipes-client-python/setup.py
+++ b/streampipes-client-python/setup.py
@@ -51,6 +51,7 @@ dev_packages = base_packages + [
"pytest-cov==4.0.0",
"pyupgrade==3.3.1",
"types-requests==2.28.11.7",
+ "types-Jinja2==2.11.9"
]
docs_packages = [
diff --git a/streampipes-client-python/streampipes_client/client/client.py b/streampipes-client-python/streampipes_client/client/client.py
index e22ed5fcd..c90166651 100644
--- a/streampipes-client-python/streampipes_client/client/client.py
+++ b/streampipes-client-python/streampipes_client/client/client.py
@@ -29,7 +29,7 @@ from typing import Dict, Optional
from requests import Session
from streampipes_client.client.client_config import StreamPipesClientConfig
-from streampipes_client.endpoint import DataLakeMeasureEndpoint, DataStreamEndpoint
+from streampipes_client.endpoint.api import DataLakeMeasureEndpoint, DataStreamEndpoint
from streampipes_client.endpoint.endpoint import APIEndpoint
logger = logging.getLogger(__name__)
diff --git a/streampipes-client-python/streampipes_client/endpoint/__init__.py b/streampipes-client-python/streampipes_client/endpoint/__init__.py
index f5c795d8e..37375cd5a 100644
--- a/streampipes-client-python/streampipes_client/endpoint/__init__.py
+++ b/streampipes-client-python/streampipes_client/endpoint/__init__.py
@@ -14,10 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from .data_lake_measure import DataLakeMeasureEndpoint
-from .data_stream import DataStreamEndpoint
+
+
+from .endpoint import APIEndpoint, MessagingEndpoint
__all__ = [
- "DataLakeMeasureEndpoint",
- "DataStreamEndpoint",
+ "APIEndpoint",
+ "MessagingEndpoint",
]
diff --git a/streampipes-client-python/streampipes_client/endpoint/__init__.py b/streampipes-client-python/streampipes_client/endpoint/api/__init__.py
similarity index 99%
copy from streampipes-client-python/streampipes_client/endpoint/__init__.py
copy to streampipes-client-python/streampipes_client/endpoint/api/__init__.py
index f5c795d8e..3d4e6e9cd 100644
--- a/streampipes-client-python/streampipes_client/endpoint/__init__.py
+++ b/streampipes-client-python/streampipes_client/endpoint/api/__init__.py
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
from .data_lake_measure import DataLakeMeasureEndpoint
from .data_stream import DataStreamEndpoint
diff --git a/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py b/streampipes-client-python/streampipes_client/endpoint/api/data_lake_measure.py
similarity index 100%
rename from streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py
rename to streampipes-client-python/streampipes_client/endpoint/api/data_lake_measure.py
diff --git a/streampipes-client-python/streampipes_client/endpoint/data_stream.py b/streampipes-client-python/streampipes_client/endpoint/api/data_stream.py
similarity index 100%
rename from streampipes-client-python/streampipes_client/endpoint/data_stream.py
rename to streampipes-client-python/streampipes_client/endpoint/api/data_stream.py
diff --git a/streampipes-client-python/streampipes_client/endpoint/endpoint.py b/streampipes-client-python/streampipes_client/endpoint/endpoint.py
index 911cb8ab1..81dcdf62a 100644
--- a/streampipes-client-python/streampipes_client/endpoint/endpoint.py
+++ b/streampipes-client-python/streampipes_client/endpoint/endpoint.py
@@ -24,15 +24,18 @@ An endpoint is provides all options to communicate with a central endpoint of th
import logging
from abc import ABC, abstractmethod
from http import HTTPStatus
-from typing import Callable, Tuple, Type
+from typing import Callable, Tuple, Type, final
from requests import Response
from requests.exceptions import HTTPError
__all__ = [
"APIEndpoint",
+ "MessagingEndpoint",
]
+from streampipes_client.endpoint.exceptions import MessagingEndpointNotConfiguredError
+from streampipes_client.functions.broker.broker import Broker
from streampipes_client.model.container.resource_container import ResourceContainer
from streampipes_client.model.resource.resource import Resource
@@ -209,3 +212,76 @@ class APIEndpoint(Endpoint):
)
return self._container_cls._resource_cls()(**response.json())
+
+
+class MessagingEndpoint(Endpoint):
+ """Abstract implementation of an StreamPipes messaging endpoint.
+ Serves as template for all endpoints used for interacting with the StreamPipes messaging layer directly.
+ Therefore, they need to provide the functionality to talk with the broker system running in StreamPipes.
+ By design, endpoints are only instantiated within the `__init__` method of the StreamPipesClient.
+
+ Parameters
+ ----------
+ parent_client: StreamPipesClient
+ This parameter expects the instance of the `client.StreamPipesClient` the endpoint is attached to.
+
+ """
+
+ @property
+ def _broker(self) -> Broker:
+ """Defines the broker instance that is used to connect to StreamPipes' messaging layer.
+
+ This instance enables the client to authenticate to the broker used in the target StreamPipes instance,
+ to consume messages from and to write messages to the broker.
+
+ Raises
+ ------
+ MessagingEndpointNotConfiguredError
+ If the endpoint is used before the broker instance is set via `configure()`
+
+ Returns
+ -------
+ The broker instance to be used to communicate with
+ StreamPipes' messaging layer.
+ """
+
+ if hasattr(self, "__broker"):
+ return self.__broker
+ raise MessagingEndpointNotConfiguredError(
+ endpoint_name=f"{self=}".split("=")[0],
+ )
+
+ @_broker.setter
+ def _broker(self, broker: Broker) -> None:
+ """Setter method for internal property `broker`"""
+ self.__broker = broker
+
+ @property
+ @abstractmethod
+ def _container_cls(self) -> Type[ResourceContainer]:
+ """Defines the model container class the endpoint refers to.
+
+ This model container class corresponds to the Python data model,
+ which handles multiple resources returned from the endpoint.
+
+ Returns
+ -------
+ The corresponding container class from the data model,
+ needs to a subclass of `model.container.ResourceContainer`.
+ """
+ raise NotImplementedError # pragma: no cover
+
+ @final
+ def configure(self, broker: Broker) -> None:
+ """Configures the message endpoint by setting the broker instance to be used.
+
+ This configuration step is required before the endpoint can be actually used.
+ The based `broker` instance is passed to an internal property
+
+ Returns
+ _______
+ None
+
+ """
+
+ self._broker = broker
diff --git a/streampipes-client-python/streampipes_client/endpoint/exceptions.py b/streampipes-client-python/streampipes_client/endpoint/exceptions.py
new file mode 100644
index 000000000..6465146f9
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/endpoint/exceptions.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Custom exceptions dedicated for the endpoints module
+"""
+
+__all__ = [
+ "MessagingEndpointNotConfiguredError",
+]
+
+
+class MessagingEndpointNotConfiguredError(Exception):
+ """Exception that indicates that an instance of a messaging endpoint has not been configured.
+
+ This error occurs when an instance of a messaging endpoint is used before
+ the broker instance to be used is configured by passing it to the `configure()` method.
+
+ Parameters
+ ----------
+ endpoint_name: str
+ The name of the endpoint that caused the error
+
+ """
+
+ def __init__(
+ self,
+ endpoint_name: str,
+ ):
+ super().__init__(
+ f"\nIt looks like the endpoint used is not configured properly.\n"
+ f"This error occurs because the endpoint `{endpoint_name}` is a messaging endpoint,\n"
+ f"which always require first of all the passing of the "
+ f"broker instance to be used with the `configure()`method.\n"
+ f"One can easily overcome this error by entering the following command before proceeding:\n"
+ f"\n `client.{endpoint_name}.configure(broker=broker)`\n\n"
+ f"The variable `broker` hereby needs to be an instance of a StreamPipes broker."
+ )
diff --git a/streampipes-client-python/streampipes_client/endpoint/messaging/__init__.py b/streampipes-client-python/streampipes_client/endpoint/messaging/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/streampipes-client-python/tests/client/test_client.py b/streampipes-client-python/tests/client/test_client.py
index 8a5dd806c..0d2f0e61d 100644
--- a/streampipes-client-python/tests/client/test_client.py
+++ b/streampipes-client-python/tests/client/test_client.py
@@ -22,7 +22,7 @@ from unittest.mock import MagicMock, call, patch
from streampipes_client.client import StreamPipesClient
from streampipes_client.client.client_config import StreamPipesClientConfig
from streampipes_client.client.credential_provider import StreamPipesApiKeyCredentials
-from streampipes_client.endpoint import DataLakeMeasureEndpoint
+from streampipes_client.endpoint.api import DataLakeMeasureEndpoint
class TestStreamPipesClient(TestCase):