You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/29 16:28:45 UTC
[iotdb] branch master updated: [IOTDB-5678] Introduce machine learning algorithm libraries on MLNode (#9338)
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1bfcab297f [IOTDB-5678] Introduce machine learning algorithm libraries on MLNode (#9338)
1bfcab297f is described below
commit 1bfcab297f90562258b8cc2bfe7929556c04cd7c
Author: Yong Liu <li...@gmail.com>
AuthorDate: Thu Mar 30 00:28:38 2023 +0800
[IOTDB-5678] Introduce machine learning algorithm libraries on MLNode (#9338)
---
.../mlnode/{constant.py => algorithm/__init__.py} | 6 -
mlnode/iotdb/mlnode/algorithm/metric.py | 69 +++++++++++
.../{constant.py => algorithm/models/__init__.py} | 6 -
.../models/forecast/__init__.py} | 6 -
.../mlnode/algorithm/models/forecast/dlinear.py | 138 +++++++++++++++++++++
.../mlnode/algorithm/models/forecast/nbeats.py | 138 +++++++++++++++++++++
mlnode/iotdb/mlnode/client.py | 14 +--
mlnode/iotdb/mlnode/constant.py | 9 ++
mlnode/iotdb/mlnode/exception.py | 1 +
mlnode/iotdb/mlnode/handler.py | 17 +--
.../iotdb/mlnode/{model_storage.py => storage.py} | 0
mlnode/iotdb/mlnode/util.py | 15 ++-
mlnode/test/test_model_storage.py | 2 +-
13 files changed, 375 insertions(+), 46 deletions(-)
diff --git a/mlnode/iotdb/mlnode/constant.py b/mlnode/iotdb/mlnode/algorithm/__init__.py
similarity index 81%
copy from mlnode/iotdb/mlnode/constant.py
copy to mlnode/iotdb/mlnode/algorithm/__init__.py
index 8a38aa95d8..2a1e720805 100644
--- a/mlnode/iotdb/mlnode/constant.py
+++ b/mlnode/iotdb/mlnode/algorithm/__init__.py
@@ -15,9 +15,3 @@
# specific language governing permissions and limitations
# under the License.
#
-
-MLNODE_CONF_DIRECTORY_NAME = "conf"
-MLNODE_CONF_FILE_NAME = "iotdb-mlnode.toml"
-MLNODE_LOG_CONF_FILE_NAME = "logging_config.ini"
-
-MLNODE_MODEL_STORAGE_DIRECTORY_NAME = "models"
diff --git a/mlnode/iotdb/mlnode/algorithm/metric.py b/mlnode/iotdb/mlnode/algorithm/metric.py
new file mode 100644
index 0000000000..e623642191
--- /dev/null
+++ b/mlnode/iotdb/mlnode/algorithm/metric.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from abc import abstractmethod
+
+import numpy as np
+
+all_metrics = ['RSE', 'CORR', 'MAE', 'MSE', 'RMSE', 'MAPE', 'MSPE']
+
+
+class Metric(object):
+ def __call__(self, pred, ground_truth):
+ return self.calculate(pred, ground_truth)
+
+ @abstractmethod
+ def calculate(self, pred, ground_truth):
+ pass
+
+
+class RSE(Metric):
+ def calculate(self, pred, ground_truth):
+ return np.sqrt(np.sum((ground_truth - pred) ** 2)) / np.sqrt(np.sum((ground_truth - ground_truth.mean()) ** 2))
+
+
+class CORR(Metric):
+ def calculate(self, pred, ground_truth):
+ u = ((ground_truth - ground_truth.mean(0)) * (pred - pred.mean(0))).sum(0)
+ d = np.sqrt(((ground_truth - ground_truth.mean(0)) ** 2 * (pred - pred.mean(0)) ** 2).sum(0))
+ return (u / d).mean(-1)
+
+
+class MAE(Metric):
+ def calculate(self, pred, ground_truth):
+ return np.mean(np.abs(pred - ground_truth))
+
+
+class MSE(Metric):
+ def calculate(self, pred, ground_truth):
+ return np.mean((pred - ground_truth) ** 2)
+
+
+class RMSE(Metric):
+ def calculate(self, pred, ground_truth):
+ mse = MSE()
+ return np.sqrt(mse(pred, ground_truth))
+
+
+class MAPE(Metric):
+ def calculate(self, pred, ground_truth):
+ return np.mean(np.abs((pred - ground_truth) / ground_truth))
+
+
+class MSPE(Metric):
+ def calculate(self, pred, ground_truth):
+ return np.mean(np.square((pred - ground_truth) / ground_truth))
diff --git a/mlnode/iotdb/mlnode/constant.py b/mlnode/iotdb/mlnode/algorithm/models/__init__.py
similarity index 81%
copy from mlnode/iotdb/mlnode/constant.py
copy to mlnode/iotdb/mlnode/algorithm/models/__init__.py
index 8a38aa95d8..2a1e720805 100644
--- a/mlnode/iotdb/mlnode/constant.py
+++ b/mlnode/iotdb/mlnode/algorithm/models/__init__.py
@@ -15,9 +15,3 @@
# specific language governing permissions and limitations
# under the License.
#
-
-MLNODE_CONF_DIRECTORY_NAME = "conf"
-MLNODE_CONF_FILE_NAME = "iotdb-mlnode.toml"
-MLNODE_LOG_CONF_FILE_NAME = "logging_config.ini"
-
-MLNODE_MODEL_STORAGE_DIRECTORY_NAME = "models"
diff --git a/mlnode/iotdb/mlnode/constant.py b/mlnode/iotdb/mlnode/algorithm/models/forecast/__init__.py
similarity index 81%
copy from mlnode/iotdb/mlnode/constant.py
copy to mlnode/iotdb/mlnode/algorithm/models/forecast/__init__.py
index 8a38aa95d8..2a1e720805 100644
--- a/mlnode/iotdb/mlnode/constant.py
+++ b/mlnode/iotdb/mlnode/algorithm/models/forecast/__init__.py
@@ -15,9 +15,3 @@
# specific language governing permissions and limitations
# under the License.
#
-
-MLNODE_CONF_DIRECTORY_NAME = "conf"
-MLNODE_CONF_FILE_NAME = "iotdb-mlnode.toml"
-MLNODE_LOG_CONF_FILE_NAME = "logging_config.ini"
-
-MLNODE_MODEL_STORAGE_DIRECTORY_NAME = "models"
diff --git a/mlnode/iotdb/mlnode/algorithm/models/forecast/dlinear.py b/mlnode/iotdb/mlnode/algorithm/models/forecast/dlinear.py
new file mode 100644
index 0000000000..58fb12bf29
--- /dev/null
+++ b/mlnode/iotdb/mlnode/algorithm/models/forecast/dlinear.py
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import argparse
+import math
+
+import torch
+import torch.nn as nn
+
+
+class MovingAverageBlock(nn.Module):
+ """ Moving average block to highlight the trend of time series """
+
+ def __init__(self, kernel_size, stride):
+ super(MovingAverageBlock, self).__init__()
+ self.kernel_size = kernel_size
+ self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=stride, padding=0)
+
+ def forward(self, x):
+ # padding on the both ends of time series
+ front = x[:, 0:1, :].repeat(1, self.kernel_size - 1 - math.floor((self.kernel_size - 1) // 2), 1)
+ end = x[:, -1:, :].repeat(1, math.floor((self.kernel_size - 1) // 2), 1)
+ x = torch.cat([front, x, end], dim=1)
+ x = self.avg(x.permute(0, 2, 1))
+ x = x.permute(0, 2, 1)
+ return x
+
+
+class SeriesDecompositionBlock(nn.Module):
+ """ Series decomposition block """
+
+ def __init__(self, kernel_size):
+ super(SeriesDecompositionBlock, self).__init__()
+ self.moving_avg = MovingAverageBlock(kernel_size, stride=1)
+
+ def forward(self, x):
+ moving_mean = self.moving_avg(x)
+ res = x - moving_mean
+ return res, moving_mean
+
+
+class DLinear(nn.Module):
+ """ Decomposition Linear Model """
+
+ def __init__(
+ self,
+ kernel_size=25,
+ input_len=96,
+ pred_len=96,
+ input_vars=1
+ ):
+ super(DLinear, self).__init__()
+ self.input_len = input_len
+ self.pred_len = pred_len
+ self.kernel_size = kernel_size
+ self.channels = input_vars
+
+ # decomposition Kernel Size
+ self.decomposition = SeriesDecompositionBlock(kernel_size)
+ self.linear_seasonal = nn.Linear(self.input_len, self.pred_len)
+ self.linear_trend = nn.Linear(self.input_len, self.pred_len)
+
+ def forward(self, x, *args):
+ # x: [Batch, Input length, Channel]
+ seasonal_init, trend_init = self.decomposition(x)
+ seasonal_init, trend_init = seasonal_init.permute(0, 2, 1), trend_init.permute(0, 2, 1)
+
+ seasonal_output = self.linear_seasonal(seasonal_init)
+ trend_output = self.linear_trend(trend_init)
+
+ x = seasonal_output + trend_output
+ return x.permute(0, 2, 1) # to [Batch, Output length, Channel]
+
+
+class DLinearIndividual(nn.Module):
+ """ Decomposition Linear Model (individual) """
+
+ def __init__(
+ self,
+ kernel_size=25,
+ input_len=96,
+ pred_len=96,
+ input_vars=1
+ ):
+ super(DLinearIndividual, self).__init__()
+ self.input_len = input_len
+ self.pred_len = pred_len
+ self.kernel_size = kernel_size
+ self.channels = input_vars
+
+ self.decomposition = SeriesDecompositionBlock(kernel_size)
+ self.Linear_Seasonal = nn.ModuleList(
+ [nn.Linear(self.input_len, self.pred_len) for _ in range(self.channels)]
+ )
+ self.Linear_Trend = nn.ModuleList(
+ [nn.Linear(self.input_len, self.pred_len) for _ in range(self.channels)]
+ )
+
+ def forward(self, x, *args):
+ # x: [Batch, Input length, Channel]
+ seasonal_init, trend_init = self.decomposition(x)
+ seasonal_init, trend_init = seasonal_init.permute(0, 2, 1), trend_init.permute(0, 2, 1)
+
+ seasonal_output = torch.zeros([seasonal_init.size(0), seasonal_init.size(1), self.pred_len],
+ dtype=seasonal_init.dtype).to(seasonal_init.device)
+ trend_output = torch.zeros([trend_init.size(0), trend_init.size(1), self.pred_len],
+ dtype=trend_init.dtype).to(trend_init.device)
+ for i, linear_season_layer in enumerate(self.Linear_Seasonal):
+ seasonal_output[:, i, :] = linear_season_layer(seasonal_init[:, i, :])
+ for i, linear_trend_layer in enumerate(self.Linear_Trend):
+ trend_output[:, i, :] = linear_trend_layer(trend_init[:, i, :])
+
+ x = seasonal_output + trend_output
+ return x.permute(0, 2, 1) # to [Batch, Output length, Channel]
+
+
+def dlinear(model_config: argparse.Namespace) -> DLinear:
+ # TODO (@lcy)
+ pass
+
+
+def dlinear_individual(model_config: argparse.Namespace) -> DLinearIndividual:
+ # TODO (@lcy)
+ pass
diff --git a/mlnode/iotdb/mlnode/algorithm/models/forecast/nbeats.py b/mlnode/iotdb/mlnode/algorithm/models/forecast/nbeats.py
new file mode 100644
index 0000000000..0744cd4460
--- /dev/null
+++ b/mlnode/iotdb/mlnode/algorithm/models/forecast/nbeats.py
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import argparse
+from typing import Tuple
+
+import torch
+import torch.nn as nn
+
+
+class GenericBasis(nn.Module):
+ """ Generic basis function """
+
+ def __init__(self, backcast_size: int, forecast_size: int):
+ super().__init__()
+ self.backcast_size = backcast_size
+ self.forecast_size = forecast_size
+
+ def forward(self, theta: torch.Tensor):
+ return theta[:, :self.backcast_size], theta[:, -self.forecast_size:]
+
+
+block_dict = {
+ 'generic': GenericBasis,
+
+ # TODO(@lcy) support more block type
+ # 'trend': TrendBasis,
+ # 'seasonality': SeasonalityBasis,
+}
+
+
+class NBeatsBlock(nn.Module):
+ """ N-BEATS block which takes a basis function as an argument """
+
+ def __init__(self,
+ input_size,
+ theta_size: int,
+ basis_function: nn.Module,
+ layers: int,
+ layer_size: int):
+ """
+ N-BEATS block
+
+ Args:
+ input_size: input sample size
+ theta_size: number of parameters for the basis function
+ basis_function: basis function which takes the parameters and produces backcast and forecast
+ layers: number of layers
+ layer_size: layer size
+ """
+ super().__init__()
+ self.layers = nn.ModuleList([nn.Linear(in_features=input_size, out_features=layer_size)] + [
+ nn.Linear(in_features=layer_size, out_features=layer_size) for _ in range(layers - 1)])
+ self.basis_parameters = nn.Linear(in_features=layer_size, out_features=theta_size)
+ self.basis_function = basis_function
+
+ def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
+ block_input = x
+ for layer in self.layers:
+ block_input = torch.relu(layer(block_input))
+ basis_parameters = self.basis_parameters(block_input)
+ return self.basis_function(basis_parameters)
+
+
+class NBeatsUnivariate(nn.Module):
+ """ N-Beats Model (univariate) """
+
+ def __init__(self, blocks: nn.ModuleList):
+ super().__init__()
+ self.blocks = blocks
+
+ def forward(self, x):
+ residuals = x
+ forecast = None
+ for _, block in enumerate(self.blocks):
+ backcast, block_forecast = block(residuals)
+ residuals = (residuals - backcast)
+ if forecast is None:
+ forecast = block_forecast
+ else:
+ forecast += block_forecast
+ return forecast
+
+
+class NBeats(nn.Module):
+ """ Neural Basis Expansion Analysis Time Series """
+
+ def __init__(
+ self,
+ block_type='generic',
+ d_model=128,
+ inner_layers=4,
+ outer_layers=4,
+ input_len=96,
+ pred_len=96,
+ input_vars=1,
+ ):
+ super(NBeats, self).__init__()
+ self.enc_in = input_vars
+ self.block = block_dict[block_type]
+ self.model = NBeatsUnivariate(
+ torch.nn.ModuleList(
+ [NBeatsBlock(input_size=input_len,
+ theta_size=input_len + pred_len,
+ basis_function=self.block(backcast_size=input_len, forecast_size=pred_len),
+ layers=inner_layers,
+ layer_size=d_model)
+ for _ in range(outer_layers)]
+ )
+ )
+
+ def forward(self, x, *args):
+ # x: [Batch, Input length, Channel]
+ res = []
+ for i in range(self.enc_in):
+ dec_out = self.model(x[:, :, i])
+ res.append(dec_out)
+ return torch.stack(res, dim=-1) # to [Batch, Output length, Channel]
+
+
+def nbeats(model_config: argparse.Namespace) -> NBeats:
+ # TODO (@lcy)
+ pass
diff --git a/mlnode/iotdb/mlnode/client.py b/mlnode/iotdb/mlnode/client.py
index 244b6975c9..aa1536e130 100644
--- a/mlnode/iotdb/mlnode/client.py
+++ b/mlnode/iotdb/mlnode/client.py
@@ -22,7 +22,9 @@ from thrift.Thrift import TException
from thrift.transport import TSocket, TTransport
from iotdb.mlnode.config import config
+from iotdb.mlnode.constant import TSStatusCode
from iotdb.mlnode.log import logger
+from iotdb.mlnode.util import verify_success
from iotdb.thrift.common.ttypes import TEndPoint, TSStatus
from iotdb.thrift.confignode import IConfigNodeRPCService
from iotdb.thrift.confignode.ttypes import TUpdateModelInfoReq
@@ -33,16 +35,6 @@ from iotdb.thrift.datanode.ttypes import (TFetchTimeseriesReq,
from iotdb.thrift.mlnode import IMLNodeRPCService
from iotdb.thrift.mlnode.ttypes import TCreateTrainingTaskReq, TDeleteModelReq
-# status code
-SUCCESS_STATUS = 200
-REDIRECTION_RECOMMEND = 400
-
-
-def verify_success(status: TSStatus, err_msg: str) -> None:
- if status.code != SUCCESS_STATUS:
- logger.warn(err_msg + ", error status is ", status)
- raise RuntimeError(str(status.code) + ": " + status.message)
-
class ClientManager(object):
def __init__(self):
@@ -242,7 +234,7 @@ class ConfigNodeClient(object):
pass
def __update_config_node_leader(self, status: TSStatus) -> bool:
- if status.code == REDIRECTION_RECOMMEND:
+ if status.code == TSStatusCode.REDIRECTION_RECOMMEND:
if status.redirectNode is not None:
self.__config_leader = status.redirectNode
else:
diff --git a/mlnode/iotdb/mlnode/constant.py b/mlnode/iotdb/mlnode/constant.py
index 8a38aa95d8..810d7c261e 100644
--- a/mlnode/iotdb/mlnode/constant.py
+++ b/mlnode/iotdb/mlnode/constant.py
@@ -15,9 +15,18 @@
# specific language governing permissions and limitations
# under the License.
#
+from enum import Enum
MLNODE_CONF_DIRECTORY_NAME = "conf"
MLNODE_CONF_FILE_NAME = "iotdb-mlnode.toml"
MLNODE_LOG_CONF_FILE_NAME = "logging_config.ini"
MLNODE_MODEL_STORAGE_DIRECTORY_NAME = "models"
+
+
+class TSStatusCode(Enum):
+ SUCCESS_STATUS = 200
+ REDIRECTION_RECOMMEND = 400
+
+ def get_status_code(self) -> int:
+ return self.value
diff --git a/mlnode/iotdb/mlnode/exception.py b/mlnode/iotdb/mlnode/exception.py
index 6307909a9a..3907a67d58 100644
--- a/mlnode/iotdb/mlnode/exception.py
+++ b/mlnode/iotdb/mlnode/exception.py
@@ -16,6 +16,7 @@
# under the License.
#
+
class _BaseError(Exception):
"""Base class for exceptions in this module."""
pass
diff --git a/mlnode/iotdb/mlnode/handler.py b/mlnode/iotdb/mlnode/handler.py
index 8a36353d47..d1f21ff517 100644
--- a/mlnode/iotdb/mlnode/handler.py
+++ b/mlnode/iotdb/mlnode/handler.py
@@ -15,28 +15,15 @@
# specific language governing permissions and limitations
# under the License.
#
-from enum import Enum
-from iotdb.thrift.common.ttypes import TSStatus
+from iotdb.mlnode.constant import TSStatusCode
+from iotdb.mlnode.util import get_status
from iotdb.thrift.mlnode import IMLNodeRPCService
from iotdb.thrift.mlnode.ttypes import (TCreateTrainingTaskReq,
TDeleteModelReq, TForecastReq,
TForecastResp)
-class TSStatusCode(Enum):
- SUCCESS_STATUS = 200
-
- def get_status_code(self) -> int:
- return self.value
-
-
-def get_status(status_code: TSStatusCode, message: str) -> TSStatus:
- status = TSStatus(status_code.get_status_code())
- status.message = message
- return status
-
-
class MLNodeRPCServiceHandler(IMLNodeRPCService.Iface):
def __init__(self):
pass
diff --git a/mlnode/iotdb/mlnode/model_storage.py b/mlnode/iotdb/mlnode/storage.py
similarity index 100%
rename from mlnode/iotdb/mlnode/model_storage.py
rename to mlnode/iotdb/mlnode/storage.py
diff --git a/mlnode/iotdb/mlnode/util.py b/mlnode/iotdb/mlnode/util.py
index 8932479c4a..c15e84da11 100644
--- a/mlnode/iotdb/mlnode/util.py
+++ b/mlnode/iotdb/mlnode/util.py
@@ -15,9 +15,10 @@
# specific language governing permissions and limitations
# under the License.
#
+from iotdb.mlnode.constant import TSStatusCode
from iotdb.mlnode.exception import BadNodeUrlError
from iotdb.mlnode.log import logger
-from iotdb.thrift.common.ttypes import TEndPoint
+from iotdb.thrift.common.ttypes import TEndPoint, TSStatus
def parse_endpoint_url(endpoint_url: str) -> TEndPoint:
@@ -45,3 +46,15 @@ def parse_endpoint_url(endpoint_url: str) -> TEndPoint:
except ValueError as e:
logger.warning("Illegal endpoint url format: {} ({})".format(endpoint_url, e))
raise BadNodeUrlError(endpoint_url)
+
+
+def get_status(status_code: TSStatusCode, message: str) -> TSStatus:
+ status = TSStatus(status_code.get_status_code())
+ status.message = message
+ return status
+
+
+def verify_success(status: TSStatus, err_msg: str) -> None:
+ if status.code != TSStatusCode.SUCCESS_STATUS:
+ logger.warn(err_msg + ", error status is ", status)
+ raise RuntimeError(str(status.code) + ": " + status.message)
diff --git a/mlnode/test/test_model_storage.py b/mlnode/test/test_model_storage.py
index 99857db37e..3750c49c2c 100644
--- a/mlnode/test/test_model_storage.py
+++ b/mlnode/test/test_model_storage.py
@@ -23,7 +23,7 @@ import time
import torch.nn as nn
from iotdb.mlnode.config import config
-from iotdb.mlnode.model_storage import model_storage
+from iotdb.mlnode.storage import model_storage
class TestModel(nn.Module):