You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/11/17 16:32:58 UTC

[incubator-streampipes] 03/10: [STREAMPIPES-608] implement data lake measure endpoint

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit eb230905b322f728284039996da380b43920b9ce
Author: bossenti <bo...@posteo.de>
AuthorDate: Sun Nov 13 14:02:07 2022 +0100

    [STREAMPIPES-608] implement data lake measure endpoint
---
 .../endpoint/data_lake_measure.py                  |  92 ++++++++++
 .../streampipes_client/model/common.py             |  91 ++++++++++
 .../model/container/data_lake_measures.py          |  53 ++++++
 .../model/resource/data_lake_measure.py            |  58 +++++++
 .../tests/client/test_endpoint.py                  | 188 +++++++++++++++++++++
 5 files changed, 482 insertions(+)

diff --git a/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py b/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py
new file mode 100644
index 000000000..168c14728
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Specific implementation of the StreamPipes API's data lake measure endpoints.
+This endpoint allows to consume data stored in StreamPipes' data lake
+"""
+from typing import Tuple, Type
+
+from streampipes_client.endpoint.endpoint import APIEndpoint
+from streampipes_client.model.container import DataLakeMeasures
+
+__all__ = [
+    "DataLakeMeasureEndpoint",
+]
+
+from streampipes_client.model.container.resource_container import ResourceContainer
+
+
+class DataLakeMeasureEndpoint(APIEndpoint):
+    """Implementation of the DataLakeMeasure endpoint.
+    This endpoint provides an interfact to all data stored in the StreamPipes data lake.
+
+    Consequently, it allows uerying metadata about available data sets (see `all()` method).
+    The metadata is returned as an instance of `model.container.DataLakeMeasures`.
+
+    In addition, the endpoint provides direct access to the data stored in the data laka by querying a
+    specific data lake measure using the `get()` method.
+
+    Parameters
+    ----------
+    parent_client: StreamPipesClient
+        The instance of `client.StreamPipesClient` the endpoint is attached to.
+
+    Examples
+    --------
+
+    >>> from streampipes_client.client import StreamPipesClient
+    >>> from streampipes_client.client.client_config import StreamPipesClientConfig
+    >>> from streampipes_client.client.credential_provider import StreamPipesApiKeyCredentials
+
+    >>> client_config = StreamPipesClientConfig(
+    ...     credential_provider=StreamPipesApiKeyCredentials(username="test-user", api_key="api-key"),
+    ...     host_address="localhost",
+    ...     port=8082,
+    ...     https_disabled=True
+    ... )
+
+    >>> client = StreamPipesClient.create(client_config=client_config)
+
+    >>> data_lake_measures = client.dataLakeMeasureApi.all()
+
+    >>> len(data_lake_measures)
+    5
+    """
+
+    @property
+    def _container_cls(self) -> Type[ResourceContainer]:
+        """Defines the model container class the endpoint refers to.
+
+
+        Returns
+        -------
+        `model.container.DataLakeMeasures`
+        """
+        return DataLakeMeasures
+
+    @property
+    def _relative_api_path(self) -> Tuple[str, ...]:
+        """Defines the relative api path to the DataLakeMeasurement endpoint.
+        Each path within the URL is defined as an own string.
+
+        Returns
+        -------
+        A tuple of strings of which every represents a path value of the endpoint's API URL.
+        """
+
+        return "api", "v4", "datalake", "measurements"
diff --git a/streampipes-client-python/streampipes_client/model/common.py b/streampipes-client-python/streampipes_client/model/common.py
new file mode 100644
index 000000000..c9eec9a18
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/model/common.py
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Classes of the StreamPipes data model that are commonly shared.
+"""
+
+from typing import List, Optional
+
+from pydantic import BaseModel, Field, StrictBool, StrictInt, StrictStr
+
+__all__ = [
+    "BasicModel",
+    "EventSchema",
+]
+
+
+def _snake_to_camel_case(snake_case_string: str) -> str:
+    """Converts a string in snake_case format to camelCase style."""
+
+    tokens = snake_case_string.split("_")
+
+    return tokens[0] + "".join(t.title() for t in tokens[1:])
+
+
+class BasicModel(BaseModel):
+    element_id: Optional[StrictStr]
+
+    class Config:
+        alias_generator = _snake_to_camel_case
+
+
+class EventPropertyQualityRequirement(BasicModel):
+    """
+    Data model of an `EventPropertyQualityRequirement` in compliance to the StreamPipes Backend.
+    """
+
+    minimum_property_quality: Optional[BasicModel] = Field(alias="eventPropertyQualityDefinition")
+    maximum_property_quality: Optional[BasicModel] = Field(alias="eventPropertyQualityDefinition")
+
+
+class ValueSpecification(BasicModel):
+    """
+    Data model of an `ValueSpecification` in compliance to the StreamPipes Backend.
+    """
+
+    min_value: Optional[int]
+    max_value: Optional[int]
+    step: Optional[float]
+
+
+class EventProperty(BasicModel):
+    """
+    Data model of an `EventProperty` in compliance to the StreamPipes Backend.
+    """
+
+    label: StrictStr
+    description: StrictStr
+    runtime_name: StrictStr
+    required: StrictBool
+    domain_properties: List[StrictStr]
+    event_property_qualities: List[BasicModel] = Field(alias="eventPropertyQualities")
+    requires_event_property_qualities: List[EventPropertyQualityRequirement]
+    property_scope: Optional[StrictStr]
+    index: StrictInt
+    runtime_id: Optional[StrictStr]
+    runtime_type: Optional[StrictStr]
+    measurement_unit: Optional[StrictStr]
+    value_specification: Optional[ValueSpecification]
+
+
+class EventSchema(BasicModel):
+    """
+    Data model of an `EventSchema` in compliance to the StreamPipes Backend.
+    """
+
+    event_properties: List[EventProperty]
diff --git a/streampipes-client-python/streampipes_client/model/container/data_lake_measures.py b/streampipes-client-python/streampipes_client/model/container/data_lake_measures.py
new file mode 100644
index 000000000..e4a00c3e9
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/model/container/data_lake_measures.py
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Implementation of a resource container for the data lake measures endpoint.
+"""
+from typing import Type
+
+from streampipes_client.model.container.resource_container import ResourceContainer
+from streampipes_client.model.resource.data_lake_measure import DataLakeMeasure
+from streampipes_client.model.resource.resource import Resource
+
+__all__ = [
+    "DataLakeMeasures",
+]
+
+
+class DataLakeMeasures(ResourceContainer):
+    """Implementation of the resource container for the data lake measures endpoint.
+    This resource container is a collection of data lake measures returned by the StreamPipes API.
+    It is capable of parsing the response content directly into a list of queried `DataLakeMeasure`.
+    Furthermore, the resource container makes them accessible in a pythonic manner.
+
+    Parameters
+    ----------
+    resources: List[DataLakeMeasure]
+        A list of resources (`model.resource.DataLakeMeasure`) to be contained in the `ResourceContainer`.
+
+    """
+
+    @classmethod
+    def _resource_cls(cls) -> Type[Resource]:
+        """Returns the class of the resource that are bundled.
+
+        Returns
+        -------
+        DataLakeMeasure
+        """
+        return DataLakeMeasure
diff --git a/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py b/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
new file mode 100644
index 000000000..e588267d0
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/model/resource/data_lake_measure.py
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from typing import Optional
+
+from pydantic import StrictBool, StrictStr
+from streampipes_client.model.common import EventSchema
+from streampipes_client.model.resource.resource import Resource
+
+"""
+Implementation of a resource for a data lake measure.
+"""
+
+__all__ = [
+    "DataLakeMeasure",
+]
+
+
+class DataLakeMeasure(Resource):
+    """Implementation of a resource for data lake measures.
+    This resource defines the data model used by resource container (`model.container.DataLakeMeasures`).
+    It inherits from Pydantic's BaseModel to get all its superpowers,
+    which are used to parse, validate the API response and to easily switch between
+    the Python representation (both serialized and deserialized) and Java representation (serialized only).
+    """
+
+    def convert_to_pandas_representation(self):
+        """Returns the dictionary representation of a data lake measure
+        to be used when creating a pandas Dataframe.
+        It excludes the following fields: `element_id`, `event_schema`, `schema_version`.
+        Instead of the whole event schema the number of event properties contained
+        is returned with the column name `num_event_properties`.
+        """
+        return {
+            **self.dict(exclude={"element_id", "event_schema", "schema_version"}),
+            "num_event_properties": len(self.event_schema.event_properties),
+        }
+
+    measure_name: StrictStr
+    timestamp_field: StrictStr
+    event_schema: EventSchema
+    pipeline_id: Optional[StrictStr]
+    pipeline_name: Optional[StrictStr]
+    pipeline_is_running: StrictBool
+    schema_version: StrictStr
diff --git a/streampipes-client-python/tests/client/test_endpoint.py b/streampipes-client-python/tests/client/test_endpoint.py
new file mode 100644
index 000000000..cd9922ef0
--- /dev/null
+++ b/streampipes-client-python/tests/client/test_endpoint.py
@@ -0,0 +1,188 @@
+import json
+from copy import deepcopy
+from unittest import TestCase
+from unittest.mock import MagicMock, patch
+
+from pydantic import ValidationError
+from requests import HTTPError
+from streampipes_client.client import StreamPipesClient
+from streampipes_client.client.client_config import StreamPipesClientConfig
+from streampipes_client.client.credential_provider import StreamPipesApiKeyCredentials
+from streampipes_client.endpoint.endpoint import _error_code_to_message
+from streampipes_client.model.container.resource_container import (
+    StreamPipesDataModelError,
+    StreamPipesResourceContainerJSONError,
+)
+
+
+class TestStreamPipesEndpoints(TestCase):
+    def setUp(self) -> None:
+        # set example responses from endpoints
+        self.data_lake_measure_all = [
+            {
+                "elementId": "urn:streampipes.apache.org:spi:datalakemeasure:xLSfXZ",
+                "measureName": "test",
+                "timestampField": "s0::timestamp",
+                "eventSchema": {
+                    "elementId": "urn:streampipes.apache.org:spi:eventschema:UDMHXn",
+                    "eventProperties": [
+                        {
+                            "elementId": "urn:streampipes.apache.org:spi:eventpropertyprimitive:utvSWg",
+                            "label": "Density",
+                            "description": "Denotes the current density of the fluid",
+                            "runtimeName": "density",
+                            "required": False,
+                            "domainProperties": ["http://schema.org/Number"],
+                            "eventPropertyQualities": [],
+                            "requiresEventPropertyQualities": [],
+                            "propertyScope": "MEASUREMENT_PROPERTY",
+                            "index": 5,
+                            "runtimeId": None,
+                            "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+                            "measurementUnit": None,
+                            "valueSpecification": None,
+                        },
+                        {
+                            "elementId": "urn:streampipes.apache.org:spi:eventpropertyprimitive:OgBuiz",
+                            "label": "Temperature",
+                            "description": "Denotes the current temperature in degrees celsius",
+                            "runtimeName": "temperature",
+                            "required": False,
+                            "domainProperties": ["http://schema.org/Number"],
+                            "eventPropertyQualities": [],
+                            "requiresEventPropertyQualities": [],
+                            "propertyScope": "MEASUREMENT_PROPERTY",
+                            "index": 4,
+                            "runtimeId": None,
+                            "runtimeType": "http://www.w3.org/2001/XMLSchema#float",
+                            "measurementUnit": "http://codes.wmo.int/common/unit/degC",
+                            "valueSpecification": {
+                                "elementId": "urn:streampipes.apache.org:spi:quantitativevalue:sotOEB",
+                                "minValue": 0,
+                                "maxValue": 100,
+                                "step": 0.1,
+                            },
+                        },
+                    ],
+                },
+                "pipelineId": None,
+                "pipelineName": None,
+                "pipelineIsRunning": False,
+                "schemaVersion": "1.1",
+            }
+        ]
+
+        self.data_lake_measure_all_json = json.dumps(self.data_lake_measure_all)
+        self.data_lake_measure_all_json_error = json.dumps(self.data_lake_measure_all[0])
+        self.dlm_all_manipulated = deepcopy(self.data_lake_measure_all)
+        self.dlm_all_manipulated[0]["measureName"] = False
+        self.data_lake_measure_all_json_validation = json.dumps(self.dlm_all_manipulated)
+
+    @patch("streampipes_client.client.client.Session", autospec=True)
+    def test_endpoint_data_lake_measure_happy_path(self, http_session: MagicMock):
+        http_session_mock = MagicMock()
+        http_session_mock.get.return_value.text = self.data_lake_measure_all_json
+        http_session.return_value = http_session_mock
+
+        client = StreamPipesClient(
+            client_config=StreamPipesClientConfig(
+                credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"),
+                host_address="localhost",
+            )
+        )
+
+        result = client.dataLakeMeasureApi.all()
+        result_pd = result.to_pandas()
+
+        self.assertEqual(
+            1,
+            len(result),
+        )
+        self.assertEqual(
+            "test",
+            result[0].measure_name,
+        )
+        self.assertEqual(
+            self.data_lake_measure_all_json,
+            result.to_json(),
+        )
+        self.assertEqual(
+            self.data_lake_measure_all,
+            result.to_dicts(use_source_names=True),
+        )
+        self.assertEqual(
+            1,
+            len(result_pd),
+        )
+        self.assertListEqual(
+            [
+                "measure_name",
+                "timestamp_field",
+                "pipeline_id",
+                "pipeline_name",
+                "pipeline_is_running",
+                "num_event_properties",
+            ],
+            list(result_pd.columns),
+        )
+        self.assertEqual(2, result_pd["num_event_properties"][0])
+
+    @patch("streampipes_client.client.client.Session", autospec=True)
+    def test_endpoint_data_lake_measure_bad_return_code(self, http_session: MagicMock):
+        response_mock = MagicMock()
+        response_mock.status_code = 405
+        response_mock.text = "Test error"
+        response_mock.url = "localhost"
+
+        http_session_mock = MagicMock()
+        http_session_mock.get.return_value.status_code = 405
+        http_session_mock.get.return_value.raise_for_status.side_effect = HTTPError(response=response_mock)
+        http_session.return_value = http_session_mock
+
+        client = StreamPipesClient(
+            client_config=StreamPipesClientConfig(
+                credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"),
+                host_address="localhost",
+            )
+        )
+
+        with self.assertRaises(HTTPError) as http_error:
+            client.dataLakeMeasureApi.all()
+        self.assertMultiLineEqual(
+            _error_code_to_message[405] + f"url: localhost\nstatus code: 405",
+            http_error.exception.args[0],
+        )
+
+    @patch("streampipes_client.client.client.Session", autospec=True)
+    def test_endpoint_data_lake_measure_json_error(self, http_session: MagicMock):
+        http_session_mock = MagicMock()
+        http_session_mock.get.return_value.text = self.data_lake_measure_all_json_error
+        http_session.return_value = http_session_mock
+
+        client = StreamPipesClient(
+            client_config=StreamPipesClientConfig(
+                credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"),
+                host_address="localhost",
+            )
+        )
+
+        with self.assertRaises(StreamPipesResourceContainerJSONError):
+            client.dataLakeMeasureApi.all()
+
+    @patch("streampipes_client.client.client.Session", autospec=True)
+    def test_endpoint_data_lake_measure_validation_error(self, http_session: MagicMock):
+        http_session_mock = MagicMock()
+        http_session_mock.get.return_value.text = self.data_lake_measure_all_json_validation
+        http_session.return_value = http_session_mock
+
+        client = StreamPipesClient(
+            client_config=StreamPipesClientConfig(
+                credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"),
+                host_address="localhost",
+            )
+        )
+
+        with self.assertRaises(StreamPipesDataModelError) as err:
+            client.dataLakeMeasureApi.all()
+
+        self.assertTrue(isinstance(err.exception.validation_error, ValidationError))