You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2023/02/21 21:14:38 UTC
[streampipes] 01/01: feature(#1254): add query parameters for get endpoint for measurements
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch feature/1254-adapt-data-lake-measure-endpoints-get-method-to-process-query-parameter-in-python
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit afb3695b90c4c1f92d0e59e3e86f9cbe1ec729f9
Author: bossenti <bo...@posteo.de>
AuthorDate: Tue Feb 21 22:12:36 2023 +0100
feature(#1254): add query parameters for get endpoint for measurements
Signed-off-by: bossenti <bo...@posteo.de>
---
.../streampipes/client/client.py | 2 +-
.../streampipes/endpoint/api/data_lake_measure.py | 185 ++++++++++++++++++++-
.../streampipes/endpoint/endpoint.py | 2 +-
.../tests/client/test_data_lake_series.py | 2 +-
.../tests/endpoint/__init__.py | 0
.../tests/endpoint/test_data_lake_measure.py | 121 ++++++++++++++
6 files changed, 302 insertions(+), 10 deletions(-)
diff --git a/streampipes-client-python/streampipes/client/client.py b/streampipes-client-python/streampipes/client/client.py
index 2c2c82dce..471696c1d 100644
--- a/streampipes-client-python/streampipes/client/client.py
+++ b/streampipes-client-python/streampipes/client/client.py
@@ -56,7 +56,7 @@ class StreamPipesClient:
--------
>>> from streampipes.client import StreamPipesClient
- >>> from streampipes.client.client_config import StreamPipesClientConfig
+ >>> from streampipes.client.config import StreamPipesClientConfig
>>> from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
>>> client_config = StreamPipesClientConfig(
diff --git a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
index b39ca5d2b..c7c51bee5 100644
--- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
@@ -19,8 +19,10 @@
Specific implementation of the StreamPipes API's data lake measure endpoints.
This endpoint allows to consume data stored in StreamPipes' data lake
"""
-from typing import Tuple, Type
+from datetime import datetime
+from typing import Any, Dict, Optional, Tuple, Type, Literal
+from pydantic import BaseModel, Extra, Field, StrictInt, ValidationError, validator
from streampipes.endpoint.endpoint import APIEndpoint
from streampipes.model.container import DataLakeMeasures
from streampipes.model.container.resource_container import ResourceContainer
@@ -31,8 +33,133 @@ __all__ = [
]
+class StreamPipesQueryValidationError(Exception):
+ """A custom exception to be raised when the validation of query parameter
+ causes an error.
+
+ Parameters
+ ----------
+ validation_error: ValidationError
+ The validation error thrown by Pydantic during parsing.
+ """
+
+
+class MeasurementGetQueryConfig(BaseModel):
+ """Config class describing the parameters of the GET endpoint for measurements.
+
+ This config class is used to validate the provided query parameters for the GET endpoint of measurements.
+ Additionally, it takes care of the conversion to a proper HTTP query string.
+ Thereby, parameter names are adapted to the naming of the StreamPipes API, for which Pydantic aliases are used.
+
+ Attributes
+ ----------
+ columns: Optional[str]
+ A comma separated list of column names (e.g., `time,value`)<br>
+ If provided, the returned data only consists of the given columns.
+ end_date: Optional[datetime]
+ Restricts queried data to be younger than the specified time.
+ limit: Optional[int]
+ Amount of records returned at maximum (default: `1000`) <br>
+ This needs to at least `1`
+ offset: Optional[int]
+ Offset to be applied to returned data <br>
+ This needs to at least `0`
+ order: Optional[str]
+ Ordering of query results <br>
+ Allowed values: `ASC` and `DESC` (default: `ASC`)
+ page_no: Optional[int]
+ Page number used for paging operation <br>
+ This needs to at least `1`
+ start_date: Optional[datetime]
+ Restricts queried data to be older than the specified time
+
+ """
+
+ _regex_comma_separated_string = r"^[0-9a-zA-Z\_]+(,[0-9a-zA-Z\_]+)*$"
+
+ class Config:
+ """Pydantic Config class"""
+
+ extra = Extra.forbid
+ allow_population_by_field_name = True
+
+ columns: Optional[str] = Field(regex=_regex_comma_separated_string)
+ end_date: Optional[StrictInt] = Field(alias="endDate")
+ limit: Optional[int] = Field(ge=1, default=1000)
+ offset: Optional[int] = Field(ge=0)
+ order: Optional[Literal["ASC", "DESC"]]
+ page_no: Optional[int] = Field(alias="page", ge=1)
+ start_date: Optional[StrictInt] = Field(alias="startDate")
+
+ @validator("end_date", "start_date", pre=True)
+ @classmethod
+ def _convert_datetime(cls, dt: datetime) -> int:
+ """Pydantic validator to convert datetime object to unix timestamp.
+
+ The StreamPipes API expects datetime related parameters to be passed as unix timestamp.
+ For the sake of convenience we expect datetime objects to be passed for these values.
+ This requires us to convert the provided datetime objects in unix timestamp representation
+
+ Parameters
+ ----------
+ dt: datetime
+ The datetime value to be passed as query parameter
+
+
+ Raises
+ ------
+ StreamPipesQueryValidationError
+ In case `start_date` or `end_date` is not passed as a datetime object
+ ValueError
+ In case the transformation of the datetime object did not work
+
+ Returns
+ -------
+ unix_timestamp: int
+ unix timestamp of the given timestamp
+
+ """
+
+ if not isinstance(dt, datetime) or dt is None:
+ raise StreamPipesQueryValidationError(f"The passed value for either `start_date` or `end_date` "
+ f"is not a datetime object: '{dt}'.")
+ try:
+ unix_timestamp = int(datetime.timestamp(dt) * 1000)
+ return unix_timestamp
+ except ValueError as ve: # pragma: no cover
+ raise ValueError(f"Your datetime object is off, it could not be parsed"
+ f"This should not occur, but unfortunately did.\n"
+ f"Therefore, it would be great if you could report this problem as an issue at "
+ f"github.com/apache/streampipes.\n"
+ ) from ve
+
+ def build_query_string(self) -> str:
+ """Builds a HTTP query string for the config.
+
+ This method returns an HTTP query string for the invoking config.
+ It follows the following structure `?param1=value1¶m2=value2...`.
+ This query string is not an entire URL, instead it needs to appended to an API path.
+
+ Returns
+ -------
+ query_param_string: str
+ HTTP query params string (`?param1=value1¶m2=value2...`)
+ """
+
+ # create dictionary representation of the config that meets the following expectations:
+ # - query parameter should comply to the parameter names of the StreamPipes API (`by_alias`)
+ # - query params should only be present if they are different from None (`exclude_none`)
+ query_param_dict = self.dict(by_alias=True, exclude_none=True)
+
+ # create query string that complies to HTTP syntax (?param1=value1¶m2=value2&...)
+ query_param_string = f"?{'&'.join([f'{k}={v}' for k, v in query_param_dict.items()])}"
+
+ return query_param_string
+
+
class DataLakeMeasureEndpoint(APIEndpoint):
"""Implementation of the DataLakeMeasure endpoint.
+
This endpoint provides an interfact to all data stored in the StreamPipes data lake.
Consequently, it allows uerying metadata about available data sets (see `all()` method).
@@ -50,7 +177,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
--------
>>> from streampipes.client import StreamPipesClient
- >>> from streampipes.client.client_config import StreamPipesClientConfig
+ >>> from streampipes.client.config import StreamPipesClientConfig
>>> from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
>>> client_config = StreamPipesClientConfig(
@@ -68,6 +195,36 @@ class DataLakeMeasureEndpoint(APIEndpoint):
5
"""
+ @staticmethod
+ def _validate_query_params(query_params: Dict[str, Any]) -> MeasurementGetQueryConfig:
+ """Validates given query params.
+
+ Validates the given query parameters via the
+ [MeasurementGetQueryConfig][streampipes.endpoint.api.data_lake_measure.MeasurementGetQueryConfig].
+
+ Raises
+ ------
+ StreamPipesQueryValidationError
+ In case the query parameters are not provided correctly
+
+ Returns
+ -------
+ config: MeasurementGetQueryConfig
+ validated config that can be used to construct the query
+ """
+ try:
+ config = MeasurementGetQueryConfig.parse_obj(query_params)
+ except ValidationError as ve:
+ raise StreamPipesQueryValidationError(
+ f"\nOops, there seems to be a problem with your provided query options. "
+ f"Some of them are not provided as expected. Please see the detailed output below:\n\n"
+ f"Validation error log: {ve.json()}\n\n"
+ f"In case you assess your query configuration to be correct feel free to file us an issue via "
+ f"github.com/apache/streampipes.\n"
+ f"Please don't forget to include the following validation log from above.")
+
+ return config
+
@property
def _resource_cls(self) -> Type[DataLakeSeries]:
"""
@@ -102,20 +259,34 @@ class DataLakeMeasureEndpoint(APIEndpoint):
return "api", "v4", "datalake", "measurements"
- def get(self, identifier: str) -> DataLakeSeries:
+ def get(self, identifier: str, **kwargs: Optional[Dict[str, Any]]) -> DataLakeSeries:
"""Queries the specified data lake measure from the API.
+ By default, the maximum number of returned records is 1000.
+ This behaviour can be influences by passing the parameter `limit` with a different value
+ (see [MeasurementGetQueryConfig][streampipes.endpoint.api.data_lake_measure.MeasurementGetQueryConfig]).
+
Parameters
----------
identifier: str
The identifier of the data lake measure to be queried.
+ **kwargs: Dict[str, Any]
+ keyword arguments can be used to provide additional query parameters.
+ The available query parameters are defined by the
+ [MeasurementGetQueryConfig][streampipes.endpoint.api.data_lake_measure.MeasurementGetQueryConfig].
Returns
-------
- The specified data lake measure as an instance of the corresponding model class (`model.DataLakeSeries`).
+ measurement: DataLakeMeasures
+ the specified data lake measure
"""
- response = self._make_request(
- request_method=self._parent_client.request_session.get, url=f"{self.build_url()}/{identifier}"
- )
+ # bild base URL for resource
+ url = f"{self.build_url()}/{identifier}"
+
+ # extend base URL by query parameters
+ measurement_get_config = self._validate_query_params(query_params=kwargs)
+ url += measurement_get_config.build_query_string()
+
+ response = self._make_request(request_method=self._parent_client.request_session.get, url=url)
return self._resource_cls.from_json(json_string=response.text)
diff --git a/streampipes-client-python/streampipes/endpoint/endpoint.py b/streampipes-client-python/streampipes/endpoint/endpoint.py
index b3ae4fe2d..cfc6b7381 100644
--- a/streampipes-client-python/streampipes/endpoint/endpoint.py
+++ b/streampipes-client-python/streampipes/endpoint/endpoint.py
@@ -193,7 +193,7 @@ class APIEndpoint(Endpoint):
)
return self._container_cls.from_json(json_string=response.text)
- def get(self, identifier: str) -> Resource:
+ def get(self, identifier: str, **kwargs) -> Resource:
"""Queries the specified resource from the API endpoint.
Parameters
diff --git a/streampipes-client-python/tests/client/test_data_lake_series.py b/streampipes-client-python/tests/client/test_data_lake_series.py
index 49b75a90c..482c0f75b 100644
--- a/streampipes-client-python/tests/client/test_data_lake_series.py
+++ b/streampipes-client-python/tests/client/test_data_lake_series.py
@@ -115,7 +115,7 @@ class TestDataLakeSeries(TestCase):
result = client.dataLakeMeasureApi.get(identifier="test")
http_session.assert_has_calls(
- [call().get(url="https://localhost:80/streampipes-backend/api/v4/datalake/measurements/test")],
+ [call().get(url="https://localhost:80/streampipes-backend/api/v4/datalake/measurements/test?limit=1000")],
any_order=True,
)
diff --git a/streampipes-client-python/tests/endpoint/__init__.py b/streampipes-client-python/tests/endpoint/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/streampipes-client-python/tests/endpoint/test_data_lake_measure.py b/streampipes-client-python/tests/endpoint/test_data_lake_measure.py
new file mode 100644
index 000000000..d2b7bfb17
--- /dev/null
+++ b/streampipes-client-python/tests/endpoint/test_data_lake_measure.py
@@ -0,0 +1,121 @@
+from datetime import datetime
+from unittest import TestCase
+from streampipes.endpoint.api.data_lake_measure import DataLakeMeasureEndpoint
+
+
+class TestMeasurementGetQueryConfig(TestCase):
+
+ def test_default(self):
+ config_dict = {}
+ measurement_config = DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict)
+ result = measurement_config.build_query_string()
+
+ self.assertEqual("?limit=1000", result)
+
+ def test_additional_param_given(self):
+ config_dict = {
+ "columns": "time,value_25"
+ }
+
+ measurement_config = DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict)
+ result = measurement_config.build_query_string()
+
+ self.assertEqual("?columns=time,value_25&limit=1000", result)
+
+ def test_extra_param(self):
+ config_dict = {
+ "foo": "bar"
+ }
+
+ with self.assertRaises(RuntimeError):
+ DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict)
+
+ def test_alias_as_query_param(self):
+ config_dict = {
+ "page_no": 5
+ }
+
+ measurement_config = DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict)
+ result = measurement_config.build_query_string()
+
+ self.assertEqual("?limit=1000&page=5", result)
+
+ def test_datetime_validation(self):
+ now = datetime.utcnow()
+
+ config_dict = {
+ "start_date": now,
+ "end_date": now
+ }
+ measurement_config = DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict)
+ result = measurement_config.build_query_string()
+
+ expected_ts = int(datetime.timestamp(now) * 1000)
+ expected = f"?endDate={expected_ts}&limit=1000&startDate={expected_ts}"
+
+ self.assertEqual(expected, result)
+
+ def test_datetime_validation_no_datetime(self):
+ config_dict = {
+ "start_date": "test"
+ }
+
+ with self.assertRaises(RuntimeError):
+ DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict)
+
+ def test_columns_validation(self):
+ config_dict_one_col = {
+ "columns": "col1"
+ }
+ config_dict_mul_col = {
+ "columns": "col1,col2,col3"
+ }
+ config_dict_semicolon = {
+ "columns": "col1;col2"
+ }
+ config_dict_whitespace_ending = {
+ "columns": "col1 "
+ }
+
+ self.assertEqual("?columns=col1&limit=1000", DataLakeMeasureEndpoint._validate_query_params(
+ query_params=config_dict_one_col).build_query_string())
+ self.assertEqual("?columns=col1,col2,col3&limit=1000", DataLakeMeasureEndpoint._validate_query_params(
+ query_params=config_dict_mul_col).build_query_string())
+ with self.assertRaises(RuntimeError):
+ DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict_semicolon)
+ with self.assertRaises(RuntimeError):
+ DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict_whitespace_ending)
+
+ def test_minium_parameter_values(self):
+ config_dict_happy_path = {
+ "limit": 15,
+ "page_no": 3
+ }
+
+ config_dict_limit_too_low = {
+ "limit": 0
+ }
+
+ config_dict_page_no_too_low = {
+ "page_no": -2
+ }
+
+ measurement_config_happy = DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict_happy_path)
+ result_happy = measurement_config_happy.build_query_string()
+
+ self.assertEqual("?limit=15&page=3", result_happy)
+
+ with self.assertRaises(RuntimeError):
+ DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict_limit_too_low)
+
+ with self.assertRaises(RuntimeError):
+ DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict_page_no_too_low)
+
+ def test_literal_validation(self):
+
+ config_invalid_order = {
+ "order": "UP"
+ }
+
+ with self.assertRaises(RuntimeError):
+ DataLakeMeasureEndpoint._validate_query_params(query_params=config_invalid_order)
\ No newline at end of file