You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2023/02/21 21:14:38 UTC

[streampipes] 01/01: feature(#1254): add query parameters for get endpoint for measurements

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch feature/1254-adapt-data-lake-measure-endpoints-get-method-to-process-query-parameter-in-python
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit afb3695b90c4c1f92d0e59e3e86f9cbe1ec729f9
Author: bossenti <bo...@posteo.de>
AuthorDate: Tue Feb 21 22:12:36 2023 +0100

    feature(#1254): add query parameters for get endpoint for measurements
    
    Signed-off-by: bossenti <bo...@posteo.de>
---
 .../streampipes/client/client.py                   |   2 +-
 .../streampipes/endpoint/api/data_lake_measure.py  | 185 ++++++++++++++++++++-
 .../streampipes/endpoint/endpoint.py               |   2 +-
 .../tests/client/test_data_lake_series.py          |   2 +-
 .../tests/endpoint/__init__.py                     |   0
 .../tests/endpoint/test_data_lake_measure.py       | 121 ++++++++++++++
 6 files changed, 302 insertions(+), 10 deletions(-)

diff --git a/streampipes-client-python/streampipes/client/client.py b/streampipes-client-python/streampipes/client/client.py
index 2c2c82dce..471696c1d 100644
--- a/streampipes-client-python/streampipes/client/client.py
+++ b/streampipes-client-python/streampipes/client/client.py
@@ -56,7 +56,7 @@ class StreamPipesClient:
     --------
 
     >>> from streampipes.client import StreamPipesClient
-    >>> from streampipes.client.client_config import StreamPipesClientConfig
+    >>> from streampipes.client.config import StreamPipesClientConfig
     >>> from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
 
     >>> client_config = StreamPipesClientConfig(
diff --git a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
index b39ca5d2b..c7c51bee5 100644
--- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
+++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py
@@ -19,8 +19,10 @@
 Specific implementation of the StreamPipes API's data lake measure endpoints.
 This endpoint allows to consume data stored in StreamPipes' data lake
 """
-from typing import Tuple, Type
+from datetime import datetime
+from typing import Any, Dict, Optional, Tuple, Type, Literal
 
+from pydantic import BaseModel, Extra, Field, StrictInt, ValidationError, validator
 from streampipes.endpoint.endpoint import APIEndpoint
 from streampipes.model.container import DataLakeMeasures
 from streampipes.model.container.resource_container import ResourceContainer
@@ -31,8 +33,133 @@ __all__ = [
 ]
 
 
+class StreamPipesQueryValidationError(Exception):
+    """A custom exception to be raised when the validation of query parameter
+    causes an error.
+
+    Parameters
+    ----------
+    validation_error: ValidationError
+        The validation error thrown by Pydantic during parsing.
+    """
+
+
+class MeasurementGetQueryConfig(BaseModel):
+    """Config class describing the parameters of the GET endpoint for measurements.
+
+    This config class is used to validate the provided query parameters for the GET endpoint of measurements.
+    Additionally, it takes care of the conversion to a proper HTTP query string.
+    Thereby, parameter names are adapted to the naming of the StreamPipes API, for which Pydantic aliases are used.
+
+    Attributes
+    ----------
+    columns: Optional[str]
+        A comma separated list of column names (e.g., `time,value`)<br>
+        If provided, the returned data only consists of the given columns.
+    end_date: Optional[datetime]
+        Restricts queried data to be younger than the specified time.
+    limit: Optional[int]
+        Amount of records returned at maximum (default: `1000`) <br>
+        This needs to at least `1`
+    offset: Optional[int]
+        Offset to be applied to returned data <br>
+        This needs to at least `0`
+    order: Optional[str]
+        Ordering of query results <br>
+        Allowed values: `ASC` and `DESC` (default: `ASC`)
+    page_no: Optional[int]
+        Page number used for paging operation <br>
+        This needs to at least `1`
+    start_date: Optional[datetime]
+        Restricts queried data to be older than the specified time
+
+    """
+
+    _regex_comma_separated_string = r"^[0-9a-zA-Z\_]+(,[0-9a-zA-Z\_]+)*$"
+
+    class Config:
+        """Pydantic Config class"""
+
+        extra = Extra.forbid
+        allow_population_by_field_name = True
+
+    columns: Optional[str] = Field(regex=_regex_comma_separated_string)
+    end_date: Optional[StrictInt] = Field(alias="endDate")
+    limit: Optional[int] = Field(ge=1, default=1000)
+    offset: Optional[int] = Field(ge=0)
+    order: Optional[Literal["ASC", "DESC"]]
+    page_no: Optional[int] = Field(alias="page", ge=1)
+    start_date: Optional[StrictInt] = Field(alias="startDate")
+
+    @validator("end_date", "start_date", pre=True)
+    @classmethod
+    def _convert_datetime(cls, dt: datetime) -> int:
+        """Pydantic validator to convert datetime object to unix timestamp.
+
+        The StreamPipes API expects datetime related parameters to be passed as unix timestamp.
+        For the sake of convenience we expect datetime objects to be passed for these values.
+        This requires us to convert the provided datetime objects in unix timestamp representation
+
+        Parameters
+        ----------
+        dt: datetime
+            The datetime value to be passed as query parameter
+
+
+        Raises
+        ------
+        StreamPipesQueryValidationError
+            In case `start_date` or `end_date` is not passed as a datetime object
+        ValueError
+            In case the transformation of the datetime object did not work
+
+        Returns
+        -------
+        unix_timestamp: int
+            unix timestamp of the given timestamp
+
+        """
+
+        if not isinstance(dt, datetime) or dt is None:
+            raise StreamPipesQueryValidationError(f"The passed value for either `start_date` or `end_date` "
+                                                  f"is not a datetime object: '{dt}'.")
+        try:
+            unix_timestamp = int(datetime.timestamp(dt) * 1000)
+            return unix_timestamp
+        except ValueError as ve:  # pragma: no cover
+            raise ValueError(f"Your datetime object is off, it could not be parsed"
+                             f"This should not occur, but unfortunately did.\n"
+                             f"Therefore, it would be great if you could report this problem as an issue at "
+                             f"github.com/apache/streampipes.\n"
+                             ) from ve
+
+    def build_query_string(self) -> str:
+        """Builds a HTTP query string for the config.
+
+        This method returns an HTTP query string for the invoking config.
+        It follows the following structure `?param1=value1&param2=value2...`.
+        This query string is not an entire URL, instead it needs to appended to an API path.
+
+        Returns
+        -------
+        query_param_string: str
+            HTTP query params string (`?param1=value1&param2=value2...`)
+        """
+
+        # create dictionary representation of the config that meets the following expectations:
+        # - query parameter should comply to the parameter names of the StreamPipes API (`by_alias`)
+        # - query params should only be present if they are different from None (`exclude_none`)
+        query_param_dict = self.dict(by_alias=True, exclude_none=True)
+
+        # create query string that complies to HTTP syntax (?param1=value1&param2=value2&...)
+        query_param_string = f"?{'&'.join([f'{k}={v}' for k, v in query_param_dict.items()])}"
+
+        return query_param_string
+
+
 class DataLakeMeasureEndpoint(APIEndpoint):
     """Implementation of the DataLakeMeasure endpoint.
+
     This endpoint provides an interfact to all data stored in the StreamPipes data lake.
 
     Consequently, it allows uerying metadata about available data sets (see `all()` method).
@@ -50,7 +177,7 @@ class DataLakeMeasureEndpoint(APIEndpoint):
     --------
 
     >>> from streampipes.client import StreamPipesClient
-    >>> from streampipes.client.client_config import StreamPipesClientConfig
+    >>> from streampipes.client.config import StreamPipesClientConfig
     >>> from streampipes.client.credential_provider import StreamPipesApiKeyCredentials
 
     >>> client_config = StreamPipesClientConfig(
@@ -68,6 +195,36 @@ class DataLakeMeasureEndpoint(APIEndpoint):
     5
     """
 
+    @staticmethod
+    def _validate_query_params(query_params: Dict[str, Any]) -> MeasurementGetQueryConfig:
+        """Validates given query params.
+
+        Validates the given query parameters via the
+        [MeasurementGetQueryConfig][streampipes.endpoint.api.data_lake_measure.MeasurementGetQueryConfig].
+
+        Raises
+        ------
+        StreamPipesQueryValidationError
+            In case the query parameters are not provided correctly
+
+        Returns
+        -------
+        config: MeasurementGetQueryConfig
+            validated config that can be used to construct the query
+        """
+        try:
+            config = MeasurementGetQueryConfig.parse_obj(query_params)
+        except ValidationError as ve:
+            raise StreamPipesQueryValidationError(
+                f"\nOops, there seems to be a problem with your provided query options. "
+                f"Some of them are not provided as expected. Please see the detailed output below:\n\n"
+                f"Validation error log: {ve.json()}\n\n"
+                f"In case you assess your query configuration to be correct feel free to file us an issue via "
+                f"github.com/apache/streampipes.\n"
+                f"Please don't forget to include the following validation log from above.")
+
+        return config
+
     @property
     def _resource_cls(self) -> Type[DataLakeSeries]:
         """
@@ -102,20 +259,34 @@ class DataLakeMeasureEndpoint(APIEndpoint):
 
         return "api", "v4", "datalake", "measurements"
 
-    def get(self, identifier: str) -> DataLakeSeries:
+    def get(self, identifier: str, **kwargs: Optional[Dict[str, Any]]) -> DataLakeSeries:
         """Queries the specified data lake measure from the API.
 
+        By default, the maximum number of returned records is 1000.
+        This behaviour can be influences by passing the parameter `limit` with a different value
+        (see [MeasurementGetQueryConfig][streampipes.endpoint.api.data_lake_measure.MeasurementGetQueryConfig]).
+
         Parameters
         ----------
         identifier: str
             The identifier of the data lake measure to be queried.
+        **kwargs: Dict[str, Any]
+            keyword arguments can be used to provide additional query parameters.
+            The available query parameters are defined by the
+            [MeasurementGetQueryConfig][streampipes.endpoint.api.data_lake_measure.MeasurementGetQueryConfig].
 
         Returns
         -------
-        The specified data lake measure as an instance of the corresponding model class (`model.DataLakeSeries`).
+        measurement: DataLakeMeasures
+            the specified data lake measure
         """
 
-        response = self._make_request(
-            request_method=self._parent_client.request_session.get, url=f"{self.build_url()}/{identifier}"
-        )
+        # bild base URL for resource
+        url = f"{self.build_url()}/{identifier}"
+
+        # extend base URL by query parameters
+        measurement_get_config = self._validate_query_params(query_params=kwargs)
+        url += measurement_get_config.build_query_string()
+
+        response = self._make_request(request_method=self._parent_client.request_session.get, url=url)
         return self._resource_cls.from_json(json_string=response.text)
diff --git a/streampipes-client-python/streampipes/endpoint/endpoint.py b/streampipes-client-python/streampipes/endpoint/endpoint.py
index b3ae4fe2d..cfc6b7381 100644
--- a/streampipes-client-python/streampipes/endpoint/endpoint.py
+++ b/streampipes-client-python/streampipes/endpoint/endpoint.py
@@ -193,7 +193,7 @@ class APIEndpoint(Endpoint):
         )
         return self._container_cls.from_json(json_string=response.text)
 
-    def get(self, identifier: str) -> Resource:
+    def get(self, identifier: str, **kwargs) -> Resource:
         """Queries the specified resource from the API endpoint.
 
         Parameters
diff --git a/streampipes-client-python/tests/client/test_data_lake_series.py b/streampipes-client-python/tests/client/test_data_lake_series.py
index 49b75a90c..482c0f75b 100644
--- a/streampipes-client-python/tests/client/test_data_lake_series.py
+++ b/streampipes-client-python/tests/client/test_data_lake_series.py
@@ -115,7 +115,7 @@ class TestDataLakeSeries(TestCase):
         result = client.dataLakeMeasureApi.get(identifier="test")
 
         http_session.assert_has_calls(
-            [call().get(url="https://localhost:80/streampipes-backend/api/v4/datalake/measurements/test")],
+            [call().get(url="https://localhost:80/streampipes-backend/api/v4/datalake/measurements/test?limit=1000")],
             any_order=True,
         )
 
diff --git a/streampipes-client-python/tests/endpoint/__init__.py b/streampipes-client-python/tests/endpoint/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/streampipes-client-python/tests/endpoint/test_data_lake_measure.py b/streampipes-client-python/tests/endpoint/test_data_lake_measure.py
new file mode 100644
index 000000000..d2b7bfb17
--- /dev/null
+++ b/streampipes-client-python/tests/endpoint/test_data_lake_measure.py
@@ -0,0 +1,121 @@
+from datetime import datetime
+from unittest import TestCase
+from streampipes.endpoint.api.data_lake_measure import DataLakeMeasureEndpoint
+
+
+class TestMeasurementGetQueryConfig(TestCase):
+
+    def test_default(self):
+        config_dict = {}
+        measurement_config = DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict)
+        result = measurement_config.build_query_string()
+
+        self.assertEqual("?limit=1000", result)
+
+    def test_additional_param_given(self):
+        config_dict = {
+            "columns": "time,value_25"
+        }
+
+        measurement_config = DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict)
+        result = measurement_config.build_query_string()
+
+        self.assertEqual("?columns=time,value_25&limit=1000", result)
+
+    def test_extra_param(self):
+        config_dict = {
+            "foo": "bar"
+        }
+
+        with self.assertRaises(RuntimeError):
+            DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict)
+
+    def test_alias_as_query_param(self):
+        config_dict = {
+            "page_no": 5
+        }
+
+        measurement_config = DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict)
+        result = measurement_config.build_query_string()
+
+        self.assertEqual("?limit=1000&page=5", result)
+
+    def test_datetime_validation(self):
+        now = datetime.utcnow()
+
+        config_dict = {
+            "start_date": now,
+            "end_date": now
+        }
+        measurement_config = DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict)
+        result = measurement_config.build_query_string()
+
+        expected_ts = int(datetime.timestamp(now) * 1000)
+        expected = f"?endDate={expected_ts}&limit=1000&startDate={expected_ts}"
+
+        self.assertEqual(expected, result)
+
+    def test_datetime_validation_no_datetime(self):
+        config_dict = {
+            "start_date": "test"
+        }
+
+        with self.assertRaises(RuntimeError):
+            DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict)
+
+    def test_columns_validation(self):
+        config_dict_one_col = {
+            "columns": "col1"
+        }
+        config_dict_mul_col = {
+            "columns": "col1,col2,col3"
+        }
+        config_dict_semicolon = {
+            "columns": "col1;col2"
+        }
+        config_dict_whitespace_ending = {
+            "columns": "col1 "
+        }
+
+        self.assertEqual("?columns=col1&limit=1000", DataLakeMeasureEndpoint._validate_query_params(
+            query_params=config_dict_one_col).build_query_string())
+        self.assertEqual("?columns=col1,col2,col3&limit=1000", DataLakeMeasureEndpoint._validate_query_params(
+            query_params=config_dict_mul_col).build_query_string())
+        with self.assertRaises(RuntimeError):
+            DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict_semicolon)
+        with self.assertRaises(RuntimeError):
+            DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict_whitespace_ending)
+
+    def test_minium_parameter_values(self):
+        config_dict_happy_path = {
+            "limit": 15,
+            "page_no": 3
+        }
+
+        config_dict_limit_too_low = {
+            "limit": 0
+        }
+
+        config_dict_page_no_too_low = {
+            "page_no": -2
+        }
+
+        measurement_config_happy = DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict_happy_path)
+        result_happy = measurement_config_happy.build_query_string()
+
+        self.assertEqual("?limit=15&page=3", result_happy)
+
+        with self.assertRaises(RuntimeError):
+            DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict_limit_too_low)
+
+        with self.assertRaises(RuntimeError):
+            DataLakeMeasureEndpoint._validate_query_params(query_params=config_dict_page_no_too_low)
+
+    def test_literal_validation(self):
+
+        config_invalid_order = {
+            "order": "UP"
+        }
+
+        with self.assertRaises(RuntimeError):
+            DataLakeMeasureEndpoint._validate_query_params(query_params=config_invalid_order)
\ No newline at end of file