You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/04/03 13:21:24 UTC
[iotdb] branch mlnode/test updated (3072ae715a -> 833d0619ed)
This is an automated email from the ASF dual-hosted git repository.
hui pushed a change to branch mlnode/test
in repository https://gitbox.apache.org/repos/asf/iotdb.git
from 3072ae715a Merge remote-tracking branch 'liuyong/mlnode/process' into mlnode/test
new bf171eff72 modify iotdb server
new 833d0619ed make mlnode available
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
mlnode/iotdb/mlnode/algorithm/enums.py | 3 ++
mlnode/iotdb/mlnode/algorithm/factory.py | 1 +
.../mlnode/algorithm/models/forecast/dlinear.py | 3 +-
mlnode/iotdb/mlnode/client.py | 32 ++++++------
mlnode/iotdb/mlnode/config.py | 10 ++--
mlnode/iotdb/mlnode/constant.py | 6 ---
mlnode/iotdb/mlnode/data_access/enums.py | 3 ++
mlnode/iotdb/mlnode/data_access/offline/source.py | 4 +-
mlnode/iotdb/mlnode/handler.py | 32 +++++-------
mlnode/iotdb/mlnode/parser.py | 7 ++-
mlnode/iotdb/mlnode/process/manager.py | 36 +++++++------
mlnode/iotdb/mlnode/process/task.py | 4 +-
mlnode/iotdb/mlnode/process/task_factory.py | 2 +-
mlnode/iotdb/mlnode/process/trial.py | 59 ++++++++++++----------
mlnode/iotdb/mlnode/service.py | 11 ++--
mlnode/iotdb/mlnode/storage.py | 11 ++--
mlnode/iotdb/mlnode/util.py | 2 +-
mlnode/pyproject.toml | 1 +
mlnode/requirements.txt | 2 +-
.../iotdb/commons/model/ModelHyperparameter.java | 10 ++++
.../iotdb/commons/model/ModelInformation.java | 9 +++-
.../iotdb/commons/model/TrailInformation.java | 7 ++-
.../db/mpp/common/header/ColumnHeaderConstant.java | 8 +--
.../db/mpp/common/header/DatasetHeaderFactory.java | 4 +-
.../config/metadata/model/ShowModelsTask.java | 51 +++++++++++--------
.../config/metadata/model/ShowTrailsTask.java | 29 +++++++----
.../ConcatExpressionWithSuffixPathsVisitor.java | 3 +-
.../db/mpp/plan/parser/StatementGenerator.java | 49 +++++++++---------
28 files changed, 227 insertions(+), 172 deletions(-)
[iotdb] 02/02: make mlnode available
Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch mlnode/test
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 833d0619ed19711132042578ae0ea6123bec77c4
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Apr 3 16:11:31 2023 +0800
make mlnode available
---
mlnode/iotdb/mlnode/algorithm/enums.py | 3 ++
mlnode/iotdb/mlnode/algorithm/factory.py | 1 +
.../mlnode/algorithm/models/forecast/dlinear.py | 3 +-
mlnode/iotdb/mlnode/client.py | 32 ++++++------
mlnode/iotdb/mlnode/config.py | 10 ++--
mlnode/iotdb/mlnode/constant.py | 6 ---
mlnode/iotdb/mlnode/data_access/enums.py | 3 ++
mlnode/iotdb/mlnode/data_access/offline/source.py | 4 +-
mlnode/iotdb/mlnode/handler.py | 32 +++++-------
mlnode/iotdb/mlnode/parser.py | 7 ++-
mlnode/iotdb/mlnode/process/manager.py | 36 +++++++------
mlnode/iotdb/mlnode/process/task.py | 4 +-
mlnode/iotdb/mlnode/process/task_factory.py | 2 +-
mlnode/iotdb/mlnode/process/trial.py | 59 ++++++++++++----------
mlnode/iotdb/mlnode/service.py | 11 ++--
mlnode/iotdb/mlnode/storage.py | 11 ++--
mlnode/iotdb/mlnode/util.py | 2 +-
mlnode/pyproject.toml | 1 +
mlnode/requirements.txt | 2 +-
19 files changed, 122 insertions(+), 107 deletions(-)
diff --git a/mlnode/iotdb/mlnode/algorithm/enums.py b/mlnode/iotdb/mlnode/algorithm/enums.py
index 4b05aa4bf8..2def3751cd 100644
--- a/mlnode/iotdb/mlnode/algorithm/enums.py
+++ b/mlnode/iotdb/mlnode/algorithm/enums.py
@@ -27,3 +27,6 @@ class ForecastTaskType(Enum):
def __eq__(self, other: str) -> bool:
return self.value == other
+
+ def __hash__(self) -> int:
+ return hash(self.value)
diff --git a/mlnode/iotdb/mlnode/algorithm/factory.py b/mlnode/iotdb/mlnode/algorithm/factory.py
index 92cb01a883..26eab10860 100644
--- a/mlnode/iotdb/mlnode/algorithm/factory.py
+++ b/mlnode/iotdb/mlnode/algorithm/factory.py
@@ -19,6 +19,7 @@ import torch.nn as nn
from iotdb.mlnode.algorithm.enums import ForecastTaskType
from iotdb.mlnode.algorithm.models.forecast import support_forecasting_models
+from iotdb.mlnode.algorithm.models.forecast.dlinear import dlinear
from iotdb.mlnode.exception import BadConfigValueError
diff --git a/mlnode/iotdb/mlnode/algorithm/models/forecast/dlinear.py b/mlnode/iotdb/mlnode/algorithm/models/forecast/dlinear.py
index fa9ee04e56..966ea20347 100644
--- a/mlnode/iotdb/mlnode/algorithm/models/forecast/dlinear.py
+++ b/mlnode/iotdb/mlnode/algorithm/models/forecast/dlinear.py
@@ -21,6 +21,7 @@ import math
import torch
import torch.nn as nn
+from iotdb.mlnode.algorithm.enums import ForecastTaskType
from iotdb.mlnode.exception import BadConfigValueError
@@ -65,7 +66,7 @@ class DLinear(nn.Module):
pred_len=96,
input_vars=1,
output_vars=1,
- forecast_type='m', # TODO, support others
+ forecast_task_type=ForecastTaskType.ENDOGENOUS, # TODO, support others
):
super(DLinear, self).__init__()
self.input_len = input_len
diff --git a/mlnode/iotdb/mlnode/client.py b/mlnode/iotdb/mlnode/client.py
index 6c1c549ea1..724d517316 100644
--- a/mlnode/iotdb/mlnode/client.py
+++ b/mlnode/iotdb/mlnode/client.py
@@ -21,7 +21,7 @@ from thrift.protocol import TBinaryProtocol, TCompactProtocol
from thrift.Thrift import TException
from thrift.transport import TSocket, TTransport
-from iotdb.mlnode.config import config
+from iotdb.mlnode.config import descriptor
from iotdb.mlnode.constant import TSStatusCode
from iotdb.mlnode.log import logger
from iotdb.mlnode.util import verify_success
@@ -29,7 +29,7 @@ from iotdb.thrift.common.ttypes import TEndPoint, TrainingState, TSStatus
from iotdb.thrift.confignode import IConfigNodeRPCService
from iotdb.thrift.confignode.ttypes import (TUpdateModelInfoReq,
TUpdateModelStateReq)
-from iotdb.thrift.datanode import IDataNodeRPCService
+from iotdb.thrift.datanode import IMLNodeInternalRPCService
from iotdb.thrift.datanode.ttypes import (TFetchTimeseriesReq,
TFetchTimeseriesResp,
TRecordModelMetricsReq)
@@ -39,8 +39,8 @@ from iotdb.thrift.mlnode.ttypes import TCreateTrainingTaskReq, TDeleteModelReq
class ClientManager(object):
def __init__(self):
- self.__data_node_endpoint = config.get_mn_target_data_node()
- self.__config_node_endpoint = config.get_mn_target_config_node()
+ self.__data_node_endpoint = descriptor.get_config().get_mn_target_data_node()
+ self.__config_node_endpoint = descriptor.get_config().get_mn_target_config_node()
def borrow_data_node_client(self):
return DataNodeClient(host=self.__data_node_endpoint.ip,
@@ -120,18 +120,14 @@ class DataNodeClient(object):
raise e
protocol = TBinaryProtocol.TBinaryProtocol(transport)
- self.__client = IDataNodeRPCService.Client(protocol)
+ self.__client = IMLNodeInternalRPCService.Client(protocol)
def fetch_timeseries(self,
- session_id: int,
- statement_id: int,
query_expressions: list = [],
query_filter: str = None,
fetch_size: int = DEFAULT_FETCH_SIZE,
timeout: int = DEFAULT_TIMEOUT) -> TFetchTimeseriesResp:
req = TFetchTimeseriesReq(
- sessionId=session_id,
- statementId=statement_id,
queryExpressions=query_expressions,
queryFilter=query_filter,
fetchSize=fetch_size,
@@ -147,8 +143,8 @@ class DataNodeClient(object):
def record_model_metrics(self,
model_id: str,
trial_id: str,
- metrics: list = [],
- values: list = []) -> None:
+ metrics: list,
+ values: list) -> None:
req = TRecordModelMetricsReq(
modelId=model_id,
trialId=trial_id,
@@ -186,6 +182,7 @@ class ConfigNodeClient(object):
if self.__config_leader is not None:
try:
self.__connect(self.__config_leader)
+ return
except TException:
logger.warn("The current node {} may have been down, try next node", self.__config_leader)
self.__config_leader = None
@@ -200,6 +197,7 @@ class ConfigNodeClient(object):
try_endpoint = self.__config_nodes[self.__cursor]
try:
self.__connect(try_endpoint)
+ return
except TException:
logger.warn("The current node {} may have been down, try next node", try_endpoint)
@@ -217,7 +215,7 @@ class ConfigNodeClient(object):
except TTransport.TTransportException as e:
logger.exception("TTransportException!", exc_info=e)
- protocol = TCompactProtocol.TBinaryProtocol(transport)
+ protocol = TBinaryProtocol.TBinaryProtocol(transport)
self.__client = IConfigNodeRPCService.Client(protocol)
def __wait_and_reconnect(self) -> None:
@@ -246,12 +244,12 @@ class ConfigNodeClient(object):
def update_model_state(self,
model_id: str,
- trial_id: str,
- training_state: TrainingState) -> None:
+ training_state: TrainingState,
+ best_trail_id: str = None) -> None:
req = TUpdateModelStateReq(
modelId=model_id,
- trialId=trial_id,
- trainingState=training_state
+ state=training_state,
+ bestTrailId=best_trail_id
)
for i in range(0, self.__RETRY_NUM):
try:
@@ -275,7 +273,7 @@ class ConfigNodeClient(object):
model_info = {}
req = TUpdateModelInfoReq(
modelId=model_id,
- trialId=trial_id,
+ trailId=trial_id,
modelInfo={k: str(v) for k, v in model_info.items()},
)
diff --git a/mlnode/iotdb/mlnode/config.py b/mlnode/iotdb/mlnode/config.py
index e59338209a..109452eab5 100644
--- a/mlnode/iotdb/mlnode/config.py
+++ b/mlnode/iotdb/mlnode/config.py
@@ -44,7 +44,7 @@ class MLNodeConfig(object):
self.__mn_target_config_node: TEndPoint = TEndPoint("127.0.0.1", 10710)
# Target DataNode to be connected by MLNode
- self.__mn_target_data_node: TEndPoint = TEndPoint("127.0.0.1", 10730)
+ self.__mn_target_data_node: TEndPoint = TEndPoint("127.0.0.1", 10780)
def get_mn_rpc_address(self) -> str:
return self.__mn_rpc_address
@@ -86,9 +86,8 @@ class MLNodeConfig(object):
class MLNodeDescriptor(object):
def __init__(self):
self.__config = MLNodeConfig()
- self.__load_config_from_file()
- def __load_config_from_file(self) -> None:
+ def load_config_from_file(self) -> None:
conf_file = os.path.join(os.getcwd(), MLNODE_CONF_DIRECTORY_NAME, MLNODE_CONF_FILE_NAME)
if not os.path.exists(conf_file):
logger.info("Cannot find MLNode config file '{}', use default configuration.".format(conf_file))
@@ -113,7 +112,7 @@ class MLNodeDescriptor(object):
self.__config.set_mn_model_storage_dir(file_configs.mn_model_storage_dir)
if file_configs.mn_model_storage_cache_size is not None:
- self.__config.set_mn_model_storage_cachesize(file_configs.mn_model_storage_cache_size)
+ self.__config.set_mn_model_storage_cache_size(file_configs.mn_model_storage_cache_size)
if file_configs.mn_target_config_node is not None:
self.__config.set_mn_target_config_node(file_configs.mn_target_config_node)
@@ -129,4 +128,5 @@ class MLNodeDescriptor(object):
return self.__config
-config = MLNodeDescriptor().get_config()
+# initialize a singleton
+descriptor = MLNodeDescriptor()
diff --git a/mlnode/iotdb/mlnode/constant.py b/mlnode/iotdb/mlnode/constant.py
index e0be2a7b63..3bffa06526 100644
--- a/mlnode/iotdb/mlnode/constant.py
+++ b/mlnode/iotdb/mlnode/constant.py
@@ -31,9 +31,3 @@ class TSStatusCode(Enum):
def get_status_code(self) -> int:
return self.value
-
-
-class ModelState(Enum):
- RUNNING = 'running'
- FINISHED = 'finished'
- FAILED = 'failed'
diff --git a/mlnode/iotdb/mlnode/data_access/enums.py b/mlnode/iotdb/mlnode/data_access/enums.py
index d21a9f69c4..e7f5417b3d 100644
--- a/mlnode/iotdb/mlnode/data_access/enums.py
+++ b/mlnode/iotdb/mlnode/data_access/enums.py
@@ -27,3 +27,6 @@ class DatasetType(Enum):
def __eq__(self, other: str) -> bool:
return self.value == other
+
+ def __hash__(self) -> int:
+ return hash(self.value)
diff --git a/mlnode/iotdb/mlnode/data_access/offline/source.py b/mlnode/iotdb/mlnode/data_access/offline/source.py
index a63371ec7a..0422bb373d 100644
--- a/mlnode/iotdb/mlnode/data_access/offline/source.py
+++ b/mlnode/iotdb/mlnode/data_access/offline/source.py
@@ -74,8 +74,8 @@ class ThriftDataSource(DataSource):
try:
res = data_client.fetch_timeseries(
- queryExpressions=self.query_expressions,
- queryFilter=self.query_filter,
+ query_expressions=self.query_expressions,
+ query_filter=self.query_filter,
)
except Exception:
raise RuntimeError(f'Fail to fetch data with query expressions: {self.query_expressions}'
diff --git a/mlnode/iotdb/mlnode/handler.py b/mlnode/iotdb/mlnode/handler.py
index e43f26c226..1a6e3eb90a 100644
--- a/mlnode/iotdb/mlnode/handler.py
+++ b/mlnode/iotdb/mlnode/handler.py
@@ -19,7 +19,6 @@
from iotdb.mlnode.algorithm.factory import create_forecast_model
from iotdb.mlnode.constant import TSStatusCode
from iotdb.mlnode.data_access.factory import create_forecast_dataset
-from iotdb.mlnode.log import logger
from iotdb.mlnode.parser import parse_training_request
from iotdb.mlnode.process.manager import TaskManager
from iotdb.mlnode.util import get_status
@@ -37,29 +36,26 @@ class MLNodeRPCServiceHandler(IMLNodeRPCService.Iface):
return get_status(TSStatusCode.SUCCESS_STATUS, "")
def createTrainingTask(self, req: TCreateTrainingTaskReq):
- # parse request stage (check required config and config type)
- data_config, model_config, task_config = parse_training_request(req)
-
- # create model stage (check model config legitimacy)
+ task = None
try:
+ # parse request, check required config and config type
+ data_config, model_config, task_config = parse_training_request(req)
+
+ # create model & check model config legitimacy
model, model_config = create_forecast_model(**model_config)
- except Exception as e: # Create model failed
- return get_status(TSStatusCode.FAIL_STATUS, str(e))
- logger.info('model config: ' + str(model_config))
- # create data stage (check data config legitimacy)
- try:
+ # create dataset & check data config legitimacy
dataset, data_config = create_forecast_dataset(**data_config)
- except Exception as e: # Create data failed
- return get_status(TSStatusCode.FAIL_STATUS, str(e))
- logger.info('data config: ' + str(data_config))
-
- # create task stage (check task config legitimacy)
- # submit task stage (check resource and decide pending/start)
- self.__task_manager.submit_training_task(task_config, model_config, model, dataset)
+ # create task & check task config legitimacy
+ task = self.__task_manager.create_training_task(dataset, model, model_config, task_config)
- return get_status(TSStatusCode.SUCCESS_STATUS, 'Successfully create training task')
+ return get_status(TSStatusCode.SUCCESS_STATUS, 'Successfully create training task')
+ except Exception as e:
+ return get_status(TSStatusCode.FAIL_STATUS, str(e))
+ finally:
+ # submit task stage & check resource and decide pending/start
+ self.__task_manager.submit_training_task(task)
def forecast(self, req: TForecastReq):
status = get_status(TSStatusCode.SUCCESS_STATUS, "")
diff --git a/mlnode/iotdb/mlnode/parser.py b/mlnode/iotdb/mlnode/parser.py
index 236032b9a0..c052cd5050 100644
--- a/mlnode/iotdb/mlnode/parser.py
+++ b/mlnode/iotdb/mlnode/parser.py
@@ -91,8 +91,9 @@ class _ConfigParser(argparse.ArgumentParser):
- output_vars: number of output variables
"""
_data_config_parser = _ConfigParser()
-_data_config_parser.add_argument('--source_type', type=str, required=True)
-_data_config_parser.add_argument('--dataset_type', type=DatasetType, required=True)
+_data_config_parser.add_argument('--source_type', type=str, default="thrift")
+_data_config_parser.add_argument('--dataset_type', type=DatasetType, default=DatasetType.WINDOW,
+ choices=list(DatasetType))
_data_config_parser.add_argument('--filename', type=str, default='')
_data_config_parser.add_argument('--query_expressions', type=str, nargs='*', default=[])
_data_config_parser.add_argument('--query_filter', type=str, default='')
@@ -183,6 +184,8 @@ def parse_training_request(req: TCreateTrainingTaskReq):
task_config: configurations related to task
"""
config = req.modelConfigs
+ config.update(model_name=config['model_type'])
+ config.update(task_class=config['model_task'])
config.update(model_id=req.modelId)
config.update(tuning=req.isAuto)
config.update(query_expressions=req.queryExpressions)
diff --git a/mlnode/iotdb/mlnode/process/manager.py b/mlnode/iotdb/mlnode/process/manager.py
index bfb035f27b..0af0353973 100644
--- a/mlnode/iotdb/mlnode/process/manager.py
+++ b/mlnode/iotdb/mlnode/process/manager.py
@@ -18,7 +18,11 @@
import multiprocessing as mp
+from torch import nn
+from torch.utils.data import Dataset
+
from iotdb.mlnode.log import logger
+from iotdb.mlnode.process.task import ForecastingTrainingTask
from iotdb.mlnode.process.task_factory import create_task
@@ -33,22 +37,22 @@ class TaskManager(object):
self.__pid_info = self.__shared_resource_manager.dict()
self.__training_process_pool = mp.Pool(pool_num)
- def submit_training_task(self, task_configs, model_configs, model, dataset):
- assert 'model_id' in task_configs.keys(), 'Task config should contain model_id'
+ def create_training_task(self,
+ dataset: Dataset,
+ model: nn.Module,
+ model_configs: dict,
+ task_configs: dict) -> ForecastingTrainingTask:
model_id = task_configs['model_id']
self.__pid_info[model_id] = self.__shared_resource_manager.dict()
- try:
- task = create_task(
- task_configs,
- model_configs,
- model,
- dataset,
- self.__pid_info
- )
- except Exception as e:
- logger.exception(e)
- return e, False
+ return create_task(
+ task_configs,
+ model_configs,
+ model,
+ dataset,
+ self.__pid_info
+ )
- logger.info(f'Task: ({model_id}) - Training process submitted successfully')
- self.__training_process_pool.apply_async(task, args=())
- return model_id, True
+ def submit_training_task(self, task: ForecastingTrainingTask) -> None:
+ if task is not None:
+ self.__training_process_pool.apply_async(task, args=())
+ logger.info(f'Task: ({task.model_id}) - Training process submitted successfully')
diff --git a/mlnode/iotdb/mlnode/process/task.py b/mlnode/iotdb/mlnode/process/task.py
index 7fac9cb1c5..85d5b5d2cf 100644
--- a/mlnode/iotdb/mlnode/process/task.py
+++ b/mlnode/iotdb/mlnode/process/task.py
@@ -75,7 +75,7 @@ class _BasicTask(object):
class ForecastingTrainingTask(_BasicTask):
def __init__(self, task_configs, model_configs, model, dataset, task_trial_map):
super(ForecastingTrainingTask, self).__init__(task_configs, model_configs, model, dataset, task_trial_map)
- model_id = self.task_configs['model_id']
+ self.model_id = self.task_configs['model_id']
self.tuning = self.task_configs["tuning"]
if self.tuning: # TODO implement tuning task
@@ -83,7 +83,7 @@ class ForecastingTrainingTask(_BasicTask):
else:
self.task_configs['trial_id'] = 'tid_0' # TODO: set a default trial id
self.trial = ForecastingTrainingTrial(self.task_configs, self.model, self.model_configs, self.dataset)
- self.task_trial_map[model_id]['tid_0'] = os.getpid()
+ self.task_trial_map[self.model_id]['tid_0'] = os.getpid()
def __call__(self):
try:
diff --git a/mlnode/iotdb/mlnode/process/task_factory.py b/mlnode/iotdb/mlnode/process/task_factory.py
index 7b9966a8f3..083b84eba2 100644
--- a/mlnode/iotdb/mlnode/process/task_factory.py
+++ b/mlnode/iotdb/mlnode/process/task_factory.py
@@ -20,7 +20,7 @@
from iotdb.mlnode.process.task import ForecastingTrainingTask
support_task_types = {
- 'forecast_training_task': ForecastingTrainingTask
+ 'forecast': ForecastingTrainingTask
}
diff --git a/mlnode/iotdb/mlnode/process/trial.py b/mlnode/iotdb/mlnode/process/trial.py
index f8671b4657..9852e3ffb4 100644
--- a/mlnode/iotdb/mlnode/process/trial.py
+++ b/mlnode/iotdb/mlnode/process/trial.py
@@ -23,11 +23,11 @@ import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
-from iotdb.mlnode.algorithm.metric import all_metrics
+from iotdb.mlnode.algorithm.metric import MAE, MSE, all_metrics
from iotdb.mlnode.client import client_manager
from iotdb.mlnode.log import logger
from iotdb.mlnode.storage import model_storage
-from iotdb.mlnode.constant import ModelState
+from iotdb.thrift.common.ttypes import TrainingState
def _parse_trial_config(**kwargs):
@@ -188,8 +188,8 @@ class ForecastingTrainingTrial(BasicTrial):
val_loss.append(loss.item())
for name in self.metric_names:
- value = eval(name)(outputs.detach().cpu().numpy(),
- batch_y.detach().cpu().numpy())
+ metric = eval(name)()
+ value = metric(outputs.detach().cpu().numpy(), batch_y.detach().cpu().numpy())
metrics_dict[name].append(value)
for name, value_list in metrics_dict.items():
@@ -207,25 +207,32 @@ class ForecastingTrainingTrial(BasicTrial):
return val_loss, metrics_dict
def start(self) -> float:
- self.confignode_client.update_model_state(self.model_id, self.trial_id, ModelState.RUNNING)
- best_loss = np.inf
- best_metrics_dict = None
- for epoch in range(self.epochs):
- self._train(epoch)
- val_loss, metrics_dict = self._validate(epoch)
- if val_loss < best_loss:
- best_loss = val_loss
- best_metrics_dict = metrics_dict
- model_storage.save_model(self.model,
- self.model_configs,
- model_id=self.model_id,
- trial_id=self.trial_id)
-
- logger.info(f'Trial: ({self.model_id}_{self.trial_id}) - Finished with best model saved successfully')
-
- self.confignode_client.update_model_state(self.model_id, self.trial_id, ModelState.RUNNING)
- model_info = {}
- model_info.update(best_metrics_dict)
- model_info.update(self.trial_configs)
- self.confignode_client.update_model_info(self.model_id, self.trial_id, model_info)
- return best_loss
+ try:
+ self.confignode_client.update_model_state(self.model_id, TrainingState.RUNNING)
+ best_loss = np.inf
+ best_metrics_dict = None
+ model_path = None
+ for epoch in range(self.epochs):
+ self._train(epoch)
+ val_loss, metrics_dict = self._validate(epoch)
+ if val_loss < best_loss:
+ best_loss = val_loss
+ best_metrics_dict = metrics_dict
+ model_path = model_storage.save_model(self.model,
+ self.model_configs,
+ model_id=self.model_id,
+ trial_id=self.trial_id)
+
+ logger.info(f'Trial: ({self.model_id}_{self.trial_id}) - Finished with best model saved successfully')
+
+ model_info = {}
+ model_info.update(best_metrics_dict)
+ model_info.update(self.trial_configs)
+ model_info['model_path'] = model_path
+ self.confignode_client.update_model_info(self.model_id, self.trial_id, model_info)
+ self.confignode_client.update_model_state(self.model_id, TrainingState.FINISHED, self.trial_id)
+ return best_loss
+ except Exception as e:
+ logger.warn(e)
+ self.confignode_client.update_model_state(self.model_id, TrainingState.FAILED)
+ raise e
diff --git a/mlnode/iotdb/mlnode/service.py b/mlnode/iotdb/mlnode/service.py
index a2c05ea5c3..ae0727cc5a 100644
--- a/mlnode/iotdb/mlnode/service.py
+++ b/mlnode/iotdb/mlnode/service.py
@@ -19,10 +19,10 @@ import threading
import time
from thrift.protocol import TCompactProtocol
-from thrift.server import TServer
+from thrift.server import TProcessPoolServer
from thrift.transport import TSocket, TTransport
-from iotdb.mlnode.config import config
+from iotdb.mlnode.config import descriptor
from iotdb.mlnode.handler import MLNodeRPCServiceHandler
from iotdb.mlnode.log import logger
from iotdb.thrift.mlnode import IMLNodeRPCService
@@ -32,11 +32,13 @@ class RPCService(threading.Thread):
def __init__(self):
super().__init__()
processor = IMLNodeRPCService.Processor(handler=MLNodeRPCServiceHandler())
- transport = TSocket.TServerSocket(host=config.get_mn_rpc_address(), port=config.get_mn_rpc_port())
+ transport = TSocket.TServerSocket(host=descriptor.get_config().get_mn_rpc_address(),
+ port=descriptor.get_config().get_mn_rpc_port())
transport_factory = TTransport.TFramedTransportFactory()
protocol_factory = TCompactProtocol.TCompactProtocolFactory()
- self.__pool_server = TServer.TThreadPoolServer(processor, transport, transport_factory, protocol_factory)
+ self.__pool_server = TProcessPoolServer.TProcessPoolServer(processor, transport, transport_factory,
+ protocol_factory)
def run(self) -> None:
logger.info("The RPC service thread begin to run...")
@@ -45,6 +47,7 @@ class RPCService(threading.Thread):
class MLNode(object):
def __init__(self):
+ descriptor.load_config_from_file()
self.__rpc_service = RPCService()
def start(self) -> None:
diff --git a/mlnode/iotdb/mlnode/storage.py b/mlnode/iotdb/mlnode/storage.py
index ee745689b1..78a0be43bf 100644
--- a/mlnode/iotdb/mlnode/storage.py
+++ b/mlnode/iotdb/mlnode/storage.py
@@ -24,35 +24,36 @@ import torch
import torch.nn as nn
from pylru import lrucache
-from iotdb.mlnode.config import config
+from iotdb.mlnode.config import descriptor
from iotdb.mlnode.exception import ModelNotExistError
class ModelStorage(object):
def __init__(self):
- self.__model_dir = os.path.join(os.getcwd(), config.get_mn_model_storage_dir())
+ self.__model_dir = os.path.join('.', descriptor.get_config().get_mn_model_storage_dir())
if not os.path.exists(self.__model_dir):
os.mkdir(self.__model_dir)
- self.__model_cache = lrucache(config.get_mn_model_storage_cache_size())
+ self.__model_cache = lrucache(descriptor.get_config().get_mn_model_storage_cache_size())
def save_model(self,
model: nn.Module,
model_config: dict,
model_id: str,
- trial_id: str) -> None:
+ trial_id: str) -> str:
"""
Note: model config for time series should contain 'input_len' and 'input_vars'
"""
model_dir_path = os.path.join(self.__model_dir, f'{model_id}')
if not os.path.exists(model_dir_path):
- os.mkdir(model_dir_path)
+ os.makedirs(model_dir_path)
model_file_path = os.path.join(model_dir_path, f'{trial_id}.pt')
sample_input = [torch.randn(1, model_config['input_len'], model_config['input_vars'])]
torch.jit.save(torch.jit.trace(model, sample_input),
model_file_path,
_extra_files={'model_config': json.dumps(model_config)})
+ return os.path.abspath(model_file_path)
def load_model(self, model_id: str, trial_id: str) -> (torch.jit.ScriptModule, dict):
"""
diff --git a/mlnode/iotdb/mlnode/util.py b/mlnode/iotdb/mlnode/util.py
index e451d2b25a..5d3a2d670e 100644
--- a/mlnode/iotdb/mlnode/util.py
+++ b/mlnode/iotdb/mlnode/util.py
@@ -52,6 +52,6 @@ def get_status(status_code: TSStatusCode, message: str) -> TSStatus:
def verify_success(status: TSStatus, err_msg: str) -> None:
- if status.code != TSStatusCode.SUCCESS_STATUS:
+ if status.code != TSStatusCode.SUCCESS_STATUS.get_status_code():
logger.warn(err_msg + ", error status is ", status)
raise RuntimeError(str(status.code) + ": " + status.message)
diff --git a/mlnode/pyproject.toml b/mlnode/pyproject.toml
index 3944e2910d..56290f8d4e 100644
--- a/mlnode/pyproject.toml
+++ b/mlnode/pyproject.toml
@@ -49,6 +49,7 @@ packages = [
python = "^3.7"
thrift = "^0.13.0"
dynaconf = "^3.1.11"
+pylru = "^1.2.1"
[tool.poetry.scripts]
mlnode = "iotdb.mlnode.script:main"
\ No newline at end of file
diff --git a/mlnode/requirements.txt b/mlnode/requirements.txt
index edd85701ab..c49c8a0189 100644
--- a/mlnode/requirements.txt
+++ b/mlnode/requirements.txt
@@ -20,7 +20,7 @@ pandas>=1.3.5
numpy>=1.21.4
apache-iotdb
poetry
-torch
+torch~=2.0.0
pylru
thrift~=0.13.0
[iotdb] 01/02: modify iotdb server
Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch mlnode/test
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bf171eff727a59895ed439d08ea45b65657dc154
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Apr 3 16:05:01 2023 +0800
modify iotdb server
---
.../iotdb/commons/model/ModelHyperparameter.java | 10 +++++
.../iotdb/commons/model/ModelInformation.java | 9 +++-
.../iotdb/commons/model/TrailInformation.java | 7 ++-
.../db/mpp/common/header/ColumnHeaderConstant.java | 8 ++--
.../db/mpp/common/header/DatasetHeaderFactory.java | 4 +-
.../config/metadata/model/ShowModelsTask.java | 51 +++++++++++++---------
.../config/metadata/model/ShowTrailsTask.java | 29 ++++++++----
.../ConcatExpressionWithSuffixPathsVisitor.java | 3 +-
.../db/mpp/plan/parser/StatementGenerator.java | 49 ++++++++++-----------
9 files changed, 105 insertions(+), 65 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/model/ModelHyperparameter.java b/node-commons/src/main/java/org/apache/iotdb/commons/model/ModelHyperparameter.java
index 151a6b7c59..ed5a3cbb3c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/model/ModelHyperparameter.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/model/ModelHyperparameter.java
@@ -26,6 +26,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
public class ModelHyperparameter {
@@ -53,6 +55,14 @@ public class ModelHyperparameter {
return stringBuilder.toString();
}
+ public List<String> toStringList() {
+ List<String> resultList = new ArrayList<>();
+ for (Map.Entry<String, String> keyValuePair : keyValueMap.entrySet()) {
+ resultList.add(keyValuePair.getKey() + "=" + keyValuePair.getValue());
+ }
+ return resultList;
+ }
+
public void serialize(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(keyValueMap, stream);
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/model/ModelInformation.java b/node-commons/src/main/java/org/apache/iotdb/commons/model/ModelInformation.java
index a8cff6968d..522f609e51 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/model/ModelInformation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/model/ModelInformation.java
@@ -270,10 +270,17 @@ public class ModelInformation {
if (bestTrailId != null) {
TrailInformation bestTrail = trailMap.get(bestTrailId);
- ReadWriteIOUtils.write(bestTrail.getModelHyperparameter().toString(), stream);
ReadWriteIOUtils.write(bestTrail.getModelPath(), stream);
+
+ List<String> modelHyperparameterList = bestTrail.getModelHyperparameter().toStringList();
+ ReadWriteIOUtils.write(modelHyperparameterList.size(), stream);
+ for (String hyperparameter : modelHyperparameterList) {
+ ReadWriteIOUtils.write(hyperparameter, stream);
+ }
} else {
ReadWriteIOUtils.write("UNKNOWN", stream);
+
+ ReadWriteIOUtils.write(1, stream);
ReadWriteIOUtils.write("UNKNOWN", stream);
}
return ByteBuffer.wrap(buffer.getBuf(), 0, buffer.size());
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/model/TrailInformation.java b/node-commons/src/main/java/org/apache/iotdb/commons/model/TrailInformation.java
index 8551534d41..56de20671e 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/model/TrailInformation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/model/TrailInformation.java
@@ -27,6 +27,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Map;
public class TrailInformation {
@@ -68,8 +69,12 @@ public class TrailInformation {
PublicBAOS buffer = new PublicBAOS();
DataOutputStream stream = new DataOutputStream(buffer);
ReadWriteIOUtils.write(trailId, stream);
- ReadWriteIOUtils.write(modelHyperparameter.toString(), stream);
ReadWriteIOUtils.write(modelPath, stream);
+ List<String> modelHyperparameterList = modelHyperparameter.toStringList();
+ ReadWriteIOUtils.write(modelHyperparameterList.size(), stream);
+ for (String hyperparameter : modelHyperparameterList) {
+ ReadWriteIOUtils.write(hyperparameter, stream);
+ }
return ByteBuffer.wrap(buffer.getBuf(), 0, buffer.size());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index 9093cf3f27..ded93fc3df 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -412,12 +412,12 @@ public class ColumnHeaderConstant {
new ColumnHeader(MODEL_TYPE, TSDataType.TEXT),
new ColumnHeader(QUERY_BODY, TSDataType.TEXT),
new ColumnHeader(STATE, TSDataType.TEXT),
- new ColumnHeader(HYPERPARAMETER, TSDataType.TEXT),
- new ColumnHeader(MODEL_PATH, TSDataType.TEXT));
+ new ColumnHeader(MODEL_PATH, TSDataType.TEXT),
+ new ColumnHeader(HYPERPARAMETER, TSDataType.TEXT));
public static final List<ColumnHeader> showTrailsColumnHeaders =
ImmutableList.of(
new ColumnHeader(TRAIL_ID, TSDataType.TEXT),
- new ColumnHeader(HYPERPARAMETER, TSDataType.TEXT),
- new ColumnHeader(MODEL_PATH, TSDataType.TEXT));
+ new ColumnHeader(MODEL_PATH, TSDataType.TEXT),
+ new ColumnHeader(HYPERPARAMETER, TSDataType.TEXT));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
index e5670729f2..c7b07431d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
@@ -170,10 +170,10 @@ public class DatasetHeaderFactory {
}
public static DatasetHeader getShowModelsHeader() {
- return new DatasetHeader(ColumnHeaderConstant.showModelsColumnHeaders, false);
+ return new DatasetHeader(ColumnHeaderConstant.showModelsColumnHeaders, true);
}
public static DatasetHeader getShowTrailsHeader() {
- return new DatasetHeader(ColumnHeaderConstant.showTrailsColumnHeaders, false);
+ return new DatasetHeader(ColumnHeaderConstant.showTrailsColumnHeaders, true);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowModelsTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowModelsTask.java
index 7fd719e0fd..750e0dd5ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowModelsTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowModelsTask.java
@@ -36,6 +36,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -57,29 +58,37 @@ public class ShowModelsTask implements IConfigTask {
.collect(Collectors.toList());
TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
for (ByteBuffer modelInfo : modelInfoList) {
+ String modelId = ReadWriteIOUtils.readString(modelInfo);
+ String modelTask = ReadWriteIOUtils.readString(modelInfo);
+ String modelType = ReadWriteIOUtils.readString(modelInfo);
+ String queryBody = ReadWriteIOUtils.readString(modelInfo);
+ String trainingState = ReadWriteIOUtils.readString(modelInfo);
+ String modelPath = ReadWriteIOUtils.readString(modelInfo);
+
+ int listSize = ReadWriteIOUtils.readInt(modelInfo);
+ List<String> modelHyperparameter = new ArrayList<>();
+ for (int i = 0; i < listSize; i++) {
+ modelHyperparameter.add(ReadWriteIOUtils.readString(modelInfo));
+ }
+
builder.getTimeColumnBuilder().writeLong(0L);
- builder
- .getColumnBuilder(0)
- .writeBinary(Binary.valueOf(ReadWriteIOUtils.readString(modelInfo)));
- builder
- .getColumnBuilder(1)
- .writeBinary(Binary.valueOf(ReadWriteIOUtils.readString(modelInfo)));
- builder
- .getColumnBuilder(2)
- .writeBinary(Binary.valueOf(ReadWriteIOUtils.readString(modelInfo)));
- builder
- .getColumnBuilder(3)
- .writeBinary(Binary.valueOf(ReadWriteIOUtils.readString(modelInfo)));
- builder
- .getColumnBuilder(4)
- .writeBinary(Binary.valueOf(ReadWriteIOUtils.readString(modelInfo)));
- builder
- .getColumnBuilder(5)
- .writeBinary(Binary.valueOf(ReadWriteIOUtils.readString(modelInfo)));
- builder
- .getColumnBuilder(6)
- .writeBinary(Binary.valueOf(ReadWriteIOUtils.readString(modelInfo)));
+ builder.getColumnBuilder(0).writeBinary(Binary.valueOf(modelId));
+ builder.getColumnBuilder(1).writeBinary(Binary.valueOf(modelTask));
+ builder.getColumnBuilder(2).writeBinary(Binary.valueOf(modelType));
+ builder.getColumnBuilder(3).writeBinary(Binary.valueOf(queryBody));
+ builder.getColumnBuilder(4).writeBinary(Binary.valueOf(trainingState));
+ builder.getColumnBuilder(5).writeBinary(Binary.valueOf(modelPath));
+ builder.getColumnBuilder(6).writeBinary(Binary.valueOf(modelHyperparameter.get(0)));
builder.declarePosition();
+
+ for (int i = 1; i < listSize; i++) {
+ builder.getTimeColumnBuilder().writeLong(0L);
+ for (int columnIndex = 0; columnIndex <= 5; columnIndex++) {
+ builder.getColumnBuilder(columnIndex).writeBinary(Binary.valueOf(""));
+ }
+ builder.getColumnBuilder(6).writeBinary(Binary.valueOf(modelHyperparameter.get(i)));
+ builder.declarePosition();
+ }
}
DatasetHeader datasetHeader = DatasetHeaderFactory.getShowModelsHeader();
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowTrailsTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowTrailsTask.java
index a428c27794..ddb9a5e385 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowTrailsTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/model/ShowTrailsTask.java
@@ -36,6 +36,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -61,17 +62,27 @@ public class ShowTrailsTask implements IConfigTask {
.collect(Collectors.toList());
TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
for (ByteBuffer trailInfo : trailInfoList) {
+ String trailId = ReadWriteIOUtils.readString(trailInfo);
+ String modelPath = ReadWriteIOUtils.readString(trailInfo);
+ int listSize = ReadWriteIOUtils.readInt(trailInfo);
+ List<String> modelHyperparameter = new ArrayList<>();
+ for (int i = 0; i < listSize; i++) {
+ modelHyperparameter.add(ReadWriteIOUtils.readString(trailInfo));
+ }
+
builder.getTimeColumnBuilder().writeLong(0L);
- builder
- .getColumnBuilder(0)
- .writeBinary(Binary.valueOf(ReadWriteIOUtils.readString(trailInfo)));
- builder
- .getColumnBuilder(1)
- .writeBinary(Binary.valueOf(ReadWriteIOUtils.readString(trailInfo)));
- builder
- .getColumnBuilder(2)
- .writeBinary(Binary.valueOf(ReadWriteIOUtils.readString(trailInfo)));
+ builder.getColumnBuilder(0).writeBinary(Binary.valueOf(trailId));
+ builder.getColumnBuilder(1).writeBinary(Binary.valueOf(modelPath));
+ builder.getColumnBuilder(2).writeBinary(Binary.valueOf(modelHyperparameter.get(0)));
builder.declarePosition();
+
+ for (int i = 1; i < listSize; i++) {
+ builder.getTimeColumnBuilder().writeLong(0L);
+ builder.getColumnBuilder(0).writeBinary(Binary.valueOf(""));
+ builder.getColumnBuilder(1).writeBinary(Binary.valueOf(""));
+ builder.getColumnBuilder(2).writeBinary(Binary.valueOf(modelHyperparameter.get(i)));
+ builder.declarePosition();
+ }
}
DatasetHeader datasetHeader = DatasetHeaderFactory.getShowTrailsHeader();
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/ConcatExpressionWithSuffixPathsVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/ConcatExpressionWithSuffixPathsVisitor.java
index 4afbe910ee..a8e565e7da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/ConcatExpressionWithSuffixPathsVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/ConcatExpressionWithSuffixPathsVisitor.java
@@ -60,8 +60,7 @@ public class ConcatExpressionWithSuffixPathsVisitor
}
List<List<Expression>> childExpressionsList = new ArrayList<>();
cartesianProduct(extendedExpressions, childExpressionsList, 0, new ArrayList<>());
- return reconstructFunctionExpressions(
- (FunctionExpression) functionExpression, childExpressionsList);
+ return reconstructFunctionExpressions(functionExpression, childExpressionsList);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index e3951749d4..422617162d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -109,7 +109,6 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
/** Convert SQL and RPC requests to {@link Statement}. */
public class StatementGenerator {
@@ -843,30 +842,30 @@ public class StatementGenerator {
new ResultColumn(
new TimeSeriesOperand(new PartialPath("", false)), ResultColumn.ColumnType.RAW));
- WhereCondition whereCondition = new WhereCondition();
- String queryFilter = fetchTimeseriesReq.getQueryFilter();
- String[] times = queryFilter.split(",");
- int predictNum = 0;
- LessThanExpression rightPredicate = null;
- GreaterEqualExpression leftPredicate = null;
- if (!Objects.equals(times[0], "-1")) {
- leftPredicate =
- new GreaterEqualExpression(
- new TimestampOperand(), new ConstantOperand(TSDataType.INT64, times[0]));
- predictNum += 1;
- }
- if (!Objects.equals(times[1], "-1")) {
- rightPredicate =
- new LessThanExpression(
- new TimestampOperand(), new ConstantOperand(TSDataType.INT64, times[1]));
- predictNum += 2;
- }
- whereCondition.setPredicate(
- predictNum == 3
- ? new LogicAndExpression(leftPredicate, rightPredicate)
- : (predictNum == 1 ? leftPredicate : rightPredicate));
-
- queryStatement.setWhereCondition(whereCondition);
+ // WhereCondition whereCondition = new WhereCondition();
+ // String queryFilter = fetchTimeseriesReq.getQueryFilter();
+ // String[] times = queryFilter.split(",");
+ // int predictNum = 0;
+ // LessThanExpression rightPredicate = null;
+ // GreaterEqualExpression leftPredicate = null;
+ // if (!Objects.equals(times[0], "-1")) {
+ // leftPredicate =
+ // new GreaterEqualExpression(
+ // new TimestampOperand(), new ConstantOperand(TSDataType.INT64, times[0]));
+ // predictNum += 1;
+ // }
+ // if (!Objects.equals(times[1], "-1")) {
+ // rightPredicate =
+ // new LessThanExpression(
+ // new TimestampOperand(), new ConstantOperand(TSDataType.INT64, times[1]));
+ // predictNum += 2;
+ // }
+ // whereCondition.setPredicate(
+ // predictNum == 3
+ // ? new LogicAndExpression(leftPredicate, rightPredicate)
+ // : (predictNum == 1 ? leftPredicate : rightPredicate));
+ //
+ // queryStatement.setWhereCondition(whereCondition);
queryStatement.setFromComponent(fromComponent);
queryStatement.setSelectComponent(selectComponent);
return queryStatement;