You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2021/06/03 11:34:39 UTC

[systemds] 01/04: [SYSTEMDS-2828] Python Multi Return Continuation

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit bfc58f15aec8d65ab01e8488678c6eb9838f19d1
Author: TMaddox <gi...@mailexpress.cc>
AuthorDate: Fri May 7 22:14:03 2021 +0200

    [SYSTEMDS-2828] Python Multi Return Continuation
    
    This commit adds support for python multi return continuation.
    It give the ability to continue processing variables that are returned
    from a function call that return more than one variable.
---
 .../python/systemds/context/systemds_context.py    |  16 +++-
 src/main/python/systemds/operator/__init__.py      |   3 +-
 src/main/python/systemds/operator/nodes/list.py    |  88 +++++++++++++++++
 .../python/systemds/operator/operation_node.py     |  33 +------
 src/main/python/systemds/script_building/dag.py    |  41 ++++++--
 src/main/python/systemds/script_building/script.py |  36 ++++---
 src/main/python/tests/algorithms/test_pca.py       |   5 +-
 src/main/python/tests/list/test_operations.py      | 106 +++++++++++++++++++++
 8 files changed, 275 insertions(+), 53 deletions(-)

diff --git a/src/main/python/systemds/context/systemds_context.py b/src/main/python/systemds/context/systemds_context.py
index f7f8199..80db697 100644
--- a/src/main/python/systemds/context/systemds_context.py
+++ b/src/main/python/systemds/context/systemds_context.py
@@ -38,7 +38,7 @@ import numpy as np
 import pandas as pd
 from py4j.java_gateway import GatewayParameters, JavaGateway
 from py4j.protocol import Py4JNetworkError
-from systemds.operator import Frame, Matrix, OperationNode, Scalar, Source
+from systemds.operator import Frame, Matrix, OperationNode, Scalar, Source, List
 from systemds.script_building import OutputType
 from systemds.utils.consts import VALID_INPUT_TYPES
 from systemds.utils.helpers import get_module_dir
@@ -458,3 +458,17 @@ class SystemDSContext(object):
         :param print_imported_methods: boolean specifying if the imported methods should be printed.
         """
         return Source(self, path, name, print_imported_methods)
+
+    def list(self, *args: Sequence[VALID_INPUT_TYPES], **kwargs: Dict[str, VALID_INPUT_TYPES]) -> 'List':
+        if len(kwargs) != 0 and len(args) != 0:
+            raise Exception("Accepts either args or kwargs")
+        elif len(kwargs) != 0:
+            out = []
+            for key, arg in kwargs.items():
+                out.append((key, OutputType.from_type(arg)))
+            return List(self, 'list', named_input_nodes=kwargs, outputs=out)
+        elif len(args) != 0:
+            out = []
+            for idx, arg in enumerate(args):
+                out.append((f"_{idx}", OutputType.from_type(arg)))
+            return List(self, 'list', unnamed_input_nodes=args, outputs=out)
diff --git a/src/main/python/systemds/operator/__init__.py b/src/main/python/systemds/operator/__init__.py
index fcecc9e..cda9ba2 100644
--- a/src/main/python/systemds/operator/__init__.py
+++ b/src/main/python/systemds/operator/__init__.py
@@ -24,6 +24,7 @@ from systemds.operator.nodes.scalar import Scalar
 from systemds.operator.nodes.matrix import Matrix
 from systemds.operator.nodes.frame import Frame
 from systemds.operator.nodes.source import Source
+from systemds.operator.nodes.list import List
 from systemds.operator import algorithm
 
-__all__ = [OperationNode, algorithm, Scalar, Matrix, Frame, Source]
+__all__ = [OperationNode, algorithm, Scalar, List, Matrix, Frame, Source]
diff --git a/src/main/python/systemds/operator/nodes/list.py b/src/main/python/systemds/operator/nodes/list.py
new file mode 100644
index 0000000..64e37eb
--- /dev/null
+++ b/src/main/python/systemds/operator/nodes/list.py
@@ -0,0 +1,88 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+__all__ = ["List"]
+
+from typing import Dict, Sequence, Tuple, Union, Iterable, List
+
+import numpy as np
+from py4j.java_gateway import JavaObject
+
+from systemds.operator import OperationNode, Matrix
+from systemds.script_building.dag import OutputType
+from systemds.utils.consts import VALID_INPUT_TYPES
+from systemds.utils.converters import numpy_to_matrix_block
+from systemds.utils.helpers import create_params_string
+
+
+class List(OperationNode):
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Union[str, Iterable[VALID_INPUT_TYPES]] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)]):
+
+        is_python_local_data = False
+        self._outputs = outputs
+        self._named_output_nodes = {}
+        for idx, output in enumerate(outputs):
+            if output[1] == OutputType.MATRIX:
+                self.named_output_nodes[output[0]] = Matrix(sds_context, operation='list', named_input_nodes={f"_{idx}": self})
+                # TODO add output types
+
+        super().__init__(sds_context, operation, unnamed_input_nodes,
+                         named_input_nodes, OutputType.LIST, is_python_local_data)
+
+    def __getitem__(self, key):
+        if isinstance(key, int):
+            return self.named_output_nodes[self._outputs[key][0]]
+        return self.named_output_nodes[key]
+
+    def pass_python_data_to_prepared_script(self, sds, var_name: str, prepared_script: JavaObject) -> None:
+        assert self.is_python_local_data, 'Can only pass data to prepared script if it is python local!'
+        if self._is_numpy():
+            prepared_script.setMatrix(var_name, numpy_to_matrix_block(
+                sds, self._np_array), True)  # True for reuse
+
+    def __parse_output_result_list(self, result_variables):
+        result_var = []
+        named_output_nodes_types_list = [type(named_output_node).__name__ for named_output_node in list(self.named_output_nodes.values())]
+        for idx, v in enumerate(self._script.out_var_name):
+            if named_output_nodes_types_list[idx] == "Matrix":
+                result_var.append(self.__parse_output_result_matrix(result_variables, v))
+            elif named_output_nodes_types_list[idx] == "Frame":
+                result_var.append(self.__parse_output_result_frame(result_variables, v))
+            else:
+                result_var.append(result_variables.getDouble(self._script.out_var_name[idx]))
+        return result_var
+
+    def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
+                  named_input_vars: Dict[str, str]) -> str:
+
+        inputs_comma_sep = create_params_string(unnamed_input_vars, named_input_vars)
+        output = "["
+        for idx, output_node in enumerate(self.named_output_nodes):
+            output += f'{var_name}_{idx},'
+        output = output[:-1] + "]"
+        return f'{output}={self.operation}({inputs_comma_sep});'
+
+    def compute(self, verbose: bool = False, lineage: bool = False) -> Union[np.array]:
+        return super().compute(verbose, lineage)
diff --git a/src/main/python/systemds/operator/operation_node.py b/src/main/python/systemds/operator/operation_node.py
index 3f98598..6dcd56c 100644
--- a/src/main/python/systemds/operator/operation_node.py
+++ b/src/main/python/systemds/operator/operation_node.py
@@ -50,9 +50,7 @@ class OperationNode(DAGNode):
                                             Iterable[VALID_INPUT_TYPES]] = None,
                  named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
                  output_type: OutputType = OutputType.MATRIX,
-                 is_python_local_data: bool = False,
-                 number_of_outputs=1,
-                 output_types: Iterable[OutputType] = None):
+                 is_python_local_data: bool = False):
                  
         """
         Create general `OperationNode`
@@ -81,10 +79,9 @@ class OperationNode(DAGNode):
         self._result_var = None
         self._lineage_trace = None
         self._script = None
-        self._number_of_outputs = number_of_outputs
-        self._output_types = output_types
         self._source_node = None
         self._already_added = False
+        self.dml_name = ""
 
     def compute(self, verbose: bool = False, lineage: bool = False) -> \
             Union[float, np.array, Tuple[Union[float, np.array], str]]:
@@ -138,21 +135,6 @@ class OperationNode(DAGNode):
             self.sds_context, result_variables.getFrameBlock(var_name)
         )
 
-    def __parse_output_result_list(self, result_variables):
-        result_var = []
-        for idx, v in enumerate(self._script.out_var_name):
-            if(self._output_types == None or self._output_types[idx] == OutputType.MATRIX):
-                result_var.append(
-                    self.__parse_output_result_matrix(result_variables, v))
-            elif self._output_types[idx] == OutputType.FRAME:
-                result_var.append(
-                    self.__parse_output_result_frame(result_variables, v))
-
-            else:
-                result_var.append(result_variables.getDouble(
-                    self._script.out_var_name[idx]))
-        return result_var
-
     def get_lineage_trace(self) -> str:
         """Get the lineage trace for this node.
 
@@ -174,16 +156,9 @@ class OperationNode(DAGNode):
                 unnamed_input_vars) == 2, 'Binary Operations need exactly two input variables'
             return f'{var_name}={unnamed_input_vars[0]}{self.operation}{unnamed_input_vars[1]}'
 
-        inputs_comma_sep = create_params_string(
-            unnamed_input_vars, named_input_vars)
+        inputs_comma_sep = create_params_string(unnamed_input_vars, named_input_vars)
 
-        if self.output_type == OutputType.LIST:
-            output = "["
-            for idx in range(self._number_of_outputs):
-                output += f'{var_name}_{idx},'
-            output = output[:-1] + "]"
-            return f'{output}={self.operation}({inputs_comma_sep});'
-        elif self.output_type == OutputType.NONE:
+        if self.output_type == OutputType.NONE:
             return f'{self.operation}({inputs_comma_sep});'
         # elif self.output_type == OutputType.ASSIGN:
         #     return f'{var_name}={self.operation};'
diff --git a/src/main/python/systemds/script_building/dag.py b/src/main/python/systemds/script_building/dag.py
index 3abdc74..2ca2e8f 100644
--- a/src/main/python/systemds/script_building/dag.py
+++ b/src/main/python/systemds/script_building/dag.py
@@ -24,6 +24,8 @@ from enum import Enum, auto
 from typing import TYPE_CHECKING, Any, Dict, Sequence, Union, Optional
 
 from py4j.java_gateway import JavaObject, JVMView
+
+import systemds.operator
 from systemds.utils.consts import VALID_INPUT_TYPES
 
 if TYPE_CHECKING:
@@ -68,18 +70,37 @@ class OutputType(Enum):
 
         return OutputType.NONE
 
+    @staticmethod
+    def from_type(obj):
+        if obj is not None:
+            if isinstance(obj, systemds.operator.Matrix):
+                return OutputType.MATRIX
+            elif isinstance(obj, systemds.operator.Frame):
+                return OutputType.FRAME
+            elif isinstance(obj, systemds.operator.Scalar):
+                return OutputType.SCALAR
+            elif isinstance(obj, float):  # TODO is this correct?
+                return OutputType.DOUBLE
+            elif isinstance(obj, str):
+                return OutputType.STRING
+            elif isinstance(obj, systemds.operator.List):
+                return OutputType.LIST
+
+        return OutputType.NONE
+
 
 class DAGNode(ABC):
     """A Node in the directed-acyclic-graph (DAG) defining all operations."""
     sds_context: 'SystemDSContext'
     _unnamed_input_nodes: Sequence[Union['DAGNode', str, int, float, bool]]
     _named_input_nodes: Dict[str, Union['DAGNode', str, int, float, bool]]
+    _named_output_nodes: Dict[str, Union['DAGNode', str, int, float, bool]]
     _source_node: Optional["DAGNode"]
     _output_type: OutputType
     _script: Optional["DMLScript"]
     _is_python_local_data: bool
-    _number_of_outputs: int
     _already_added: bool
+    _dml_name: str
 
     def compute(self, verbose: bool = False, lineage: bool = False) -> Any:
         """Get result of this operation. Builds the dml script and executes it in SystemDS, before this method is called
@@ -126,12 +147,12 @@ class DAGNode(ABC):
         return self._named_input_nodes
 
     @property
-    def is_python_local_data(self):
-        return self._is_python_local_data
+    def named_output_nodes(self):
+        return self._named_output_nodes
 
     @property
-    def number_of_outputs(self):
-        return self._number_of_outputs
+    def is_python_local_data(self):
+        return self._is_python_local_data
 
     @property
     def output_type(self):
@@ -147,4 +168,12 @@ class DAGNode(ABC):
 
     @property
     def script_str(self):
-        return self._script.dml_script
\ No newline at end of file
+        return self._script.dml_script
+
+    @property
+    def dml_name(self):
+        return self._dml_name
+
+    @dml_name.setter
+    def dml_name(self, value):
+        self._dml_name = value
diff --git a/src/main/python/systemds/script_building/script.py b/src/main/python/systemds/script_building/script.py
index eb145b0..3eed51c 100644
--- a/src/main/python/systemds/script_building/script.py
+++ b/src/main/python/systemds/script_building/script.py
@@ -158,10 +158,10 @@ class DMLScript:
         :param dag_root: the topmost operation of our DAG, result of operation will be output
         """
         baseOutVarString = self._dfs_dag_nodes(dag_root)
-        if(dag_root.output_type != OutputType.NONE):
-            if(dag_root.number_of_outputs > 1):
+        if dag_root.output_type != OutputType.NONE:
+            if dag_root.output_type == OutputType.LIST:
                 self.out_var_name = []
-                for idx in range(dag_root.number_of_outputs):
+                for idx, output_node in enumerate(dag_root.named_output_nodes):
                     self.add_code(
                         f'write({baseOutVarString}_{idx}, \'./tmp_{idx}\');')
                     self.out_var_name.append(f'{baseOutVarString}_{idx}')
@@ -179,31 +179,37 @@ class DMLScript:
             if isinstance(dag_node, bool):
                 return 'TRUE' if dag_node else 'FALSE'
             return str(dag_node)
+
+        if dag_node.dml_name != "":
+            return dag_node.dml_name
+
         if dag_node._output_type == OutputType.IMPORT:
             if not dag_node.already_added:
                 self.add_code(dag_node.code_line(None, None))
             return None
+
         if dag_node._source_node is not None:
             self._dfs_dag_nodes(dag_node._source_node)
         # for each node do the dfs operation and save the variable names in `input_var_names`
         # get variable names of unnamed parameters
 
-        unnamed_input_vars = [self._dfs_dag_nodes(
-            input_node) for input_node in dag_node.unnamed_input_nodes]
-        
-        # get variable names of named parameters
-        named_input_vars = {name: self._dfs_dag_nodes(input_node) for name, input_node in
-                            dag_node.named_input_nodes.items()}
+        unnamed_input_vars = [self._dfs_dag_nodes(input_node) for input_node in dag_node.unnamed_input_nodes]
 
-        curr_var_name = self._next_unique_var()
+        named_input_vars = {}
+        for name, input_node in dag_node.named_input_nodes.items():
+            named_input_vars[name] = self._dfs_dag_nodes(input_node)
+            if isinstance(input_node, DAGNode) and input_node._output_type == OutputType.LIST:
+                dag_node.dml_name = named_input_vars[name] + name
+                return dag_node.dml_name
+
+        dag_node.dml_name = self._next_unique_var()
 
         if dag_node.is_python_local_data:
-            self.add_input_from_python(curr_var_name, dag_node)
-        
-        code_line = dag_node.code_line(
-            curr_var_name, unnamed_input_vars, named_input_vars)
+            self.add_input_from_python(dag_node.dml_name, dag_node)
+
+        code_line = dag_node.code_line(dag_node.dml_name, unnamed_input_vars, named_input_vars)
         self.add_code(code_line)
-        return curr_var_name
+        return dag_node.dml_name
 
     def _next_unique_var(self) -> str:
         """Gets the next unique variable name
diff --git a/src/main/python/tests/algorithms/test_pca.py b/src/main/python/tests/algorithms/test_pca.py
index bf5bb8c..0f774c0 100644
--- a/src/main/python/tests/algorithms/test_pca.py
+++ b/src/main/python/tests/algorithms/test_pca.py
@@ -25,6 +25,9 @@ import numpy as np
 from systemds.context import SystemDSContext
 from systemds.operator.algorithm import pca
 
+from systemds.operator import List
+from systemds.script_building.dag import OutputType
+
 
 class TestPCA(unittest.TestCase):
 
@@ -48,7 +51,7 @@ class TestPCA(unittest.TestCase):
         m1 = self.generate_matrices_for_pca(30, seed=1304)
         X = self.sds.from_numpy( m1)
         # print(features)
-        [res, model, _, _ ] = pca(X, K=1, scale="FALSE", center="FALSE").compute()
+        [res, model, _, _] = pca(X, K=1, scale="FALSE", center="FALSE").compute()
         for (x, y) in zip(m1, res):
             self.assertTrue((x[0] > 0 and y > 0) or (x[0] < 0 and y < 0))
 
diff --git a/src/main/python/tests/list/test_operations.py b/src/main/python/tests/list/test_operations.py
new file mode 100644
index 0000000..818042f
--- /dev/null
+++ b/src/main/python/tests/list/test_operations.py
@@ -0,0 +1,106 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import unittest
+
+import numpy as np
+from systemds.context import SystemDSContext
+from systemds.operator.algorithm import pca
+
+from systemds.operator import List
+from systemds.script_building.dag import OutputType
+
+
+class TestListOperations(unittest.TestCase):
+
+    sds: SystemDSContext = None
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+
+    def test_creation(self):
+        """
+        Tests the creation of a List object via the SystemDSContext
+        """
+        m1 = self.sds.from_numpy(np.array([1, 2, 3]))
+        m2 = self.sds.from_numpy(np.array([4, 5, 6]))
+        list_obj = self.sds.list(m1, m2)
+        tmp = list_obj[0] + list_obj[1]
+        res = tmp.compute()
+        self.assertTrue(np.allclose(m2, res))
+
+    def test_addition(self):
+        """
+        Tests the creation of a List object via the SystemDSContext and adds a value
+        """
+        m1 = self.sds.from_numpy(np.array([1, 2, 3]))
+        m2 = self.sds.from_numpy(np.array([4, 5, 6]))
+        list_obj = self.sds.list(m1, m2)
+        tmp = list_obj[0] + 2
+        res = tmp.compute()
+        self.assertTrue(np.allclose(m2 + 2, res))
+
+    def test_500x2b(self):
+        """
+        The purpose of this test is to show that an operation can be performed on the output of a multi output list node,
+        without the need of calculating the result first.
+        """
+        m1 = self.generate_matrices_for_pca(30, seed=1304)
+        node0 = self.sds.from_numpy(m1)
+        # print(features)
+        node1 = List(node0.sds_context, 'pca', named_input_nodes={"X": node0, "K": 1, "scale": "FALSE", "center": "FALSE"},
+                     outputs=[("res", OutputType.MATRIX), ("model", OutputType.MATRIX), ("scale", OutputType.MATRIX), ("center", OutputType.MATRIX)])
+        node2 = node1["res"].abs()
+        res = node2.compute(verbose=False)
+
+    def test_multiple_outputs(self):
+        """
+        The purpose of this test is to show that we can use multiple outputs
+        of a single list node in the DAG in one script
+        """
+        node0 = self.sds.from_numpy(np.array([1, 2, 3, 4, 5, 6, 7, 8, 9]))
+        node1 = self.sds.from_numpy(np.array([10, 20, 30, 40, 50, 60, 70, 80, 90]))
+        params_dict = {'X': node0, 'Y': node1}
+        node2 = List(self.sds, 'split', named_input_nodes=params_dict,
+                     outputs=[("X_train", OutputType.MATRIX), ("X_test", OutputType.MATRIX), ("Y_train", OutputType.MATRIX), ("Y_test", OutputType.MATRIX)])
+        node3 = node2["X_train"] + node2["Y_train"]
+        # X_train and Y_train are of the same shape because node0 and node1 have both only one dimension.
+        # Therefore they can be added together
+        res = node3.compute(verbose=False)
+
+    def generate_matrices_for_pca(self, dims: int, seed: int = 1234):
+        np.random.seed(seed)
+
+        mu, sigma = 0, 0.1
+        s = np.random.normal(mu, sigma,  dims)
+
+        m1 = np.array(np.c_[np.copy(s) * 1, np.copy(s)*0.3], dtype=np.double)
+
+        return m1
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)