You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/11/24 19:02:22 UTC
[streampipes] 01/04: experimental "get()"
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch STREAMPIPES-607
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit cc4f870a57c8c417e55f7d3ee497c3e475091fc9
Author: bossenti <bo...@posteo.de>
AuthorDate: Mon Nov 14 20:40:55 2022 +0100
experimental "get()"
---
.../endpoint/data_lake_measure.py | 27 +++++++++-
.../model/resource/DataLakeSeries.py | 58 ++++++++++++++++++++++
2 files changed, 83 insertions(+), 2 deletions(-)
diff --git a/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py b/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py
index 168c14728..d854b60b4 100644
--- a/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py
+++ b/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py
@@ -23,13 +23,14 @@ from typing import Tuple, Type
from streampipes_client.endpoint.endpoint import APIEndpoint
from streampipes_client.model.container import DataLakeMeasures
+from streampipes_client.model.container.resource_container import ResourceContainer
+from streampipes_client.model.resource.DataLakeSeries import DataLakeSeries
+from streampipes_client.model.resource.resource import Resource
__all__ = [
"DataLakeMeasureEndpoint",
]
-from streampipes_client.model.container.resource_container import ResourceContainer
-
class DataLakeMeasureEndpoint(APIEndpoint):
"""Implementation of the DataLakeMeasure endpoint.
@@ -68,6 +69,10 @@ class DataLakeMeasureEndpoint(APIEndpoint):
5
"""
+ @property
+ def _resource_cls(self) -> Type[DataLakeSeries]:
+ return DataLakeSeries
+
@property
def _container_cls(self) -> Type[ResourceContainer]:
"""Defines the model container class the endpoint refers to.
@@ -90,3 +95,21 @@ class DataLakeMeasureEndpoint(APIEndpoint):
"""
return "api", "v4", "datalake", "measurements"
+
+ def get(self, identifier: str) -> Resource:
+ """Queries the specified resource from the API endpoint.
+
+ Parameters
+ ----------
+ identifier: str
+ The identifier of the resource to be queried.
+
+ Returns
+ -------
+ The specified resource as an instance of the corresponding model class (`model.Element`).
+ """
+
+ response = self._make_request(
+ request_method=self._parent_client.request_session.get, url=f"{self.build_url()}/{identifier}"
+ )
+ return self._resource_cls.from_json(json_string=response.text)
diff --git a/streampipes-client-python/streampipes_client/model/resource/DataLakeSeries.py b/streampipes-client-python/streampipes_client/model/resource/DataLakeSeries.py
new file mode 100644
index 000000000..a936edafc
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/model/resource/DataLakeSeries.py
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+from typing import Any, Dict, List, Optional
+
+import pandas as pd
+from pydantic import StrictInt, StrictStr
+from streampipes_client.model.resource.resource import Resource
+
+
+class DataLakeSeries(Resource):
+ @classmethod
+ def from_json(cls, json_string: str) -> Resource:
+
+ # deserialize JSON string
+ parsed_json = json.loads(json_string)
+
+ if len(parsed_json["allDataSeries"]) != 1:
+ raise RuntimeError("Not supported")
+
+ data_series = parsed_json["allDataSeries"][0]
+
+ return cls.parse_obj(data_series)
+
+ def convert_to_pandas_representation(self) -> Dict:
+
+ result: Dict = dict()
+
+ for row in self.rows:
+ for idx, value in enumerate(row):
+ if (key := self.headers[idx]) in result.keys():
+ result[self.headers[idx]].append(value)
+ else:
+ result.update({key: [value]})
+
+ return result
+
+ total: StrictInt
+ headers: List[StrictStr]
+ rows: List[List[Any]]
+ tags: Optional[str]
+
+ def to_pandas(self) -> pd.DataFrame:
+ return pd.DataFrame(data=self.convert_to_pandas_representation())