You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/11/24 19:02:22 UTC

[streampipes] 01/04: experimental "get()"

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch STREAMPIPES-607
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit cc4f870a57c8c417e55f7d3ee497c3e475091fc9
Author: bossenti <bo...@posteo.de>
AuthorDate: Mon Nov 14 20:40:55 2022 +0100

    experimental "get()"
---
 .../endpoint/data_lake_measure.py                  | 27 +++++++++-
 .../model/resource/DataLakeSeries.py               | 58 ++++++++++++++++++++++
 2 files changed, 83 insertions(+), 2 deletions(-)

diff --git a/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py b/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py
index 168c14728..d854b60b4 100644
--- a/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py
+++ b/streampipes-client-python/streampipes_client/endpoint/data_lake_measure.py
@@ -23,13 +23,14 @@ from typing import Tuple, Type
 
 from streampipes_client.endpoint.endpoint import APIEndpoint
 from streampipes_client.model.container import DataLakeMeasures
+from streampipes_client.model.container.resource_container import ResourceContainer
+from streampipes_client.model.resource.DataLakeSeries import DataLakeSeries
+from streampipes_client.model.resource.resource import Resource
 
 __all__ = [
     "DataLakeMeasureEndpoint",
 ]
 
-from streampipes_client.model.container.resource_container import ResourceContainer
-
 
 class DataLakeMeasureEndpoint(APIEndpoint):
     """Implementation of the DataLakeMeasure endpoint.
@@ -68,6 +69,10 @@ class DataLakeMeasureEndpoint(APIEndpoint):
     5
     """
 
+    @property
+    def _resource_cls(self) -> Type[DataLakeSeries]:
+        return DataLakeSeries
+
     @property
     def _container_cls(self) -> Type[ResourceContainer]:
         """Defines the model container class the endpoint refers to.
@@ -90,3 +95,21 @@ class DataLakeMeasureEndpoint(APIEndpoint):
         """
 
         return "api", "v4", "datalake", "measurements"
+
+    def get(self, identifier: str) -> Resource:
+        """Queries the specified resource from the API endpoint.
+
+        Parameters
+        ----------
+        identifier: str
+            The identifier of the resource to be queried.
+
+        Returns
+        -------
+        The specified resource as an instance of the corresponding model class (`model.Element`).
+        """
+
+        response = self._make_request(
+            request_method=self._parent_client.request_session.get, url=f"{self.build_url()}/{identifier}"
+        )
+        return self._resource_cls.from_json(json_string=response.text)
diff --git a/streampipes-client-python/streampipes_client/model/resource/DataLakeSeries.py b/streampipes-client-python/streampipes_client/model/resource/DataLakeSeries.py
new file mode 100644
index 000000000..a936edafc
--- /dev/null
+++ b/streampipes-client-python/streampipes_client/model/resource/DataLakeSeries.py
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+from typing import Any, Dict, List, Optional
+
+import pandas as pd
+from pydantic import StrictInt, StrictStr
+from streampipes_client.model.resource.resource import Resource
+
+
+class DataLakeSeries(Resource):
+    @classmethod
+    def from_json(cls, json_string: str) -> Resource:
+
+        # deserialize JSON string
+        parsed_json = json.loads(json_string)
+
+        if len(parsed_json["allDataSeries"]) != 1:
+            raise RuntimeError("Not supported")
+
+        data_series = parsed_json["allDataSeries"][0]
+
+        return cls.parse_obj(data_series)
+
+    def convert_to_pandas_representation(self) -> Dict:
+
+        result: Dict = dict()
+
+        for row in self.rows:
+            for idx, value in enumerate(row):
+                if (key := self.headers[idx]) in result.keys():
+                    result[self.headers[idx]].append(value)
+                else:
+                    result.update({key: [value]})
+
+        return result
+
+    total: StrictInt
+    headers: List[StrictStr]
+    rows: List[List[Any]]
+    tags: Optional[str]
+
+    def to_pandas(self) -> pd.DataFrame:
+        return pd.DataFrame(data=self.convert_to_pandas_representation())