You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/29 16:28:45 UTC

[iotdb] branch master updated: [IOTDB-5678] Introduce machine learning algorithm libraries on MLNode (#9338)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bfcab297f [IOTDB-5678] Introduce machine learning algorithm libraries on MLNode (#9338)
1bfcab297f is described below

commit 1bfcab297f90562258b8cc2bfe7929556c04cd7c
Author: Yong Liu <li...@gmail.com>
AuthorDate: Thu Mar 30 00:28:38 2023 +0800

    [IOTDB-5678] Introduce machine learning algorithm libraries on MLNode (#9338)
---
 .../mlnode/{constant.py => algorithm/__init__.py}  |   6 -
 mlnode/iotdb/mlnode/algorithm/metric.py            |  69 +++++++++++
 .../{constant.py => algorithm/models/__init__.py}  |   6 -
 .../models/forecast/__init__.py}                   |   6 -
 .../mlnode/algorithm/models/forecast/dlinear.py    | 138 +++++++++++++++++++++
 .../mlnode/algorithm/models/forecast/nbeats.py     | 138 +++++++++++++++++++++
 mlnode/iotdb/mlnode/client.py                      |  14 +--
 mlnode/iotdb/mlnode/constant.py                    |   9 ++
 mlnode/iotdb/mlnode/exception.py                   |   1 +
 mlnode/iotdb/mlnode/handler.py                     |  17 +--
 .../iotdb/mlnode/{model_storage.py => storage.py}  |   0
 mlnode/iotdb/mlnode/util.py                        |  15 ++-
 mlnode/test/test_model_storage.py                  |   2 +-
 13 files changed, 375 insertions(+), 46 deletions(-)

diff --git a/mlnode/iotdb/mlnode/constant.py b/mlnode/iotdb/mlnode/algorithm/__init__.py
similarity index 81%
copy from mlnode/iotdb/mlnode/constant.py
copy to mlnode/iotdb/mlnode/algorithm/__init__.py
index 8a38aa95d8..2a1e720805 100644
--- a/mlnode/iotdb/mlnode/constant.py
+++ b/mlnode/iotdb/mlnode/algorithm/__init__.py
@@ -15,9 +15,3 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
-MLNODE_CONF_DIRECTORY_NAME = "conf"
-MLNODE_CONF_FILE_NAME = "iotdb-mlnode.toml"
-MLNODE_LOG_CONF_FILE_NAME = "logging_config.ini"
-
-MLNODE_MODEL_STORAGE_DIRECTORY_NAME = "models"
diff --git a/mlnode/iotdb/mlnode/algorithm/metric.py b/mlnode/iotdb/mlnode/algorithm/metric.py
new file mode 100644
index 0000000000..e623642191
--- /dev/null
+++ b/mlnode/iotdb/mlnode/algorithm/metric.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from abc import abstractmethod
+
+import numpy as np
+
+all_metrics = ['RSE', 'CORR', 'MAE', 'MSE', 'RMSE', 'MAPE', 'MSPE']
+
+
+class Metric(object):
+    def __call__(self, pred, ground_truth):
+        return self.calculate(pred, ground_truth)
+
+    @abstractmethod
+    def calculate(self, pred, ground_truth):
+        pass
+
+
+class RSE(Metric):
+    def calculate(self, pred, ground_truth):
+        return np.sqrt(np.sum((ground_truth - pred) ** 2)) / np.sqrt(np.sum((ground_truth - ground_truth.mean()) ** 2))
+
+
+class CORR(Metric):
+    def calculate(self, pred, ground_truth):
+        u = ((ground_truth - ground_truth.mean(0)) * (pred - pred.mean(0))).sum(0)
+        d = np.sqrt(((ground_truth - ground_truth.mean(0)) ** 2 * (pred - pred.mean(0)) ** 2).sum(0))
+        return (u / d).mean(-1)
+
+
+class MAE(Metric):
+    def calculate(self, pred, ground_truth):
+        return np.mean(np.abs(pred - ground_truth))
+
+
+class MSE(Metric):
+    def calculate(self, pred, ground_truth):
+        return np.mean((pred - ground_truth) ** 2)
+
+
+class RMSE(Metric):
+    def calculate(self, pred, ground_truth):
+        mse = MSE()
+        return np.sqrt(mse(pred, ground_truth))
+
+
+class MAPE(Metric):
+    def calculate(self, pred, ground_truth):
+        return np.mean(np.abs((pred - ground_truth) / ground_truth))
+
+
+class MSPE(Metric):
+    def calculate(self, pred, ground_truth):
+        return np.mean(np.square((pred - ground_truth) / ground_truth))
diff --git a/mlnode/iotdb/mlnode/constant.py b/mlnode/iotdb/mlnode/algorithm/models/__init__.py
similarity index 81%
copy from mlnode/iotdb/mlnode/constant.py
copy to mlnode/iotdb/mlnode/algorithm/models/__init__.py
index 8a38aa95d8..2a1e720805 100644
--- a/mlnode/iotdb/mlnode/constant.py
+++ b/mlnode/iotdb/mlnode/algorithm/models/__init__.py
@@ -15,9 +15,3 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
-MLNODE_CONF_DIRECTORY_NAME = "conf"
-MLNODE_CONF_FILE_NAME = "iotdb-mlnode.toml"
-MLNODE_LOG_CONF_FILE_NAME = "logging_config.ini"
-
-MLNODE_MODEL_STORAGE_DIRECTORY_NAME = "models"
diff --git a/mlnode/iotdb/mlnode/constant.py b/mlnode/iotdb/mlnode/algorithm/models/forecast/__init__.py
similarity index 81%
copy from mlnode/iotdb/mlnode/constant.py
copy to mlnode/iotdb/mlnode/algorithm/models/forecast/__init__.py
index 8a38aa95d8..2a1e720805 100644
--- a/mlnode/iotdb/mlnode/constant.py
+++ b/mlnode/iotdb/mlnode/algorithm/models/forecast/__init__.py
@@ -15,9 +15,3 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
-MLNODE_CONF_DIRECTORY_NAME = "conf"
-MLNODE_CONF_FILE_NAME = "iotdb-mlnode.toml"
-MLNODE_LOG_CONF_FILE_NAME = "logging_config.ini"
-
-MLNODE_MODEL_STORAGE_DIRECTORY_NAME = "models"
diff --git a/mlnode/iotdb/mlnode/algorithm/models/forecast/dlinear.py b/mlnode/iotdb/mlnode/algorithm/models/forecast/dlinear.py
new file mode 100644
index 0000000000..58fb12bf29
--- /dev/null
+++ b/mlnode/iotdb/mlnode/algorithm/models/forecast/dlinear.py
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import argparse
+import math
+
+import torch
+import torch.nn as nn
+
+
+class MovingAverageBlock(nn.Module):
+    """ Moving average block to highlight the trend of time series """
+
+    def __init__(self, kernel_size, stride):
+        super(MovingAverageBlock, self).__init__()
+        self.kernel_size = kernel_size
+        self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=stride, padding=0)
+
+    def forward(self, x):
+        # padding on the both ends of time series
+        front = x[:, 0:1, :].repeat(1, self.kernel_size - 1 - math.floor((self.kernel_size - 1) // 2), 1)
+        end = x[:, -1:, :].repeat(1, math.floor((self.kernel_size - 1) // 2), 1)
+        x = torch.cat([front, x, end], dim=1)
+        x = self.avg(x.permute(0, 2, 1))
+        x = x.permute(0, 2, 1)
+        return x
+
+
+class SeriesDecompositionBlock(nn.Module):
+    """ Series decomposition block """
+
+    def __init__(self, kernel_size):
+        super(SeriesDecompositionBlock, self).__init__()
+        self.moving_avg = MovingAverageBlock(kernel_size, stride=1)
+
+    def forward(self, x):
+        moving_mean = self.moving_avg(x)
+        res = x - moving_mean
+        return res, moving_mean
+
+
+class DLinear(nn.Module):
+    """ Decomposition Linear Model """
+
+    def __init__(
+            self,
+            kernel_size=25,
+            input_len=96,
+            pred_len=96,
+            input_vars=1
+    ):
+        super(DLinear, self).__init__()
+        self.input_len = input_len
+        self.pred_len = pred_len
+        self.kernel_size = kernel_size
+        self.channels = input_vars
+
+        # decomposition Kernel Size
+        self.decomposition = SeriesDecompositionBlock(kernel_size)
+        self.linear_seasonal = nn.Linear(self.input_len, self.pred_len)
+        self.linear_trend = nn.Linear(self.input_len, self.pred_len)
+
+    def forward(self, x, *args):
+        # x: [Batch, Input length, Channel]
+        seasonal_init, trend_init = self.decomposition(x)
+        seasonal_init, trend_init = seasonal_init.permute(0, 2, 1), trend_init.permute(0, 2, 1)
+
+        seasonal_output = self.linear_seasonal(seasonal_init)
+        trend_output = self.linear_trend(trend_init)
+
+        x = seasonal_output + trend_output
+        return x.permute(0, 2, 1)  # to [Batch, Output length, Channel]
+
+
+class DLinearIndividual(nn.Module):
+    """ Decomposition Linear Model (individual) """
+
+    def __init__(
+            self,
+            kernel_size=25,
+            input_len=96,
+            pred_len=96,
+            input_vars=1
+    ):
+        super(DLinearIndividual, self).__init__()
+        self.input_len = input_len
+        self.pred_len = pred_len
+        self.kernel_size = kernel_size
+        self.channels = input_vars
+
+        self.decomposition = SeriesDecompositionBlock(kernel_size)
+        self.Linear_Seasonal = nn.ModuleList(
+            [nn.Linear(self.input_len, self.pred_len) for _ in range(self.channels)]
+        )
+        self.Linear_Trend = nn.ModuleList(
+            [nn.Linear(self.input_len, self.pred_len) for _ in range(self.channels)]
+        )
+
+    def forward(self, x, *args):
+        # x: [Batch, Input length, Channel]
+        seasonal_init, trend_init = self.decomposition(x)
+        seasonal_init, trend_init = seasonal_init.permute(0, 2, 1), trend_init.permute(0, 2, 1)
+
+        seasonal_output = torch.zeros([seasonal_init.size(0), seasonal_init.size(1), self.pred_len],
+                                      dtype=seasonal_init.dtype).to(seasonal_init.device)
+        trend_output = torch.zeros([trend_init.size(0), trend_init.size(1), self.pred_len],
+                                   dtype=trend_init.dtype).to(trend_init.device)
+        for i, linear_season_layer in enumerate(self.Linear_Seasonal):
+            seasonal_output[:, i, :] = linear_season_layer(seasonal_init[:, i, :])
+        for i, linear_trend_layer in enumerate(self.Linear_Trend):
+            trend_output[:, i, :] = linear_trend_layer(trend_init[:, i, :])
+
+        x = seasonal_output + trend_output
+        return x.permute(0, 2, 1)  # to [Batch, Output length, Channel]
+
+
+def dlinear(model_config: argparse.Namespace) -> DLinear:
+    # TODO (@lcy)
+    pass
+
+
+def dlinear_individual(model_config: argparse.Namespace) -> DLinearIndividual:
+    # TODO (@lcy)
+    pass
diff --git a/mlnode/iotdb/mlnode/algorithm/models/forecast/nbeats.py b/mlnode/iotdb/mlnode/algorithm/models/forecast/nbeats.py
new file mode 100644
index 0000000000..0744cd4460
--- /dev/null
+++ b/mlnode/iotdb/mlnode/algorithm/models/forecast/nbeats.py
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import argparse
+from typing import Tuple
+
+import torch
+import torch.nn as nn
+
+
+class GenericBasis(nn.Module):
+    """ Generic basis function """
+
+    def __init__(self, backcast_size: int, forecast_size: int):
+        super().__init__()
+        self.backcast_size = backcast_size
+        self.forecast_size = forecast_size
+
+    def forward(self, theta: torch.Tensor):
+        return theta[:, :self.backcast_size], theta[:, -self.forecast_size:]
+
+
+block_dict = {
+    'generic': GenericBasis,
+
+    # TODO(@lcy) support more block type
+    # 'trend': TrendBasis,
+    # 'seasonality': SeasonalityBasis,
+}
+
+
+class NBeatsBlock(nn.Module):
+    """ N-BEATS block which takes a basis function as an argument """
+
+    def __init__(self,
+                 input_size,
+                 theta_size: int,
+                 basis_function: nn.Module,
+                 layers: int,
+                 layer_size: int):
+        """
+        N-BEATS block
+
+        Args:
+            input_size: input sample size
+            theta_size:  number of parameters for the basis function
+            basis_function: basis function which takes the parameters and produces backcast and forecast
+            layers: number of layers
+            layer_size: layer size
+        """
+        super().__init__()
+        self.layers = nn.ModuleList([nn.Linear(in_features=input_size, out_features=layer_size)] + [
+            nn.Linear(in_features=layer_size, out_features=layer_size) for _ in range(layers - 1)])
+        self.basis_parameters = nn.Linear(in_features=layer_size, out_features=theta_size)
+        self.basis_function = basis_function
+
+    def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
+        block_input = x
+        for layer in self.layers:
+            block_input = torch.relu(layer(block_input))
+        basis_parameters = self.basis_parameters(block_input)
+        return self.basis_function(basis_parameters)
+
+
+class NBeatsUnivariate(nn.Module):
+    """ N-Beats Model (univariate) """
+
+    def __init__(self, blocks: nn.ModuleList):
+        super().__init__()
+        self.blocks = blocks
+
+    def forward(self, x):
+        residuals = x
+        forecast = None
+        for _, block in enumerate(self.blocks):
+            backcast, block_forecast = block(residuals)
+            residuals = (residuals - backcast)
+            if forecast is None:
+                forecast = block_forecast
+            else:
+                forecast += block_forecast
+        return forecast
+
+
+class NBeats(nn.Module):
+    """ Neural Basis Expansion Analysis Time Series """
+
+    def __init__(
+            self,
+            block_type='generic',
+            d_model=128,
+            inner_layers=4,
+            outer_layers=4,
+            input_len=96,
+            pred_len=96,
+            input_vars=1,
+    ):
+        super(NBeats, self).__init__()
+        self.enc_in = input_vars
+        self.block = block_dict[block_type]
+        self.model = NBeatsUnivariate(
+            torch.nn.ModuleList(
+                [NBeatsBlock(input_size=input_len,
+                             theta_size=input_len + pred_len,
+                             basis_function=self.block(backcast_size=input_len, forecast_size=pred_len),
+                             layers=inner_layers,
+                             layer_size=d_model)
+                 for _ in range(outer_layers)]
+            )
+        )
+
+    def forward(self, x, *args):
+        # x: [Batch, Input length, Channel]
+        res = []
+        for i in range(self.enc_in):
+            dec_out = self.model(x[:, :, i])
+            res.append(dec_out)
+        return torch.stack(res, dim=-1)  # to [Batch, Output length, Channel]
+
+
+def nbeats(model_config: argparse.Namespace) -> NBeats:
+    # TODO (@lcy)
+    pass
diff --git a/mlnode/iotdb/mlnode/client.py b/mlnode/iotdb/mlnode/client.py
index 244b6975c9..aa1536e130 100644
--- a/mlnode/iotdb/mlnode/client.py
+++ b/mlnode/iotdb/mlnode/client.py
@@ -22,7 +22,9 @@ from thrift.Thrift import TException
 from thrift.transport import TSocket, TTransport
 
 from iotdb.mlnode.config import config
+from iotdb.mlnode.constant import TSStatusCode
 from iotdb.mlnode.log import logger
+from iotdb.mlnode.util import verify_success
 from iotdb.thrift.common.ttypes import TEndPoint, TSStatus
 from iotdb.thrift.confignode import IConfigNodeRPCService
 from iotdb.thrift.confignode.ttypes import TUpdateModelInfoReq
@@ -33,16 +35,6 @@ from iotdb.thrift.datanode.ttypes import (TFetchTimeseriesReq,
 from iotdb.thrift.mlnode import IMLNodeRPCService
 from iotdb.thrift.mlnode.ttypes import TCreateTrainingTaskReq, TDeleteModelReq
 
-# status code
-SUCCESS_STATUS = 200
-REDIRECTION_RECOMMEND = 400
-
-
-def verify_success(status: TSStatus, err_msg: str) -> None:
-    if status.code != SUCCESS_STATUS:
-        logger.warn(err_msg + ", error status is ", status)
-        raise RuntimeError(str(status.code) + ": " + status.message)
-
 
 class ClientManager(object):
     def __init__(self):
@@ -242,7 +234,7 @@ class ConfigNodeClient(object):
         pass
 
     def __update_config_node_leader(self, status: TSStatus) -> bool:
-        if status.code == REDIRECTION_RECOMMEND:
+        if status.code == TSStatusCode.REDIRECTION_RECOMMEND:
             if status.redirectNode is not None:
                 self.__config_leader = status.redirectNode
             else:
diff --git a/mlnode/iotdb/mlnode/constant.py b/mlnode/iotdb/mlnode/constant.py
index 8a38aa95d8..810d7c261e 100644
--- a/mlnode/iotdb/mlnode/constant.py
+++ b/mlnode/iotdb/mlnode/constant.py
@@ -15,9 +15,18 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+from enum import Enum
 
 MLNODE_CONF_DIRECTORY_NAME = "conf"
 MLNODE_CONF_FILE_NAME = "iotdb-mlnode.toml"
 MLNODE_LOG_CONF_FILE_NAME = "logging_config.ini"
 
 MLNODE_MODEL_STORAGE_DIRECTORY_NAME = "models"
+
+
+class TSStatusCode(Enum):
+    SUCCESS_STATUS = 200
+    REDIRECTION_RECOMMEND = 400
+
+    def get_status_code(self) -> int:
+        return self.value
diff --git a/mlnode/iotdb/mlnode/exception.py b/mlnode/iotdb/mlnode/exception.py
index 6307909a9a..3907a67d58 100644
--- a/mlnode/iotdb/mlnode/exception.py
+++ b/mlnode/iotdb/mlnode/exception.py
@@ -16,6 +16,7 @@
 # under the License.
 #
 
+
 class _BaseError(Exception):
     """Base class for exceptions in this module."""
     pass
diff --git a/mlnode/iotdb/mlnode/handler.py b/mlnode/iotdb/mlnode/handler.py
index 8a36353d47..d1f21ff517 100644
--- a/mlnode/iotdb/mlnode/handler.py
+++ b/mlnode/iotdb/mlnode/handler.py
@@ -15,28 +15,15 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-from enum import Enum
 
-from iotdb.thrift.common.ttypes import TSStatus
+from iotdb.mlnode.constant import TSStatusCode
+from iotdb.mlnode.util import get_status
 from iotdb.thrift.mlnode import IMLNodeRPCService
 from iotdb.thrift.mlnode.ttypes import (TCreateTrainingTaskReq,
                                         TDeleteModelReq, TForecastReq,
                                         TForecastResp)
 
 
-class TSStatusCode(Enum):
-    SUCCESS_STATUS = 200
-
-    def get_status_code(self) -> int:
-        return self.value
-
-
-def get_status(status_code: TSStatusCode, message: str) -> TSStatus:
-    status = TSStatus(status_code.get_status_code())
-    status.message = message
-    return status
-
-
 class MLNodeRPCServiceHandler(IMLNodeRPCService.Iface):
     def __init__(self):
         pass
diff --git a/mlnode/iotdb/mlnode/model_storage.py b/mlnode/iotdb/mlnode/storage.py
similarity index 100%
rename from mlnode/iotdb/mlnode/model_storage.py
rename to mlnode/iotdb/mlnode/storage.py
diff --git a/mlnode/iotdb/mlnode/util.py b/mlnode/iotdb/mlnode/util.py
index 8932479c4a..c15e84da11 100644
--- a/mlnode/iotdb/mlnode/util.py
+++ b/mlnode/iotdb/mlnode/util.py
@@ -15,9 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+from iotdb.mlnode.constant import TSStatusCode
 from iotdb.mlnode.exception import BadNodeUrlError
 from iotdb.mlnode.log import logger
-from iotdb.thrift.common.ttypes import TEndPoint
+from iotdb.thrift.common.ttypes import TEndPoint, TSStatus
 
 
 def parse_endpoint_url(endpoint_url: str) -> TEndPoint:
@@ -45,3 +46,15 @@ def parse_endpoint_url(endpoint_url: str) -> TEndPoint:
     except ValueError as e:
         logger.warning("Illegal endpoint url format: {} ({})".format(endpoint_url, e))
         raise BadNodeUrlError(endpoint_url)
+
+
+def get_status(status_code: TSStatusCode, message: str) -> TSStatus:
+    status = TSStatus(status_code.get_status_code())
+    status.message = message
+    return status
+
+
+def verify_success(status: TSStatus, err_msg: str) -> None:
+    if status.code != TSStatusCode.SUCCESS_STATUS:
+        logger.warn(err_msg + ", error status is ", status)
+        raise RuntimeError(str(status.code) + ": " + status.message)
diff --git a/mlnode/test/test_model_storage.py b/mlnode/test/test_model_storage.py
index 99857db37e..3750c49c2c 100644
--- a/mlnode/test/test_model_storage.py
+++ b/mlnode/test/test_model_storage.py
@@ -23,7 +23,7 @@ import time
 import torch.nn as nn
 
 from iotdb.mlnode.config import config
-from iotdb.mlnode.model_storage import model_storage
+from iotdb.mlnode.storage import model_storage
 
 
 class TestModel(nn.Module):