You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2021/09/15 07:55:20 UTC

[systemds] 03/05: [SYSTEMDS-3133] Right indexing on Frames and Matrix

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit fad79545a670e7f81848a763ae3bebc5ba8587f8
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Tue Sep 14 21:13:53 2021 +0200

    [SYSTEMDS-3133] Right indexing on Frames and Matrix
    
    This commit adds support for right indexing of frames and matrices in
    the Python API.
---
 .../sysds/runtime/matrix/data/FrameBlock.java      |  46 +++++++++-
 src/main/python/systemds/operator/nodes/frame.py   |  11 ++-
 src/main/python/systemds/operator/nodes/matrix.py  |   9 +-
 .../python/systemds/operator/operation_node.py     |  11 ++-
 src/main/python/systemds/utils/converters.py       |  12 ++-
 src/main/python/systemds/utils/helpers.py          |  22 +++++
 src/main/python/tests/frame/test_rIndexing.py      | 100 +++++++++++++++++++++
 src/main/python/tests/matrix/test_rIndexing.py     |  62 +++++++++++--
 8 files changed, 256 insertions(+), 17 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index 2e39cef..9ff106c 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -27,6 +27,8 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
 import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -577,8 +579,9 @@ public class FrameBlock implements CacheBlock, Externalizable  {
 		switch(_schema[c]) {
 			case STRING:  return ((StringArray)_coldata[c])._data;
 			case BOOLEAN: return ((BooleanArray)_coldata[c])._data;
-			case INT64:     return ((LongArray)_coldata[c])._data;
-			case FP64:  return ((DoubleArray)_coldata[c])._data;
+			case INT64:   return ((LongArray)_coldata[c])._data;
+			case INT32:   return ((IntegerArray)_coldata[c])._data;
+			case FP64:    return ((DoubleArray)_coldata[c])._data;
 			default:      return null;
 	 	}
 	}
@@ -588,6 +591,7 @@ public class FrameBlock implements CacheBlock, Externalizable  {
 			case STRING:  return "String";
 			case BOOLEAN: return "Boolean";
 			case INT64:   return "Long";
+			case INT32:   return "Int";
 			case FP64:    return "Double";
 			default:      return null;
 	 	}
@@ -617,6 +621,27 @@ public class FrameBlock implements CacheBlock, Externalizable  {
 		}
 	}
 
+	public byte[] getColumnAsBytes(int c){
+		switch(_schema[c]){
+			case INT64:
+				long[] colLong = ((LongArray)_coldata[c])._data;
+				ByteBuffer longBuffer = ByteBuffer.allocate(8 * getNumRows());
+				longBuffer.order(ByteOrder.LITTLE_ENDIAN);
+				for(int i = 0; i <  getNumRows(); i++)
+					longBuffer.putLong(colLong[i]);
+				return longBuffer.array();
+			case INT32:
+				int[] colInt = ((IntegerArray)_coldata[c])._data;
+				ByteBuffer intBuffer = ByteBuffer.allocate(4 *  getNumRows());
+				intBuffer.order(ByteOrder.LITTLE_ENDIAN);
+				for(int i = 0; i < getNumRows(); i++)
+					intBuffer.putInt(colInt[i]);
+				return intBuffer.array();
+			default:
+				throw new NotImplementedException();
+		}
+	}
+
 	public Array getColumn(int c) {
 		return _coldata[c];
 	}
@@ -1547,6 +1572,11 @@ public class FrameBlock implements CacheBlock, Externalizable  {
 		public abstract Array clone();
 		public abstract Array slice(int rl, int ru);
 		public abstract void reset(int size);
+
+		@Override
+		public String toString(){
+			return this.getClass().getSimpleName().toString() + ":" + _size;
+		}
 	}
 
 	private static class StringArray extends Array<String> {
@@ -2291,4 +2321,16 @@ public class FrameBlock implements CacheBlock, Externalizable  {
 		}
 		return ret;
 	}
+
+	@Override
+	public String toString(){
+		StringBuilder sb = new StringBuilder();
+		sb.append("FrameBlock");
+		sb.append("\n");
+		sb.append(Arrays.toString(_schema));
+		sb.append("\n");
+		sb.append(Arrays.toString(_coldata));
+
+		return sb.toString();
+	}
 }
diff --git a/src/main/python/systemds/operator/nodes/frame.py b/src/main/python/systemds/operator/nodes/frame.py
index 6551332..5fe02cc 100644
--- a/src/main/python/systemds/operator/nodes/frame.py
+++ b/src/main/python/systemds/operator/nodes/frame.py
@@ -29,6 +29,7 @@ import pandas as pd
 from py4j.java_gateway import JavaObject, JVMView
 from systemds.operator import OperationNode, Matrix, MultiReturn
 from systemds.utils.consts import VALID_INPUT_TYPES
+from systemds.utils.helpers import get_slice_string
 from systemds.utils.converters import pandas_to_frame_block, frame_block_to_pandas
 from systemds.script_building.dag import OutputType, DAGNode
 
@@ -45,7 +46,7 @@ class Frame(OperationNode):
                  unnamed_input_nodes: Union[str,
                                             Iterable[VALID_INPUT_TYPES]] = None,
                  named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
-                 local_data: pd.DataFrame = None) -> "Frame":
+                 local_data: pd.DataFrame = None, brackets:bool = False) -> "Frame":
         is_python_local_data = False
         if local_data is not None:
             self._pd_dataframe = local_data
@@ -54,7 +55,7 @@ class Frame(OperationNode):
             self._pd_dataframe = None
 
         super().__init__(sds_context, operation, unnamed_input_nodes,
-                         named_input_nodes, OutputType.FRAME, is_python_local_data)
+                         named_input_nodes, OutputType.FRAME, is_python_local_data, brackets)
 
     def pass_python_data_to_prepared_script(self, sds, var_name: str, prepared_script: JavaObject) -> None:
         assert (
@@ -134,4 +135,8 @@ class Frame(OperationNode):
         return Frame(self.sds_context, "replace", named_input_nodes={"target": self, "pattern": f"'{pattern}'", "replacement":f"'{replacement}'"})
 
     def __str__(self):
-        return "FrameNode"
\ No newline at end of file
+        return "FrameNode"
+
+    def __getitem__(self, i) -> 'Frame':
+        sliceIns = get_slice_string(i)
+        return Frame(self.sds_context, '', [self, sliceIns], brackets=True)
\ No newline at end of file
diff --git a/src/main/python/systemds/operator/nodes/matrix.py b/src/main/python/systemds/operator/nodes/matrix.py
index f90218b..2603dd5 100644
--- a/src/main/python/systemds/operator/nodes/matrix.py
+++ b/src/main/python/systemds/operator/nodes/matrix.py
@@ -29,6 +29,7 @@ from py4j.java_gateway import JavaObject, JVMView
 from systemds.operator import OperationNode, Scalar
 from systemds.utils.consts import VALID_INPUT_TYPES
 from systemds.utils.converters import numpy_to_matrix_block, matrix_block_to_numpy
+from systemds.utils.helpers import get_slice_string
 from systemds.script_building.dag import OutputType
 
 from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES
@@ -41,7 +42,7 @@ class Matrix(OperationNode):
                  unnamed_input_nodes: Union[str,
                                             Iterable[VALID_INPUT_TYPES]] = None,
                  named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
-                 local_data: np.array = None) -> 'Matrix':
+                 local_data: np.array = None, brackets:bool = False ) -> 'Matrix':
 
         is_python_local_data = False
         if local_data is not None:
@@ -51,7 +52,7 @@ class Matrix(OperationNode):
             self._np_array = None
 
         super().__init__(sds_context, operation, unnamed_input_nodes,
-                         named_input_nodes, OutputType.MATRIX, is_python_local_data)
+                         named_input_nodes, OutputType.MATRIX, is_python_local_data, brackets)
 
     def pass_python_data_to_prepared_script(self, sds, var_name: str, prepared_script: JavaObject) -> None:
         assert self.is_python_local_data, 'Can only pass data to prepared script if it is python local!'
@@ -152,6 +153,10 @@ class Matrix(OperationNode):
     def __matmul__(self, other: 'Matrix') -> 'Matrix':
         return Matrix(self.sds_context, '%*%', [self, other])
 
+    def __getitem__(self, i):
+        sliceIns = get_slice_string(i)
+        return Matrix(self.sds_context, '', [self, sliceIns], brackets=True)
+
     def sum(self, axis: int = None) -> 'OperationNode':
         """Calculate sum of matrix.
 
diff --git a/src/main/python/systemds/operator/operation_node.py b/src/main/python/systemds/operator/operation_node.py
index 8dba0f9..acc3988 100644
--- a/src/main/python/systemds/operator/operation_node.py
+++ b/src/main/python/systemds/operator/operation_node.py
@@ -44,13 +44,15 @@ class OperationNode(DAGNode):
     _script: Optional[DMLScript]
     _output_types: Optional[Iterable[VALID_INPUT_TYPES]]
     _source_node: Optional["DAGNode"]
+    _brackets: bool
 
     def __init__(self, sds_context: 'SystemDSContext', operation: str,
                  unnamed_input_nodes: Union[str,
                                             Iterable[VALID_INPUT_TYPES]] = None,
                  named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
                  output_type: OutputType = OutputType.MATRIX,
-                 is_python_local_data: bool = False):
+                 is_python_local_data: bool = False,
+                 brackets: bool = False):
         """
         Create general `OperationNode`
 
@@ -80,6 +82,7 @@ class OperationNode(DAGNode):
         self._script = None
         self._source_node = None
         self._already_added = False
+        self._brackets = brackets
         self.dml_name = ""
 
     def compute(self, verbose: bool = False, lineage: bool = False) -> \
@@ -134,13 +137,17 @@ class OperationNode(DAGNode):
 
     def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
                   named_input_vars: Dict[str, str]) -> str:
+
+        if self._brackets:
+            return f'{var_name}={unnamed_input_vars[0]}[{",".join(unnamed_input_vars[1:])}]'
+
         if self.operation in BINARY_OPERATIONS:
             assert len(
                 named_input_vars) == 0, 'Named parameters can not be used with binary operations'
             assert len(
                 unnamed_input_vars) == 2, 'Binary Operations need exactly two input variables'
             return f'{var_name}={unnamed_input_vars[0]}{self.operation}{unnamed_input_vars[1]}'
-
+        
         inputs_comma_sep = create_params_string(
             unnamed_input_vars, named_input_vars)
 
diff --git a/src/main/python/systemds/utils/converters.py b/src/main/python/systemds/utils/converters.py
index 5ef41d7..342c7a3 100644
--- a/src/main/python/systemds/utils/converters.py
+++ b/src/main/python/systemds/utils/converters.py
@@ -118,7 +118,7 @@ def pandas_to_frame_block(sds: "SystemDSContext", pd_df: pd.DataFrame):
         for i in range(len(schema)):
             j_valueTypeArray[i] = schema[i]
         for i in range(len(col_names)):
-            j_colNameArray[i] = col_names[i]
+            j_colNameArray[i] = str(col_names[i])
         j = 0
         for j, col_name in enumerate(col_names):
             col_data = pd_df[col_name].fillna("").to_numpy(dtype=str)
@@ -150,9 +150,15 @@ def frame_block_to_pandas(sds: "SystemDSContext", fb: JavaObject):
                     ret.append(ent)
                 else:
                     ret.append(None)
-            df[fb.getColumnName(c_index)] = ret
+        elif d_type == "Int":
+            byteArray = fb.getColumnAsBytes(c_index)
+            ret = np.frombuffer(byteArray, dtype=np.int32)
+        elif d_type == "Long":
+            byteArray = fb.getColumnAsBytes(c_index)
+            ret = np.frombuffer(byteArray, dtype=np.int64)
         else:
-            raise NotImplementedError("Not Implemented other types for systemds to pandas parsing")
+            raise NotImplementedError(f'Not Implemented {d_type} for systemds to pandas parsing')
+        df[fb.getColumnName(c_index)] = ret
 
 
     return df
diff --git a/src/main/python/systemds/utils/helpers.py b/src/main/python/systemds/utils/helpers.py
index 7aae17e..67b4f8d 100644
--- a/src/main/python/systemds/utils/helpers.py
+++ b/src/main/python/systemds/utils/helpers.py
@@ -48,3 +48,25 @@ def get_module_dir() -> os.PathLike:
     """
     spec = find_spec(MODULE_NAME)
     return spec.submodule_search_locations[0]
+
+
+def get_slice_string(i):
+    if isinstance(i, tuple):
+        if len(i) > 2:
+            raise ValueError(f'Invalid number of dimensions to slice {len(i)}, Only 2 dimensions allowed')
+        else:
+            return f'{get_slice_string(i[0])},{get_slice_string(i[1])}'
+    elif isinstance(i, slice):
+        if i.step:
+            raise ValueError("Invalid to slice with step in systemds")
+        elif i.start == None and i.stop == None:
+            return ''
+        elif i.start == None or i.stop == None:
+            raise NotImplementedError("Not Implemented slice with dynamic end")
+        else:
+            # + 1 since R and systemDS is 1 indexed.
+            return f'{i.start+1}:{i.stop}'
+    else:
+        # + 1 since R and systemDS is 1 indexed.
+        sliceIns = i+1
+    return sliceIns
\ No newline at end of file
diff --git a/src/main/python/tests/frame/test_rIndexing.py b/src/main/python/tests/frame/test_rIndexing.py
new file mode 100644
index 0000000..cb69f03
--- /dev/null
+++ b/src/main/python/tests/frame/test_rIndexing.py
@@ -0,0 +1,100 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import unittest
+
+import numpy as np
+import pandas as pd
+from systemds.context import SystemDSContext
+
+
+class Test_rIndexing(unittest.TestCase):
+
+    sds: SystemDSContext = None
+
+    #shape (4, 3)
+    df = pd.DataFrame(np.arange(0, 100).reshape(10, 10))
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+
+    def test_1(self):
+        m1 = self.sds.from_pandas(self.df)
+        npres = self.df.loc[4]
+        res = m1[4].compute()
+        self.assertTrue(np.allclose(res, npres))
+
+    def test_2(self):
+        m1 = self.sds.from_pandas(self.df)
+        # Pandas is not consistant with numpy, since it is inclusive ranges
+        # therefore the tests are subtracting 1 from the end of the range.
+        npres = self.df.loc[4:4]
+        res = m1[4:5].compute()
+        self.assertTrue(np.allclose(res, npres))
+
+    def test_3(self):
+        m1 = self.sds.from_pandas(self.df)
+        # Invalid to slice with a step
+        with self.assertRaises(ValueError) as context:
+            res = m1[4:7:2].compute()
+
+    def test_4(self):
+        m1 = self.sds.from_pandas(self.df)
+        npres = np.array(self.df.loc[:,4])
+        res = np.array(m1[:,4].compute()).flatten()
+        self.assertTrue(np.allclose(res, npres))
+
+    def test_5(self):
+        m1 = self.sds.from_pandas(self.df)
+        npres = np.array(self.df.loc[:,4:5])
+        res = np.array(m1[:,4:6].compute())
+        self.assertTrue(np.allclose(res, npres))
+
+    def test_6(self):
+        m1 = self.sds.from_pandas(self.df)
+        npres = self.df.loc[1:1,4:5]
+        res = m1[1:2,4:6].compute()
+        self.assertTrue(np.allclose(res, npres))
+
+    def test_7(self):
+        m1 = self.sds.from_pandas(self.df)
+        npres = self.df.loc[1,4:5]
+        res = m1[1,4:6].compute()
+        self.assertTrue(np.allclose(res, npres))
+
+    def test_8(self):
+        m1 = self.sds.from_pandas(self.df)
+        with self.assertRaises(NotImplementedError) as context:
+            res = m1[1:,4:6].compute()
+
+    def test_9(self):
+        m1 = self.sds.from_pandas(self.df)
+        with self.assertRaises(NotImplementedError) as context:
+            res = m1[:3,4:6].compute()
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)
diff --git a/src/main/python/tests/matrix/test_rIndexing.py b/src/main/python/tests/matrix/test_rIndexing.py
index 1d25dab..01401ad 100644
--- a/src/main/python/tests/matrix/test_rIndexing.py
+++ b/src/main/python/tests/matrix/test_rIndexing.py
@@ -38,14 +38,66 @@ class Test_rIndexing(unittest.TestCase):
         cls.sds.close()
 
     def test_1(self):
-        npA = np.zeros((10, 2))
+        npA = np.arange(0, 100).reshape(10, 10)
         m1 = self.sds.from_numpy(npA)
-        npres = npA[4,]
-        print(npres)
-        res = m1[4,].compute()
-        print(res)
+        npres = npA[4]
+        res = m1[4].compute()
         self.assertTrue(np.allclose(res, npres))
 
+    def test_2(self):
+        npA = np.arange(0, 100).reshape(10, 10)
+        m1 = self.sds.from_numpy(npA)
+        npres = npA[4:5]
+        res = m1[4:5].compute()
+        self.assertTrue(np.allclose(res, npres))
+
+    def test_3(self):
+        npA = np.arange(0, 100).reshape(10, 10)
+        m1 = self.sds.from_numpy(npA)
+        # Invalid to slice with a step
+        with self.assertRaises(ValueError) as context:
+            res = m1[4:7:2].compute()
+
+    def test_4(self):
+        npA = np.arange(0, 100).reshape(10, 10)
+        m1 = self.sds.from_numpy(npA)
+        npres = npA[:,4]
+        res = m1[:,4].compute().flatten()
+        self.assertTrue(np.allclose(res, npres))
+
+    def test_5(self):
+        npA = np.arange(0, 100).reshape(10, 10)
+        m1 = self.sds.from_numpy(npA)
+        npres = npA[:,4:6]
+        res = m1[:,4:6].compute()
+        self.assertTrue(np.allclose(res, npres))
+
+    def test_6(self):
+        npA = np.arange(0, 100).reshape(10, 10)
+        m1 = self.sds.from_numpy(npA)
+        npres = npA[1:2,4:6]
+        res = m1[1:2,4:6].compute()
+        self.assertTrue(np.allclose(res, npres))
+
+    def test_7(self):
+        npA = np.arange(0, 100).reshape(10, 10)
+        m1 = self.sds.from_numpy(npA)
+        npres = npA[1,4:6]
+        res = m1[1,4:6].compute()
+        self.assertTrue(np.allclose(res, npres))
+
+    def test_8(self):
+        npA = np.arange(0, 100).reshape(10, 10)
+        m1 = self.sds.from_numpy(npA)
+        with self.assertRaises(NotImplementedError) as context:
+            res = m1[1:,4:6].compute()
+
+    def test_9(self):
+        npA = np.arange(0, 100).reshape(10, 10)
+        m1 = self.sds.from_numpy(npA)
+        with self.assertRaises(NotImplementedError) as context:
+            res = m1[:3,4:6].compute()
+
 
 if __name__ == "__main__":
     unittest.main(exit=False)