You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/12/04 20:44:16 UTC
[streampipes] 01/02: add describe() method to python client
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch add-example-files-python-client
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit e3cac9dd6d742a9c08e272eb62b14b8751b1abf0
Author: bossenti <bo...@posteo.de>
AuthorDate: Sun Dec 4 21:43:35 2022 +0100
add describe() method to python client
---
.../streampipes_client/client/client.py | 37 +++++++++++++++++
.../model/resource/data_lake_measure.py | 6 +--
.../tests/client/test_client.py | 46 +++++++++++++++++++++-
.../tests/client/test_endpoint.py | 44 ++++++++++++++++++---
4 files changed, 121 insertions(+), 12 deletions(-)
diff --git a/streampipes-client-python/streampipes_client/client/client.py b/streampipes-client-python/streampipes_client/client/client.py
index 0a6a8e1c3..3fb49f28a 100644
--- a/streampipes-client-python/streampipes_client/client/client.py
+++ b/streampipes-client-python/streampipes_client/client/client.py
@@ -30,6 +30,7 @@ from typing import Dict, Optional
from requests import Session
from streampipes_client.client.client_config import StreamPipesClientConfig
from streampipes_client.endpoint import DataLakeMeasureEndpoint, DataStreamEndpoint
+from streampipes_client.endpoint.endpoint import APIEndpoint
logger = logging.getLogger(__name__)
@@ -176,3 +177,39 @@ class StreamPipesClient:
f"{self.client_config.host_address}:"
f"{self.client_config.port}/streampipes-backend/"
)
+
+ def describe(self) -> None:
+ """Prints short description of the connected StreamPipes instance and the available resources to the console.
+
+ Returns
+ -------
+ None
+ """
+
+ # get all endpoints of this client
+ available_endpoints = [
+ attr_name for attr_name in dir(self) if isinstance(self.__getattribute__(attr_name), APIEndpoint)
+ ]
+
+ # collect the number of available resources per endpoint
+ endpoint_stats = {
+ (all_items := self.__getattribute__(endpoint_name).all()).__class__.__name__: len(all_items)
+ for endpoint_name in available_endpoints
+ }
+
+ # sort the endpoints descending based on the number of resources
+ sorted_endpoint_stats = {
+ key: val for key, val in sorted(endpoint_stats.items(), key=lambda item: item[1], reverse=True)
+ }
+
+ base_message = (
+ f"\nHi there!\n"
+ f"You are connected to a StreamPipes instance running at "
+ f"{'http://' if self.client_config.https_disabled else 'https://'}"
+ f"{self.client_config.host_address}:{self.client_config.port}.\n"
+ f"The following StreamPipes resources are available with this client:\n"
+ )
+
+ endpoint_stats_message = "\n".join(f"{count}x {name}" for name, count in sorted_endpoint_stats.items())
+
+ print(base_message + endpoint_stats_message)
diff --git a/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py b/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
index e588267d0..9d2b73b27 100644
--- a/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
+++ b/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
@@ -20,10 +20,6 @@ from pydantic import StrictBool, StrictStr
from streampipes_client.model.common import EventSchema
from streampipes_client.model.resource.resource import Resource
-"""
-Implementation of a resource for a data lake measure.
-"""
-
__all__ = [
"DataLakeMeasure",
]
@@ -51,7 +47,7 @@ class DataLakeMeasure(Resource):
measure_name: StrictStr
timestamp_field: StrictStr
- event_schema: EventSchema
+ event_schema: Optional[EventSchema]
pipeline_id: Optional[StrictStr]
pipeline_name: Optional[StrictStr]
pipeline_is_running: StrictBool
diff --git a/streampipes-client-python/tests/client/test_client.py b/streampipes-client-python/tests/client/test_client.py
index 0c4027647..5c53944da 100644
--- a/streampipes-client-python/tests/client/test_client.py
+++ b/streampipes-client-python/tests/client/test_client.py
@@ -14,8 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
+import json
+from collections import namedtuple
from unittest import TestCase
+from unittest.mock import MagicMock, call, patch
from streampipes_client.client import StreamPipesClient
from streampipes_client.client.client_config import StreamPipesClientConfig
@@ -68,3 +70,45 @@ class TestStreamPipesClient(TestCase):
)
self.assertTrue(isinstance(result.dataLakeMeasureApi, DataLakeMeasureEndpoint))
self.assertEqual(result.base_api_path, "https://localhost:443/streampipes-backend/")
+
+ @patch("builtins.print")
+ @patch("streampipes_client.endpoint.endpoint.APIEndpoint._make_request", autospec=True)
+ def test_client_describe(self, make_request: MagicMock, mocked_print: MagicMock):
+ def simulate_response(*args, **kwargs):
+ Response = namedtuple("Response", ["text"])
+ if "measurements" in kwargs["url"]:
+ return Response(
+ json.dumps(
+ [
+ {
+ "measureName": "test",
+ "timestampField": "time",
+ "pipelineIsRunning": False,
+ "schemaVersion": "0",
+ }
+ ]
+ )
+ )
+ if "streams" in kwargs["url"]:
+ return Response(json.dumps([{"name": "test"}]))
+
+ make_request.side_effect = simulate_response
+ StreamPipesClient.create(
+ client_config=StreamPipesClientConfig(
+ credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"),
+ host_address="localhost",
+ https_disabled=False,
+ port=443,
+ )
+ ).describe()
+
+ mocked_print.assert_has_calls(
+ calls=[
+ call(
+ "\nHi there!\nYou are connected to a StreamPipes instance running at https://localhost:443.\n"
+ "The following StreamPipes resources are available with this client:\n"
+ "1x DataLakeMeasures\n1x DataStreams"
+ ),
+ ],
+ any_order=True,
+ )
diff --git a/streampipes-client-python/tests/client/test_endpoint.py b/streampipes-client-python/tests/client/test_endpoint.py
index d2f60409d..1117634d5 100644
--- a/streampipes-client-python/tests/client/test_endpoint.py
+++ b/streampipes-client-python/tests/client/test_endpoint.py
@@ -17,8 +17,9 @@
import json
from copy import deepcopy
+from typing import Dict, List
from unittest import TestCase
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock, call, patch
from pydantic import ValidationError
from requests import HTTPError
@@ -30,6 +31,7 @@ from streampipes_client.model.container.resource_container import (
StreamPipesDataModelError,
StreamPipesResourceContainerJSONError,
)
+from streampipes_client.model.resource import DataStream
class TestStreamPipesEndpoints(TestCase):
@@ -89,7 +91,7 @@ class TestStreamPipesEndpoints(TestCase):
}
]
- self.data_stream_all = [
+ self.data_stream_all: List[Dict] = [
{
"elementId": "urn:streampipes.apache.org:eventstream:uPDKLI",
"name": "Test",
@@ -111,7 +113,8 @@ class TestStreamPipesEndpoints(TestCase):
"brokerHostname": "nats",
"topicDefinition": {
"elementId": "urn:streampipes.apache.org:spi:simpletopicdefinition:QzCiFI",
- "actualTopicName": "org.apache.streampipes.connect.fc22b8f6-698a-4127-aa71-e11854dc57c5",
+ "actualTopicName": "org.apache.streampipes.connect."
+ "fc22b8f6-698a-4127-aa71-e11854dc57c5",
},
"port": 4222,
}
@@ -168,7 +171,8 @@ class TestStreamPipesEndpoints(TestCase):
"measurementCapability": None,
"measurementObject": None,
"index": 0,
- "correspondingAdapterId": "urn:streampipes.apache.org:spi:org.apache.streampipes.connect.iiot.adapters.simulator.machine:11934d37-135b-4ef6-b5f1-4f520cc81a43",
+ "correspondingAdapterId": "urn:streampipes.apache.org:spi:org.apache.streampipes.connect."
+ "iiot.adapters.simulator.machine:11934d37-135b-4ef6-b5f1-4f520cc81a43",
"category": None,
"uri": "urn:streampipes.apache.org:eventstream:uPDKLI",
"dom": None,
@@ -176,6 +180,7 @@ class TestStreamPipesEndpoints(TestCase):
]
self.data_stream_all_json = json.dumps(self.data_stream_all)
+ self.data_stream_get = self.data_stream_all[0]
self.data_lake_measure_all_json = json.dumps(self.data_lake_measure_all)
self.data_lake_measure_all_json_error = json.dumps(self.data_lake_measure_all[0])
@@ -183,6 +188,33 @@ class TestStreamPipesEndpoints(TestCase):
self.dlm_all_manipulated[0]["measureName"] = False
self.data_lake_measure_all_json_validation = json.dumps(self.dlm_all_manipulated)
+ @patch("streampipes_client.client.client.Session", autospec=True)
+ def test_endpoint_get(self, http_session: MagicMock):
+ http_session_mock = MagicMock()
+ http_session_mock.get.return_value.json.return_value = self.data_stream_get
+ http_session.return_value = http_session_mock
+
+ client = StreamPipesClient(
+ client_config=StreamPipesClientConfig(
+ credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"),
+ host_address="localhost",
+ )
+ )
+
+ result = client.dataStreamApi.get(identifier="urn:streampipes.apache.org:eventstream:uPDKLI")
+
+ http_session.assert_has_calls(
+ calls=[
+ call().get(
+ url="https://localhost:80/streampipes-backend/api/v2/streams/urn:streampipes."
+ "apache.org:eventstream:uPDKLI"
+ )
+ ],
+ any_order=True,
+ )
+ self.assertTrue(isinstance(result, DataStream))
+ self.assertEqual(result.dict(by_alias=True), self.data_stream_get)
+
@patch("streampipes_client.client.client.Session", autospec=True)
def test_endpoint_data_stream_happy_path(self, http_session: MagicMock):
http_session_mock = MagicMock()
@@ -205,7 +237,7 @@ class TestStreamPipesEndpoints(TestCase):
)
self.assertEqual(
"Test",
- result[0].name,
+ result[0].name, # type: ignore
)
self.assertEqual(
self.data_stream_all_json,
@@ -268,7 +300,7 @@ class TestStreamPipesEndpoints(TestCase):
)
self.assertEqual(
"test",
- result[0].measure_name,
+ result[0].measure_name, # type: ignore
)
self.assertEqual(
self.data_lake_measure_all_json,