You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2023/04/13 15:43:00 UTC

[streampipes] branch dev updated: Sp 1367 Use QueryResult as return type for data lake measure API (#1492)

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new c5ec75cbc Sp 1367 Use QueryResult as return type for data lake measure API (#1492)
c5ec75cbc is described below

commit c5ec75cbca24eff2e133c92c5ae8ce5a6b10e112
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Thu Apr 13 17:42:55 2023 +0200

    Sp 1367 Use QueryResult as return type for data lake measure API (#1492)
---
 ...cting-data-from-the-streampipes-data-lake.ipynb |   4 +-
 .../streampipes/endpoint/api/data_lake_measure.py  |  11 +-
 .../streampipes/model/resource/__init__.py         |   4 +-
 .../{data_lake_series.py => data_series.py}        |  29 +---
 .../model/resource/{__init__.py => exceptions.py}  |  23 +--
 .../streampipes/model/resource/query_result.py     |  89 ++++++++++
 .../tests/client/test_data_lake_series.py          | 189 ++++++++++-----------
 7 files changed, 213 insertions(+), 136 deletions(-)

diff --git a/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb b/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb
index 6f1d7d81d..66a71f08b 100644
--- a/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb
+++ b/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb
@@ -370,8 +370,8 @@
     },
     {
      "data": {
-      "text/plain": "                         time  mass_flow  temperature\n0    2023-02-24T16:19:41.472Z   3.309556    44.448483\n1    2023-02-24T16:19:41.482Z   5.608580    40.322033\n2    2023-02-24T16:19:41.493Z   7.692881    49.239639\n3    2023-02-24T16:19:41.503Z   3.632898    49.933754\n4    2023-02-24T16:19:41.513Z   0.711260    50.106617\n..                        ...        ...          ...\n995  2023-02-24T16:19:52.927Z   1.740114    46.558231\n996   2023-02-24T16:19:52.94Z   [...]
-      "text/html": "<div>\n<style scoped>\n    .dataframe tbody tr th:only-of-type {\n        vertical-align: middle;\n    }\n\n    .dataframe tbody tr th {\n        vertical-align: top;\n    }\n\n    .dataframe thead th {\n        text-align: right;\n    }\n</style>\n<table border=\"1\" class=\"dataframe\">\n  <thead>\n    <tr style=\"text-align: right;\">\n      <th></th>\n      <th>time</th>\n      <th>mass_flow</th>\n      <th>temperature</th>\n    </tr>\n  </thead>\n  <tbody>\n    < [...]
+      "text/plain": "                         timestamp  mass_flow  temperature\n0    2023-02-24T16:19:41.472Z   3.309556    44.448483\n1    2023-02-24T16:19:41.482Z   5.608580    40.322033\n2    2023-02-24T16:19:41.493Z   7.692881    49.239639\n3    2023-02-24T16:19:41.503Z   3.632898    49.933754\n4    2023-02-24T16:19:41.513Z   0.711260    50.106617\n..                        ...        ...          ...\n995  2023-02-24T16:19:52.927Z   1.740114    46.558231\n996   2023-02-24T16:19:52. [...]
+      "text/html": "<div>\n<style scoped>\n    .dataframe tbody tr th:only-of-type {\n        vertical-align: middle;\n    }\n\n    .dataframe tbody tr th {\n        vertical-align: top;\n    }\n\n    .dataframe thead th {\n        text-align: right;\n    }\n</style>\n<table border=\"1\" class=\"dataframe\">\n  <thead>\n    <tr style=\"text-align: right;\">\n      <th></th>\n      <th>timestamp</th>\n      <th>mass_flow</th>\n      <th>temperature</th>\n    </tr>\n  </thead>\n  <tbody>\n [...]
      },
      "execution_count": 14,
      "metadata": {},
diff --git a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
index 3566dba8d..c7493683c 100644
--- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
@@ -26,12 +26,13 @@ from pydantic import BaseModel, Extra, Field, StrictInt, ValidationError, valida
 from streampipes.endpoint.endpoint import APIEndpoint
 from streampipes.model.container import DataLakeMeasures
 from streampipes.model.container.resource_container import ResourceContainer
-from streampipes.model.resource import DataLakeSeries
 
 __all__ = [
     "DataLakeMeasureEndpoint",
 ]
 
+from streampipes.model.resource.query_result import QueryResult
+
 
 class StreamPipesQueryValidationError(Exception):
     """A custom exception to be raised when the validation of query parameter
@@ -275,7 +276,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
         return config
 
     @property
-    def _resource_cls(self) -> Type[DataLakeSeries]:
+    def _resource_cls(self) -> Type[QueryResult]:
         """
         Additional reference to resource class.
         This endpoint deviates from the desired relationship
@@ -283,7 +284,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
         the return type of the get endpoint.
         Therefore, this is only a temporary implementation and will be removed soon.
         """
-        return DataLakeSeries
+        return QueryResult
 
     @property
     def _container_cls(self) -> Type[ResourceContainer]:
@@ -298,7 +299,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
 
         return "api", "v4", "datalake", "measurements"
 
-    def get(self, identifier: str, **kwargs: Optional[Dict[str, Any]]) -> DataLakeSeries:
+    def get(self, identifier: str, **kwargs: Optional[Dict[str, Any]]) -> QueryResult:
         """Queries the specified data lake measure from the API.
 
         By default, the maximum number of returned records is 1000.
@@ -332,4 +333,4 @@ class DataLakeMeasureEndpoint(APIEndpoint):
         url += measurement_get_config.build_query_string()
 
         response = self._make_request(request_method=self._parent_client.request_session.get, url=url)
-        return self._resource_cls.from_json(json_string=response.text)
+        return self._resource_cls(**response.json())
diff --git a/streampipes-client-python/streampipes/model/resource/__init__.py b/streampipes-client-python/streampipes/model/resource/__init__.py
index c6f0cddba..09590c1da 100644
--- a/streampipes-client-python/streampipes/model/resource/__init__.py
+++ b/streampipes-client-python/streampipes/model/resource/__init__.py
@@ -16,13 +16,13 @@
 #
 
 from .data_lake_measure import DataLakeMeasure
-from .data_lake_series import DataLakeSeries
+from .data_series import DataSeries
 from .data_stream import DataStream
 from .function_definition import FunctionDefinition
 
 __all__ = [
     "DataLakeMeasure",
-    "DataLakeSeries",
+    "DataSeries",
     "DataStream",
     "FunctionDefinition",
 ]
diff --git a/streampipes-client-python/streampipes/model/resource/data_lake_series.py b/streampipes-client-python/streampipes/model/resource/data_series.py
similarity index 81%
rename from streampipes-client-python/streampipes/model/resource/data_lake_series.py
rename to streampipes-client-python/streampipes/model/resource/data_series.py
index 137792f3d..55cefbab7 100644
--- a/streampipes-client-python/streampipes/model/resource/data_lake_series.py
+++ b/streampipes-client-python/streampipes/model/resource/data_series.py
@@ -22,27 +22,16 @@ from typing import Any, Dict, List, Optional, Union
 
 import pandas as pd
 from pydantic import StrictInt, StrictStr
+from streampipes.model.resource.exceptions import StreamPipesUnsupportedDataSeries
 from streampipes.model.resource.resource import Resource
 
 __all__ = [
-    "DataLakeSeries",
+    "DataSeries",
 ]
 
 
-class StreamPipesUnsupportedDataLakeSeries(Exception):
-    """Exception to be raised when the returned data lake series
-    cannot be parsed with the current implementation of the resource.
-    """
-
-    def __init__(self):
-        super().__init__(
-            "The Data Lake series returned by the API appears "
-            "to have a structure that is not currently supported by the Python client."
-        )
-
-
-class DataLakeSeries(Resource):
-    """Implementation of a resource for data lake series.
+class DataSeries(Resource):
+    """Implementation of a resource for data series.
     This resource defines the data model used by its resource container(`model.container.DataLakeMeasures`).
     It inherits from Pydantic's BaseModel to get all its superpowers,
     which are used to parse, validate the API response and to easily switch between
@@ -56,8 +45,8 @@ class DataLakeSeries(Resource):
     """
 
     @classmethod
-    def from_json(cls, json_string: str) -> DataLakeSeries:
-        """Creates an instance of `DataLakeSeries` from a given JSON string.
+    def from_json(cls, json_string: str) -> DataSeries:
+        """Creates an instance of `DataSeries` from a given JSON string.
 
         This method is used by the resource container to parse the JSON response of
         the StreamPipes API.
@@ -70,8 +59,8 @@ class DataLakeSeries(Resource):
 
         Returns
         -------
-        DataLakeSeries
-            Instance of `DataLakeSeries` that is created based on the given JSON string.
+        DataSeries
+            Instance of `DataSeries` that is created based on the given JSON string.
 
         Raises
         ------
@@ -87,7 +76,7 @@ class DataLakeSeries(Resource):
         # check if the provided JSON has only one data series entry
         # otherwise raise the proper exception
         if len(parsed_json["allDataSeries"]) != 1:
-            raise StreamPipesUnsupportedDataLakeSeries()
+            raise StreamPipesUnsupportedDataSeries()
 
         # get the data data series
         data_series = parsed_json["allDataSeries"][0]
diff --git a/streampipes-client-python/streampipes/model/resource/__init__.py b/streampipes-client-python/streampipes/model/resource/exceptions.py
similarity index 61%
copy from streampipes-client-python/streampipes/model/resource/__init__.py
copy to streampipes-client-python/streampipes/model/resource/exceptions.py
index c6f0cddba..88bb9e007 100644
--- a/streampipes-client-python/streampipes/model/resource/__init__.py
+++ b/streampipes-client-python/streampipes/model/resource/exceptions.py
@@ -15,14 +15,17 @@
 # limitations under the License.
 #
 
-from .data_lake_measure import DataLakeMeasure
-from .data_lake_series import DataLakeSeries
-from .data_stream import DataStream
-from .function_definition import FunctionDefinition
+from typing import Optional
 
-__all__ = [
-    "DataLakeMeasure",
-    "DataLakeSeries",
-    "DataStream",
-    "FunctionDefinition",
-]
+
+class StreamPipesUnsupportedDataSeries(Exception):
+    """Exception to be raised when the returned data lake series
+    cannot be parsed with the current implementation of the resource.
+    """
+
+    def __init__(self, reason: Optional[str] = None):
+        super().__init__(
+            "The Data Lake series returned by the API appears "
+            "to have a structure that is not currently supported by the Python client."
+            f"Reason: {reason}"
+        )
diff --git a/streampipes-client-python/streampipes/model/resource/query_result.py b/streampipes-client-python/streampipes/model/resource/query_result.py
new file mode 100644
index 000000000..03d6f3bc1
--- /dev/null
+++ b/streampipes-client-python/streampipes/model/resource/query_result.py
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from itertools import chain
+from typing import Any, Dict, List, Literal, Union
+
+import pandas as pd
+from pydantic import Field, StrictInt, StrictStr
+from streampipes.model.resource import DataSeries
+from streampipes.model.resource.exceptions import StreamPipesUnsupportedDataSeries
+from streampipes.model.resource.resource import Resource
+
+__all__ = [
+    "QueryResult",
+]
+
+
+class QueryResult(Resource):
+    """Implementation of a resource for query result.
+    This resource defines the data model used by its resource container(`model.container.DataLakeMeasures`).
+    It inherits from Pydantic's BaseModel to get all its superpowers,
+    which are used to parse, validate the API response and to easily switch between
+    the Python representation (both serialized and deserialized) and Java representation (serialized only).
+    """
+
+    def convert_to_pandas_representation(self) -> Dict[str, Union[List[str], List[List[Any]]]]:
+        """Returns the dictionary representation of a data lake series
+        to be used when creating a pandas Dataframe.
+
+        It contains only the "header rows" (the column names) and "rows" that contain the actual data.
+
+        Returns
+        -------
+        dict
+            Dictionary with the keys `headers` and `rows`
+
+        Raises
+        ------
+        StreamPipesUnsupportedDataLakeSeries
+            If the query result returned by the StreamPipes API cannot be converted to the pandas representation
+
+        """
+        for series in self.all_data_series:
+            if self.headers != series.headers:
+                raise StreamPipesUnsupportedDataSeries("Headers of series does not match query result headers")
+
+        if self.headers[0] == "time":
+            self.headers[0] = "timestamp"
+        else:
+            raise StreamPipesUnsupportedDataSeries(f"Unsupported headers {self.headers}")
+
+        return {
+            "headers": self.headers,
+            "rows": list(chain.from_iterable([series.rows for series in self.all_data_series])),
+        }
+
+    total: StrictInt
+    headers: List[StrictStr]
+    all_data_series: List[DataSeries]
+    query_status: Literal["OK", "TOO_MUCH_DATA"] = Field(alias="spQueryStatus")
+
+    def to_pandas(self) -> pd.DataFrame:
+        """Returns the data lake series in representation of a Pandas Dataframe.
+
+        Returns
+        -------
+        df: pd.DataFrame
+            Pandas df containing the query result
+        """
+
+        pandas_representation = self.convert_to_pandas_representation()
+
+        df = pd.DataFrame(data=pandas_representation["rows"], columns=pandas_representation["headers"])
+
+        return df
diff --git a/streampipes-client-python/tests/client/test_data_lake_series.py b/streampipes-client-python/tests/client/test_data_lake_series.py
index 482c0f75b..d634a1262 100644
--- a/streampipes-client-python/tests/client/test_data_lake_series.py
+++ b/streampipes-client-python/tests/client/test_data_lake_series.py
@@ -21,89 +21,60 @@ from unittest.mock import MagicMock, call, patch
 from streampipes.client import StreamPipesClient
 from streampipes.client.config import StreamPipesClientConfig
 from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
-from streampipes.model.resource.data_lake_series import (
-    StreamPipesUnsupportedDataLakeSeries,
-)
+from streampipes.model.resource.exceptions import StreamPipesUnsupportedDataSeries
 
 
 class TestDataLakeSeries(TestCase):
     def setUp(self) -> None:
-        self.series_regular = {
-            "total": 1,
-            "headers": [
-                "time",
-                "changeDetectedHigh",
-                "changeDetectedLow",
-                "cumSumHigh",
-                "cumSumLow",
-                "level",
-                "overflow",
-                "sensorId",
-                "underflow",
-            ],
-            "allDataSeries": [
-                {
-                    "total": 2,
-                    "rows": [
-                        [
-                            "2022-11-05T14:47:50.838Z",
-                            False,
-                            False,
-                            "0.0",
-                            "0.0",
-                            73.37740325927734,
-                            False,
-                            "level01",
-                            False,
-                        ],
-                        [
-                            "2022-11-05T14:47:54.906Z",
-                            False,
-                            False,
-                            "0.0",
-                            "-0.38673634857474815",
-                            70.03279876708984,
-                            False,
-                            "level01",
-                            False,
-                        ],
-                    ],
-                    "tags": None,
-                    "headers": [
-                        "time",
-                        "changeDetectedHigh",
-                        "changeDetectedLow",
-                        "cumSumHigh",
-                        "cumSumLow",
-                        "level",
-                        "overflow",
-                        "sensorId",
-                        "underflow",
-                    ],
-                }
-            ],
-        }
+        self.base_headers = [
+            "changeDetectedHigh",
+            "changeDetectedLow",
+            "cumSumHigh",
+            "cumSumLow",
+            "level",
+            "overflow",
+            "sensorId",
+            "underflow",
+        ]
 
-        self.series_missing = {
-            "total": 1,
-            "headers": [
-                "time",
-                "changeDetectedHigh",
-                "changeDetectedLow",
-                "cumSumHigh",
-                "cumSumLow",
-                "level",
-                "overflow",
-                "sensorId",
-                "underflow",
+        self.headers = ["time"] + self.base_headers
+
+        self.headers_expected = ["timestamp"] + self.base_headers
+
+        self.data_series = {
+            "total": 2,
+            "rows": [
+                [
+                    "2022-11-05T14:47:50.838Z",
+                    False,
+                    False,
+                    "0.0",
+                    "0.0",
+                    73.37740325927734,
+                    False,
+                    "level01",
+                    False,
+                ],
+                [
+                    "2022-11-05T14:47:54.906Z",
+                    False,
+                    False,
+                    "0.0",
+                    "-0.38673634857474815",
+                    70.03279876708984,
+                    False,
+                    "level01",
+                    False,
+                ],
             ],
-            "allDataSeries": [],
+            "tags": None,
+            "headers": self.headers,
         }
 
-    @patch("streampipes.client.client.Session", autospec=True)
-    def test_to_pandas(self, http_session: MagicMock):
+    @staticmethod
+    def get_result_as_panda(http_session: MagicMock, data: dict):
         http_session_mock = MagicMock()
-        http_session_mock.get.return_value.text = json.dumps(self.series_regular)
+        http_session_mock.get.return_value.json.return_value = data
         http_session.return_value = http_session_mock
 
         client = StreamPipesClient(
@@ -112,6 +83,7 @@ class TestDataLakeSeries(TestCase):
                 host_address="localhost",
             )
         )
+
         result = client.dataLakeMeasureApi.get(identifier="test")
 
         http_session.assert_has_calls(
@@ -119,36 +91,59 @@ class TestDataLakeSeries(TestCase):
             any_order=True,
         )
 
-        result_pd = result.to_pandas()
+        return result.to_pandas()
+
+    @patch("streampipes.client.client.Session", autospec=True)
+    def test_to_pandas(self, http_session: MagicMock):
+        query_result = {
+            "total": 1,
+            "headers": self.headers,
+            "spQueryStatus": "OK",
+            "allDataSeries": [
+                self.data_series
+            ],
+        }
+
+        result_pd = self.get_result_as_panda(http_session, query_result)
 
         self.assertEqual(2, len(result_pd))
         self.assertListEqual(
-            [
-                "time",
-                "changeDetectedHigh",
-                "changeDetectedLow",
-                "cumSumHigh",
-                "cumSumLow",
-                "level",
-                "overflow",
-                "sensorId",
-                "underflow",
-            ],
+            self.headers_expected,
             list(result_pd.columns),
         )
         self.assertEqual(73.37740325927734, result_pd["level"][0])
 
     @patch("streampipes.client.client.Session", autospec=True)
-    def test_to_pandas_unsupported_series(self, http_session: MagicMock):
-        http_session_mock = MagicMock()
-        http_session_mock.get.return_value.text = json.dumps(self.series_missing)
-        http_session.return_value = http_session_mock
+    def test_group_by_to_pandas(self, http_session: MagicMock):
+        query_result = {
+            "total": 2,
+            "headers": self.headers,
+            "spQueryStatus": "OK",
+            "allDataSeries": [
+                self.data_series,
+                self.data_series
+            ],
+        }
 
-        client = StreamPipesClient(
-            client_config=StreamPipesClientConfig(
-                credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"),
-                host_address="localhost",
-            )
+        result_pd = self.get_result_as_panda(http_session, query_result)
+
+        self.assertEqual(4, len(result_pd))
+        self.assertListEqual(
+            self.headers_expected,
+            list(result_pd.columns),
         )
-        with self.assertRaises(StreamPipesUnsupportedDataLakeSeries):
-            client.dataLakeMeasureApi.get(identifier="test")
+        self.assertEqual(70.03279876708984, result_pd["level"][3])
+
+    @patch("streampipes.client.client.Session", autospec=True)
+    def test_different_headers_exception(self, http_session: MagicMock):
+        query_result = {
+            "total": 1,
+            "headers": ['one'],
+            "spQueryStatus": "OK",
+            "allDataSeries": [
+                self.data_series
+            ],
+        }
+
+        with self.assertRaises(StreamPipesUnsupportedDataSeries):
+            self.get_result_as_panda(http_session, query_result)