You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2023/01/19 20:50:38 UTC

[streampipes] 02/03: refactor: introduce messaging endpoint

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/introduce-messaging-endpoint
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit a2509bd60acd89e1d4dd45fcaa2d7af1defb6e1b
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Jan 19 21:35:41 2023 +0100

    refactor: introduce messaging endpoint
    
    Signed-off-by: bossenti <bo...@posteo.de>
---
 streampipes-client-python/setup.py                 |  1 +
 .../streampipes_client/client/client.py            |  2 +-
 .../streampipes_client/endpoint/__init__.py        |  9 +--
 .../endpoint/{ => api}/__init__.py                 |  1 +
 .../endpoint/{ => api}/data_lake_measure.py        |  0
 .../endpoint/{ => api}/data_stream.py              |  0
 .../streampipes_client/endpoint/endpoint.py        | 78 +++++++++++++++++++++-
 .../streampipes_client/endpoint/exceptions.py      | 52 +++++++++++++++
 .../endpoint/messaging/__init__.py                 |  0
 .../tests/client/test_client.py                    |  2 +-
 10 files changed, 138 insertions(+), 7 deletions(-)

diff --git a/streampipes-client-python/setup.py b/streampipes-client-python/setup.py
index d3f3f742d..e832c0dee 100644
--- a/streampipes-client-python/setup.py
+++ b/streampipes-client-python/setup.py
@@ -51,6 +51,7 @@ dev_packages = base_packages + [
     "pytest-cov==4.0.0",
     "pyupgrade==3.3.1",
     "types-requests==2.28.11.7",
+    "types-Jinja2==2.11.9"
 ]
 
 docs_packages = [
diff --git a/streampipes-client-python/streampipes_client/client/client.py b/streampipes-client-python/streampipes_client/client/client.py
index e22ed5fcd..c90166651 100644
--- a/streampipes-client-python/streampipes_client/client/client.py
+++ b/streampipes-client-python/streampipes_client/client/client.py
@@ -29,7 +29,7 @@ from typing import Dict, Optional
 
 from requests import Session
 from streampipes_client.client.client_config import StreamPipesClientConfig
-from streampipes_client.endpoint import DataLakeMeasureEndpoint, DataStreamEndpoint
+from streampipes_client.endpoint.api import DataLakeMeasureEndpoint, DataStreamEndpoint
 from streampipes_client.endpoint.endpoint import APIEndpoint
 
 logger = logging.getLogger(__name__)
diff --git a/streampipes-client-python/streampipes_client/endpoint/__init__.py b/streampipes-client-python/streampipes_client/endpoint/__init__.py
index f5c795d8e..37375cd5a 100644
--- a/streampipes-client-python/streampipes_client/endpoint/__init__.py
+++ b/streampipes-client-python/streampipes_client/endpoint/__init__.py
@@ -14,10 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from .data_lake_measure import DataLakeMeasureEndpoint
-from .data_stream import DataStreamEndpoint
+
+
+from .endpoint import APIEndpoint, MessagingEndpoint
 
 __all__ = [
-    "DataLakeMeasureEndpoint",
-    "DataStreamEndpoint",
+    "APIEndpoint",
+    "MessagingEndpoint",
 ]
diff --git a/streampipes-client-python/streampipes_client/endpoint/__init__.py b/streampipes-client-python/streampipes_client/endpoint/api/__init__.py
similarity index 99%
copy from streampipes-client-python/streampipes_client/endpoint/__init__.py
copy to streampipes-client-python/streampipes_client/endpoint/api/__init__.py
index f5c795d8e..3d4e6e9cd 100644
--- a/streampipes-client-python/streampipes_client/endpoint/__init__.py
+++ b/streampipes-client-python/streampipes_client/endpoint/api/__init__.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
 from .data_lake_measure import DataLakeMeasureEndpoint
 from .data_stream import DataStreamEndpoint
 
diff --git a/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py b/streampipes-client-python/streampipes_client/endpoint/api/data_lake_measure.py
similarity index 100%
rename from streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py
rename to streampipes-client-python/streampipes_client/endpoint/api/data_lake_measure.py
diff --git a/streampipes-client-python/streampipes_client/endpoint/data_stream.py b/streampipes-client-python/streampipes_client/endpoint/api/data_stream.py
similarity index 100%
rename from streampipes-client-python/streampipes_client/endpoint/data_stream.py
rename to streampipes-client-python/streampipes_client/endpoint/api/data_stream.py
diff --git a/streampipes-client-python/streampipes_client/endpoint/endpoint.py b/streampipes-client-python/streampipes_client/endpoint/endpoint.py
index 911cb8ab1..81dcdf62a 100644
--- a/streampipes-client-python/streampipes_client/endpoint/endpoint.py
+++ b/streampipes-client-python/streampipes_client/endpoint/endpoint.py
@@ -24,15 +24,18 @@ An endpoint is provides all options to communicate with a central endpoint of th
 import logging
 from abc import ABC, abstractmethod
 from http import HTTPStatus
-from typing import Callable, Tuple, Type
+from typing import Callable, Tuple, Type, final
 
 from requests import Response
 from requests.exceptions import HTTPError
 
 __all__ = [
     "APIEndpoint",
+    "MessagingEndpoint",
 ]
 
+from streampipes_client.endpoint.exceptions import MessagingEndpointNotConfiguredError
+from streampipes_client.functions.broker.broker import Broker
 from streampipes_client.model.container.resource_container import ResourceContainer
 from streampipes_client.model.resource.resource import Resource
 
@@ -209,3 +212,76 @@ class APIEndpoint(Endpoint):
         )
 
         return self._container_cls._resource_cls()(**response.json())
+
+
+class MessagingEndpoint(Endpoint):
+    """Abstract implementation of an StreamPipes messaging endpoint.
+    Serves as template for all endpoints used for interacting with the StreamPipes messaging layer directly.
+    Therefore, they need to provide the functionality to talk with the broker system running in StreamPipes.
+    By design, endpoints are only instantiated within the `__init__` method of the StreamPipesClient.
+
+    Parameters
+    ----------
+    parent_client: StreamPipesClient
+        This parameter expects the instance of the `client.StreamPipesClient` the endpoint is attached to.
+
+    """
+
+    @property
+    def _broker(self) -> Broker:
+        """Defines the broker instance that is used to connect to StreamPipes' messaging layer.
+
+        This instance enables the client to authenticate to the broker used in the target StreamPipes instance,
+        to consume messages from and to write messages to the broker.
+
+        Raises
+        ------
+        MessagingEndpointNotConfiguredError
+            If the endpoint is used before the broker instance is set via `configure()`
+
+        Returns
+        -------
+        The broker instance to be used to communicate with
+        StreamPipes' messaging layer.
+        """
+
+        if hasattr(self, "__broker"):
+            return self.__broker
+        raise MessagingEndpointNotConfiguredError(
+            endpoint_name=f"{self=}".split("=")[0],
+        )
+
+    @_broker.setter
+    def _broker(self, broker: Broker) -> None:
+        """Setter method for internal property `broker`"""
+        self.__broker = broker
+
+    @property
+    @abstractmethod
+    def _container_cls(self) -> Type[ResourceContainer]:
+        """Defines the model container class the endpoint refers to.
+
+        This model container class corresponds to the Python data model,
+        which handles multiple resources returned from the endpoint.
+
+        Returns
+        -------
+        The corresponding container class from the data model,
+        needs to a subclass of `model.container.ResourceContainer`.
+        """
+        raise NotImplementedError  # pragma: no cover
+
+    @final
+    def configure(self, broker: Broker) -> None:
+        """Configures the message endpoint by setting the broker instance to be used.
+
+        This configuration step is required before the endpoint can be actually used.
+        The based `broker` instance is passed to an internal property
+
+        Returns
+        _______
+        None
+
+        """
+
+        self._broker = broker
diff --git a/streampipes-client-python/streampipes_client/endpoint/exceptions.py b/streampipes-client-python/streampipes_client/endpoint/exceptions.py
new file mode 100644
index 000000000..6465146f9
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/endpoint/exceptions.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Custom exceptions dedicated for the endpoints module
+"""
+
+__all__ = [
+    "MessagingEndpointNotConfiguredError",
+]
+
+
+class MessagingEndpointNotConfiguredError(Exception):
+    """Exception that indicates that an instance of a messaging endpoint has not been configured.
+
+    This error occurs when an instance of a messaging endpoint is used before
+    the broker instance to be used is configured by passing it to the `configure()` method.
+
+    Parameters
+    ----------
+    endpoint_name: str
+        The name of the endpoint that caused the error
+
+    """
+
+    def __init__(
+        self,
+        endpoint_name: str,
+    ):
+        super().__init__(
+            f"\nIt looks like the endpoint used is not configured properly.\n"
+            f"This error occurs because the endpoint `{endpoint_name}` is a messaging endpoint,\n"
+            f"which always require first of all the passing of the "
+            f"broker instance to be used with the `configure()`method.\n"
+            f"One can easily overcome this error by entering the following command before proceeding:\n"
+            f"\n `client.{endpoint_name}.configure(broker=broker)`\n\n"
+            f"The variable `broker` hereby needs to be an instance of a StreamPipes broker."
+        )
diff --git a/streampipes-client-python/streampipes_client/endpoint/messaging/__init__.py b/streampipes-client-python/streampipes_client/endpoint/messaging/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/streampipes-client-python/tests/client/test_client.py b/streampipes-client-python/tests/client/test_client.py
index 8a5dd806c..0d2f0e61d 100644
--- a/streampipes-client-python/tests/client/test_client.py
+++ b/streampipes-client-python/tests/client/test_client.py
@@ -22,7 +22,7 @@ from unittest.mock import MagicMock, call, patch
 from streampipes_client.client import StreamPipesClient
 from streampipes_client.client.client_config import StreamPipesClientConfig
 from streampipes_client.client.credential_provider import StreamPipesApiKeyCredentials
-from streampipes_client.endpoint import DataLakeMeasureEndpoint
+from streampipes_client.endpoint.api import DataLakeMeasureEndpoint
 
 
 class TestStreamPipesClient(TestCase):