You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2023/04/13 15:43:00 UTC
[streampipes] branch dev updated: Sp 1367 Use QueryResult as return type for data lake measure API (#1492)
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new c5ec75cbc Sp 1367 Use QueryResult as return type for data lake measure API (#1492)
c5ec75cbc is described below
commit c5ec75cbca24eff2e133c92c5ae8ce5a6b10e112
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Thu Apr 13 17:42:55 2023 +0200
Sp 1367 Use QueryResult as return type for data lake measure API (#1492)
---
...cting-data-from-the-streampipes-data-lake.ipynb | 4 +-
.../streampipes/endpoint/api/data_lake_measure.py | 11 +-
.../streampipes/model/resource/__init__.py | 4 +-
.../{data_lake_series.py => data_series.py} | 29 +---
.../model/resource/{__init__.py => exceptions.py} | 23 +--
.../streampipes/model/resource/query_result.py | 89 ++++++++++
.../tests/client/test_data_lake_series.py | 189 ++++++++++-----------
7 files changed, 213 insertions(+), 136 deletions(-)
diff --git a/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb b/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb
index 6f1d7d81d..66a71f08b 100644
--- a/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb
+++ b/streampipes-client-python/docs/tutorials/2-extracting-data-from-the-streampipes-data-lake.ipynb
@@ -370,8 +370,8 @@
},
{
"data": {
- "text/plain": " time mass_flow temperature\n0 2023-02-24T16:19:41.472Z 3.309556 44.448483\n1 2023-02-24T16:19:41.482Z 5.608580 40.322033\n2 2023-02-24T16:19:41.493Z 7.692881 49.239639\n3 2023-02-24T16:19:41.503Z 3.632898 49.933754\n4 2023-02-24T16:19:41.513Z 0.711260 50.106617\n.. ... ... ...\n995 2023-02-24T16:19:52.927Z 1.740114 46.558231\n996 2023-02-24T16:19:52.94Z [...]
- "text/html": "<div>\n<style scoped>\n .dataframe tbody tr th:only-of-type {\n vertical-align: middle;\n }\n\n .dataframe tbody tr th {\n vertical-align: top;\n }\n\n .dataframe thead th {\n text-align: right;\n }\n</style>\n<table border=\"1\" class=\"dataframe\">\n <thead>\n <tr style=\"text-align: right;\">\n <th></th>\n <th>time</th>\n <th>mass_flow</th>\n <th>temperature</th>\n </tr>\n </thead>\n <tbody>\n < [...]
+ "text/plain": " timestamp mass_flow temperature\n0 2023-02-24T16:19:41.472Z 3.309556 44.448483\n1 2023-02-24T16:19:41.482Z 5.608580 40.322033\n2 2023-02-24T16:19:41.493Z 7.692881 49.239639\n3 2023-02-24T16:19:41.503Z 3.632898 49.933754\n4 2023-02-24T16:19:41.513Z 0.711260 50.106617\n.. ... ... ...\n995 2023-02-24T16:19:52.927Z 1.740114 46.558231\n996 2023-02-24T16:19:52. [...]
+ "text/html": "<div>\n<style scoped>\n .dataframe tbody tr th:only-of-type {\n vertical-align: middle;\n }\n\n .dataframe tbody tr th {\n vertical-align: top;\n }\n\n .dataframe thead th {\n text-align: right;\n }\n</style>\n<table border=\"1\" class=\"dataframe\">\n <thead>\n <tr style=\"text-align: right;\">\n <th></th>\n <th>timestamp</th>\n <th>mass_flow</th>\n <th>temperature</th>\n </tr>\n </thead>\n <tbody>\n [...]
},
"execution_count": 14,
"metadata": {},
diff --git a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
index 3566dba8d..c7493683c 100644
--- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
@@ -26,12 +26,13 @@ from pydantic import BaseModel, Extra, Field, StrictInt, ValidationError, valida
from streampipes.endpoint.endpoint import APIEndpoint
from streampipes.model.container import DataLakeMeasures
from streampipes.model.container.resource_container import ResourceContainer
-from streampipes.model.resource import DataLakeSeries
__all__ = [
"DataLakeMeasureEndpoint",
]
+from streampipes.model.resource.query_result import QueryResult
+
class StreamPipesQueryValidationError(Exception):
"""A custom exception to be raised when the validation of query parameter
@@ -275,7 +276,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
return config
@property
- def _resource_cls(self) -> Type[DataLakeSeries]:
+ def _resource_cls(self) -> Type[QueryResult]:
"""
Additional reference to resource class.
This endpoint deviates from the desired relationship
@@ -283,7 +284,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
the return type of the get endpoint.
Therefore, this is only a temporary implementation and will be removed soon.
"""
- return DataLakeSeries
+ return QueryResult
@property
def _container_cls(self) -> Type[ResourceContainer]:
@@ -298,7 +299,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
return "api", "v4", "datalake", "measurements"
- def get(self, identifier: str, **kwargs: Optional[Dict[str, Any]]) -> DataLakeSeries:
+ def get(self, identifier: str, **kwargs: Optional[Dict[str, Any]]) -> QueryResult:
"""Queries the specified data lake measure from the API.
By default, the maximum number of returned records is 1000.
@@ -332,4 +333,4 @@ class DataLakeMeasureEndpoint(APIEndpoint):
url += measurement_get_config.build_query_string()
response = self._make_request(request_method=self._parent_client.request_session.get, url=url)
- return self._resource_cls.from_json(json_string=response.text)
+ return self._resource_cls(**response.json())
diff --git a/streampipes-client-python/streampipes/model/resource/__init__.py b/streampipes-client-python/streampipes/model/resource/__init__.py
index c6f0cddba..09590c1da 100644
--- a/streampipes-client-python/streampipes/model/resource/__init__.py
+++ b/streampipes-client-python/streampipes/model/resource/__init__.py
@@ -16,13 +16,13 @@
#
from .data_lake_measure import DataLakeMeasure
-from .data_lake_series import DataLakeSeries
+from .data_series import DataSeries
from .data_stream import DataStream
from .function_definition import FunctionDefinition
__all__ = [
"DataLakeMeasure",
- "DataLakeSeries",
+ "DataSeries",
"DataStream",
"FunctionDefinition",
]
diff --git a/streampipes-client-python/streampipes/model/resource/data_lake_series.py b/streampipes-client-python/streampipes/model/resource/data_series.py
similarity index 81%
rename from streampipes-client-python/streampipes/model/resource/data_lake_series.py
rename to streampipes-client-python/streampipes/model/resource/data_series.py
index 137792f3d..55cefbab7 100644
--- a/streampipes-client-python/streampipes/model/resource/data_lake_series.py
+++ b/streampipes-client-python/streampipes/model/resource/data_series.py
@@ -22,27 +22,16 @@ from typing import Any, Dict, List, Optional, Union
import pandas as pd
from pydantic import StrictInt, StrictStr
+from streampipes.model.resource.exceptions import StreamPipesUnsupportedDataSeries
from streampipes.model.resource.resource import Resource
__all__ = [
- "DataLakeSeries",
+ "DataSeries",
]
-class StreamPipesUnsupportedDataLakeSeries(Exception):
- """Exception to be raised when the returned data lake series
- cannot be parsed with the current implementation of the resource.
- """
-
- def __init__(self):
- super().__init__(
- "The Data Lake series returned by the API appears "
- "to have a structure that is not currently supported by the Python client."
- )
-
-
-class DataLakeSeries(Resource):
- """Implementation of a resource for data lake series.
+class DataSeries(Resource):
+ """Implementation of a resource for data series.
This resource defines the data model used by its resource container(`model.container.DataLakeMeasures`).
It inherits from Pydantic's BaseModel to get all its superpowers,
which are used to parse, validate the API response and to easily switch between
@@ -56,8 +45,8 @@ class DataLakeSeries(Resource):
"""
@classmethod
- def from_json(cls, json_string: str) -> DataLakeSeries:
- """Creates an instance of `DataLakeSeries` from a given JSON string.
+ def from_json(cls, json_string: str) -> DataSeries:
+ """Creates an instance of `DataSeries` from a given JSON string.
This method is used by the resource container to parse the JSON response of
the StreamPipes API.
@@ -70,8 +59,8 @@ class DataLakeSeries(Resource):
Returns
-------
- DataLakeSeries
- Instance of `DataLakeSeries` that is created based on the given JSON string.
+ DataSeries
+ Instance of `DataSeries` that is created based on the given JSON string.
Raises
------
@@ -87,7 +76,7 @@ class DataLakeSeries(Resource):
# check if the provided JSON has only one data series entry
# otherwise raise the proper exception
if len(parsed_json["allDataSeries"]) != 1:
- raise StreamPipesUnsupportedDataLakeSeries()
+ raise StreamPipesUnsupportedDataSeries()
# get the data data series
data_series = parsed_json["allDataSeries"][0]
diff --git a/streampipes-client-python/streampipes/model/resource/__init__.py b/streampipes-client-python/streampipes/model/resource/exceptions.py
similarity index 61%
copy from streampipes-client-python/streampipes/model/resource/__init__.py
copy to streampipes-client-python/streampipes/model/resource/exceptions.py
index c6f0cddba..88bb9e007 100644
--- a/streampipes-client-python/streampipes/model/resource/__init__.py
+++ b/streampipes-client-python/streampipes/model/resource/exceptions.py
@@ -15,14 +15,17 @@
# limitations under the License.
#
-from .data_lake_measure import DataLakeMeasure
-from .data_lake_series import DataLakeSeries
-from .data_stream import DataStream
-from .function_definition import FunctionDefinition
+from typing import Optional
-__all__ = [
- "DataLakeMeasure",
- "DataLakeSeries",
- "DataStream",
- "FunctionDefinition",
-]
+
+class StreamPipesUnsupportedDataSeries(Exception):
+ """Exception to be raised when the returned data lake series
+ cannot be parsed with the current implementation of the resource.
+ """
+
+ def __init__(self, reason: Optional[str] = None):
+ super().__init__(
+ "The Data Lake series returned by the API appears "
+ "to have a structure that is not currently supported by the Python client."
+ f"Reason: {reason}"
+ )
diff --git a/streampipes-client-python/streampipes/model/resource/query_result.py b/streampipes-client-python/streampipes/model/resource/query_result.py
new file mode 100644
index 000000000..03d6f3bc1
--- /dev/null
+++ b/streampipes-client-python/streampipes/model/resource/query_result.py
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from itertools import chain
+from typing import Any, Dict, List, Literal, Union
+
+import pandas as pd
+from pydantic import Field, StrictInt, StrictStr
+from streampipes.model.resource import DataSeries
+from streampipes.model.resource.exceptions import StreamPipesUnsupportedDataSeries
+from streampipes.model.resource.resource import Resource
+
+__all__ = [
+ "QueryResult",
+]
+
+
+class QueryResult(Resource):
+ """Implementation of a resource for query result.
+ This resource defines the data model used by its resource container(`model.container.DataLakeMeasures`).
+ It inherits from Pydantic's BaseModel to get all its superpowers,
+ which are used to parse, validate the API response and to easily switch between
+ the Python representation (both serialized and deserialized) and Java representation (serialized only).
+ """
+
+ def convert_to_pandas_representation(self) -> Dict[str, Union[List[str], List[List[Any]]]]:
+ """Returns the dictionary representation of a data lake series
+ to be used when creating a pandas Dataframe.
+
+ It contains only the "header rows" (the column names) and "rows" that contain the actual data.
+
+ Returns
+ -------
+ dict
+ Dictionary with the keys `headers` and `rows`
+
+ Raises
+ ------
+ StreamPipesUnsupportedDataLakeSeries
+ If the query result returned by the StreamPipes API cannot be converted to the pandas representation
+
+ """
+ for series in self.all_data_series:
+ if self.headers != series.headers:
+ raise StreamPipesUnsupportedDataSeries("Headers of series does not match query result headers")
+
+ if self.headers[0] == "time":
+ self.headers[0] = "timestamp"
+ else:
+ raise StreamPipesUnsupportedDataSeries(f"Unsupported headers {self.headers}")
+
+ return {
+ "headers": self.headers,
+ "rows": list(chain.from_iterable([series.rows for series in self.all_data_series])),
+ }
+
+ total: StrictInt
+ headers: List[StrictStr]
+ all_data_series: List[DataSeries]
+ query_status: Literal["OK", "TOO_MUCH_DATA"] = Field(alias="spQueryStatus")
+
+ def to_pandas(self) -> pd.DataFrame:
+ """Returns the data lake series in representation of a Pandas Dataframe.
+
+ Returns
+ -------
+ df: pd.DataFrame
+ Pandas df containing the query result
+ """
+
+ pandas_representation = self.convert_to_pandas_representation()
+
+ df = pd.DataFrame(data=pandas_representation["rows"], columns=pandas_representation["headers"])
+
+ return df
diff --git a/streampipes-client-python/tests/client/test_data_lake_series.py b/streampipes-client-python/tests/client/test_data_lake_series.py
index 482c0f75b..d634a1262 100644
--- a/streampipes-client-python/tests/client/test_data_lake_series.py
+++ b/streampipes-client-python/tests/client/test_data_lake_series.py
@@ -21,89 +21,60 @@ from unittest.mock import MagicMock, call, patch
from streampipes.client import StreamPipesClient
from streampipes.client.config import StreamPipesClientConfig
from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
-from streampipes.model.resource.data_lake_series import (
- StreamPipesUnsupportedDataLakeSeries,
-)
+from streampipes.model.resource.exceptions import StreamPipesUnsupportedDataSeries
class TestDataLakeSeries(TestCase):
def setUp(self) -> None:
- self.series_regular = {
- "total": 1,
- "headers": [
- "time",
- "changeDetectedHigh",
- "changeDetectedLow",
- "cumSumHigh",
- "cumSumLow",
- "level",
- "overflow",
- "sensorId",
- "underflow",
- ],
- "allDataSeries": [
- {
- "total": 2,
- "rows": [
- [
- "2022-11-05T14:47:50.838Z",
- False,
- False,
- "0.0",
- "0.0",
- 73.37740325927734,
- False,
- "level01",
- False,
- ],
- [
- "2022-11-05T14:47:54.906Z",
- False,
- False,
- "0.0",
- "-0.38673634857474815",
- 70.03279876708984,
- False,
- "level01",
- False,
- ],
- ],
- "tags": None,
- "headers": [
- "time",
- "changeDetectedHigh",
- "changeDetectedLow",
- "cumSumHigh",
- "cumSumLow",
- "level",
- "overflow",
- "sensorId",
- "underflow",
- ],
- }
- ],
- }
+ self.base_headers = [
+ "changeDetectedHigh",
+ "changeDetectedLow",
+ "cumSumHigh",
+ "cumSumLow",
+ "level",
+ "overflow",
+ "sensorId",
+ "underflow",
+ ]
- self.series_missing = {
- "total": 1,
- "headers": [
- "time",
- "changeDetectedHigh",
- "changeDetectedLow",
- "cumSumHigh",
- "cumSumLow",
- "level",
- "overflow",
- "sensorId",
- "underflow",
+ self.headers = ["time"] + self.base_headers
+
+ self.headers_expected = ["timestamp"] + self.base_headers
+
+ self.data_series = {
+ "total": 2,
+ "rows": [
+ [
+ "2022-11-05T14:47:50.838Z",
+ False,
+ False,
+ "0.0",
+ "0.0",
+ 73.37740325927734,
+ False,
+ "level01",
+ False,
+ ],
+ [
+ "2022-11-05T14:47:54.906Z",
+ False,
+ False,
+ "0.0",
+ "-0.38673634857474815",
+ 70.03279876708984,
+ False,
+ "level01",
+ False,
+ ],
],
- "allDataSeries": [],
+ "tags": None,
+ "headers": self.headers,
}
- @patch("streampipes.client.client.Session", autospec=True)
- def test_to_pandas(self, http_session: MagicMock):
+ @staticmethod
+ def get_result_as_panda(http_session: MagicMock, data: dict):
http_session_mock = MagicMock()
- http_session_mock.get.return_value.text = json.dumps(self.series_regular)
+ http_session_mock.get.return_value.json.return_value = data
http_session.return_value = http_session_mock
client = StreamPipesClient(
@@ -112,6 +83,7 @@ class TestDataLakeSeries(TestCase):
host_address="localhost",
)
)
+
result = client.dataLakeMeasureApi.get(identifier="test")
http_session.assert_has_calls(
@@ -119,36 +91,59 @@ class TestDataLakeSeries(TestCase):
any_order=True,
)
- result_pd = result.to_pandas()
+ return result.to_pandas()
+
+ @patch("streampipes.client.client.Session", autospec=True)
+ def test_to_pandas(self, http_session: MagicMock):
+ query_result = {
+ "total": 1,
+ "headers": self.headers,
+ "spQueryStatus": "OK",
+ "allDataSeries": [
+ self.data_series
+ ],
+ }
+
+ result_pd = self.get_result_as_panda(http_session, query_result)
self.assertEqual(2, len(result_pd))
self.assertListEqual(
- [
- "time",
- "changeDetectedHigh",
- "changeDetectedLow",
- "cumSumHigh",
- "cumSumLow",
- "level",
- "overflow",
- "sensorId",
- "underflow",
- ],
+ self.headers_expected,
list(result_pd.columns),
)
self.assertEqual(73.37740325927734, result_pd["level"][0])
@patch("streampipes.client.client.Session", autospec=True)
- def test_to_pandas_unsupported_series(self, http_session: MagicMock):
- http_session_mock = MagicMock()
- http_session_mock.get.return_value.text = json.dumps(self.series_missing)
- http_session.return_value = http_session_mock
+ def test_group_by_to_pandas(self, http_session: MagicMock):
+ query_result = {
+ "total": 2,
+ "headers": self.headers,
+ "spQueryStatus": "OK",
+ "allDataSeries": [
+ self.data_series,
+ self.data_series
+ ],
+ }
- client = StreamPipesClient(
- client_config=StreamPipesClientConfig(
- credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"),
- host_address="localhost",
- )
+ result_pd = self.get_result_as_panda(http_session, query_result)
+
+ self.assertEqual(4, len(result_pd))
+ self.assertListEqual(
+ self.headers_expected,
+ list(result_pd.columns),
)
- with self.assertRaises(StreamPipesUnsupportedDataLakeSeries):
- client.dataLakeMeasureApi.get(identifier="test")
+ self.assertEqual(70.03279876708984, result_pd["level"][3])
+
+ @patch("streampipes.client.client.Session", autospec=True)
+ def test_different_headers_exception(self, http_session: MagicMock):
+ query_result = {
+ "total": 1,
+ "headers": ['one'],
+ "spQueryStatus": "OK",
+ "allDataSeries": [
+ self.data_series
+ ],
+ }
+
+ with self.assertRaises(StreamPipesUnsupportedDataSeries):
+ self.get_result_as_panda(http_session, query_result)