You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@systemds.apache.org by GitBox <gi...@apache.org> on 2021/04/21 09:13:48 UTC

[GitHub] [systemds] Baunsgaard commented on a change in pull request #1234: [WIP][Python] Allow an OperationNode with multiple outputs to be part of the DAG

Baunsgaard commented on a change in pull request #1234:
URL: https://github.com/apache/systemds/pull/1234#discussion_r617303102



##########
File path: src/main/python/systemds/operator/__init__.py
##########
@@ -20,6 +20,7 @@
 # -------------------------------------------------------------
 
 from systemds.operator.operation_node import OperationNode
+from systemds.operator.operation_node2 import OperationNode2
 from systemds.operator import algorithm
 
-__all__ = [OperationNode, algorithm]
+__all__ = [OperationNode, OperationNode2, algorithm]

Review comment:
       maybe call it MultiNode.

##########
File path: src/main/python/systemds/matrix/matrix.py
##########
@@ -25,15 +25,15 @@
 import numpy as np
 from py4j.java_gateway import JavaObject, JVMView
 from systemds.context import SystemDSContext
-from systemds.operator import OperationNode
+from systemds.operator import OperationNode2
 from systemds.utils.consts import VALID_INPUT_TYPES
 from systemds.utils.converters import numpy_to_matrix_block
 
 # TODO maybe instead of having a new class we could have a function `matrix` instead, adding behavior to
-#  `OperationNode` would be necessary
+#  `OperationNode2` would be necessary
 
 
-class Matrix(OperationNode):
+class Matrix(OperationNode2):

Review comment:
       It should not be necessary to change matrix at all, i would like it to stay as a OperationNode
   since it is the 'lowest' primitive.

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)],

Review comment:
       Make it a list of tuples of Operation Nodes, and leverage the previous operation node to construct the individual returns.

##########
File path: src/main/python/systemds/script_building/script.py
##########
@@ -178,24 +178,30 @@ def _dfs_dag_nodes(self, dag_node: VALID_INPUT_TYPES) -> str:
         :param dag_node: current DAG node
         :return: the variable name the current DAG node operation created
         """
+
         if not isinstance(dag_node, DAGNode):
             if isinstance(dag_node, bool):
                 return 'TRUE' if dag_node else 'FALSE'
             return str(dag_node)
-        # for each node do the dfs operation and save the variable names in `input_var_names`
+
+        if dag_node.dml_name != "":
+            return dag_node.dml_name
+
         # get variable names of unnamed parameters
-        unnamed_input_vars = [self._dfs_dag_nodes(
-            input_node) for input_node in dag_node.unnamed_input_nodes]
+        unnamed_input_vars = [self._dfs_dag_nodes(input_node) for input_node in dag_node.unnamed_input_nodes]
         # get variable names of named parameters
-        named_input_vars = {name: self._dfs_dag_nodes(input_node) for name, input_node in
-                            dag_node.named_input_nodes.items()}
-        curr_var_name = self._next_unique_var()
+        named_input_vars = {}
+        for name, input_node in dag_node.named_input_nodes.items():
+            named_input_vars[name] = self._dfs_dag_nodes(input_node)
+            if isinstance(input_node, DAGNode) and len(input_node.output_nodes) > 1:
+                named_input_vars[name] = named_input_vars[name] + name

Review comment:
       this here seems oddly designed,
   if multi return, append the name again?
   
   can you explain?

##########
File path: src/main/python/systemds/script_building/dag.py
##########
@@ -45,9 +45,10 @@ class DAGNode(ABC):
     sds_context: 'SystemDSContext'
     _unnamed_input_nodes: Sequence[Union['DAGNode', str, int, float, bool]]
     _named_input_nodes: Dict[str, Union['DAGNode', str, int, float, bool]]
-    _output_type: OutputType
+    _outputs: List[Tuple[str, OutputType]]
+    _output_nodes: Dict[str, Union['DAGNode']]
     _is_python_local_data: bool
-    _number_of_outputs: int
+    _dml_name: str

Review comment:
       this seems bad because it change the allocation to a List rather than single element, maybe we should have a multi dag node. 
   
   If we chose to have a multi dag node then make two classes one called: SingleDAGNode another MultiDAGNode that both inherit from DAGNode.
   
   opinion?
   
   

##########
File path: src/main/python/tests/algorithms/test_pca.py
##########
@@ -49,10 +51,38 @@ def test_500x2(self):
         m1 = self.generate_matrices_for_pca(30, seed=1304)
         X = Matrix(self.sds, m1)
         # print(features)
-        [res, model, _, _ ] = pca(X, K=1, scale="FALSE", center="FALSE").compute()
+        [res, model, _, _] = pca(X, K=1, scale="FALSE", center="FALSE").compute()
         for (x, y) in zip(m1, res):
             self.assertTrue((x[0] > 0 and y > 0) or (x[0] < 0 and y < 0))
 
+    def test_500x2b(self):
+        """
+        This test constructs a line of values in 2d space. 
+        That if fit correctly maps perfectly to 1d space.
+        The check is simply if the input value was positive
+        then the output value should be similar.
+        """
+        m1 = self.generate_matrices_for_pca(30, seed=1304)
+        X = Matrix(self.sds, m1)
+        # print(features)
+        X._check_matrix_op()
+        params_dict = {'X': X, 'K': 1, 'scale': "FALSE", 'center': "FALSE"}
+        nodeC = OperationNode2(self.sds, 'pca', named_input_nodes=params_dict,
+                               outputs=[("res", OutputType.MATRIX), ("model", OutputType.MATRIX), ("scale", OutputType.MATRIX), ("center", OutputType.MATRIX)])
+        nodeD = OperationNode2(self.sds, 'abs', unnamed_input_nodes=[nodeC.output_nodes["model"]])

Review comment:
       In the end the tests should reflect what a user should write, therefore in this case
   The goal become these three:
   
   
   ```python
   [res, model, _, _] = pca(...)
   res = abs(res)
   ```
   
   or
   
   ```python
   m= pca(...)
   res = abs(m[0])
   ```
   
   or
   
   
   ```python
   m= pca(...)
   res = abs(m["model"])
   ```
   
   Now this is challenging since it require some change in our generated algorithms.
   Maybe to keep the project simple, you could make a new test file and do the following:
   
   ```python
   X = self.sds.rand(rows=1, cols=2, seed = 3)
   Y = self.sds.rand(rows=2, cols=1, seed=3)
   // I want the default to be unnamed input nodes if the argument is a list,
   // or if it is a dictionary named input nodes.
   M = multinode(sds, 'list', [X,Y])
   print(M[1]).compute()
   ```
   this should result in a script if you execute with  looking something like:
   ```dml
   X = rand(rows = 1, cols = 2,  seed = 3)
   Y = rand(rows = 2, cols = 1, seed = 3)
   m = list(X,Y)
   print(toString(m[1]))
   ```
   
   If it is not clear please say so :)

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)],
+                 is_python_local_data: bool = False,
+                 shape: Tuple[int] = ()):
+        """
+        Create general `OperationNode2`
+
+        :param sds_context: The SystemDS context for performing the operations
+        :param operation: The name of the DML function to execute
+        :param unnamed_input_nodes: inputs identified by their position, not name
+        :param named_input_nodes: inputs with their respective parameter name
+        :param output_type: type of the output in DML (double, matrix etc.)
+        :param is_python_local_data: if the data is local in python e.g. Numpy arrays
+        :param number_of_outputs: If set to other value than 1 then it is expected
+            that this operation node returns multiple values. If set remember to set the output_types value as well.
+        :param output_types: The types of output in a multi output scenario.
+            Default is None, and means every multi output is a matrix.
+        """
+        self.sds_context = sds_context
+        self.shape = shape
+        if unnamed_input_nodes is None:
+            unnamed_input_nodes = []
+        if named_input_nodes is None:
+            named_input_nodes = {}
+        self.operation = operation
+        self._unnamed_input_nodes = unnamed_input_nodes
+        self._named_input_nodes = named_input_nodes
+        self._outputs = outputs
+        self._output_nodes = {}
+        if self.operation is not None:
+            for idx, output in enumerate(outputs):
+                if output[1] == OutputType.MATRIX:
+                    self.output_nodes[output[0]] = OperationNode2(sds_context, operation=None, named_input_nodes={f"_{idx}": self})
+                    # TODO add output types
+        self._is_python_local_data = is_python_local_data
+        self._result_var = None
+        self._lineage_trace = None
+        self._script = None
+        self.dml_name = ""
+
+    def compute(self, verbose: bool = False, lineage: bool = False) -> \
+            Union[float, np.array, Tuple[Union[float, np.array], str]]:
+
+        if self._result_var is None or self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            if verbose:
+                print("SCRIPT:")
+                print(self._script.dml_script)
+
+            if lineage:
+                result_variables, self._lineage_trace = self._script.execute_with_lineage()
+            else:
+                result_variables = self._script.execute()
+
+            self._result_var = self.__parse_output_result_variables(result_variables)
+
+        if verbose:
+            for x in self.sds_context.get_stdout():
+                print(x)
+            for y in self.sds_context.get_stderr():
+                print(y)
+
+        if lineage:
+            return self._result_var, self._lineage_trace
+        else:
+            return self._result_var
+
+    def __parse_output_result_variables(self, result_variables):
+        result_var = []
+        for idx, v in enumerate(self._script.out_var_name):
+            if self.outputs[idx][1] == OutputType.MATRIX:
+                result_var.append(self.__parse_output_result_matrix(result_variables, v))
+            else:
+                result_var.append(result_variables.getDouble(self._script.out_var_name[idx]))
+        return result_var
+
+    def __parse_output_result_matrix(self, result_variables, var_name):
+        return matrix_block_to_numpy(self.sds_context.java_gateway.jvm,
+                                     result_variables.getMatrixBlock(var_name))
+
+    def get_lineage_trace(self) -> str:
+        """Get the lineage trace for this node.
+
+        :return: Lineage trace
+        """
+        if self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            self._lineage_trace = self._script.get_lineage()
+
+        return self._lineage_trace
+
+    def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
+                  named_input_vars: Dict[str, str]) -> str:
+        if self.operation is None:
+            return f'{var_name}={list(named_input_vars.values())[0]}'
+
+        if self.operation in BINARY_OPERATIONS:
+            assert len(
+                named_input_vars) == 0, 'Named parameters can not be used with binary operations'
+            assert len(
+                unnamed_input_vars) == 2, 'Binary Operations need exactly two input variables'
+            return f'{var_name}={unnamed_input_vars[0]}{self.operation}{unnamed_input_vars[1]}'
+
+        inputs_comma_sep = create_params_string(unnamed_input_vars, named_input_vars)
+
+        if len(self._outputs) > 1:
+            output = "["
+            for idx in range(len(self._outputs)):
+                output += f'{var_name}_{idx},'
+            output = output[:-1] + "]"
+            return f'{output}={self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.NONE:
+            return f'{self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.ASSIGN:
+            return f'{var_name}={self.operation};'
+        else:
+            return f'{var_name}={self.operation}({inputs_comma_sep});'
+
+    def pass_python_data_to_prepared_script(self, jvm: JVMView, var_name: str, prepared_script: JavaObject) -> None:
+        raise NotImplementedError(
+            'Operation node has no python local data. Missing implementation in derived class?')
+
+    def _check_matrix_op(self):
+        """Perform checks to assure operation is allowed to be performed on data type of this `OperationNode2`
+
+        :raise: AssertionError
+        """
+        assert len(self._outputs) == 1 and self._outputs[0][1] == OutputType.MATRIX, f'{self.operation} only supported for matrices'
+
+    def __add__(self, other: VALID_ARITHMETIC_TYPES) -> 'OperationNode2':
+        return OperationNode2(self.sds_context, '+', [self, other], shape=self.shape)

Review comment:
       most of these are invalid, since you cant + a value on a list of heterogeneous outputs.
   I would suggest going through them and validate if they can be done.

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)],
+                 is_python_local_data: bool = False,

Review comment:
       It will never be needed that it is pythonlocal data, since it should contain OperationNodes.

##########
File path: src/main/python/systemds/script_building/script.py
##########
@@ -178,24 +178,30 @@ def _dfs_dag_nodes(self, dag_node: VALID_INPUT_TYPES) -> str:
         :param dag_node: current DAG node
         :return: the variable name the current DAG node operation created
         """
+
         if not isinstance(dag_node, DAGNode):
             if isinstance(dag_node, bool):
                 return 'TRUE' if dag_node else 'FALSE'
             return str(dag_node)
-        # for each node do the dfs operation and save the variable names in `input_var_names`
+
+        if dag_node.dml_name != "":
+            return dag_node.dml_name

Review comment:
       nice, early termination !

##########
File path: src/main/python/systemds/operator/operation_node2.py
##########
@@ -0,0 +1,527 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+from typing import Union, Optional, Iterable, Dict, Sequence, Tuple, TYPE_CHECKING, List
+from multiprocessing import Process
+
+import numpy as np
+from py4j.java_gateway import JVMView, JavaObject
+
+from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES
+from systemds.utils.helpers import create_params_string
+from systemds.utils.converters import matrix_block_to_numpy
+from systemds.script_building.script import DMLScript
+from systemds.script_building.dag import OutputType, DAGNode
+
+if TYPE_CHECKING:
+    # to avoid cyclic dependencies during runtime
+    from systemds.context import SystemDSContext
+
+
+class OperationNode2(DAGNode):
+    """A Node representing an operation in SystemDS
+    This is a temporary clone of OperationNode"""
+    shape: Optional[Tuple[int]]
+    _result_var: Optional[Union[float, np.array]]
+    _lineage_trace: Optional[str]
+    _script: Optional[DMLScript]
+
+    def __init__(self, sds_context: 'SystemDSContext', operation: str,
+                 unnamed_input_nodes: Iterable[VALID_INPUT_TYPES] = None,
+                 named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
+                 outputs: List[Tuple[str, OutputType]] = [("_1", OutputType.MATRIX)],
+                 is_python_local_data: bool = False,
+                 shape: Tuple[int] = ()):
+        """
+        Create general `OperationNode2`
+
+        :param sds_context: The SystemDS context for performing the operations
+        :param operation: The name of the DML function to execute
+        :param unnamed_input_nodes: inputs identified by their position, not name
+        :param named_input_nodes: inputs with their respective parameter name
+        :param output_type: type of the output in DML (double, matrix etc.)
+        :param is_python_local_data: if the data is local in python e.g. Numpy arrays
+        :param number_of_outputs: If set to other value than 1 then it is expected
+            that this operation node returns multiple values. If set remember to set the output_types value as well.
+        :param output_types: The types of output in a multi output scenario.
+            Default is None, and means every multi output is a matrix.
+        """
+        self.sds_context = sds_context
+        self.shape = shape
+        if unnamed_input_nodes is None:
+            unnamed_input_nodes = []
+        if named_input_nodes is None:
+            named_input_nodes = {}
+        self.operation = operation
+        self._unnamed_input_nodes = unnamed_input_nodes
+        self._named_input_nodes = named_input_nodes
+        self._outputs = outputs
+        self._output_nodes = {}
+        if self.operation is not None:
+            for idx, output in enumerate(outputs):
+                if output[1] == OutputType.MATRIX:
+                    self.output_nodes[output[0]] = OperationNode2(sds_context, operation=None, named_input_nodes={f"_{idx}": self})
+                    # TODO add output types
+        self._is_python_local_data = is_python_local_data
+        self._result_var = None
+        self._lineage_trace = None
+        self._script = None
+        self.dml_name = ""
+
+    def compute(self, verbose: bool = False, lineage: bool = False) -> \
+            Union[float, np.array, Tuple[Union[float, np.array], str]]:
+
+        if self._result_var is None or self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            if verbose:
+                print("SCRIPT:")
+                print(self._script.dml_script)
+
+            if lineage:
+                result_variables, self._lineage_trace = self._script.execute_with_lineage()
+            else:
+                result_variables = self._script.execute()
+
+            self._result_var = self.__parse_output_result_variables(result_variables)
+
+        if verbose:
+            for x in self.sds_context.get_stdout():
+                print(x)
+            for y in self.sds_context.get_stderr():
+                print(y)
+
+        if lineage:
+            return self._result_var, self._lineage_trace
+        else:
+            return self._result_var
+
+    def __parse_output_result_variables(self, result_variables):
+        result_var = []
+        for idx, v in enumerate(self._script.out_var_name):
+            if self.outputs[idx][1] == OutputType.MATRIX:
+                result_var.append(self.__parse_output_result_matrix(result_variables, v))
+            else:
+                result_var.append(result_variables.getDouble(self._script.out_var_name[idx]))
+        return result_var
+
+    def __parse_output_result_matrix(self, result_variables, var_name):
+        return matrix_block_to_numpy(self.sds_context.java_gateway.jvm,
+                                     result_variables.getMatrixBlock(var_name))
+
+    def get_lineage_trace(self) -> str:
+        """Get the lineage trace for this node.
+
+        :return: Lineage trace
+        """
+        if self._lineage_trace is None:
+            self._script = DMLScript(self.sds_context)
+            self._script.build_code(self)
+            self._lineage_trace = self._script.get_lineage()
+
+        return self._lineage_trace
+
+    def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
+                  named_input_vars: Dict[str, str]) -> str:
+        if self.operation is None:
+            return f'{var_name}={list(named_input_vars.values())[0]}'
+
+        if self.operation in BINARY_OPERATIONS:
+            assert len(
+                named_input_vars) == 0, 'Named parameters can not be used with binary operations'
+            assert len(
+                unnamed_input_vars) == 2, 'Binary Operations need exactly two input variables'
+            return f'{var_name}={unnamed_input_vars[0]}{self.operation}{unnamed_input_vars[1]}'
+
+        inputs_comma_sep = create_params_string(unnamed_input_vars, named_input_vars)
+
+        if len(self._outputs) > 1:
+            output = "["
+            for idx in range(len(self._outputs)):
+                output += f'{var_name}_{idx},'
+            output = output[:-1] + "]"
+            return f'{output}={self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.NONE:
+            return f'{self.operation}({inputs_comma_sep});'
+        elif self._outputs[0][1] == OutputType.ASSIGN:
+            return f'{var_name}={self.operation};'
+        else:
+            return f'{var_name}={self.operation}({inputs_comma_sep});'
+
+    def pass_python_data_to_prepared_script(self, jvm: JVMView, var_name: str, prepared_script: JavaObject) -> None:
+        raise NotImplementedError(
+            'Operation node has no python local data. Missing implementation in derived class?')
+
+    def _check_matrix_op(self):
+        """Perform checks to assure operation is allowed to be performed on data type of this `OperationNode2`
+
+        :raise: AssertionError
+        """
+        assert len(self._outputs) == 1 and self._outputs[0][1] == OutputType.MATRIX, f'{self.operation} only supported for matrices'
+
+    def __add__(self, other: VALID_ARITHMETIC_TYPES) -> 'OperationNode2':
+        return OperationNode2(self.sds_context, '+', [self, other], shape=self.shape)

Review comment:
       Actually double checking all operations from pass_python_data_to_prepared_script are invalid.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org