You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2023/04/06 15:24:38 UTC

[streampipes] 02/02: [SP-1367] Use `QueryResult` as return type for datalakemeasure API

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch SP-1367
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 1749eb44d55dc7b2ab3b138838799cf1a8829b94
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Thu Apr 6 17:24:24 2023 +0200

    [SP-1367] Use `QueryResult` as return type for datalakemeasure API
---
 ...cting-data-from-the-streampipes-data-lake.ipynb |  4 +-
 .../streampipes/endpoint/api/data_lake_measure.py  |  1 -
 .../streampipes/model/resource/data_lake_series.py | 13 +-----
 .../streampipes/model/resource/exceptions.py       | 16 +++++++
 .../streampipes/model/resource/query_result.py     | 53 ++++++++++++++--------
 streampipes-client-python/streampipes/test.py      | 17 -------
 .../tests/client/test_data_lake_series.py          | 48 ++++++++++++--------
 7 files changed, 83 insertions(+), 69 deletions(-)

diff --git a/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb b/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb
index 6f1d7d81d..66a71f08b 100644
--- a/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb
+++ b/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb
@@ -370,8 +370,8 @@
     },
     {
      "data": {
-      "text/plain": "                         time  mass_flow  temperature\n0    2023-02-24T16:19:41.472Z   3.309556    44.448483\n1    2023-02-24T16:19:41.482Z   5.608580    40.322033\n2    2023-02-24T16:19:41.493Z   7.692881    49.239639\n3    2023-02-24T16:19:41.503Z   3.632898    49.933754\n4    2023-02-24T16:19:41.513Z   0.711260    50.106617\n..                        ...        ...          ...\n995  2023-02-24T16:19:52.927Z   1.740114    46.558231\n996   2023-02-24T16:19:52.94Z   [...]
-      "text/html": "<div>\n<style scoped>\n    .dataframe tbody tr th:only-of-type {\n        vertical-align: middle;\n    }\n\n    .dataframe tbody tr th {\n        vertical-align: top;\n    }\n\n    .dataframe thead th {\n        text-align: right;\n    }\n</style>\n<table border=\"1\" class=\"dataframe\">\n  <thead>\n    <tr style=\"text-align: right;\">\n      <th></th>\n      <th>time</th>\n      <th>mass_flow</th>\n      <th>temperature</th>\n    </tr>\n  </thead>\n  <tbody>\n    < [...]
+      "text/plain": "                         timestamp  mass_flow  temperature\n0    2023-02-24T16:19:41.472Z   3.309556    44.448483\n1    2023-02-24T16:19:41.482Z   5.608580    40.322033\n2    2023-02-24T16:19:41.493Z   7.692881    49.239639\n3    2023-02-24T16:19:41.503Z   3.632898    49.933754\n4    2023-02-24T16:19:41.513Z   0.711260    50.106617\n..                        ...        ...          ...\n995  2023-02-24T16:19:52.927Z   1.740114    46.558231\n996   2023-02-24T16:19:52. [...]
+      "text/html": "<div>\n<style scoped>\n    .dataframe tbody tr th:only-of-type {\n        vertical-align: middle;\n    }\n\n    .dataframe tbody tr th {\n        vertical-align: top;\n    }\n\n    .dataframe thead th {\n        text-align: right;\n    }\n</style>\n<table border=\"1\" class=\"dataframe\">\n  <thead>\n    <tr style=\"text-align: right;\">\n      <th></th>\n      <th>timestamp</th>\n      <th>mass_flow</th>\n      <th>temperature</th>\n    </tr>\n  </thead>\n  <tbody>\n [...]
      },
      "execution_count": 14,
      "metadata": {},
diff --git a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
index 0fbbfced8..c7493683c 100644
--- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
@@ -26,7 +26,6 @@ from pydantic import BaseModel, Extra, Field, StrictInt, ValidationError, valida
 from streampipes.endpoint.endpoint import APIEndpoint
 from streampipes.model.container import DataLakeMeasures
 from streampipes.model.container.resource_container import ResourceContainer
-from streampipes.model.resource import DataSeries
 
 __all__ = [
     "DataLakeMeasureEndpoint",
diff --git a/streampipes-client-python/streampipes/model/resource/data_lake_series.py b/streampipes-client-python/streampipes/model/resource/data_lake_series.py
index b68e1763a..7bb4ffaaa 100644
--- a/streampipes-client-python/streampipes/model/resource/data_lake_series.py
+++ b/streampipes-client-python/streampipes/model/resource/data_lake_series.py
@@ -22,6 +22,7 @@ from typing import Any, Dict, List, Optional, Union
 
 import pandas as pd
 from pydantic import StrictInt, StrictStr
+from streampipes.model.resource.exceptions import StreamPipesUnsupportedDataLakeSeries
 from streampipes.model.resource.resource import Resource
 
 __all__ = [
@@ -29,18 +30,6 @@ __all__ = [
 ]
 
 
-class StreamPipesUnsupportedDataLakeSeries(Exception):
-    """Exception to be raised when the returned data lake series
-    cannot be parsed with the current implementation of the resource.
-    """
-
-    def __init__(self):
-        super().__init__(
-            "The Data Lake series returned by the API appears "
-            "to have a structure that is not currently supported by the Python client."
-        )
-
-
 class DataSeries(Resource):
     """Implementation of a resource for data lake series.
     This resource defines the data model used by its resource container(`model.container.DataLakeMeasures`).
diff --git a/streampipes-client-python/streampipes/model/resource/exceptions.py b/streampipes-client-python/streampipes/model/resource/exceptions.py
new file mode 100644
index 000000000..5181a3bea
--- /dev/null
+++ b/streampipes-client-python/streampipes/model/resource/exceptions.py
@@ -0,0 +1,16 @@
+from __future__ import annotations
+
+from typing import Optional
+
+
+class StreamPipesUnsupportedDataLakeSeries(Exception):
+    """Exception to be raised when the returned data lake series
+    cannot be parsed with the current implementation of the resource.
+    """
+
+    def __init__(self, reason: Optional[str] = None):
+        super().__init__(
+            "The Data Lake series returned by the API appears "
+            "to have a structure that is not currently supported by the Python client."
+            f"Reason: {reason}"
+        )
diff --git a/streampipes-client-python/streampipes/model/resource/query_result.py b/streampipes-client-python/streampipes/model/resource/query_result.py
index d7b91cc50..1020902aa 100644
--- a/streampipes-client-python/streampipes/model/resource/query_result.py
+++ b/streampipes-client-python/streampipes/model/resource/query_result.py
@@ -17,14 +17,13 @@
 
 from __future__ import annotations
 
-import json
 from itertools import chain
-from typing import Any, Dict, List, Optional, Union, Literal
+from typing import Any, Dict, List, Literal, Union
 
 import pandas as pd
-from pydantic import StrictInt, StrictStr, Field
-
+from pydantic import Field, StrictInt, StrictStr
 from streampipes.model.resource import DataSeries
+from streampipes.model.resource.exceptions import StreamPipesUnsupportedDataLakeSeries
 from streampipes.model.resource.resource import Resource
 
 __all__ = [
@@ -33,6 +32,12 @@ __all__ = [
 
 
 class QueryResult(Resource):
+    """Implementation of a resource for query result.
+    This resource defines the data model used by its resource container(`model.container.DataLakeMeasures`).
+    It inherits from Pydantic's BaseModel to get all its superpowers,
+    which are used to parse, validate the API response and to easily switch between
+    the Python representation (both serialized and deserialized) and Java representation (serialized only).
+    """
 
     def convert_to_pandas_representation(self) -> Dict[str, Union[List[str], List[List[Any]]]]:
         """Returns the dictionary representation of a data lake series
@@ -42,35 +47,45 @@ class QueryResult(Resource):
 
         Returns
         -------
-        Dictionary
+        dict
             Dictionary with the keys `headers` and `rows`
 
+        Raises
+        ------
+        StreamPipesUnsupportedDataLakeSeries
+            If the query result returned by the StreamPipes API cannot be converted to the pandas representation
+
         """
-        return self.dict(include={"headers", "rows"})
+        for series in self.all_data_series:
+            if self.headers != series.headers:
+                raise StreamPipesUnsupportedDataLakeSeries("Headers of series does not match query result headers")
+
+        if self.headers[0] == "time":
+            self.headers[0] = "timestamp"
+        else:
+            raise StreamPipesUnsupportedDataLakeSeries(f"Unsupported headers {self.headers}")
+
+        return {
+            "headers": self.headers,
+            "rows": list(chain.from_iterable([series.rows for series in self.all_data_series])),
+        }
 
     total: StrictInt
     headers: List[StrictStr]
     all_data_series: List[DataSeries]
-    query_status: Literal['OK', 'TOO_MUCH_DATA'] = Field(alias="spQueryStatus")
+    query_status: Literal["OK", "TOO_MUCH_DATA"] = Field(alias="spQueryStatus")
 
     def to_pandas(self) -> pd.DataFrame:
         """Returns the data lake series in representation of a Pandas Dataframe.
 
         Returns
         -------
-        pd.DataFrame
+        df: pd.DataFrame
+            Pandas df containing the query result
         """
 
-        # Pseudocode:
-        # pandas_representation = self.convert_to_pandas_representation()
-        #
-        # pd = pd.DataFrame(data=chain(*[item.rows for item in self.all_data_series]),
-        #              columns=pandas_representation["headers"])
-        #
-        # if tags not None:
-        #     pd.groupBy(tags)
-
-        return pd
+        pandas_representation = self.convert_to_pandas_representation()
 
+        df = pd.DataFrame(data=pandas_representation["rows"], columns=pandas_representation["headers"])
 
-    test ={})
\ No newline at end of file
+        return df
diff --git a/streampipes-client-python/streampipes/test.py b/streampipes-client-python/streampipes/test.py
deleted file mode 100644
index 97bceaa72..000000000
--- a/streampipes-client-python/streampipes/test.py
+++ /dev/null
@@ -1,17 +0,0 @@
-from streampipes.client import StreamPipesClient
-from streampipes.client.config import StreamPipesClientConfig
-from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
-
-if __name__ == '__main__':
-    config = StreamPipesClientConfig(
-        credential_provider=StreamPipesApiKeyCredentials(
-            username="admin@streampipes.apache.org",
-            api_key="Mbias0Uqdytro5fMEMnXXBYM",
-        ),
-        host_address="localhost",
-        https_disabled=True,
-        port=8082
-    )
-
-    client = StreamPipesClient(client_config=config)
-    print(client.dataLakeMeasureApi.get('test').to_pandas())
diff --git a/streampipes-client-python/tests/client/test_data_lake_series.py b/streampipes-client-python/tests/client/test_data_lake_series.py
index 694104773..21a3afc53 100644
--- a/streampipes-client-python/tests/client/test_data_lake_series.py
+++ b/streampipes-client-python/tests/client/test_data_lake_series.py
@@ -21,25 +21,25 @@ from unittest.mock import MagicMock, call, patch
 from streampipes.client import StreamPipesClient
 from streampipes.client.config import StreamPipesClientConfig
 from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
-from streampipes.model.resource.data_lake_series import (
-    StreamPipesUnsupportedDataLakeSeries,
-)
+from streampipes.model.resource.exceptions import StreamPipesUnsupportedDataLakeSeries
 
 
 class TestDataLakeSeries(TestCase):
     def setUp(self) -> None:
+        self.base_headers = [
+            "changeDetectedHigh",
+            "changeDetectedLow",
+            "cumSumHigh",
+            "cumSumLow",
+            "level",
+            "overflow",
+            "sensorId",
+            "underflow",
+        ]
 
-        self.headers = [
-                "time",
-                "changeDetectedHigh",
-                "changeDetectedLow",
-                "cumSumHigh",
-                "cumSumLow",
-                "level",
-                "overflow",
-                "sensorId",
-                "underflow",
-            ]
+        self.headers = ["time"] + self.base_headers
+
+        self.headers_expected = ["timestamp"] + self.base_headers
 
         self.data_series = {
             "total": 2,
@@ -93,10 +93,8 @@ class TestDataLakeSeries(TestCase):
 
         return result.to_pandas()
 
-
     @patch("streampipes.client.client.Session", autospec=True)
     def test_to_pandas(self, http_session: MagicMock):
-
         query_result = {
             "total": 1,
             "headers": self.headers,
@@ -110,7 +108,7 @@ class TestDataLakeSeries(TestCase):
 
         self.assertEqual(2, len(result_pd))
         self.assertListEqual(
-            self.headers,
+            self.headers_expected,
             list(result_pd.columns),
         )
         self.assertEqual(73.37740325927734, result_pd["level"][0])
@@ -131,7 +129,21 @@ class TestDataLakeSeries(TestCase):
 
         self.assertEqual(4, len(result_pd))
         self.assertListEqual(
-            self.headers,
+            self.headers_expected,
             list(result_pd.columns),
         )
         self.assertEqual(70.03279876708984, result_pd["level"][3])
+
+    @patch("streampipes.client.client.Session", autospec=True)
+    def test_different_headers_exception(self, http_session: MagicMock):
+        query_result = {
+            "total": 1,
+            "headers": ['one'],
+            "spQueryStatus": "OK",
+            "allDataSeries": [
+                self.data_series
+            ],
+        }
+
+        with self.assertRaises(StreamPipesUnsupportedDataLakeSeries):
+            self.get_result_as_panda(http_session, query_result)