You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2023/04/06 15:24:38 UTC
[streampipes] 02/02: [SP-1367] Use `QueryResult` as return type for datalakemeasure API
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch SP-1367
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 1749eb44d55dc7b2ab3b138838799cf1a8829b94
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Thu Apr 6 17:24:24 2023 +0200
[SP-1367] Use `QueryResult` as return type for datalakemeasure API
---
...cting-data-from-the-streampipes-data-lake.ipynb | 4 +-
.../streampipes/endpoint/api/data_lake_measure.py | 1 -
.../streampipes/model/resource/data_lake_series.py | 13 +-----
.../streampipes/model/resource/exceptions.py | 16 +++++++
.../streampipes/model/resource/query_result.py | 53 ++++++++++++++--------
streampipes-client-python/streampipes/test.py | 17 -------
.../tests/client/test_data_lake_series.py | 48 ++++++++++++--------
7 files changed, 83 insertions(+), 69 deletions(-)
diff --git a/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb b/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb
index 6f1d7d81d..66a71f08b 100644
--- a/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb
+++ b/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb
@@ -370,8 +370,8 @@
},
{
"data": {
- "text/plain": " time mass_flow temperature\n0 2023-02-24T16:19:41.472Z 3.309556 44.448483\n1 2023-02-24T16:19:41.482Z 5.608580 40.322033\n2 2023-02-24T16:19:41.493Z 7.692881 49.239639\n3 2023-02-24T16:19:41.503Z 3.632898 49.933754\n4 2023-02-24T16:19:41.513Z 0.711260 50.106617\n.. ... ... ...\n995 2023-02-24T16:19:52.927Z 1.740114 46.558231\n996 2023-02-24T16:19:52.94Z [...]
- "text/html": "<div>\n<style scoped>\n .dataframe tbody tr th:only-of-type {\n vertical-align: middle;\n }\n\n .dataframe tbody tr th {\n vertical-align: top;\n }\n\n .dataframe thead th {\n text-align: right;\n }\n</style>\n<table border=\"1\" class=\"dataframe\">\n <thead>\n <tr style=\"text-align: right;\">\n <th></th>\n <th>time</th>\n <th>mass_flow</th>\n <th>temperature</th>\n </tr>\n </thead>\n <tbody>\n < [...]
+ "text/plain": " timestamp mass_flow temperature\n0 2023-02-24T16:19:41.472Z 3.309556 44.448483\n1 2023-02-24T16:19:41.482Z 5.608580 40.322033\n2 2023-02-24T16:19:41.493Z 7.692881 49.239639\n3 2023-02-24T16:19:41.503Z 3.632898 49.933754\n4 2023-02-24T16:19:41.513Z 0.711260 50.106617\n.. ... ... ...\n995 2023-02-24T16:19:52.927Z 1.740114 46.558231\n996 2023-02-24T16:19:52. [...]
+ "text/html": "<div>\n<style scoped>\n .dataframe tbody tr th:only-of-type {\n vertical-align: middle;\n }\n\n .dataframe tbody tr th {\n vertical-align: top;\n }\n\n .dataframe thead th {\n text-align: right;\n }\n</style>\n<table border=\"1\" class=\"dataframe\">\n <thead>\n <tr style=\"text-align: right;\">\n <th></th>\n <th>timestamp</th>\n <th>mass_flow</th>\n <th>temperature</th>\n </tr>\n </thead>\n <tbody>\n [...]
},
"execution_count": 14,
"metadata": {},
diff --git a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
index 0fbbfced8..c7493683c 100644
--- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
@@ -26,7 +26,6 @@ from pydantic import BaseModel, Extra, Field, StrictInt, ValidationError, valida
from streampipes.endpoint.endpoint import APIEndpoint
from streampipes.model.container import DataLakeMeasures
from streampipes.model.container.resource_container import ResourceContainer
-from streampipes.model.resource import DataSeries
__all__ = [
"DataLakeMeasureEndpoint",
diff --git a/streampipes-client-python/streampipes/model/resource/data_lake_series.py b/streampipes-client-python/streampipes/model/resource/data_lake_series.py
index b68e1763a..7bb4ffaaa 100644
--- a/streampipes-client-python/streampipes/model/resource/data_lake_series.py
+++ b/streampipes-client-python/streampipes/model/resource/data_lake_series.py
@@ -22,6 +22,7 @@ from typing import Any, Dict, List, Optional, Union
import pandas as pd
from pydantic import StrictInt, StrictStr
+from streampipes.model.resource.exceptions import StreamPipesUnsupportedDataLakeSeries
from streampipes.model.resource.resource import Resource
__all__ = [
@@ -29,18 +30,6 @@ __all__ = [
]
-class StreamPipesUnsupportedDataLakeSeries(Exception):
- """Exception to be raised when the returned data lake series
- cannot be parsed with the current implementation of the resource.
- """
-
- def __init__(self):
- super().__init__(
- "The Data Lake series returned by the API appears "
- "to have a structure that is not currently supported by the Python client."
- )
-
-
class DataSeries(Resource):
"""Implementation of a resource for data lake series.
This resource defines the data model used by its resource container(`model.container.DataLakeMeasures`).
diff --git a/streampipes-client-python/streampipes/model/resource/exceptions.py b/streampipes-client-python/streampipes/model/resource/exceptions.py
new file mode 100644
index 000000000..5181a3bea
--- /dev/null
+++ b/streampipes-client-python/streampipes/model/resource/exceptions.py
@@ -0,0 +1,16 @@
+from __future__ import annotations
+
+from typing import Optional
+
+
+class StreamPipesUnsupportedDataLakeSeries(Exception):
+ """Exception to be raised when the returned data lake series
+ cannot be parsed with the current implementation of the resource.
+ """
+
+ def __init__(self, reason: Optional[str] = None):
+ super().__init__(
+ "The Data Lake series returned by the API appears "
+ "to have a structure that is not currently supported by the Python client."
+ f"Reason: {reason}"
+ )
diff --git a/streampipes-client-python/streampipes/model/resource/query_result.py b/streampipes-client-python/streampipes/model/resource/query_result.py
index d7b91cc50..1020902aa 100644
--- a/streampipes-client-python/streampipes/model/resource/query_result.py
+++ b/streampipes-client-python/streampipes/model/resource/query_result.py
@@ -17,14 +17,13 @@
from __future__ import annotations
-import json
from itertools import chain
-from typing import Any, Dict, List, Optional, Union, Literal
+from typing import Any, Dict, List, Literal, Union
import pandas as pd
-from pydantic import StrictInt, StrictStr, Field
-
+from pydantic import Field, StrictInt, StrictStr
from streampipes.model.resource import DataSeries
+from streampipes.model.resource.exceptions import StreamPipesUnsupportedDataLakeSeries
from streampipes.model.resource.resource import Resource
__all__ = [
@@ -33,6 +32,12 @@ __all__ = [
class QueryResult(Resource):
+ """Implementation of a resource for query result.
+ This resource defines the data model used by its resource container(`model.container.DataLakeMeasures`).
+ It inherits from Pydantic's BaseModel to get all its superpowers,
+ which are used to parse, validate the API response and to easily switch between
+ the Python representation (both serialized and deserialized) and Java representation (serialized only).
+ """
def convert_to_pandas_representation(self) -> Dict[str, Union[List[str], List[List[Any]]]]:
"""Returns the dictionary representation of a data lake series
@@ -42,35 +47,45 @@ class QueryResult(Resource):
Returns
-------
- Dictionary
+ dict
Dictionary with the keys `headers` and `rows`
+ Raises
+ ------
+ StreamPipesUnsupportedDataLakeSeries
+ If the query result returned by the StreamPipes API cannot be converted to the pandas representation
+
"""
- return self.dict(include={"headers", "rows"})
+ for series in self.all_data_series:
+ if self.headers != series.headers:
+ raise StreamPipesUnsupportedDataLakeSeries("Headers of series does not match query result headers")
+
+ if self.headers[0] == "time":
+ self.headers[0] = "timestamp"
+ else:
+ raise StreamPipesUnsupportedDataLakeSeries(f"Unsupported headers {self.headers}")
+
+ return {
+ "headers": self.headers,
+ "rows": list(chain.from_iterable([series.rows for series in self.all_data_series])),
+ }
total: StrictInt
headers: List[StrictStr]
all_data_series: List[DataSeries]
- query_status: Literal['OK', 'TOO_MUCH_DATA'] = Field(alias="spQueryStatus")
+ query_status: Literal["OK", "TOO_MUCH_DATA"] = Field(alias="spQueryStatus")
def to_pandas(self) -> pd.DataFrame:
"""Returns the data lake series in representation of a Pandas Dataframe.
Returns
-------
- pd.DataFrame
+ df: pd.DataFrame
+ Pandas df containing the query result
"""
- # Pseudocode:
- # pandas_representation = self.convert_to_pandas_representation()
- #
- # pd = pd.DataFrame(data=chain(*[item.rows for item in self.all_data_series]),
- # columns=pandas_representation["headers"])
- #
- # if tags not None:
- # pd.groupBy(tags)
-
- return pd
+ pandas_representation = self.convert_to_pandas_representation()
+ df = pd.DataFrame(data=pandas_representation["rows"], columns=pandas_representation["headers"])
- test ={})
\ No newline at end of file
+ return df
diff --git a/streampipes-client-python/streampipes/test.py b/streampipes-client-python/streampipes/test.py
deleted file mode 100644
index 97bceaa72..000000000
--- a/streampipes-client-python/streampipes/test.py
+++ /dev/null
@@ -1,17 +0,0 @@
-from streampipes.client import StreamPipesClient
-from streampipes.client.config import StreamPipesClientConfig
-from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
-
-if __name__ == '__main__':
- config = StreamPipesClientConfig(
- credential_provider=StreamPipesApiKeyCredentials(
- username="admin@streampipes.apache.org",
- api_key="Mbias0Uqdytro5fMEMnXXBYM",
- ),
- host_address="localhost",
- https_disabled=True,
- port=8082
- )
-
- client = StreamPipesClient(client_config=config)
- print(client.dataLakeMeasureApi.get('test').to_pandas())
diff --git a/streampipes-client-python/tests/client/test_data_lake_series.py b/streampipes-client-python/tests/client/test_data_lake_series.py
index 694104773..21a3afc53 100644
--- a/streampipes-client-python/tests/client/test_data_lake_series.py
+++ b/streampipes-client-python/tests/client/test_data_lake_series.py
@@ -21,25 +21,25 @@ from unittest.mock import MagicMock, call, patch
from streampipes.client import StreamPipesClient
from streampipes.client.config import StreamPipesClientConfig
from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
-from streampipes.model.resource.data_lake_series import (
- StreamPipesUnsupportedDataLakeSeries,
-)
+from streampipes.model.resource.exceptions import StreamPipesUnsupportedDataLakeSeries
class TestDataLakeSeries(TestCase):
def setUp(self) -> None:
+ self.base_headers = [
+ "changeDetectedHigh",
+ "changeDetectedLow",
+ "cumSumHigh",
+ "cumSumLow",
+ "level",
+ "overflow",
+ "sensorId",
+ "underflow",
+ ]
- self.headers = [
- "time",
- "changeDetectedHigh",
- "changeDetectedLow",
- "cumSumHigh",
- "cumSumLow",
- "level",
- "overflow",
- "sensorId",
- "underflow",
- ]
+ self.headers = ["time"] + self.base_headers
+
+ self.headers_expected = ["timestamp"] + self.base_headers
self.data_series = {
"total": 2,
@@ -93,10 +93,8 @@ class TestDataLakeSeries(TestCase):
return result.to_pandas()
-
@patch("streampipes.client.client.Session", autospec=True)
def test_to_pandas(self, http_session: MagicMock):
-
query_result = {
"total": 1,
"headers": self.headers,
@@ -110,7 +108,7 @@ class TestDataLakeSeries(TestCase):
self.assertEqual(2, len(result_pd))
self.assertListEqual(
- self.headers,
+ self.headers_expected,
list(result_pd.columns),
)
self.assertEqual(73.37740325927734, result_pd["level"][0])
@@ -131,7 +129,21 @@ class TestDataLakeSeries(TestCase):
self.assertEqual(4, len(result_pd))
self.assertListEqual(
- self.headers,
+ self.headers_expected,
list(result_pd.columns),
)
self.assertEqual(70.03279876708984, result_pd["level"][3])
+
+ @patch("streampipes.client.client.Session", autospec=True)
+ def test_different_headers_exception(self, http_session: MagicMock):
+ query_result = {
+ "total": 1,
+ "headers": ['one'],
+ "spQueryStatus": "OK",
+ "allDataSeries": [
+ self.data_series
+ ],
+ }
+
+ with self.assertRaises(StreamPipesUnsupportedDataLakeSeries):
+ self.get_result_as_panda(http_session, query_result)