You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/04/04 09:09:35 UTC

[iotdb] 02/03: fix fetch_timeseries

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch mlnode/test
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c67e2a616f6e1b67c76b2f60120642dea4bf1397
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Apr 4 17:08:46 2023 +0800

    fix fetch_timeseries
---
 mlnode/iotdb/mlnode/client.py                     | 29 +++++++++++++++++++----
 mlnode/iotdb/mlnode/data_access/offline/source.py | 22 +++--------------
 2 files changed, 28 insertions(+), 23 deletions(-)

diff --git a/mlnode/iotdb/mlnode/client.py b/mlnode/iotdb/mlnode/client.py
index 1560b0507a..bf46846fe1 100644
--- a/mlnode/iotdb/mlnode/client.py
+++ b/mlnode/iotdb/mlnode/client.py
@@ -16,7 +16,8 @@
 # under the License.
 #
 import time
-
+import pandas as pd
+from iotdb.mlnode import serde
 from thrift.protocol import TBinaryProtocol, TCompactProtocol
 from thrift.Thrift import TException
 from thrift.transport import TSocket, TTransport
@@ -126,7 +127,7 @@ class DataNodeClient(object):
                          query_expressions: list,
                          query_filter: str = None,
                          fetch_size: int = DEFAULT_FETCH_SIZE,
-                         timeout: int = DEFAULT_TIMEOUT) -> TFetchTimeseriesResp:
+                         timeout: int = DEFAULT_TIMEOUT) -> [int, bool, pd.DataFrame]:
         req = TFetchTimeseriesReq(
             queryExpressions=query_expressions,
             queryFilter=query_filter,
@@ -136,10 +137,30 @@ class DataNodeClient(object):
         try:
             resp = self.__client.fetchTimeseries(req)
             verify_success(resp.status, "An error occurs when calling fetch_timeseries()")
-            return resp
-        except TTransport.TException as e:
+
+            if len(resp.tsDataset) == 0:
+                raise RuntimeError(f'No data fetched with query filter: {query_filter}')
+
+            data = serde.convert_to_df(resp.columnNameList,
+                                       resp.columnTypeList,
+                                       resp.columnNameIndexMap,
+                                       resp.tsDataset)
+            if data.empty:
+                raise RuntimeError(
+                    f'Fetched empty data with query expressions: {query_expressions} and query filter: {query_filter}')
+            return resp.queryId, resp.hasMoreData, data
+        except Exception as e:
+            logger.warn(
+                f'Fail to fetch data with query expressions: {query_expressions} and query filter: {query_filter}')
             raise e
 
+    def fetch_window_batch(self,
+                           query_expressions: list,
+                           query_filter: str = None,
+                           fetch_size: int = DEFAULT_FETCH_SIZE,
+                           timeout: int = DEFAULT_TIMEOUT) -> [int, bool, list[pd.DataFrame]]:
+        pass
+
     def record_model_metrics(self,
                              model_id: str,
                              trial_id: str,
diff --git a/mlnode/iotdb/mlnode/data_access/offline/source.py b/mlnode/iotdb/mlnode/data_access/offline/source.py
index 0422bb373d..05ef96d16a 100644
--- a/mlnode/iotdb/mlnode/data_access/offline/source.py
+++ b/mlnode/iotdb/mlnode/data_access/offline/source.py
@@ -72,25 +72,9 @@ class ThriftDataSource(DataSource):
         except Exception:
             raise RuntimeError('Fail to establish connection with DataNode')
 
-        try:
-            res = data_client.fetch_timeseries(
-                query_expressions=self.query_expressions,
-                query_filter=self.query_filter,
-            )
-        except Exception:
-            raise RuntimeError(f'Fail to fetch data with query expressions: {self.query_expressions}'
-                               f' and query filter: {self.query_filter}')
-
-        if len(res.tsDataset) == 0:
-            raise RuntimeError(f'No data fetched with query filter: {self.query_filter}')
-
-        raw_data = serde.convert_to_df(res.columnNameList,
-                                       res.columnTypeList,
-                                       res.columnNameIndexMap,
-                                       res.tsDataset)
-        if raw_data.empty:
-            raise RuntimeError(f'Fetched empty data with query expressions: '
-                               f'{self.query_expressions} and query filter: {self.query_filter}')
+        query_id, has_more_data, raw_data = data_client.fetch_timeseries(self.query_expressions, self.query_filter)
+        # TODO: consider has_more_data
+
         cols_data = raw_data.columns[1:]
         self.data = raw_data[cols_data].values
         self.timestamp = pd.to_datetime(raw_data[raw_data.columns[0]].values, unit='ms', utc=True) \