You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2021/06/03 11:34:39 UTC
[systemds] 01/04: [SYSTEMDS-2828] Python Multi Return Continuation
This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
commit bfc58f15aec8d65ab01e8488678c6eb9838f19d1
Author: TMaddox <gi...@mailexpress.cc>
AuthorDate: Fri May 7 22:14:03 2021 +0200
[SYSTEMDS-2828] Python Multi Return Continuation
This commit adds support for python multi return continuation.
It give the ability to continue processing variables that are returned
from a function call that return more than one variable.
---
.../python/systemds/context/systemds_context.py | 16 +++-
src/main/python/systemds/operator/__init__.py | 3 +-
src/main/python/systemds/operator/nodes/list.py | 88 +++++++++++++++++
.../python/systemds/operator/operation_node.py | 33 +------
src/main/python/systemds/script_building/dag.py | 41 ++++++--
src/main/python/systemds/script_building/script.py | 36 ++++---
src/main/python/tests/algorithms/test_pca.py | 5 +-
src/main/python/tests/list/test_operations.py | 106 +++++++++++++++++++++
8 files changed, 275 insertions(+), 53 deletions(-)
diff --git a/src/main/python/systemds/context/systemds_context.py b/src/main/python/systemds/context/systemds_context.py
index f7f8199..80db697 100644
--- a/src/main/python/systemds/context/systemds_context.py
+++ b/src/main/python/systemds/context/systemds_context.py
@@ -38,7 +38,7 @@ import numpy as np
import pandas as pd
from py4j.java_gateway import GatewayParameters, JavaGateway
from py4j.protocol import Py4JNetworkError
-from systemds.operator import Frame, Matrix, OperationNode, Scalar, Source
+from systemds.operator import Frame, Matrix, OperationNode, Scalar, Source, List
from systemds.script_building import OutputType
from systemds.utils.consts import VALID_INPUT_TYPES
from systemds.utils.helpers import get_module_dir
@@ -458,3 +458,17 @@ class SystemDSContext(object):
:param print_imported_methods: boolean specifying if the imported methods should be printed.
"""
return Source(self, path, name, print_imported_methods)
+
+ def list(self, *args: Sequence[VALID_INPUT_TYPES], **kwargs: Dict[str, VALID_INPUT_TYPES]) -> 'List':
+ if len(kwargs) != 0 and len(args) != 0:
+ raise Exception("Accepts either args or kwargs")
+ elif len(kwargs) != 0:
+ out = []
+ for key, arg in kwargs.items():
+ out.append((key, OutputType.from_type(arg)))
+ return List(self, 'list', named_input_nodes=kwargs, outputs=out)
+ elif len(args) != 0:
+ out = []
+ for idx, arg in enumerate(args):
+ out.append((f"_{idx}", OutputType.from_type(arg)))
+ return List(self, 'list', unnamed_input_nodes=args, outputs=out)
diff --git a/src/main/python/systemds/operator/__init__.py b/src/main/python/systemds/operator/__init__.py
index fcecc9e..cda9ba2 100644
--- a/src/main/python/systemds/operator/__init__.py
+++ b/src/main/python/systemds/operator/__init__.py
@@ -24,6 +24,7 @@ from systemds.operator.nodes.scalar import Scalar
from systemds.operator.nodes.matrix import Matrix
from systemds.operator.nodes.frame import Frame
from systemds.operator.nodes.source import Source
+from systemds.operator.nodes.list import List
from systemds.operator import algorithm
-__all__ = [OperationNode, algorithm, Scalar, Matrix, Frame, Source]
+__all__ = [OperationNode, algorithm, Scalar, List, Matrix, Frame, Source]
diff --git a/src/main/python/systemds/operator/nodes/list.py b/src/main/python/systemds/operator/nodes/list.py
new file mode 100644
index 0000000..64e37eb
--- /dev/null
+++ b/src/main/python/systemds/operator/nodes/list.py
@@ -0,0 +1,88 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+__all__ = ["List"]
+
+from typing import Dict, Sequence, Tuple, Union, Iterable, List
+
+import numpy as np
+from py4j.java_gateway import JavaObject
+
+from systemds.operator import OperationNode, Matrix
+from systemds.script_building.dag import OutputType
+from systemds.utils.consts import VALID_INPUT_TYPES
+from systemds.utils.converters import numpy_to_matrix_block
+from systemds.utils.helpers import create_params_string
+
+
+class List(OperationNode):
+
+ def __init__(self, sds_context: 'SystemDSContext', operation: str,
+ unnamed_input_nodes: Union[str, Iterable[VALID_INPUT_TYPES]] = None,
+ named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+ outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)]):
+
+ is_python_local_data = False
+ self._outputs = outputs
+ self._named_output_nodes = {}
+ for idx, output in enumerate(outputs):
+ if output[1] == OutputType.MATRIX:
+ self.named_output_nodes[output[0]] = Matrix(sds_context, operation='list', named_input_nodes={f"_{idx}": self})
+ # TODO add output types
+
+ super().__init__(sds_context, operation, unnamed_input_nodes,
+ named_input_nodes, OutputType.LIST, is_python_local_data)
+
+ def __getitem__(self, key):
+ if isinstance(key, int):
+ return self.named_output_nodes[self._outputs[key][0]]
+ return self.named_output_nodes[key]
+
+ def pass_python_data_to_prepared_script(self, sds, var_name: str, prepared_script: JavaObject) -> None:
+ assert self.is_python_local_data, 'Can only pass data to prepared script if it is python local!'
+ if self._is_numpy():
+ prepared_script.setMatrix(var_name, numpy_to_matrix_block(
+ sds, self._np_array), True) # True for reuse
+
+ def __parse_output_result_list(self, result_variables):
+ result_var = []
+ named_output_nodes_types_list = [type(named_output_node).__name__ for named_output_node in list(self.named_output_nodes.values())]
+ for idx, v in enumerate(self._script.out_var_name):
+ if named_output_nodes_types_list[idx] == "Matrix":
+ result_var.append(self.__parse_output_result_matrix(result_variables, v))
+ elif named_output_nodes_types_list[idx] == "Frame":
+ result_var.append(self.__parse_output_result_frame(result_variables, v))
+ else:
+ result_var.append(result_variables.getDouble(self._script.out_var_name[idx]))
+ return result_var
+
+ def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
+ named_input_vars: Dict[str, str]) -> str:
+
+ inputs_comma_sep = create_params_string(unnamed_input_vars, named_input_vars)
+ output = "["
+ for idx, output_node in enumerate(self.named_output_nodes):
+ output += f'{var_name}_{idx},'
+ output = output[:-1] + "]"
+ return f'{output}={self.operation}({inputs_comma_sep});'
+
+ def compute(self, verbose: bool = False, lineage: bool = False) -> Union[np.array]:
+ return super().compute(verbose, lineage)
diff --git a/src/main/python/systemds/operator/operation_node.py b/src/main/python/systemds/operator/operation_node.py
index 3f98598..6dcd56c 100644
--- a/src/main/python/systemds/operator/operation_node.py
+++ b/src/main/python/systemds/operator/operation_node.py
@@ -50,9 +50,7 @@ class OperationNode(DAGNode):
Iterable[VALID_INPUT_TYPES]] = None,
named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
output_type: OutputType = OutputType.MATRIX,
- is_python_local_data: bool = False,
- number_of_outputs=1,
- output_types: Iterable[OutputType] = None):
+ is_python_local_data: bool = False):
"""
Create general `OperationNode`
@@ -81,10 +79,9 @@ class OperationNode(DAGNode):
self._result_var = None
self._lineage_trace = None
self._script = None
- self._number_of_outputs = number_of_outputs
- self._output_types = output_types
self._source_node = None
self._already_added = False
+ self.dml_name = ""
def compute(self, verbose: bool = False, lineage: bool = False) -> \
Union[float, np.array, Tuple[Union[float, np.array], str]]:
@@ -138,21 +135,6 @@ class OperationNode(DAGNode):
self.sds_context, result_variables.getFrameBlock(var_name)
)
- def __parse_output_result_list(self, result_variables):
- result_var = []
- for idx, v in enumerate(self._script.out_var_name):
- if(self._output_types == None or self._output_types[idx] == OutputType.MATRIX):
- result_var.append(
- self.__parse_output_result_matrix(result_variables, v))
- elif self._output_types[idx] == OutputType.FRAME:
- result_var.append(
- self.__parse_output_result_frame(result_variables, v))
-
- else:
- result_var.append(result_variables.getDouble(
- self._script.out_var_name[idx]))
- return result_var
-
def get_lineage_trace(self) -> str:
"""Get the lineage trace for this node.
@@ -174,16 +156,9 @@ class OperationNode(DAGNode):
unnamed_input_vars) == 2, 'Binary Operations need exactly two input variables'
return f'{var_name}={unnamed_input_vars[0]}{self.operation}{unnamed_input_vars[1]}'
- inputs_comma_sep = create_params_string(
- unnamed_input_vars, named_input_vars)
+ inputs_comma_sep = create_params_string(unnamed_input_vars, named_input_vars)
- if self.output_type == OutputType.LIST:
- output = "["
- for idx in range(self._number_of_outputs):
- output += f'{var_name}_{idx},'
- output = output[:-1] + "]"
- return f'{output}={self.operation}({inputs_comma_sep});'
- elif self.output_type == OutputType.NONE:
+ if self.output_type == OutputType.NONE:
return f'{self.operation}({inputs_comma_sep});'
# elif self.output_type == OutputType.ASSIGN:
# return f'{var_name}={self.operation};'
diff --git a/src/main/python/systemds/script_building/dag.py b/src/main/python/systemds/script_building/dag.py
index 3abdc74..2ca2e8f 100644
--- a/src/main/python/systemds/script_building/dag.py
+++ b/src/main/python/systemds/script_building/dag.py
@@ -24,6 +24,8 @@ from enum import Enum, auto
from typing import TYPE_CHECKING, Any, Dict, Sequence, Union, Optional
from py4j.java_gateway import JavaObject, JVMView
+
+import systemds.operator
from systemds.utils.consts import VALID_INPUT_TYPES
if TYPE_CHECKING:
@@ -68,18 +70,37 @@ class OutputType(Enum):
return OutputType.NONE
+ @staticmethod
+ def from_type(obj):
+ if obj is not None:
+ if isinstance(obj, systemds.operator.Matrix):
+ return OutputType.MATRIX
+ elif isinstance(obj, systemds.operator.Frame):
+ return OutputType.FRAME
+ elif isinstance(obj, systemds.operator.Scalar):
+ return OutputType.SCALAR
+ elif isinstance(obj, float): # TODO is this correct?
+ return OutputType.DOUBLE
+ elif isinstance(obj, str):
+ return OutputType.STRING
+ elif isinstance(obj, systemds.operator.List):
+ return OutputType.LIST
+
+ return OutputType.NONE
+
class DAGNode(ABC):
"""A Node in the directed-acyclic-graph (DAG) defining all operations."""
sds_context: 'SystemDSContext'
_unnamed_input_nodes: Sequence[Union['DAGNode', str, int, float, bool]]
_named_input_nodes: Dict[str, Union['DAGNode', str, int, float, bool]]
+ _named_output_nodes: Dict[str, Union['DAGNode', str, int, float, bool]]
_source_node: Optional["DAGNode"]
_output_type: OutputType
_script: Optional["DMLScript"]
_is_python_local_data: bool
- _number_of_outputs: int
_already_added: bool
+ _dml_name: str
def compute(self, verbose: bool = False, lineage: bool = False) -> Any:
"""Get result of this operation. Builds the dml script and executes it in SystemDS, before this method is called
@@ -126,12 +147,12 @@ class DAGNode(ABC):
return self._named_input_nodes
@property
- def is_python_local_data(self):
- return self._is_python_local_data
+ def named_output_nodes(self):
+ return self._named_output_nodes
@property
- def number_of_outputs(self):
- return self._number_of_outputs
+ def is_python_local_data(self):
+ return self._is_python_local_data
@property
def output_type(self):
@@ -147,4 +168,12 @@ class DAGNode(ABC):
@property
def script_str(self):
- return self._script.dml_script
\ No newline at end of file
+ return self._script.dml_script
+
+ @property
+ def dml_name(self):
+ return self._dml_name
+
+ @dml_name.setter
+ def dml_name(self, value):
+ self._dml_name = value
diff --git a/src/main/python/systemds/script_building/script.py b/src/main/python/systemds/script_building/script.py
index eb145b0..3eed51c 100644
--- a/src/main/python/systemds/script_building/script.py
+++ b/src/main/python/systemds/script_building/script.py
@@ -158,10 +158,10 @@ class DMLScript:
:param dag_root: the topmost operation of our DAG, result of operation will be output
"""
baseOutVarString = self._dfs_dag_nodes(dag_root)
- if(dag_root.output_type != OutputType.NONE):
- if(dag_root.number_of_outputs > 1):
+ if dag_root.output_type != OutputType.NONE:
+ if dag_root.output_type == OutputType.LIST:
self.out_var_name = []
- for idx in range(dag_root.number_of_outputs):
+ for idx, output_node in enumerate(dag_root.named_output_nodes):
self.add_code(
f'write({baseOutVarString}_{idx}, \'./tmp_{idx}\');')
self.out_var_name.append(f'{baseOutVarString}_{idx}')
@@ -179,31 +179,37 @@ class DMLScript:
if isinstance(dag_node, bool):
return 'TRUE' if dag_node else 'FALSE'
return str(dag_node)
+
+ if dag_node.dml_name != "":
+ return dag_node.dml_name
+
if dag_node._output_type == OutputType.IMPORT:
if not dag_node.already_added:
self.add_code(dag_node.code_line(None, None))
return None
+
if dag_node._source_node is not None:
self._dfs_dag_nodes(dag_node._source_node)
# for each node do the dfs operation and save the variable names in `input_var_names`
# get variable names of unnamed parameters
- unnamed_input_vars = [self._dfs_dag_nodes(
- input_node) for input_node in dag_node.unnamed_input_nodes]
-
- # get variable names of named parameters
- named_input_vars = {name: self._dfs_dag_nodes(input_node) for name, input_node in
- dag_node.named_input_nodes.items()}
+ unnamed_input_vars = [self._dfs_dag_nodes(input_node) for input_node in dag_node.unnamed_input_nodes]
- curr_var_name = self._next_unique_var()
+ named_input_vars = {}
+ for name, input_node in dag_node.named_input_nodes.items():
+ named_input_vars[name] = self._dfs_dag_nodes(input_node)
+ if isinstance(input_node, DAGNode) and input_node._output_type == OutputType.LIST:
+ dag_node.dml_name = named_input_vars[name] + name
+ return dag_node.dml_name
+
+ dag_node.dml_name = self._next_unique_var()
if dag_node.is_python_local_data:
- self.add_input_from_python(curr_var_name, dag_node)
-
- code_line = dag_node.code_line(
- curr_var_name, unnamed_input_vars, named_input_vars)
+ self.add_input_from_python(dag_node.dml_name, dag_node)
+
+ code_line = dag_node.code_line(dag_node.dml_name, unnamed_input_vars, named_input_vars)
self.add_code(code_line)
- return curr_var_name
+ return dag_node.dml_name
def _next_unique_var(self) -> str:
"""Gets the next unique variable name
diff --git a/src/main/python/tests/algorithms/test_pca.py b/src/main/python/tests/algorithms/test_pca.py
index bf5bb8c..0f774c0 100644
--- a/src/main/python/tests/algorithms/test_pca.py
+++ b/src/main/python/tests/algorithms/test_pca.py
@@ -25,6 +25,9 @@ import numpy as np
from systemds.context import SystemDSContext
from systemds.operator.algorithm import pca
+from systemds.operator import List
+from systemds.script_building.dag import OutputType
+
class TestPCA(unittest.TestCase):
@@ -48,7 +51,7 @@ class TestPCA(unittest.TestCase):
m1 = self.generate_matrices_for_pca(30, seed=1304)
X = self.sds.from_numpy( m1)
# print(features)
- [res, model, _, _ ] = pca(X, K=1, scale="FALSE", center="FALSE").compute()
+ [res, model, _, _] = pca(X, K=1, scale="FALSE", center="FALSE").compute()
for (x, y) in zip(m1, res):
self.assertTrue((x[0] > 0 and y > 0) or (x[0] < 0 and y < 0))
diff --git a/src/main/python/tests/list/test_operations.py b/src/main/python/tests/list/test_operations.py
new file mode 100644
index 0000000..818042f
--- /dev/null
+++ b/src/main/python/tests/list/test_operations.py
@@ -0,0 +1,106 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import unittest
+
+import numpy as np
+from systemds.context import SystemDSContext
+from systemds.operator.algorithm import pca
+
+from systemds.operator import List
+from systemds.script_building.dag import OutputType
+
+
+class TestListOperations(unittest.TestCase):
+
+ sds: SystemDSContext = None
+
+ @classmethod
+ def setUpClass(cls):
+ cls.sds = SystemDSContext()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.sds.close()
+
+ def test_creation(self):
+ """
+ Tests the creation of a List object via the SystemDSContext
+ """
+ m1 = self.sds.from_numpy(np.array([1, 2, 3]))
+ m2 = self.sds.from_numpy(np.array([4, 5, 6]))
+ list_obj = self.sds.list(m1, m2)
+ tmp = list_obj[0] + list_obj[1]
+ res = tmp.compute()
+ self.assertTrue(np.allclose(m2, res))
+
+ def test_addition(self):
+ """
+ Tests the creation of a List object via the SystemDSContext and adds a value
+ """
+ m1 = self.sds.from_numpy(np.array([1, 2, 3]))
+ m2 = self.sds.from_numpy(np.array([4, 5, 6]))
+ list_obj = self.sds.list(m1, m2)
+ tmp = list_obj[0] + 2
+ res = tmp.compute()
+ self.assertTrue(np.allclose(m2 + 2, res))
+
+ def test_500x2b(self):
+ """
+ The purpose of this test is to show that an operation can be performed on the output of a multi output list node,
+ without the need of calculating the result first.
+ """
+ m1 = self.generate_matrices_for_pca(30, seed=1304)
+ node0 = self.sds.from_numpy(m1)
+ # print(features)
+ node1 = List(node0.sds_context, 'pca', named_input_nodes={"X": node0, "K": 1, "scale": "FALSE", "center": "FALSE"},
+ outputs=[("res", OutputType.MATRIX), ("model", OutputType.MATRIX), ("scale", OutputType.MATRIX), ("center", OutputType.MATRIX)])
+ node2 = node1["res"].abs()
+ res = node2.compute(verbose=False)
+
+ def test_multiple_outputs(self):
+ """
+ The purpose of this test is to show that we can use multiple outputs
+ of a single list node in the DAG in one script
+ """
+ node0 = self.sds.from_numpy(np.array([1, 2, 3, 4, 5, 6, 7, 8, 9]))
+ node1 = self.sds.from_numpy(np.array([10, 20, 30, 40, 50, 60, 70, 80, 90]))
+ params_dict = {'X': node0, 'Y': node1}
+ node2 = List(self.sds, 'split', named_input_nodes=params_dict,
+ outputs=[("X_train", OutputType.MATRIX), ("X_test", OutputType.MATRIX), ("Y_train", OutputType.MATRIX), ("Y_test", OutputType.MATRIX)])
+ node3 = node2["X_train"] + node2["Y_train"]
+ # X_train and Y_train are of the same shape because node0 and node1 have both only one dimension.
+ # Therefore they can be added together
+ res = node3.compute(verbose=False)
+
+ def generate_matrices_for_pca(self, dims: int, seed: int = 1234):
+ np.random.seed(seed)
+
+ mu, sigma = 0, 0.1
+ s = np.random.normal(mu, sigma, dims)
+
+ m1 = np.array(np.c_[np.copy(s) * 1, np.copy(s)*0.3], dtype=np.double)
+
+ return m1
+
+
+if __name__ == "__main__":
+ unittest.main(exit=False)