You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/11/17 16:32:58 UTC
[incubator-streampipes] 03/10: [STREAMPIPES-608] implement data lake measure endpoint
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit eb230905b322f728284039996da380b43920b9ce
Author: bossenti <bo...@posteo.de>
AuthorDate: Sun Nov 13 14:02:07 2022 +0100
[STREAMPIPES-608] implement data lake measure endpoint
---
.../endpoint/data_lake_measure.py | 92 ++++++++++
.../streampipes_client/model/common.py | 91 ++++++++++
.../model/container/data_lake_measures.py | 53 ++++++
.../model/resource/data_lake_measure.py | 58 +++++++
.../tests/client/test_endpoint.py | 188 +++++++++++++++++++++
5 files changed, 482 insertions(+)
diff --git a/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py b/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py
new file mode 100644
index 000000000..168c14728
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Specific implementation of the StreamPipes API's data lake measure endpoints.
+This endpoint allows to consume data stored in StreamPipes' data lake
+"""
+from typing import Tuple, Type
+
+from streampipes_client.endpoint.endpoint import APIEndpoint
+from streampipes_client.model.container import DataLakeMeasures
+
+__all__ = [
+ "DataLakeMeasureEndpoint",
+]
+
+from streampipes_client.model.container.resource_container import ResourceContainer
+
+
+class DataLakeMeasureEndpoint(APIEndpoint):
+ """Implementation of the DataLakeMeasure endpoint.
+ This endpoint provides an interfact to all data stored in the StreamPipes data lake.
+
+ Consequently, it allows uerying metadata about available data sets (see `all()` method).
+ The metadata is returned as an instance of `model.container.DataLakeMeasures`.
+
+ In addition, the endpoint provides direct access to the data stored in the data laka by querying a
+ specific data lake measure using the `get()` method.
+
+ Parameters
+ ----------
+ parent_client: StreamPipesClient
+ The instance of `client.StreamPipesClient` the endpoint is attached to.
+
+ Examples
+ --------
+
+ >>> from streampipes_client.client import StreamPipesClient
+ >>> from streampipes_client.client.client_config import StreamPipesClientConfig
+ >>> from streampipes_client.client.credential_provider import StreamPipesApiKeyCredentials
+
+ >>> client_config = StreamPipesClientConfig(
+ ... credential_provider=StreamPipesApiKeyCredentials(username="test-user", api_key="api-key"),
+ ... host_address="localhost",
+ ... port=8082,
+ ... https_disabled=True
+ ... )
+
+ >>> client = StreamPipesClient.create(client_config=client_config)
+
+ >>> data_lake_measures = client.dataLakeMeasureApi.all()
+
+ >>> len(data_lake_measures)
+ 5
+ """
+
+ @property
+ def _container_cls(self) -> Type[ResourceContainer]:
+ """Defines the model container class the endpoint refers to.
+
+
+ Returns
+ -------
+ `model.container.DataLakeMeasures`
+ """
+ return DataLakeMeasures
+
+ @property
+ def _relative_api_path(self) -> Tuple[str, ...]:
+ """Defines the relative api path to the DataLakeMeasurement endpoint.
+ Each path within the URL is defined as an own string.
+
+ Returns
+ -------
+ A tuple of strings of which every represents a path value of the endpoint's API URL.
+ """
+
+ return "api", "v4", "datalake", "measurements"
diff --git a/streampipes-client-python/streampipes_client/model/common.py b/streampipes-client-python/streampipes_client/model/common.py
new file mode 100644
index 000000000..c9eec9a18
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/model/common.py
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Classes of the StreamPipes data model that are commonly shared.
+"""
+
+from typing import List, Optional
+
+from pydantic import BaseModel, Field, StrictBool, StrictInt, StrictStr
+
+__all__ = [
+ "BasicModel",
+ "EventSchema",
+]
+
+
+def _snake_to_camel_case(snake_case_string: str) -> str:
+ """Converts a string in snake_case format to camelCase style."""
+
+ tokens = snake_case_string.split("_")
+
+ return tokens[0] + "".join(t.title() for t in tokens[1:])
+
+
+class BasicModel(BaseModel):
+ element_id: Optional[StrictStr]
+
+ class Config:
+ alias_generator = _snake_to_camel_case
+
+
+class EventPropertyQualityRequirement(BasicModel):
+ """
+ Data model of an `EventPropertyQualityRequirement` in compliance to the StreamPipes Backend.
+ """
+
+ minimum_property_quality: Optional[BasicModel] = Field(alias="eventPropertyQualityDefinition")
+ maximum_property_quality: Optional[BasicModel] = Field(alias="eventPropertyQualityDefinition")
+
+
+class ValueSpecification(BasicModel):
+ """
+ Data model of an `ValueSpecification` in compliance to the StreamPipes Backend.
+ """
+
+ min_value: Optional[int]
+ max_value: Optional[int]
+ step: Optional[float]
+
+
+class EventProperty(BasicModel):
+ """
+ Data model of an `EventProperty` in compliance to the StreamPipes Backend.
+ """
+
+ label: StrictStr
+ description: StrictStr
+ runtime_name: StrictStr
+ required: StrictBool
+ domain_properties: List[StrictStr]
+ event_property_qualities: List[BasicModel] = Field(alias="eventPropertyQualities")
+ requires_event_property_qualities: List[EventPropertyQualityRequirement]
+ property_scope: Optional[StrictStr]
+ index: StrictInt
+ runtime_id: Optional[StrictStr]
+ runtime_type: Optional[StrictStr]
+ measurement_unit: Optional[StrictStr]
+ value_specification: Optional[ValueSpecification]
+
+
+class EventSchema(BasicModel):
+ """
+ Data model of an `EventSchema` in compliance to the StreamPipes Backend.
+ """
+
+ event_properties: List[EventProperty]
diff --git a/streampipes-client-python/streampipes_client/model/container/data_lake_measures.py b/streampipes-client-python/streampipes_client/model/container/data_lake_measures.py
new file mode 100644
index 000000000..e4a00c3e9
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/model/container/data_lake_measures.py
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Implementation of a resource container for the data lake measures endpoint.
+"""
+from typing import Type
+
+from streampipes_client.model.container.resource_container import ResourceContainer
+from streampipes_client.model.resource.data_lake_measure import DataLakeMeasure
+from streampipes_client.model.resource.resource import Resource
+
+__all__ = [
+ "DataLakeMeasures",
+]
+
+
+class DataLakeMeasures(ResourceContainer):
+ """Implementation of the resource container for the data lake measures endpoint.
+ This resource container is a collection of data lake measures returned by the StreamPipes API.
+ It is capable of parsing the response content directly into a list of queried `DataLakeMeasure`.
+ Furthermore, the resource container makes them accessible in a pythonic manner.
+
+ Parameters
+ ----------
+ resources: List[DataLakeMeasure]
+ A list of resources (`model.resource.DataLakeMeasure`) to be contained in the `ResourceContainer`.
+
+ """
+
+ @classmethod
+ def _resource_cls(cls) -> Type[Resource]:
+ """Returns the class of the resource that are bundled.
+
+ Returns
+ -------
+ DataLakeMeasure
+ """
+ return DataLakeMeasure
diff --git a/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py b/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
new file mode 100644
index 000000000..e588267d0
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from typing import Optional
+
+from pydantic import StrictBool, StrictStr
+from streampipes_client.model.common import EventSchema
+from streampipes_client.model.resource.resource import Resource
+
+"""
+Implementation of a resource for a data lake measure.
+"""
+
+__all__ = [
+ "DataLakeMeasure",
+]
+
+
+class DataLakeMeasure(Resource):
+ """Implementation of a resource for data lake measures.
+ This resource defines the data model used by resource container (`model.container.DataLakeMeasures`).
+ It inherits from Pydantic's BaseModel to get all its superpowers,
+ which are used to parse, validate the API response and to easily switch between
+ the Python representation (both serialized and deserialized) and Java representation (serialized only).
+ """
+
+ def convert_to_pandas_representation(self):
+ """Returns the dictionary representation of a data lake measure
+ to be used when creating a pandas Dataframe.
+ It excludes the following fields: `element_id`, `event_schema`, `schema_version`.
+ Instead of the whole event schema the number of event properties contained
+ is returned with the column name `num_event_properties`.
+ """
+ return {
+ **self.dict(exclude={"element_id", "event_schema", "schema_version"}),
+ "num_event_properties": len(self.event_schema.event_properties),
+ }
+
+ measure_name: StrictStr
+ timestamp_field: StrictStr
+ event_schema: EventSchema
+ pipeline_id: Optional[StrictStr]
+ pipeline_name: Optional[StrictStr]
+ pipeline_is_running: StrictBool
+ schema_version: StrictStr
diff --git a/streampipes-client-python/tests/client/test_endpoint.py b/streampipes-client-python/tests/client/test_endpoint.py
new file mode 100644
index 000000000..cd9922ef0
--- /dev/null
+++ b/streampipes-client-python/tests/client/test_endpoint.py
@@ -0,0 +1,188 @@
+import json
+from copy import deepcopy
+from unittest import TestCase
+from unittest.mock import MagicMock, patch
+
+from pydantic import ValidationError
+from requests import HTTPError
+from streampipes_client.client import StreamPipesClient
+from streampipes_client.client.client_config import StreamPipesClientConfig
+from streampipes_client.client.credential_provider import StreamPipesApiKeyCredentials
+from streampipes_client.endpoint.endpoint import _error_code_to_message
+from streampipes_client.model.container.resource_container import (
+ StreamPipesDataModelError,
+ StreamPipesResourceContainerJSONError,
+)
+
+
+class TestStreamPipesEndpoints(TestCase):
+ def setUp(self) -> None:
+ # set example responses from endpoints
+ self.data_lake_measure_all = [
+ {
+ "elementId": "urn:streampipes.apache.org:spi:datalakemeasure:xLSfXZ",
+ "measureName": "test",
+ "timestampField": "s0::timestamp",
+ "eventSchema": {
+ "elementId": "urn:streampipes.apache.org:spi:eventschema:UDMHXn",
+ "eventProperties": [
+ {
+ "elementId": "urn:streampipes.apache.org:spi:eventpropertyprimitive:utvSWg",
+ "label": "Density",
+ "description": "Denotes the current density of the fluid",
+ "runtimeName": "density",
+ "required": False,
+ "domainProperties": ["http://schema.org/Number"],
+ "eventPropertyQualities": [],
+ "requiresEventPropertyQualities": [],
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "index": 5,
+ "runtimeId": None,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "measurementUnit": None,
+ "valueSpecification": None,
+ },
+ {
+ "elementId": "urn:streampipes.apache.org:spi:eventpropertyprimitive:OgBuiz",
+ "label": "Temperature",
+ "description": "Denotes the current temperature in degrees celsius",
+ "runtimeName": "temperature",
+ "required": False,
+ "domainProperties": ["http://schema.org/Number"],
+ "eventPropertyQualities": [],
+ "requiresEventPropertyQualities": [],
+ "propertyScope": "MEASUREMENT_PROPERTY",
+ "index": 4,
+ "runtimeId": None,
+ "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+ "measurementUnit": "http://codes.wmo.int/common/unit/degC",
+ "valueSpecification": {
+ "elementId": "urn:streampipes.apache.org:spi:quantitativevalue:sotOEB",
+ "minValue": 0,
+ "maxValue": 100,
+ "step": 0.1,
+ },
+ },
+ ],
+ },
+ "pipelineId": None,
+ "pipelineName": None,
+ "pipelineIsRunning": False,
+ "schemaVersion": "1.1",
+ }
+ ]
+
+ self.data_lake_measure_all_json = json.dumps(self.data_lake_measure_all)
+ self.data_lake_measure_all_json_error = json.dumps(self.data_lake_measure_all[0])
+ self.dlm_all_manipulated = deepcopy(self.data_lake_measure_all)
+ self.dlm_all_manipulated[0]["measureName"] = False
+ self.data_lake_measure_all_json_validation = json.dumps(self.dlm_all_manipulated)
+
+ @patch("streampipes_client.client.client.Session", autospec=True)
+ def test_endpoint_data_lake_measure_happy_path(self, http_session: MagicMock):
+ http_session_mock = MagicMock()
+ http_session_mock.get.return_value.text = self.data_lake_measure_all_json
+ http_session.return_value = http_session_mock
+
+ client = StreamPipesClient(
+ client_config=StreamPipesClientConfig(
+ credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"),
+ host_address="localhost",
+ )
+ )
+
+ result = client.dataLakeMeasureApi.all()
+ result_pd = result.to_pandas()
+
+ self.assertEqual(
+ 1,
+ len(result),
+ )
+ self.assertEqual(
+ "test",
+ result[0].measure_name,
+ )
+ self.assertEqual(
+ self.data_lake_measure_all_json,
+ result.to_json(),
+ )
+ self.assertEqual(
+ self.data_lake_measure_all,
+ result.to_dicts(use_source_names=True),
+ )
+ self.assertEqual(
+ 1,
+ len(result_pd),
+ )
+ self.assertListEqual(
+ [
+ "measure_name",
+ "timestamp_field",
+ "pipeline_id",
+ "pipeline_name",
+ "pipeline_is_running",
+ "num_event_properties",
+ ],
+ list(result_pd.columns),
+ )
+ self.assertEqual(2, result_pd["num_event_properties"][0])
+
+ @patch("streampipes_client.client.client.Session", autospec=True)
+ def test_endpoint_data_lake_measure_bad_return_code(self, http_session: MagicMock):
+ response_mock = MagicMock()
+ response_mock.status_code = 405
+ response_mock.text = "Test error"
+ response_mock.url = "localhost"
+
+ http_session_mock = MagicMock()
+ http_session_mock.get.return_value.status_code = 405
+ http_session_mock.get.return_value.raise_for_status.side_effect = HTTPError(response=response_mock)
+ http_session.return_value = http_session_mock
+
+ client = StreamPipesClient(
+ client_config=StreamPipesClientConfig(
+ credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"),
+ host_address="localhost",
+ )
+ )
+
+ with self.assertRaises(HTTPError) as http_error:
+ client.dataLakeMeasureApi.all()
+ self.assertMultiLineEqual(
+ _error_code_to_message[405] + f"url: localhost\nstatus code: 405",
+ http_error.exception.args[0],
+ )
+
+ @patch("streampipes_client.client.client.Session", autospec=True)
+ def test_endpoint_data_lake_measure_json_error(self, http_session: MagicMock):
+ http_session_mock = MagicMock()
+ http_session_mock.get.return_value.text = self.data_lake_measure_all_json_error
+ http_session.return_value = http_session_mock
+
+ client = StreamPipesClient(
+ client_config=StreamPipesClientConfig(
+ credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"),
+ host_address="localhost",
+ )
+ )
+
+ with self.assertRaises(StreamPipesResourceContainerJSONError):
+ client.dataLakeMeasureApi.all()
+
+ @patch("streampipes_client.client.client.Session", autospec=True)
+ def test_endpoint_data_lake_measure_validation_error(self, http_session: MagicMock):
+ http_session_mock = MagicMock()
+ http_session_mock.get.return_value.text = self.data_lake_measure_all_json_validation
+ http_session.return_value = http_session_mock
+
+ client = StreamPipesClient(
+ client_config=StreamPipesClientConfig(
+ credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"),
+ host_address="localhost",
+ )
+ )
+
+ with self.assertRaises(StreamPipesDataModelError) as err:
+ client.dataLakeMeasureApi.all()
+
+ self.assertTrue(isinstance(err.exception.validation_error, ValidationError))