You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/12/04 20:44:16 UTC

[streampipes] 01/02: add describe() method to python client

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch add-example-files-python-client
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit e3cac9dd6d742a9c08e272eb62b14b8751b1abf0
Author: bossenti <bo...@posteo.de>
AuthorDate: Sun Dec 4 21:43:35 2022 +0100

    add describe() method to python client
---
 .../streampipes_client/client/client.py            | 37 +++++++++++++++++
 .../model/resource/data_lake_measure.py            |  6 +--
 .../tests/client/test_client.py                    | 46 +++++++++++++++++++++-
 .../tests/client/test_endpoint.py                  | 44 ++++++++++++++++++---
 4 files changed, 121 insertions(+), 12 deletions(-)

diff --git a/streampipes-client-python/streampipes_client/client/client.py b/streampipes-client-python/streampipes_client/client/client.py
index 0a6a8e1c3..3fb49f28a 100644
--- a/streampipes-client-python/streampipes_client/client/client.py
+++ b/streampipes-client-python/streampipes_client/client/client.py
@@ -30,6 +30,7 @@ from typing import Dict, Optional
 from requests import Session
 from streampipes_client.client.client_config import StreamPipesClientConfig
 from streampipes_client.endpoint import DataLakeMeasureEndpoint, DataStreamEndpoint
+from streampipes_client.endpoint.endpoint import APIEndpoint
 
 logger = logging.getLogger(__name__)
 
@@ -176,3 +177,39 @@ class StreamPipesClient:
             f"{self.client_config.host_address}:"
             f"{self.client_config.port}/streampipes-backend/"
         )
+
+    def describe(self) -> None:
+        """Prints short description of the connected StreamPipes instance and the available resources to the console.
+
+        Returns
+        -------
+            None
+        """
+
+        # get all endpoints of this client
+        available_endpoints = [
+            attr_name for attr_name in dir(self) if isinstance(self.__getattribute__(attr_name), APIEndpoint)
+        ]
+
+        # collect the number of available resources per endpoint
+        endpoint_stats = {
+            (all_items := self.__getattribute__(endpoint_name).all()).__class__.__name__: len(all_items)
+            for endpoint_name in available_endpoints
+        }
+
+        # sort the endpoints descending based on the number of resources
+        sorted_endpoint_stats = {
+            key: val for key, val in sorted(endpoint_stats.items(), key=lambda item: item[1], reverse=True)
+        }
+
+        base_message = (
+            f"\nHi there!\n"
+            f"You are connected to a StreamPipes instance running at "
+            f"{'http://' if self.client_config.https_disabled else 'https://'}"
+            f"{self.client_config.host_address}:{self.client_config.port}.\n"
+            f"The following StreamPipes resources are available with this client:\n"
+        )
+
+        endpoint_stats_message = "\n".join(f"{count}x {name}" for name, count in sorted_endpoint_stats.items())
+
+        print(base_message + endpoint_stats_message)
diff --git a/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py b/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
index e588267d0..9d2b73b27 100644
--- a/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
+++ b/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
@@ -20,10 +20,6 @@ from pydantic import StrictBool, StrictStr
 from streampipes_client.model.common import EventSchema
 from streampipes_client.model.resource.resource import Resource
 
-"""
-Implementation of a resource for a data lake measure.
-"""
-
 __all__ = [
     "DataLakeMeasure",
 ]
@@ -51,7 +47,7 @@ class DataLakeMeasure(Resource):
 
     measure_name: StrictStr
     timestamp_field: StrictStr
-    event_schema: EventSchema
+    event_schema: Optional[EventSchema]
     pipeline_id: Optional[StrictStr]
     pipeline_name: Optional[StrictStr]
     pipeline_is_running: StrictBool
diff --git a/streampipes-client-python/tests/client/test_client.py b/streampipes-client-python/tests/client/test_client.py
index 0c4027647..5c53944da 100644
--- a/streampipes-client-python/tests/client/test_client.py
+++ b/streampipes-client-python/tests/client/test_client.py
@@ -14,8 +14,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
+import json
+from collections import namedtuple
 from unittest import TestCase
+from unittest.mock import MagicMock, call, patch
 
 from streampipes_client.client import StreamPipesClient
 from streampipes_client.client.client_config import StreamPipesClientConfig
@@ -68,3 +70,45 @@ class TestStreamPipesClient(TestCase):
         )
         self.assertTrue(isinstance(result.dataLakeMeasureApi, DataLakeMeasureEndpoint))
         self.assertEqual(result.base_api_path, "https://localhost:443/streampipes-backend/")
+
+    @patch("builtins.print")
+    @patch("streampipes_client.endpoint.endpoint.APIEndpoint._make_request", autospec=True)
+    def test_client_describe(self, make_request: MagicMock, mocked_print: MagicMock):
+        def simulate_response(*args, **kwargs):
+            Response = namedtuple("Response", ["text"])
+            if "measurements" in kwargs["url"]:
+                return Response(
+                    json.dumps(
+                        [
+                            {
+                                "measureName": "test",
+                                "timestampField": "time",
+                                "pipelineIsRunning": False,
+                                "schemaVersion": "0",
+                            }
+                        ]
+                    )
+                )
+            if "streams" in kwargs["url"]:
+                return Response(json.dumps([{"name": "test"}]))
+
+        make_request.side_effect = simulate_response
+        StreamPipesClient.create(
+            client_config=StreamPipesClientConfig(
+                credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"),
+                host_address="localhost",
+                https_disabled=False,
+                port=443,
+            )
+        ).describe()
+
+        mocked_print.assert_has_calls(
+            calls=[
+                call(
+                    "\nHi there!\nYou are connected to a StreamPipes instance running at https://localhost:443.\n"
+                    "The following StreamPipes resources are available with this client:\n"
+                    "1x DataLakeMeasures\n1x DataStreams"
+                ),
+            ],
+            any_order=True,
+        )
diff --git a/streampipes-client-python/tests/client/test_endpoint.py b/streampipes-client-python/tests/client/test_endpoint.py
index d2f60409d..1117634d5 100644
--- a/streampipes-client-python/tests/client/test_endpoint.py
+++ b/streampipes-client-python/tests/client/test_endpoint.py
@@ -17,8 +17,9 @@
 
 import json
 from copy import deepcopy
+from typing import Dict, List
 from unittest import TestCase
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock, call, patch
 
 from pydantic import ValidationError
 from requests import HTTPError
@@ -30,6 +31,7 @@ from streampipes_client.model.container.resource_container import (
     StreamPipesDataModelError,
     StreamPipesResourceContainerJSONError,
 )
+from streampipes_client.model.resource import DataStream
 
 
 class TestStreamPipesEndpoints(TestCase):
@@ -89,7 +91,7 @@ class TestStreamPipesEndpoints(TestCase):
             }
         ]
 
-        self.data_stream_all = [
+        self.data_stream_all: List[Dict] = [
             {
                 "elementId": "urn:streampipes.apache.org:eventstream:uPDKLI",
                 "name": "Test",
@@ -111,7 +113,8 @@ class TestStreamPipesEndpoints(TestCase):
                             "brokerHostname": "nats",
                             "topicDefinition": {
                                 "elementId": "urn:streampipes.apache.org:spi:simpletopicdefinition:QzCiFI",
-                                "actualTopicName": "org.apache.streampipes.connect.fc22b8f6-698a-4127-aa71-e11854dc57c5",
+                                "actualTopicName": "org.apache.streampipes.connect."
+                                "fc22b8f6-698a-4127-aa71-e11854dc57c5",
                             },
                             "port": 4222,
                         }
@@ -168,7 +171,8 @@ class TestStreamPipesEndpoints(TestCase):
                 "measurementCapability": None,
                 "measurementObject": None,
                 "index": 0,
-                "correspondingAdapterId": "urn:streampipes.apache.org:spi:org.apache.streampipes.connect.iiot.adapters.simulator.machine:11934d37-135b-4ef6-b5f1-4f520cc81a43",
+                "correspondingAdapterId": "urn:streampipes.apache.org:spi:org.apache.streampipes.connect."
+                "iiot.adapters.simulator.machine:11934d37-135b-4ef6-b5f1-4f520cc81a43",
                 "category": None,
                 "uri": "urn:streampipes.apache.org:eventstream:uPDKLI",
                 "dom": None,
@@ -176,6 +180,7 @@ class TestStreamPipesEndpoints(TestCase):
         ]
 
         self.data_stream_all_json = json.dumps(self.data_stream_all)
+        self.data_stream_get = self.data_stream_all[0]
 
         self.data_lake_measure_all_json = json.dumps(self.data_lake_measure_all)
         self.data_lake_measure_all_json_error = json.dumps(self.data_lake_measure_all[0])
@@ -183,6 +188,33 @@ class TestStreamPipesEndpoints(TestCase):
         self.dlm_all_manipulated[0]["measureName"] = False
         self.data_lake_measure_all_json_validation = json.dumps(self.dlm_all_manipulated)
 
+    @patch("streampipes_client.client.client.Session", autospec=True)
+    def test_endpoint_get(self, http_session: MagicMock):
+        http_session_mock = MagicMock()
+        http_session_mock.get.return_value.json.return_value = self.data_stream_get
+        http_session.return_value = http_session_mock
+
+        client = StreamPipesClient(
+            client_config=StreamPipesClientConfig(
+                credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"),
+                host_address="localhost",
+            )
+        )
+
+        result = client.dataStreamApi.get(identifier="urn:streampipes.apache.org:eventstream:uPDKLI")
+
+        http_session.assert_has_calls(
+            calls=[
+                call().get(
+                    url="https://localhost:80/streampipes-backend/api/v2/streams/urn:streampipes."
+                    "apache.org:eventstream:uPDKLI"
+                )
+            ],
+            any_order=True,
+        )
+        self.assertTrue(isinstance(result, DataStream))
+        self.assertEqual(result.dict(by_alias=True), self.data_stream_get)
+
     @patch("streampipes_client.client.client.Session", autospec=True)
     def test_endpoint_data_stream_happy_path(self, http_session: MagicMock):
         http_session_mock = MagicMock()
@@ -205,7 +237,7 @@ class TestStreamPipesEndpoints(TestCase):
         )
         self.assertEqual(
             "Test",
-            result[0].name,
+            result[0].name,  # type: ignore
         )
         self.assertEqual(
             self.data_stream_all_json,
@@ -268,7 +300,7 @@ class TestStreamPipesEndpoints(TestCase):
         )
         self.assertEqual(
             "test",
-            result[0].measure_name,
+            result[0].measure_name,  # type: ignore
         )
         self.assertEqual(
             self.data_lake_measure_all_json,