You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2023/01/19 20:50:36 UTC

[streampipes] branch chore/introduce-messaging-endpoint created (now 23221b715)

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a change to branch chore/introduce-messaging-endpoint
in repository https://gitbox.apache.org/repos/asf/streampipes.git


      at 23221b715 refactor: bump version to the latest release

This branch includes the following new commits:

     new 92ff4b2d8 chore(linting, docs): add blacken-docs to dev dependencies
     new a2509bd60 refactor: introduce messaging endpoint
     new 23221b715 refactor: bump version to the latest release

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[streampipes] 01/03: chore(linting, docs): add blacken-docs to dev dependencies

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/introduce-messaging-endpoint
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 92ff4b2d87b89126e1fd904f33718acfa7232f36
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Jan 19 21:33:16 2023 +0100

    chore(linting, docs): add blacken-docs to dev dependencies
    
    Signed-off-by: bossenti <bo...@posteo.de>
---
 streampipes-client-python/.pre-commit-config.yaml | 7 +++++++
 streampipes-client-python/setup.py                | 1 +
 2 files changed, 8 insertions(+)

diff --git a/streampipes-client-python/.pre-commit-config.yaml b/streampipes-client-python/.pre-commit-config.yaml
index ecc2aec76..d554ad2cc 100644
--- a/streampipes-client-python/.pre-commit-config.yaml
+++ b/streampipes-client-python/.pre-commit-config.yaml
@@ -63,6 +63,13 @@ repos:
         entry: black --line-length=120
         verbose: true
 
+      - id: blacken-docks
+        name: blacken-docs
+        language: python
+        types: [ python ]
+        entry: black --line-length=120
+        verbose: true
+
       - id: mypy
         name: mypy
         language: python
diff --git a/streampipes-client-python/setup.py b/streampipes-client-python/setup.py
index 5fd1ff972..d3f3f742d 100644
--- a/streampipes-client-python/setup.py
+++ b/streampipes-client-python/setup.py
@@ -40,6 +40,7 @@ base_packages = [
 dev_packages = base_packages + [
     "autoflake==2.0.0",
     "black==22.12.0",
+    "blacken-docs==1.12.1",
     "flake8==6.0.0",
     "interrogate==1.5.0",
     "isort==5.11.4",


[streampipes] 03/03: refactor: bump version to the latest release

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/introduce-messaging-endpoint
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 23221b7155bdbfe9fc1a9c890c81b28bd1f2a878
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Jan 19 21:50:22 2023 +0100

    refactor: bump version to the latest release
    
    Signed-off-by: bossenti <bo...@posteo.de>
---
 streampipes-client-python/streampipes_client/__version__.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/streampipes-client-python/streampipes_client/__version__.py b/streampipes-client-python/streampipes_client/__version__.py
index 7eef49c16..11761693a 100644
--- a/streampipes-client-python/streampipes_client/__version__.py
+++ b/streampipes-client-python/streampipes_client/__version__.py
@@ -16,6 +16,6 @@
 #
 #
 
-VERSION = (0, 71, 0)  # pragma: no cover
+VERSION = (0, 90, 0)  # pragma: no cover
 
 __version__ = ".".join(map(str, VERSION))  # noqa: F401 # pragma: no cover


[streampipes] 02/03: refactor: introduce messaging endpoint

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch chore/introduce-messaging-endpoint
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit a2509bd60acd89e1d4dd45fcaa2d7af1defb6e1b
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Jan 19 21:35:41 2023 +0100

    refactor: introduce messaging endpoint
    
    Signed-off-by: bossenti <bo...@posteo.de>
---
 streampipes-client-python/setup.py                 |  1 +
 .../streampipes_client/client/client.py            |  2 +-
 .../streampipes_client/endpoint/__init__.py        |  9 +--
 .../endpoint/{ => api}/__init__.py                 |  1 +
 .../endpoint/{ => api}/data_lake_measure.py        |  0
 .../endpoint/{ => api}/data_stream.py              |  0
 .../streampipes_client/endpoint/endpoint.py        | 78 +++++++++++++++++++++-
 .../streampipes_client/endpoint/exceptions.py      | 52 +++++++++++++++
 .../endpoint/messaging/__init__.py                 |  0
 .../tests/client/test_client.py                    |  2 +-
 10 files changed, 138 insertions(+), 7 deletions(-)

diff --git a/streampipes-client-python/setup.py b/streampipes-client-python/setup.py
index d3f3f742d..e832c0dee 100644
--- a/streampipes-client-python/setup.py
+++ b/streampipes-client-python/setup.py
@@ -51,6 +51,7 @@ dev_packages = base_packages + [
     "pytest-cov==4.0.0",
     "pyupgrade==3.3.1",
     "types-requests==2.28.11.7",
+    "types-Jinja2==2.11.9"
 ]
 
 docs_packages = [
diff --git a/streampipes-client-python/streampipes_client/client/client.py b/streampipes-client-python/streampipes_client/client/client.py
index e22ed5fcd..c90166651 100644
--- a/streampipes-client-python/streampipes_client/client/client.py
+++ b/streampipes-client-python/streampipes_client/client/client.py
@@ -29,7 +29,7 @@ from typing import Dict, Optional
 
 from requests import Session
 from streampipes_client.client.client_config import StreamPipesClientConfig
-from streampipes_client.endpoint import DataLakeMeasureEndpoint, DataStreamEndpoint
+from streampipes_client.endpoint.api import DataLakeMeasureEndpoint, DataStreamEndpoint
 from streampipes_client.endpoint.endpoint import APIEndpoint
 
 logger = logging.getLogger(__name__)
diff --git a/streampipes-client-python/streampipes_client/endpoint/__init__.py b/streampipes-client-python/streampipes_client/endpoint/__init__.py
index f5c795d8e..37375cd5a 100644
--- a/streampipes-client-python/streampipes_client/endpoint/__init__.py
+++ b/streampipes-client-python/streampipes_client/endpoint/__init__.py
@@ -14,10 +14,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from .data_lake_measure import DataLakeMeasureEndpoint
-from .data_stream import DataStreamEndpoint
+
+
+from .endpoint import APIEndpoint, MessagingEndpoint
 
 __all__ = [
-    "DataLakeMeasureEndpoint",
-    "DataStreamEndpoint",
+    "APIEndpoint",
+    "MessagingEndpoint",
 ]
diff --git a/streampipes-client-python/streampipes_client/endpoint/__init__.py b/streampipes-client-python/streampipes_client/endpoint/api/__init__.py
similarity index 99%
copy from streampipes-client-python/streampipes_client/endpoint/__init__.py
copy to streampipes-client-python/streampipes_client/endpoint/api/__init__.py
index f5c795d8e..3d4e6e9cd 100644
--- a/streampipes-client-python/streampipes_client/endpoint/__init__.py
+++ b/streampipes-client-python/streampipes_client/endpoint/api/__init__.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
 from .data_lake_measure import DataLakeMeasureEndpoint
 from .data_stream import DataStreamEndpoint
 
diff --git a/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py b/streampipes-client-python/streampipes_client/endpoint/api/data_lake_measure.py
similarity index 100%
rename from streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py
rename to streampipes-client-python/streampipes_client/endpoint/api/data_lake_measure.py
diff --git a/streampipes-client-python/streampipes_client/endpoint/data_stream.py b/streampipes-client-python/streampipes_client/endpoint/api/data_stream.py
similarity index 100%
rename from streampipes-client-python/streampipes_client/endpoint/data_stream.py
rename to streampipes-client-python/streampipes_client/endpoint/api/data_stream.py
diff --git a/streampipes-client-python/streampipes_client/endpoint/endpoint.py b/streampipes-client-python/streampipes_client/endpoint/endpoint.py
index 911cb8ab1..81dcdf62a 100644
--- a/streampipes-client-python/streampipes_client/endpoint/endpoint.py
+++ b/streampipes-client-python/streampipes_client/endpoint/endpoint.py
@@ -24,15 +24,18 @@ An endpoint is provides all options to communicate with a central endpoint of th
 import logging
 from abc import ABC, abstractmethod
 from http import HTTPStatus
-from typing import Callable, Tuple, Type
+from typing import Callable, Tuple, Type, final
 
 from requests import Response
 from requests.exceptions import HTTPError
 
 __all__ = [
     "APIEndpoint",
+    "MessagingEndpoint",
 ]
 
+from streampipes_client.endpoint.exceptions import MessagingEndpointNotConfiguredError
+from streampipes_client.functions.broker.broker import Broker
 from streampipes_client.model.container.resource_container import ResourceContainer
 from streampipes_client.model.resource.resource import Resource
 
@@ -209,3 +212,76 @@ class APIEndpoint(Endpoint):
         )
 
         return self._container_cls._resource_cls()(**response.json())
+
+
+class MessagingEndpoint(Endpoint):
+    """Abstract implementation of an StreamPipes messaging endpoint.
+    Serves as template for all endpoints used for interacting with the StreamPipes messaging layer directly.
+    Therefore, they need to provide the functionality to talk with the broker system running in StreamPipes.
+    By design, endpoints are only instantiated within the `__init__` method of the StreamPipesClient.
+
+    Parameters
+    ----------
+    parent_client: StreamPipesClient
+        This parameter expects the instance of the `client.StreamPipesClient` the endpoint is attached to.
+
+    """
+
+    @property
+    def _broker(self) -> Broker:
+        """Defines the broker instance that is used to connect to StreamPipes' messaging layer.
+
+        This instance enables the client to authenticate to the broker used in the target StreamPipes instance,
+        to consume messages from and to write messages to the broker.
+
+        Raises
+        ------
+        MessagingEndpointNotConfiguredError
+            If the endpoint is used before the broker instance is set via `configure()`
+
+        Returns
+        -------
+        The broker instance to be used to communicate with
+        StreamPipes' messaging layer.
+        """
+
+        if hasattr(self, "__broker"):
+            return self.__broker
+        raise MessagingEndpointNotConfiguredError(
+            endpoint_name=f"{self=}".split("=")[0],
+        )
+
+    @_broker.setter
+    def _broker(self, broker: Broker) -> None:
+        """Setter method for internal property `broker`"""
+        self.__broker = broker
+
+    @property
+    @abstractmethod
+    def _container_cls(self) -> Type[ResourceContainer]:
+        """Defines the model container class the endpoint refers to.
+
+        This model container class corresponds to the Python data model,
+        which handles multiple resources returned from the endpoint.
+
+        Returns
+        -------
+        The corresponding container class from the data model,
+        needs to a subclass of `model.container.ResourceContainer`.
+        """
+        raise NotImplementedError  # pragma: no cover
+
+    @final
+    def configure(self, broker: Broker) -> None:
+        """Configures the message endpoint by setting the broker instance to be used.
+
+        This configuration step is required before the endpoint can be actually used.
+        The based `broker` instance is passed to an internal property
+
+        Returns
+        _______
+        None
+
+        """
+
+        self._broker = broker
diff --git a/streampipes-client-python/streampipes_client/endpoint/exceptions.py b/streampipes-client-python/streampipes_client/endpoint/exceptions.py
new file mode 100644
index 000000000..6465146f9
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/endpoint/exceptions.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Custom exceptions dedicated for the endpoints module
+"""
+
+__all__ = [
+    "MessagingEndpointNotConfiguredError",
+]
+
+
+class MessagingEndpointNotConfiguredError(Exception):
+    """Exception that indicates that an instance of a messaging endpoint has not been configured.
+
+    This error occurs when an instance of a messaging endpoint is used before
+    the broker instance to be used is configured by passing it to the `configure()` method.
+
+    Parameters
+    ----------
+    endpoint_name: str
+        The name of the endpoint that caused the error
+
+    """
+
+    def __init__(
+        self,
+        endpoint_name: str,
+    ):
+        super().__init__(
+            f"\nIt looks like the endpoint used is not configured properly.\n"
+            f"This error occurs because the endpoint `{endpoint_name}` is a messaging endpoint,\n"
+            f"which always require first of all the passing of the "
+            f"broker instance to be used with the `configure()`method.\n"
+            f"One can easily overcome this error by entering the following command before proceeding:\n"
+            f"\n `client.{endpoint_name}.configure(broker=broker)`\n\n"
+            f"The variable `broker` hereby needs to be an instance of a StreamPipes broker."
+        )
diff --git a/streampipes-client-python/streampipes_client/endpoint/messaging/__init__.py b/streampipes-client-python/streampipes_client/endpoint/messaging/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/streampipes-client-python/tests/client/test_client.py b/streampipes-client-python/tests/client/test_client.py
index 8a5dd806c..0d2f0e61d 100644
--- a/streampipes-client-python/tests/client/test_client.py
+++ b/streampipes-client-python/tests/client/test_client.py
@@ -22,7 +22,7 @@ from unittest.mock import MagicMock, call, patch
 from streampipes_client.client import StreamPipesClient
 from streampipes_client.client.client_config import StreamPipesClientConfig
 from streampipes_client.client.credential_provider import StreamPipesApiKeyCredentials
-from streampipes_client.endpoint import DataLakeMeasureEndpoint
+from streampipes_client.endpoint.api import DataLakeMeasureEndpoint
 
 
 class TestStreamPipesClient(TestCase):