You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@systemds.apache.org by GitBox <gi...@apache.org> on 2021/04/20 22:09:01 UTC

[GitHub] [systemds] TMaddox opened a new pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

TMaddox opened a new pull request #1234:
URL: https://github.com/apache/systemds/pull/1234


   WIP - Do not merge!
   
   Allow an OperationNode with multiple outputs to be part of the DAG while not being the root node.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] TMaddox commented on pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
TMaddox commented on pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#issuecomment-827012889


   I see, it seems like I misunderstood the assignment and as I haven't got the insight in the lib like you do so I am afraid I can't comment/discuss that with you, but what you say definitely makes sense.
   
   Besides that I would like to respond to one comment of yours:
   
   > One of the reasons why this is important to separate, is because many operations supported directly on a OperationNode is not valid on a "MultiNode", like for instance min.
   > Rather than having to introduce an if else inside the min function (if we chose the universal approach) trowing an invalid exception, i would prefer not having that if else.
   
   My approach to solve this via the universal node approach was, that the first output will become the standard, meaning if not specified the first output will be used for all operations. So if I call the operation min on a multi node it returns the min of the first output node. A downside of this proposed solution is that it probably won't be as strait forward for a user as your approach in my opinion, as the user needs to know that the first output node is the standard output.
   
   I will change it so there is a separate MultiNode.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] TMaddox commented on a change in pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
TMaddox commented on a change in pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#discussion_r619851415



##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)],

Review comment:
       If you let me implement a universal OperationNode class I won't be able to rely on a second node class, since there is only one




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] TMaddox commented on a change in pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
TMaddox commented on a change in pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#discussion_r619852374



##########
File path: src/main/python/systemds/script_building/script.py
##########
@@ -178,24 +178,30 @@ def _dfs_dag_nodes(self, dag_node: VALID_INPUT_TYPES) -> str:
         :param dag_node: current DAG node
         :return: the variable name the current DAG node operation created
         """
+
         if not isinstance(dag_node, DAGNode):
             if isinstance(dag_node, bool):
                 return 'TRUE' if dag_node else 'FALSE'
             return str(dag_node)
-        # for each node do the dfs operation and save the variable names in `input_var_names`
+
+        if dag_node.dml_name != "":
+            return dag_node.dml_name
+
         # get variable names of unnamed parameters
-        unnamed_input_vars = [self._dfs_dag_nodes(
-            input_node) for input_node in dag_node.unnamed_input_nodes]
+        unnamed_input_vars = [self._dfs_dag_nodes(input_node) for input_node in dag_node.unnamed_input_nodes]
         # get variable names of named parameters
-        named_input_vars = {name: self._dfs_dag_nodes(input_node) for name, input_node in
-                            dag_node.named_input_nodes.items()}
-        curr_var_name = self._next_unique_var()
+        named_input_vars = {}
+        for name, input_node in dag_node.named_input_nodes.items():
+            named_input_vars[name] = self._dfs_dag_nodes(input_node)
+            if isinstance(input_node, DAGNode) and len(input_node.output_nodes) > 1:
+                named_input_vars[name] = named_input_vars[name] + name

Review comment:
       in your scripts you use [V2_0, V2_1] as a result of operations with multiple outputs. I am leveraging this to reduce complexity in the built scripts. The alternative would be to do this: V3 = V2_0; V4=V2_1, which increases the amount of variables in the scripts
   
   name in this context ([V2_0, V2_1]) is the name of the output node edge, which is _0 and _1 in this case 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] Baunsgaard commented on a change in pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
Baunsgaard commented on a change in pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#discussion_r617303102



##########
File path: src/main/python/systemds/operator/__init__.py
##########
@@ -20,6 +20,7 @@
 # -------------------------------------------------------------
 
 from systemds.operator.operation_node import OperationNode
+from systemds.operator.operation_node2 import OperationNode2
 from systemds.operator import algorithm
 
-__all__ = [OperationNode, algorithm]
+__all__ = [OperationNode, OperationNode2, algorithm]

Review comment:
       maybe call it MultiNode.

##########
File path: src/main/python/systemds/matrix/matrix.py
##########
@@ -25,15 +25,15 @@
 import numpy as np
 from py4j.java_gateway import JavaObject, JVMView
 from systemds.context import SystemDSContext
-from systemds.operator import OperationNode
+from systemds.operator import OperationNode2
 from systemds.utils.consts import VALID_INPUT_TYPES
 from systemds.utils.converters import numpy_to_matrix_block
 
 # TODO maybe instead of having a new class we could have a function `matrix` instead, adding behavior to
-#  `OperationNode` would be necessary
+#  `OperationNode2` would be necessary
 
 
-class Matrix(OperationNode):
+class Matrix(OperationNode2):

Review comment:
       It should not be necessary to change matrix at all, i would like it to stay as a OperationNode
   since it is the 'lowest' primitive.

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)],

Review comment:
       Make it a list of tuples of Operation Nodes, and leverage the previous operation node to construct the individual returns.

##########
File path: src/main/python/systemds/script_building/script.py
##########
@@ -178,24 +178,30 @@ def _dfs_dag_nodes(self, dag_node: VALID_INPUT_TYPES) -> str:
         :param dag_node: current DAG node
         :return: the variable name the current DAG node operation created
         """
+
         if not isinstance(dag_node, DAGNode):
             if isinstance(dag_node, bool):
                 return 'TRUE' if dag_node else 'FALSE'
             return str(dag_node)
-        # for each node do the dfs operation and save the variable names in `input_var_names`
+
+        if dag_node.dml_name != "":
+            return dag_node.dml_name
+
         # get variable names of unnamed parameters
-        unnamed_input_vars = [self._dfs_dag_nodes(
-            input_node) for input_node in dag_node.unnamed_input_nodes]
+        unnamed_input_vars = [self._dfs_dag_nodes(input_node) for input_node in dag_node.unnamed_input_nodes]
         # get variable names of named parameters
-        named_input_vars = {name: self._dfs_dag_nodes(input_node) for name, input_node in
-                            dag_node.named_input_nodes.items()}
-        curr_var_name = self._next_unique_var()
+        named_input_vars = {}
+        for name, input_node in dag_node.named_input_nodes.items():
+            named_input_vars[name] = self._dfs_dag_nodes(input_node)
+            if isinstance(input_node, DAGNode) and len(input_node.output_nodes) > 1:
+                named_input_vars[name] = named_input_vars[name] + name

Review comment:
       this here seems oddly designed,
   if multi return, append the name again?
   
   can you explain?

##########
File path: src/main/python/systemds/script_building/dag.py
##########
@@ -45,9 +45,10 @@ class DAGNode(ABC):
     sds_context: 'SystemDSContext'
     _unnamed_input_nodes: Sequence[Union['DAGNode', str, int, float, bool]]
     _named_input_nodes: Dict[str, Union['DAGNode', str, int, float, bool]]
-    _output_type: OutputType
+    _outputs: List[Tuple[str, OutputType]]
+    _output_nodes: Dict[str, Union['DAGNode']]
     _is_python_local_data: bool
-    _number_of_outputs: int
+    _dml_name: str

Review comment:
       this seems bad because it change the allocation to a List rather than single element, maybe we should have a multi dag node. 
   
   If we chose to have a multi dag node then make two classes one called: SingleDAGNode another MultiDAGNode that both inherit from DAGNode.
   
   opinion?
   
   

##########
File path: src/main/python/tests/algorithms/test_pca.py
##########
@@ -49,10 +51,38 @@ def test_500x2(self):
         m1 = self.generate_matrices_for_pca(30, seed=1304)
         X = Matrix(self.sds, m1)
         # print(features)
-        [res, model, _, _ ] = pca(X, K=1, scale="FALSE", center="FALSE").compute()
+        [res, model, _, _] = pca(X, K=1, scale="FALSE", center="FALSE").compute()
         for (x, y) in zip(m1, res):
             self.assertTrue((x[0] > 0 and y > 0) or (x[0] < 0 and y < 0))
 
+    def test_500x2b(self):
+        """
+        This test constructs a line of values in 2d space. 
+        That if fit correctly maps perfectly to 1d space.
+        The check is simply if the input value was positive
+        then the output value should be similar.
+        """
+        m1 = self.generate_matrices_for_pca(30, seed=1304)
+        X = Matrix(self.sds, m1)
+        # print(features)
+        X._check_matrix_op()
+        params_dict = {'X': X, 'K': 1, 'scale': "FALSE", 'center': "FALSE"}
+        nodeC = OperationNode2(self.sds, 'pca', named_input_nodes=params_dict,
+                               outputs=[("res", OutputType.MATRIX), ("model", OutputType.MATRIX), ("scale", OutputType.MATRIX), ("center", OutputType.MATRIX)])
+        nodeD = OperationNode2(self.sds, 'abs', unnamed_input_nodes=[nodeC.output_nodes["model"]])

Review comment:
       In the end the tests should reflect what a user should write, therefore in this case
   The goal become these three:
   
   
   ```python
   [res, model, _, _] = pca(...)
   res = abs(res)
   ```
   
   or
   
   ```python
   m= pca(...)
   res = abs(m[0])
   ```
   
   or
   
   
   ```python
   m= pca(...)
   res = abs(m["model"])
   ```
   
   Now this is challenging since it require some change in our generated algorithms.
   Maybe to keep the project simple, you could make a new test file and do the following:
   
   ```python
   X = self.sds.rand(rows=1, cols=2, seed = 3)
   Y = self.sds.rand(rows=2, cols=1, seed=3)
   // I want the default to be unnamed input nodes if the argument is a list,
   // or if it is a dictionary named input nodes.
   M = multinode(sds, 'list', [X,Y])
   print(M[1]).compute()
   ```
   this should result in a script if you execute with  looking something like:
   ```dml
   X = rand(rows = 1, cols = 2,  seed = 3)
   Y = rand(rows = 2, cols = 1, seed = 3)
   m = list(X,Y)
   print(toString(m[1]))
   ```
   
   If it is not clear please say so :)

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)],
+                 is_python_local_data: bool = False,
+                 shape: Tuple[int] = ()):
+        """
+        Create general `OperationNode2`
+
+        :param sds_context: The SystemDS context for performing the operations
+        :param operation: The name of the DML function to execute
+        :param unnamed_input_nodes: inputs identified by their position, not name
+        :param named_input_nodes: inputs with their respective parameter name
+        :param output_type: type of the output in DML (double, matrix etc.)
+        :param is_python_local_data: if the data is local in python e.g. Numpy arrays
+        :param number_of_outputs: If set to other value than 1 then it is expected
+            that this operation node returns multiple values. If set remember to set the output_types value as well.
+        :param output_types: The types of output in a multi output scenario.
+            Default is None, and means every multi output is a matrix.
+        """
+        self.sds_context = sds_context
+        self.shape = shape
+        if unnamed_input_nodes is None:
+            unnamed_input_nodes = []
+        if named_input_nodes is None:
+            named_input_nodes = {}
+        self.operation = operation
+        self._unnamed_input_nodes = unnamed_input_nodes
+        self._named_input_nodes = named_input_nodes
+        self._outputs = outputs
+        self._output_nodes = {}
+        if self.operation is not None:
+            for idx, output in enumerate(outputs):
+                if output[1] == OutputType.MATRIX:
+                    self.output_nodes[output[0]] = OperationNode2(sds_context, operation=None, named_input_nodes={f"_{idx}": self})
+                    # TODO add output types
+        self._is_python_local_data = is_python_local_data
+        self._result_var = None
+        self._lineage_trace = None
+        self._script = None
+        self.dml_name = ""
+
+    def compute(self, verbose: bool = False, lineage: bool = False) -> \
+            Union[float, np.array, Tuple[Union[float, np.array], str]]:
+
+        if self._result_var is None or self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            if verbose:
+                print("SCRIPT:")
+                print(self._script.dml_script)
+
+            if lineage:
+                result_variables, self._lineage_trace = self._script.execute_with_lineage()
+            else:
+                result_variables = self._script.execute()
+
+            self._result_var = self.__parse_output_result_variables(result_variables)
+
+        if verbose:
+            for x in self.sds_context.get_stdout():
+                print(x)
+            for y in self.sds_context.get_stderr():
+                print(y)
+
+        if lineage:
+            return self._result_var, self._lineage_trace
+        else:
+            return self._result_var
+
+    def __parse_output_result_variables(self, result_variables):
+        result_var = []
+        for idx, v in enumerate(self._script.out_var_name):
+            if self.outputs[idx][1] == OutputType.MATRIX:
+                result_var.append(self.__parse_output_result_matrix(result_variables, v))
+            else:
+                result_var.append(result_variables.getDouble(self._script.out_var_name[idx]))
+        return result_var
+
+    def __parse_output_result_matrix(self, result_variables, var_name):
+        return matrix_block_to_numpy(self.sds_context.java_gateway.jvm,
+                                     result_variables.getMatrixBlock(var_name))
+
+    def get_lineage_trace(self) -> str:
+        """Get the lineage trace for this node.
+
+        :return: Lineage trace
+        """
+        if self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            self._lineage_trace = self._script.get_lineage()
+
+        return self._lineage_trace
+
+    def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
+                  named_input_vars: Dict[str, str]) -> str:
+        if self.operation is None:
+            return f'{var_name}={list(named_input_vars.values())[0]}'
+
+        if self.operation in BINARY_OPERATIONS:
+            assert len(
+                named_input_vars) == 0, 'Named parameters can not be used with binary operations'
+            assert len(
+                unnamed_input_vars) == 2, 'Binary Operations need exactly two input variables'
+            return f'{var_name}={unnamed_input_vars[0]}{self.operation}{unnamed_input_vars[1]}'
+
+        inputs_comma_sep = create_params_string(unnamed_input_vars, named_input_vars)
+
+        if len(self._outputs) > 1:
+            output = "["
+            for idx in range(len(self._outputs)):
+                output += f'{var_name}_{idx},'
+            output = output[:-1] + "]"
+            return f'{output}={self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.NONE:
+            return f'{self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.ASSIGN:
+            return f'{var_name}={self.operation};'
+        else:
+            return f'{var_name}={self.operation}({inputs_comma_sep});'
+
+    def pass_python_data_to_prepared_script(self, jvm: JVMView, var_name: str, prepared_script: JavaObject) -> None:
+        raise NotImplementedError(
+            'Operation node has no python local data. Missing implementation in derived class?')
+
+    def _check_matrix_op(self):
+        """Perform checks to assure operation is allowed to be performed on data type of this `OperationNode2`
+
+        :raise: AssertionError
+        """
+        assert len(self._outputs) == 1 and self._outputs[0][1] == OutputType.MATRIX, f'{self.operation} only supported for matrices'
+
+    def __add__(self, other: VALID_ARITHMETIC_TYPES) -> 'OperationNode2':
+        return OperationNode2(self.sds_context, '+', [self, other], shape=self.shape)

Review comment:
       most of these are invalid, since you cant + a value on a list of heterogeneous outputs.
   I would suggest going through them and validate if they can be done.

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)],
+                 is_python_local_data: bool = False,

Review comment:
       It will never be needed that it is pythonlocal data, since it should contain OperationNodes.

##########
File path: src/main/python/systemds/script_building/script.py
##########
@@ -178,24 +178,30 @@ def _dfs_dag_nodes(self, dag_node: VALID_INPUT_TYPES) -> str:
         :param dag_node: current DAG node
         :return: the variable name the current DAG node operation created
         """
+
         if not isinstance(dag_node, DAGNode):
             if isinstance(dag_node, bool):
                 return 'TRUE' if dag_node else 'FALSE'
             return str(dag_node)
-        # for each node do the dfs operation and save the variable names in `input_var_names`
+
+        if dag_node.dml_name != "":
+            return dag_node.dml_name

Review comment:
       nice, early termination !

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)],
+                 is_python_local_data: bool = False,
+                 shape: Tuple[int] = ()):
+        """
+        Create general `OperationNode2`
+
+        :param sds_context: The SystemDS context for performing the operations
+        :param operation: The name of the DML function to execute
+        :param unnamed_input_nodes: inputs identified by their position, not name
+        :param named_input_nodes: inputs with their respective parameter name
+        :param output_type: type of the output in DML (double, matrix etc.)
+        :param is_python_local_data: if the data is local in python e.g. Numpy arrays
+        :param number_of_outputs: If set to other value than 1 then it is expected
+            that this operation node returns multiple values. If set remember to set the output_types value as well.
+        :param output_types: The types of output in a multi output scenario.
+            Default is None, and means every multi output is a matrix.
+        """
+        self.sds_context = sds_context
+        self.shape = shape
+        if unnamed_input_nodes is None:
+            unnamed_input_nodes = []
+        if named_input_nodes is None:
+            named_input_nodes = {}
+        self.operation = operation
+        self._unnamed_input_nodes = unnamed_input_nodes
+        self._named_input_nodes = named_input_nodes
+        self._outputs = outputs
+        self._output_nodes = {}
+        if self.operation is not None:
+            for idx, output in enumerate(outputs):
+                if output[1] == OutputType.MATRIX:
+                    self.output_nodes[output[0]] = OperationNode2(sds_context, operation=None, named_input_nodes={f"_{idx}": self})
+                    # TODO add output types
+        self._is_python_local_data = is_python_local_data
+        self._result_var = None
+        self._lineage_trace = None
+        self._script = None
+        self.dml_name = ""
+
+    def compute(self, verbose: bool = False, lineage: bool = False) -> \
+            Union[float, np.array, Tuple[Union[float, np.array], str]]:
+
+        if self._result_var is None or self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            if verbose:
+                print("SCRIPT:")
+                print(self._script.dml_script)
+
+            if lineage:
+                result_variables, self._lineage_trace = self._script.execute_with_lineage()
+            else:
+                result_variables = self._script.execute()
+
+            self._result_var = self.__parse_output_result_variables(result_variables)
+
+        if verbose:
+            for x in self.sds_context.get_stdout():
+                print(x)
+            for y in self.sds_context.get_stderr():
+                print(y)
+
+        if lineage:
+            return self._result_var, self._lineage_trace
+        else:
+            return self._result_var
+
+    def __parse_output_result_variables(self, result_variables):
+        result_var = []
+        for idx, v in enumerate(self._script.out_var_name):
+            if self.outputs[idx][1] == OutputType.MATRIX:
+                result_var.append(self.__parse_output_result_matrix(result_variables, v))
+            else:
+                result_var.append(result_variables.getDouble(self._script.out_var_name[idx]))
+        return result_var
+
+    def __parse_output_result_matrix(self, result_variables, var_name):
+        return matrix_block_to_numpy(self.sds_context.java_gateway.jvm,
+                                     result_variables.getMatrixBlock(var_name))
+
+    def get_lineage_trace(self) -> str:
+        """Get the lineage trace for this node.
+
+        :return: Lineage trace
+        """
+        if self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            self._lineage_trace = self._script.get_lineage()
+
+        return self._lineage_trace
+
+    def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
+                  named_input_vars: Dict[str, str]) -> str:
+        if self.operation is None:
+            return f'{var_name}={list(named_input_vars.values())[0]}'
+
+        if self.operation in BINARY_OPERATIONS:
+            assert len(
+                named_input_vars) == 0, 'Named parameters can not be used with binary operations'
+            assert len(
+                unnamed_input_vars) == 2, 'Binary Operations need exactly two input variables'
+            return f'{var_name}={unnamed_input_vars[0]}{self.operation}{unnamed_input_vars[1]}'
+
+        inputs_comma_sep = create_params_string(unnamed_input_vars, named_input_vars)
+
+        if len(self._outputs) > 1:
+            output = "["
+            for idx in range(len(self._outputs)):
+                output += f'{var_name}_{idx},'
+            output = output[:-1] + "]"
+            return f'{output}={self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.NONE:
+            return f'{self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.ASSIGN:
+            return f'{var_name}={self.operation};'
+        else:
+            return f'{var_name}={self.operation}({inputs_comma_sep});'
+
+    def pass_python_data_to_prepared_script(self, jvm: JVMView, var_name: str, prepared_script: JavaObject) -> None:
+        raise NotImplementedError(
+            'Operation node has no python local data. Missing implementation in derived class?')
+
+    def _check_matrix_op(self):
+        """Perform checks to assure operation is allowed to be performed on data type of this `OperationNode2`
+
+        :raise: AssertionError
+        """
+        assert len(self._outputs) == 1 and self._outputs[0][1] == OutputType.MATRIX, f'{self.operation} only supported for matrices'
+
+    def __add__(self, other: VALID_ARITHMETIC_TYPES) -> 'OperationNode2':
+        return OperationNode2(self.sds_context, '+', [self, other], shape=self.shape)

Review comment:
       Actually double checking all operations from pass_python_data_to_prepared_script are invalid.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] TMaddox commented on a change in pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
TMaddox commented on a change in pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#discussion_r619850306



##########
File path: src/main/python/systemds/matrix/matrix.py
##########
@@ -25,15 +25,15 @@
 import numpy as np
 from py4j.java_gateway import JavaObject, JVMView
 from systemds.context import SystemDSContext
-from systemds.operator import OperationNode
+from systemds.operator import OperationNode2
 from systemds.utils.consts import VALID_INPUT_TYPES
 from systemds.utils.converters import numpy_to_matrix_block
 
 # TODO maybe instead of having a new class we could have a function `matrix` instead, adding behavior to
-#  `OperationNode` would be necessary
+#  `OperationNode2` would be necessary
 
 
-class Matrix(OperationNode):
+class Matrix(OperationNode2):

Review comment:
       The matrix class isn't changed at all, I just need it to inherit from the temporary OperationNode class, which is called OperationNode2

##########
File path: src/main/python/systemds/operator/__init__.py
##########
@@ -20,6 +20,7 @@
 # -------------------------------------------------------------
 
 from systemds.operator.operation_node import OperationNode
+from systemds.operator.operation_node2 import OperationNode2
 from systemds.operator import algorithm
 
-__all__ = [OperationNode, algorithm]
+__all__ = [OperationNode, OperationNode2, algorithm]

Review comment:
       The OperationNode2 is a temporary class which will be deleted after resolving this issue. You suggested that I create a clone of OperationNode and make my changes in this class. All changes should be applied directly in OperationNode after you give me the ok.

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)],
+                 is_python_local_data: bool = False,

Review comment:
       My code makes OperationNode universal enough to handle all cases (multi and normal), there is no class "MultiNode" (for example) needed

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)],

Review comment:
       If you let me implement a universal OperationNode class I won't be able to rely on a second node class, since there is only one

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)],
+                 is_python_local_data: bool = False,
+                 shape: Tuple[int] = ()):
+        """
+        Create general `OperationNode2`
+
+        :param sds_context: The SystemDS context for performing the operations
+        :param operation: The name of the DML function to execute
+        :param unnamed_input_nodes: inputs identified by their position, not name
+        :param named_input_nodes: inputs with their respective parameter name
+        :param output_type: type of the output in DML (double, matrix etc.)
+        :param is_python_local_data: if the data is local in python e.g. Numpy arrays
+        :param number_of_outputs: If set to other value than 1 then it is expected
+            that this operation node returns multiple values. If set remember to set the output_types value as well.
+        :param output_types: The types of output in a multi output scenario.
+            Default is None, and means every multi output is a matrix.
+        """
+        self.sds_context = sds_context
+        self.shape = shape
+        if unnamed_input_nodes is None:
+            unnamed_input_nodes = []
+        if named_input_nodes is None:
+            named_input_nodes = {}
+        self.operation = operation
+        self._unnamed_input_nodes = unnamed_input_nodes
+        self._named_input_nodes = named_input_nodes
+        self._outputs = outputs
+        self._output_nodes = {}
+        if self.operation is not None:
+            for idx, output in enumerate(outputs):
+                if output[1] == OutputType.MATRIX:
+                    self.output_nodes[output[0]] = OperationNode2(sds_context, operation=None, named_input_nodes={f"_{idx}": self})
+                    # TODO add output types
+        self._is_python_local_data = is_python_local_data
+        self._result_var = None
+        self._lineage_trace = None
+        self._script = None
+        self.dml_name = ""
+
+    def compute(self, verbose: bool = False, lineage: bool = False) -> \
+            Union[float, np.array, Tuple[Union[float, np.array], str]]:
+
+        if self._result_var is None or self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            if verbose:
+                print("SCRIPT:")
+                print(self._script.dml_script)
+
+            if lineage:
+                result_variables, self._lineage_trace = self._script.execute_with_lineage()
+            else:
+                result_variables = self._script.execute()
+
+            self._result_var = self.__parse_output_result_variables(result_variables)
+
+        if verbose:
+            for x in self.sds_context.get_stdout():
+                print(x)
+            for y in self.sds_context.get_stderr():
+                print(y)
+
+        if lineage:
+            return self._result_var, self._lineage_trace
+        else:
+            return self._result_var
+
+    def __parse_output_result_variables(self, result_variables):
+        result_var = []
+        for idx, v in enumerate(self._script.out_var_name):
+            if self.outputs[idx][1] == OutputType.MATRIX:
+                result_var.append(self.__parse_output_result_matrix(result_variables, v))
+            else:
+                result_var.append(result_variables.getDouble(self._script.out_var_name[idx]))
+        return result_var
+
+    def __parse_output_result_matrix(self, result_variables, var_name):
+        return matrix_block_to_numpy(self.sds_context.java_gateway.jvm,
+                                     result_variables.getMatrixBlock(var_name))
+
+    def get_lineage_trace(self) -> str:
+        """Get the lineage trace for this node.
+
+        :return: Lineage trace
+        """
+        if self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            self._lineage_trace = self._script.get_lineage()
+
+        return self._lineage_trace
+
+    def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
+                  named_input_vars: Dict[str, str]) -> str:
+        if self.operation is None:
+            return f'{var_name}={list(named_input_vars.values())[0]}'
+
+        if self.operation in BINARY_OPERATIONS:
+            assert len(
+                named_input_vars) == 0, 'Named parameters can not be used with binary operations'
+            assert len(
+                unnamed_input_vars) == 2, 'Binary Operations need exactly two input variables'
+            return f'{var_name}={unnamed_input_vars[0]}{self.operation}{unnamed_input_vars[1]}'
+
+        inputs_comma_sep = create_params_string(unnamed_input_vars, named_input_vars)
+
+        if len(self._outputs) > 1:
+            output = "["
+            for idx in range(len(self._outputs)):
+                output += f'{var_name}_{idx},'
+            output = output[:-1] + "]"
+            return f'{output}={self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.NONE:
+            return f'{self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.ASSIGN:
+            return f'{var_name}={self.operation};'
+        else:
+            return f'{var_name}={self.operation}({inputs_comma_sep});'
+
+    def pass_python_data_to_prepared_script(self, jvm: JVMView, var_name: str, prepared_script: JavaObject) -> None:
+        raise NotImplementedError(
+            'Operation node has no python local data. Missing implementation in derived class?')
+
+    def _check_matrix_op(self):
+        """Perform checks to assure operation is allowed to be performed on data type of this `OperationNode2`
+
+        :raise: AssertionError
+        """
+        assert len(self._outputs) == 1 and self._outputs[0][1] == OutputType.MATRIX, f'{self.operation} only supported for matrices'
+
+    def __add__(self, other: VALID_ARITHMETIC_TYPES) -> 'OperationNode2':
+        return OperationNode2(self.sds_context, '+', [self, other], shape=self.shape)

Review comment:
       will do, thanks

##########
File path: src/main/python/systemds/script_building/dag.py
##########
@@ -45,9 +45,10 @@ class DAGNode(ABC):
     sds_context: 'SystemDSContext'
     _unnamed_input_nodes: Sequence[Union['DAGNode', str, int, float, bool]]
     _named_input_nodes: Dict[str, Union['DAGNode', str, int, float, bool]]
-    _output_type: OutputType
+    _outputs: List[Tuple[str, OutputType]]
+    _output_nodes: Dict[str, Union['DAGNode']]
     _is_python_local_data: bool
-    _number_of_outputs: int
+    _dml_name: str

Review comment:
       I can create this structure if you want, but in my opinion a single universal class that can handle any cases has less complexity and is thus easier to maintain. The variable _dml_name is needed so I can perform DFS on the DAG without evaluating paths more than once in case more than one output of a node are used as input in the DAG

##########
File path: src/main/python/systemds/script_building/script.py
##########
@@ -178,24 +178,30 @@ def _dfs_dag_nodes(self, dag_node: VALID_INPUT_TYPES) -> str:
         :param dag_node: current DAG node
         :return: the variable name the current DAG node operation created
         """
+
         if not isinstance(dag_node, DAGNode):
             if isinstance(dag_node, bool):
                 return 'TRUE' if dag_node else 'FALSE'
             return str(dag_node)
-        # for each node do the dfs operation and save the variable names in `input_var_names`
+
+        if dag_node.dml_name != "":
+            return dag_node.dml_name

Review comment:
       :)

##########
File path: src/main/python/systemds/script_building/script.py
##########
@@ -178,24 +178,30 @@ def _dfs_dag_nodes(self, dag_node: VALID_INPUT_TYPES) -> str:
         :param dag_node: current DAG node
         :return: the variable name the current DAG node operation created
         """
+
         if not isinstance(dag_node, DAGNode):
             if isinstance(dag_node, bool):
                 return 'TRUE' if dag_node else 'FALSE'
             return str(dag_node)
-        # for each node do the dfs operation and save the variable names in `input_var_names`
+
+        if dag_node.dml_name != "":
+            return dag_node.dml_name
+
         # get variable names of unnamed parameters
-        unnamed_input_vars = [self._dfs_dag_nodes(
-            input_node) for input_node in dag_node.unnamed_input_nodes]
+        unnamed_input_vars = [self._dfs_dag_nodes(input_node) for input_node in dag_node.unnamed_input_nodes]
         # get variable names of named parameters
-        named_input_vars = {name: self._dfs_dag_nodes(input_node) for name, input_node in
-                            dag_node.named_input_nodes.items()}
-        curr_var_name = self._next_unique_var()
+        named_input_vars = {}
+        for name, input_node in dag_node.named_input_nodes.items():
+            named_input_vars[name] = self._dfs_dag_nodes(input_node)
+            if isinstance(input_node, DAGNode) and len(input_node.output_nodes) > 1:
+                named_input_vars[name] = named_input_vars[name] + name

Review comment:
       in your scripts you use [V2_0, V2_1] as a result of operations with multiple outputs. I am leveraging this to reduce complexity in the built scripts. The alternative would be to do this: V3 = V2_0; V4=V2_1, which increases the amount of variables in the scripts
   
   name in this context ([V2_0, V2_1]) is the name of the output node edge, which is _0 and _1 in this case 

##########
File path: src/main/python/tests/algorithms/test_pca.py
##########
@@ -49,10 +51,38 @@ def test_500x2(self):
         m1 = self.generate_matrices_for_pca(30, seed=1304)
         X = Matrix(self.sds, m1)
         # print(features)
-        [res, model, _, _ ] = pca(X, K=1, scale="FALSE", center="FALSE").compute()
+        [res, model, _, _] = pca(X, K=1, scale="FALSE", center="FALSE").compute()
         for (x, y) in zip(m1, res):
             self.assertTrue((x[0] > 0 and y > 0) or (x[0] < 0 and y < 0))
 
+    def test_500x2b(self):
+        """
+        This test constructs a line of values in 2d space. 
+        That if fit correctly maps perfectly to 1d space.
+        The check is simply if the input value was positive
+        then the output value should be similar.
+        """
+        m1 = self.generate_matrices_for_pca(30, seed=1304)
+        X = Matrix(self.sds, m1)
+        # print(features)
+        X._check_matrix_op()
+        params_dict = {'X': X, 'K': 1, 'scale': "FALSE", 'center': "FALSE"}
+        nodeC = OperationNode2(self.sds, 'pca', named_input_nodes=params_dict,
+                               outputs=[("res", OutputType.MATRIX), ("model", OutputType.MATRIX), ("scale", OutputType.MATRIX), ("center", OutputType.MATRIX)])
+        nodeD = OperationNode2(self.sds, 'abs', unnamed_input_nodes=[nodeC.output_nodes["model"]])

Review comment:
       will do, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] Baunsgaard closed pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
Baunsgaard closed pull request #1234:
URL: https://github.com/apache/systemds/pull/1234


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] Baunsgaard commented on pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
Baunsgaard commented on pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#issuecomment-853803144


   See other PRs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] Baunsgaard commented on pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
Baunsgaard commented on pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#issuecomment-826721786


   Looking through your comments, it seems like you insist on the "universal " Operation node approach.
   This is something that i would like not to happen, because this means for every operation called we would need to differentiate between "am i single return?", "am i multi return?", "am i a matrix?", "am i a frame?".
   Currently we have the luxury that there is only Matrices and scalars, but we already have a PR for Frames that become a subclass to Operation Node, And in the future we might add more datatype, or abstractions needed.
   
   Therefore the perfect design in my book for this multi return support is:
   
   - Make a class that is can contain multiple different Operation Nodes(that each correspond to a single output).
   - This new class should implement the type DAGNode.
   
   One of the reasons why this is important to separate, is because many operations supported directly on a OperationNode is not valid on a "MultiNode", like for instance min.
   Rather than having to introduce an if else inside the min function (if we chose the universal approach) trowing an invalid exception, i would prefer not having that if else.
   
   Again comments and discussions are welcome if you disagree
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] TMaddox commented on a change in pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
TMaddox commented on a change in pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#discussion_r619850778



##########
File path: src/main/python/systemds/operator/__init__.py
##########
@@ -20,6 +20,7 @@
 # -------------------------------------------------------------
 
 from systemds.operator.operation_node import OperationNode
+from systemds.operator.operation_node2 import OperationNode2
 from systemds.operator import algorithm
 
-__all__ = [OperationNode, algorithm]
+__all__ = [OperationNode, OperationNode2, algorithm]

Review comment:
       The OperationNode2 is a temporary class which will be deleted after resolving this issue. You suggested that I create a clone of OperationNode and make my changes in this class. All changes should be applied directly in OperationNode after you give me the ok.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] TMaddox commented on a change in pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
TMaddox commented on a change in pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#discussion_r619850306



##########
File path: src/main/python/systemds/matrix/matrix.py
##########
@@ -25,15 +25,15 @@
 import numpy as np
 from py4j.java_gateway import JavaObject, JVMView
 from systemds.context import SystemDSContext
-from systemds.operator import OperationNode
+from systemds.operator import OperationNode2
 from systemds.utils.consts import VALID_INPUT_TYPES
 from systemds.utils.converters import numpy_to_matrix_block
 
 # TODO maybe instead of having a new class we could have a function `matrix` instead, adding behavior to
-#  `OperationNode` would be necessary
+#  `OperationNode2` would be necessary
 
 
-class Matrix(OperationNode):
+class Matrix(OperationNode2):

Review comment:
       The matrix class isn't changed at all, I just need it to inherit from the temporary OperationNode class, which is called OperationNode2




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] TMaddox commented on a change in pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
TMaddox commented on a change in pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#discussion_r619851982



##########
File path: src/main/python/systemds/script_building/dag.py
##########
@@ -45,9 +45,10 @@ class DAGNode(ABC):
     sds_context: 'SystemDSContext'
     _unnamed_input_nodes: Sequence[Union['DAGNode', str, int, float, bool]]
     _named_input_nodes: Dict[str, Union['DAGNode', str, int, float, bool]]
-    _output_type: OutputType
+    _outputs: List[Tuple[str, OutputType]]
+    _output_nodes: Dict[str, Union['DAGNode']]
     _is_python_local_data: bool
-    _number_of_outputs: int
+    _dml_name: str

Review comment:
       I can create this structure if you want, but in my opinion a single universal class that can handle any cases has less complexity and is thus easier to maintain. The variable _dml_name is needed so I can perform DFS on the DAG without evaluating paths more than once in case more than one output of a node are used as input in the DAG




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] TMaddox commented on a change in pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
TMaddox commented on a change in pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#discussion_r619852790



##########
File path: src/main/python/tests/algorithms/test_pca.py
##########
@@ -49,10 +51,38 @@ def test_500x2(self):
         m1 = self.generate_matrices_for_pca(30, seed=1304)
         X = Matrix(self.sds, m1)
         # print(features)
-        [res, model, _, _ ] = pca(X, K=1, scale="FALSE", center="FALSE").compute()
+        [res, model, _, _] = pca(X, K=1, scale="FALSE", center="FALSE").compute()
         for (x, y) in zip(m1, res):
             self.assertTrue((x[0] > 0 and y > 0) or (x[0] < 0 and y < 0))
 
+    def test_500x2b(self):
+        """
+        This test constructs a line of values in 2d space. 
+        That if fit correctly maps perfectly to 1d space.
+        The check is simply if the input value was positive
+        then the output value should be similar.
+        """
+        m1 = self.generate_matrices_for_pca(30, seed=1304)
+        X = Matrix(self.sds, m1)
+        # print(features)
+        X._check_matrix_op()
+        params_dict = {'X': X, 'K': 1, 'scale': "FALSE", 'center': "FALSE"}
+        nodeC = OperationNode2(self.sds, 'pca', named_input_nodes=params_dict,
+                               outputs=[("res", OutputType.MATRIX), ("model", OutputType.MATRIX), ("scale", OutputType.MATRIX), ("center", OutputType.MATRIX)])
+        nodeD = OperationNode2(self.sds, 'abs', unnamed_input_nodes=[nodeC.output_nodes["model"]])

Review comment:
       will do, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] Baunsgaard commented on a change in pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
Baunsgaard commented on a change in pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#discussion_r617343037



##########
File path: src/main/python/tests/algorithms/test_pca.py
##########
@@ -49,10 +51,38 @@ def test_500x2(self):
         m1 = self.generate_matrices_for_pca(30, seed=1304)
         X = Matrix(self.sds, m1)
         # print(features)
-        [res, model, _, _ ] = pca(X, K=1, scale="FALSE", center="FALSE").compute()
+        [res, model, _, _] = pca(X, K=1, scale="FALSE", center="FALSE").compute()
         for (x, y) in zip(m1, res):
             self.assertTrue((x[0] > 0 and y > 0) or (x[0] < 0 and y < 0))
 
+    def test_500x2b(self):
+        """
+        This test constructs a line of values in 2d space. 
+        That if fit correctly maps perfectly to 1d space.
+        The check is simply if the input value was positive
+        then the output value should be similar.
+        """
+        m1 = self.generate_matrices_for_pca(30, seed=1304)
+        X = Matrix(self.sds, m1)
+        # print(features)
+        X._check_matrix_op()
+        params_dict = {'X': X, 'K': 1, 'scale': "FALSE", 'center': "FALSE"}
+        nodeC = OperationNode2(self.sds, 'pca', named_input_nodes=params_dict,
+                               outputs=[("res", OutputType.MATRIX), ("model", OutputType.MATRIX), ("scale", OutputType.MATRIX), ("center", OutputType.MATRIX)])
+        nodeD = OperationNode2(self.sds, 'abs', unnamed_input_nodes=[nodeC.output_nodes["model"]])

Review comment:
       In the end the tests should reflect what a user should write, therefore in this case
   The goal become these three:
   
   
   ```python
   [res, model, _, _] = pca(...)
   res = abs(res)
   ```
   
   or
   
   ```python
   m= pca(...)
   res = abs(m[0])
   ```
   
   or
   
   
   ```python
   m= pca(...)
   res = abs(m["model"])
   ```
   
   Now this is challenging since it require some change in our generated algorithms.
   Maybe to keep the project simple, you could make a new test file and do the following:
   
   ```python
   X = self.sds.rand(rows=1, cols=2, seed = 3)
   Y = self.sds.rand(rows=2, cols=1, seed=3)
   # I want the default to be unnamed input nodes if the argument is a list,
   # or if it is a dictionary named input nodes.
   M = multinode(sds, 'list', [X,Y])
   print(M[1]).compute()
   ```
   this should result in a script if you execute with  looking something like:
   ```dml
   X = rand(rows = 1, cols = 2,  seed = 3)
   Y = rand(rows = 2, cols = 1, seed = 3)
   m = list(X,Y)
   print(toString(m[1]))
   ```
   
   If it is not clear please say so :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] Baunsgaard closed pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
Baunsgaard closed pull request #1234:
URL: https://github.com/apache/systemds/pull/1234


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] TMaddox commented on a change in pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
TMaddox commented on a change in pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#discussion_r619851992



##########
File path: src/main/python/systemds/script_building/script.py
##########
@@ -178,24 +178,30 @@ def _dfs_dag_nodes(self, dag_node: VALID_INPUT_TYPES) -> str:
         :param dag_node: current DAG node
         :return: the variable name the current DAG node operation created
         """
+
         if not isinstance(dag_node, DAGNode):
             if isinstance(dag_node, bool):
                 return 'TRUE' if dag_node else 'FALSE'
             return str(dag_node)
-        # for each node do the dfs operation and save the variable names in `input_var_names`
+
+        if dag_node.dml_name != "":
+            return dag_node.dml_name

Review comment:
       :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] TMaddox commented on a change in pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
TMaddox commented on a change in pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#discussion_r619851575



##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)],
+                 is_python_local_data: bool = False,
+                 shape: Tuple[int] = ()):
+        """
+        Create general `OperationNode2`
+
+        :param sds_context: The SystemDS context for performing the operations
+        :param operation: The name of the DML function to execute
+        :param unnamed_input_nodes: inputs identified by their position, not name
+        :param named_input_nodes: inputs with their respective parameter name
+        :param output_type: type of the output in DML (double, matrix etc.)
+        :param is_python_local_data: if the data is local in python e.g. Numpy arrays
+        :param number_of_outputs: If set to other value than 1 then it is expected
+            that this operation node returns multiple values. If set remember to set the output_types value as well.
+        :param output_types: The types of output in a multi output scenario.
+            Default is None, and means every multi output is a matrix.
+        """
+        self.sds_context = sds_context
+        self.shape = shape
+        if unnamed_input_nodes is None:
+            unnamed_input_nodes = []
+        if named_input_nodes is None:
+            named_input_nodes = {}
+        self.operation = operation
+        self._unnamed_input_nodes = unnamed_input_nodes
+        self._named_input_nodes = named_input_nodes
+        self._outputs = outputs
+        self._output_nodes = {}
+        if self.operation is not None:
+            for idx, output in enumerate(outputs):
+                if output[1] == OutputType.MATRIX:
+                    self.output_nodes[output[0]] = OperationNode2(sds_context, operation=None, named_input_nodes={f"_{idx}": self})
+                    # TODO add output types
+        self._is_python_local_data = is_python_local_data
+        self._result_var = None
+        self._lineage_trace = None
+        self._script = None
+        self.dml_name = ""
+
+    def compute(self, verbose: bool = False, lineage: bool = False) -> \
+            Union[float, np.array, Tuple[Union[float, np.array], str]]:
+
+        if self._result_var is None or self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            if verbose:
+                print("SCRIPT:")
+                print(self._script.dml_script)
+
+            if lineage:
+                result_variables, self._lineage_trace = self._script.execute_with_lineage()
+            else:
+                result_variables = self._script.execute()
+
+            self._result_var = self.__parse_output_result_variables(result_variables)
+
+        if verbose:
+            for x in self.sds_context.get_stdout():
+                print(x)
+            for y in self.sds_context.get_stderr():
+                print(y)
+
+        if lineage:
+            return self._result_var, self._lineage_trace
+        else:
+            return self._result_var
+
+    def __parse_output_result_variables(self, result_variables):
+        result_var = []
+        for idx, v in enumerate(self._script.out_var_name):
+            if self.outputs[idx][1] == OutputType.MATRIX:
+                result_var.append(self.__parse_output_result_matrix(result_variables, v))
+            else:
+                result_var.append(result_variables.getDouble(self._script.out_var_name[idx]))
+        return result_var
+
+    def __parse_output_result_matrix(self, result_variables, var_name):
+        return matrix_block_to_numpy(self.sds_context.java_gateway.jvm,
+                                     result_variables.getMatrixBlock(var_name))
+
+    def get_lineage_trace(self) -> str:
+        """Get the lineage trace for this node.
+
+        :return: Lineage trace
+        """
+        if self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            self._lineage_trace = self._script.get_lineage()
+
+        return self._lineage_trace
+
+    def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
+                  named_input_vars: Dict[str, str]) -> str:
+        if self.operation is None:
+            return f'{var_name}={list(named_input_vars.values())[0]}'
+
+        if self.operation in BINARY_OPERATIONS:
+            assert len(
+                named_input_vars) == 0, 'Named parameters can not be used with binary operations'
+            assert len(
+                unnamed_input_vars) == 2, 'Binary Operations need exactly two input variables'
+            return f'{var_name}={unnamed_input_vars[0]}{self.operation}{unnamed_input_vars[1]}'
+
+        inputs_comma_sep = create_params_string(unnamed_input_vars, named_input_vars)
+
+        if len(self._outputs) > 1:
+            output = "["
+            for idx in range(len(self._outputs)):
+                output += f'{var_name}_{idx},'
+            output = output[:-1] + "]"
+            return f'{output}={self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.NONE:
+            return f'{self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.ASSIGN:
+            return f'{var_name}={self.operation};'
+        else:
+            return f'{var_name}={self.operation}({inputs_comma_sep});'
+
+    def pass_python_data_to_prepared_script(self, jvm: JVMView, var_name: str, prepared_script: JavaObject) -> None:
+        raise NotImplementedError(
+            'Operation node has no python local data. Missing implementation in derived class?')
+
+    def _check_matrix_op(self):
+        """Perform checks to assure operation is allowed to be performed on data type of this `OperationNode2`
+
+        :raise: AssertionError
+        """
+        assert len(self._outputs) == 1 and self._outputs[0][1] == OutputType.MATRIX, f'{self.operation} only supported for matrices'
+
+    def __add__(self, other: VALID_ARITHMETIC_TYPES) -> 'OperationNode2':
+        return OperationNode2(self.sds_context, '+', [self, other], shape=self.shape)

Review comment:
       will do, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] TMaddox commented on a change in pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
TMaddox commented on a change in pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#discussion_r619851083



##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)],
+                 is_python_local_data: bool = False,

Review comment:
       My code makes OperationNode universal enough to handle all cases (multi and normal), there is no class "MultiNode" (for example) needed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [systemds] Baunsgaard commented on pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Posted by GitBox <gi...@apache.org>.
Baunsgaard commented on pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#issuecomment-853803144


   See other PRs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org