You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2021/09/15 07:55:20 UTC
[systemds] 03/05: [SYSTEMDS-3133] Right indexing on Frames and
Matrix
This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
commit fad79545a670e7f81848a763ae3bebc5ba8587f8
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Tue Sep 14 21:13:53 2021 +0200
[SYSTEMDS-3133] Right indexing on Frames and Matrix
This commit adds support for right indexing of frames and matrices in
the Python API.
---
.../sysds/runtime/matrix/data/FrameBlock.java | 46 +++++++++-
src/main/python/systemds/operator/nodes/frame.py | 11 ++-
src/main/python/systemds/operator/nodes/matrix.py | 9 +-
.../python/systemds/operator/operation_node.py | 11 ++-
src/main/python/systemds/utils/converters.py | 12 ++-
src/main/python/systemds/utils/helpers.py | 22 +++++
src/main/python/tests/frame/test_rIndexing.py | 100 +++++++++++++++++++++
src/main/python/tests/matrix/test_rIndexing.py | 62 +++++++++++--
8 files changed, 256 insertions(+), 17 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index 2e39cef..9ff106c 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -27,6 +27,8 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -577,8 +579,9 @@ public class FrameBlock implements CacheBlock, Externalizable {
switch(_schema[c]) {
case STRING: return ((StringArray)_coldata[c])._data;
case BOOLEAN: return ((BooleanArray)_coldata[c])._data;
- case INT64: return ((LongArray)_coldata[c])._data;
- case FP64: return ((DoubleArray)_coldata[c])._data;
+ case INT64: return ((LongArray)_coldata[c])._data;
+ case INT32: return ((IntegerArray)_coldata[c])._data;
+ case FP64: return ((DoubleArray)_coldata[c])._data;
default: return null;
}
}
@@ -588,6 +591,7 @@ public class FrameBlock implements CacheBlock, Externalizable {
case STRING: return "String";
case BOOLEAN: return "Boolean";
case INT64: return "Long";
+ case INT32: return "Int";
case FP64: return "Double";
default: return null;
}
@@ -617,6 +621,27 @@ public class FrameBlock implements CacheBlock, Externalizable {
}
}
+ public byte[] getColumnAsBytes(int c){
+ switch(_schema[c]){
+ case INT64:
+ long[] colLong = ((LongArray)_coldata[c])._data;
+ ByteBuffer longBuffer = ByteBuffer.allocate(8 * getNumRows());
+ longBuffer.order(ByteOrder.LITTLE_ENDIAN);
+ for(int i = 0; i < getNumRows(); i++)
+ longBuffer.putLong(colLong[i]);
+ return longBuffer.array();
+ case INT32:
+ int[] colInt = ((IntegerArray)_coldata[c])._data;
+ ByteBuffer intBuffer = ByteBuffer.allocate(4 * getNumRows());
+ intBuffer.order(ByteOrder.LITTLE_ENDIAN);
+ for(int i = 0; i < getNumRows(); i++)
+ intBuffer.putInt(colInt[i]);
+ return intBuffer.array();
+ default:
+ throw new NotImplementedException();
+ }
+ }
+
public Array getColumn(int c) {
return _coldata[c];
}
@@ -1547,6 +1572,11 @@ public class FrameBlock implements CacheBlock, Externalizable {
public abstract Array clone();
public abstract Array slice(int rl, int ru);
public abstract void reset(int size);
+
+ @Override
+ public String toString(){
+ return this.getClass().getSimpleName().toString() + ":" + _size;
+ }
}
private static class StringArray extends Array<String> {
@@ -2291,4 +2321,16 @@ public class FrameBlock implements CacheBlock, Externalizable {
}
return ret;
}
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("FrameBlock");
+ sb.append("\n");
+ sb.append(Arrays.toString(_schema));
+ sb.append("\n");
+ sb.append(Arrays.toString(_coldata));
+
+ return sb.toString();
+ }
}
diff --git a/src/main/python/systemds/operator/nodes/frame.py b/src/main/python/systemds/operator/nodes/frame.py
index 6551332..5fe02cc 100644
--- a/src/main/python/systemds/operator/nodes/frame.py
+++ b/src/main/python/systemds/operator/nodes/frame.py
@@ -29,6 +29,7 @@ import pandas as pd
from py4j.java_gateway import JavaObject, JVMView
from systemds.operator import OperationNode, Matrix, MultiReturn
from systemds.utils.consts import VALID_INPUT_TYPES
+from systemds.utils.helpers import get_slice_string
from systemds.utils.converters import pandas_to_frame_block, frame_block_to_pandas
from systemds.script_building.dag import OutputType, DAGNode
@@ -45,7 +46,7 @@ class Frame(OperationNode):
unnamed_input_nodes: Union[str,
Iterable[VALID_INPUT_TYPES]] = None,
named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
- local_data: pd.DataFrame = None) -> "Frame":
+ local_data: pd.DataFrame = None, brackets:bool = False) -> "Frame":
is_python_local_data = False
if local_data is not None:
self._pd_dataframe = local_data
@@ -54,7 +55,7 @@ class Frame(OperationNode):
self._pd_dataframe = None
super().__init__(sds_context, operation, unnamed_input_nodes,
- named_input_nodes, OutputType.FRAME, is_python_local_data)
+ named_input_nodes, OutputType.FRAME, is_python_local_data, brackets)
def pass_python_data_to_prepared_script(self, sds, var_name: str, prepared_script: JavaObject) -> None:
assert (
@@ -134,4 +135,8 @@ class Frame(OperationNode):
return Frame(self.sds_context, "replace", named_input_nodes={"target": self, "pattern": f"'{pattern}'", "replacement":f"'{replacement}'"})
def __str__(self):
- return "FrameNode"
\ No newline at end of file
+ return "FrameNode"
+
+ def __getitem__(self, i) -> 'Frame':
+ sliceIns = get_slice_string(i)
+ return Frame(self.sds_context, '', [self, sliceIns], brackets=True)
\ No newline at end of file
diff --git a/src/main/python/systemds/operator/nodes/matrix.py b/src/main/python/systemds/operator/nodes/matrix.py
index f90218b..2603dd5 100644
--- a/src/main/python/systemds/operator/nodes/matrix.py
+++ b/src/main/python/systemds/operator/nodes/matrix.py
@@ -29,6 +29,7 @@ from py4j.java_gateway import JavaObject, JVMView
from systemds.operator import OperationNode, Scalar
from systemds.utils.consts import VALID_INPUT_TYPES
from systemds.utils.converters import numpy_to_matrix_block, matrix_block_to_numpy
+from systemds.utils.helpers import get_slice_string
from systemds.script_building.dag import OutputType
from systemds.utils.consts import VALID_INPUT_TYPES, BINARY_OPERATIONS, VALID_ARITHMETIC_TYPES
@@ -41,7 +42,7 @@ class Matrix(OperationNode):
unnamed_input_nodes: Union[str,
Iterable[VALID_INPUT_TYPES]] = None,
named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
- local_data: np.array = None) -> 'Matrix':
+ local_data: np.array = None, brackets:bool = False ) -> 'Matrix':
is_python_local_data = False
if local_data is not None:
@@ -51,7 +52,7 @@ class Matrix(OperationNode):
self._np_array = None
super().__init__(sds_context, operation, unnamed_input_nodes,
- named_input_nodes, OutputType.MATRIX, is_python_local_data)
+ named_input_nodes, OutputType.MATRIX, is_python_local_data, brackets)
def pass_python_data_to_prepared_script(self, sds, var_name: str, prepared_script: JavaObject) -> None:
assert self.is_python_local_data, 'Can only pass data to prepared script if it is python local!'
@@ -152,6 +153,10 @@ class Matrix(OperationNode):
def __matmul__(self, other: 'Matrix') -> 'Matrix':
return Matrix(self.sds_context, '%*%', [self, other])
+ def __getitem__(self, i):
+ sliceIns = get_slice_string(i)
+ return Matrix(self.sds_context, '', [self, sliceIns], brackets=True)
+
def sum(self, axis: int = None) -> 'OperationNode':
"""Calculate sum of matrix.
diff --git a/src/main/python/systemds/operator/operation_node.py b/src/main/python/systemds/operator/operation_node.py
index 8dba0f9..acc3988 100644
--- a/src/main/python/systemds/operator/operation_node.py
+++ b/src/main/python/systemds/operator/operation_node.py
@@ -44,13 +44,15 @@ class OperationNode(DAGNode):
_script: Optional[DMLScript]
_output_types: Optional[Iterable[VALID_INPUT_TYPES]]
_source_node: Optional["DAGNode"]
+ _brackets: bool
def __init__(self, sds_context: 'SystemDSContext', operation: str,
unnamed_input_nodes: Union[str,
Iterable[VALID_INPUT_TYPES]] = None,
named_input_nodes: Dict[str, VALID_INPUT_TYPES] = None,
output_type: OutputType = OutputType.MATRIX,
- is_python_local_data: bool = False):
+ is_python_local_data: bool = False,
+ brackets: bool = False):
"""
Create general `OperationNode`
@@ -80,6 +82,7 @@ class OperationNode(DAGNode):
self._script = None
self._source_node = None
self._already_added = False
+ self._brackets = brackets
self.dml_name = ""
def compute(self, verbose: bool = False, lineage: bool = False) -> \
@@ -134,13 +137,17 @@ class OperationNode(DAGNode):
def code_line(self, var_name: str, unnamed_input_vars: Sequence[str],
named_input_vars: Dict[str, str]) -> str:
+
+ if self._brackets:
+ return f'{var_name}={unnamed_input_vars[0]}[{",".join(unnamed_input_vars[1:])}]'
+
if self.operation in BINARY_OPERATIONS:
assert len(
named_input_vars) == 0, 'Named parameters can not be used with binary operations'
assert len(
unnamed_input_vars) == 2, 'Binary Operations need exactly two input variables'
return f'{var_name}={unnamed_input_vars[0]}{self.operation}{unnamed_input_vars[1]}'
-
+
inputs_comma_sep = create_params_string(
unnamed_input_vars, named_input_vars)
diff --git a/src/main/python/systemds/utils/converters.py b/src/main/python/systemds/utils/converters.py
index 5ef41d7..342c7a3 100644
--- a/src/main/python/systemds/utils/converters.py
+++ b/src/main/python/systemds/utils/converters.py
@@ -118,7 +118,7 @@ def pandas_to_frame_block(sds: "SystemDSContext", pd_df: pd.DataFrame):
for i in range(len(schema)):
j_valueTypeArray[i] = schema[i]
for i in range(len(col_names)):
- j_colNameArray[i] = col_names[i]
+ j_colNameArray[i] = str(col_names[i])
j = 0
for j, col_name in enumerate(col_names):
col_data = pd_df[col_name].fillna("").to_numpy(dtype=str)
@@ -150,9 +150,15 @@ def frame_block_to_pandas(sds: "SystemDSContext", fb: JavaObject):
ret.append(ent)
else:
ret.append(None)
- df[fb.getColumnName(c_index)] = ret
+ elif d_type == "Int":
+ byteArray = fb.getColumnAsBytes(c_index)
+ ret = np.frombuffer(byteArray, dtype=np.int32)
+ elif d_type == "Long":
+ byteArray = fb.getColumnAsBytes(c_index)
+ ret = np.frombuffer(byteArray, dtype=np.int64)
else:
- raise NotImplementedError("Not Implemented other types for systemds to pandas parsing")
+ raise NotImplementedError(f'Not Implemented {d_type} for systemds to pandas parsing')
+ df[fb.getColumnName(c_index)] = ret
return df
diff --git a/src/main/python/systemds/utils/helpers.py b/src/main/python/systemds/utils/helpers.py
index 7aae17e..67b4f8d 100644
--- a/src/main/python/systemds/utils/helpers.py
+++ b/src/main/python/systemds/utils/helpers.py
@@ -48,3 +48,25 @@ def get_module_dir() -> os.PathLike:
"""
spec = find_spec(MODULE_NAME)
return spec.submodule_search_locations[0]
+
+
+def get_slice_string(i):
+ if isinstance(i, tuple):
+ if len(i) > 2:
+ raise ValueError(f'Invalid number of dimensions to slice {len(i)}, Only 2 dimensions allowed')
+ else:
+ return f'{get_slice_string(i[0])},{get_slice_string(i[1])}'
+ elif isinstance(i, slice):
+ if i.step:
+ raise ValueError("Invalid to slice with step in systemds")
+ elif i.start == None and i.stop == None:
+ return ''
+ elif i.start == None or i.stop == None:
+ raise NotImplementedError("Not Implemented slice with dynamic end")
+ else:
+ # + 1 since R and systemDS is 1 indexed.
+ return f'{i.start+1}:{i.stop}'
+ else:
+ # + 1 since R and systemDS is 1 indexed.
+ sliceIns = i+1
+ return sliceIns
\ No newline at end of file
diff --git a/src/main/python/tests/frame/test_rIndexing.py b/src/main/python/tests/frame/test_rIndexing.py
new file mode 100644
index 0000000..cb69f03
--- /dev/null
+++ b/src/main/python/tests/frame/test_rIndexing.py
@@ -0,0 +1,100 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import unittest
+
+import numpy as np
+import pandas as pd
+from systemds.context import SystemDSContext
+
+
+class Test_rIndexing(unittest.TestCase):
+
+ sds: SystemDSContext = None
+
+ #shape (4, 3)
+ df = pd.DataFrame(np.arange(0, 100).reshape(10, 10))
+
+ @classmethod
+ def setUpClass(cls):
+ cls.sds = SystemDSContext()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.sds.close()
+
+ def test_1(self):
+ m1 = self.sds.from_pandas(self.df)
+ npres = self.df.loc[4]
+ res = m1[4].compute()
+ self.assertTrue(np.allclose(res, npres))
+
+ def test_2(self):
+ m1 = self.sds.from_pandas(self.df)
+ # Pandas is not consistant with numpy, since it is inclusive ranges
+ # therefore the tests are subtracting 1 from the end of the range.
+ npres = self.df.loc[4:4]
+ res = m1[4:5].compute()
+ self.assertTrue(np.allclose(res, npres))
+
+ def test_3(self):
+ m1 = self.sds.from_pandas(self.df)
+ # Invalid to slice with a step
+ with self.assertRaises(ValueError) as context:
+ res = m1[4:7:2].compute()
+
+ def test_4(self):
+ m1 = self.sds.from_pandas(self.df)
+ npres = np.array(self.df.loc[:,4])
+ res = np.array(m1[:,4].compute()).flatten()
+ self.assertTrue(np.allclose(res, npres))
+
+ def test_5(self):
+ m1 = self.sds.from_pandas(self.df)
+ npres = np.array(self.df.loc[:,4:5])
+ res = np.array(m1[:,4:6].compute())
+ self.assertTrue(np.allclose(res, npres))
+
+ def test_6(self):
+ m1 = self.sds.from_pandas(self.df)
+ npres = self.df.loc[1:1,4:5]
+ res = m1[1:2,4:6].compute()
+ self.assertTrue(np.allclose(res, npres))
+
+ def test_7(self):
+ m1 = self.sds.from_pandas(self.df)
+ npres = self.df.loc[1,4:5]
+ res = m1[1,4:6].compute()
+ self.assertTrue(np.allclose(res, npres))
+
+ def test_8(self):
+ m1 = self.sds.from_pandas(self.df)
+ with self.assertRaises(NotImplementedError) as context:
+ res = m1[1:,4:6].compute()
+
+ def test_9(self):
+ m1 = self.sds.from_pandas(self.df)
+ with self.assertRaises(NotImplementedError) as context:
+ res = m1[:3,4:6].compute()
+
+
+if __name__ == "__main__":
+ unittest.main(exit=False)
diff --git a/src/main/python/tests/matrix/test_rIndexing.py b/src/main/python/tests/matrix/test_rIndexing.py
index 1d25dab..01401ad 100644
--- a/src/main/python/tests/matrix/test_rIndexing.py
+++ b/src/main/python/tests/matrix/test_rIndexing.py
@@ -38,14 +38,66 @@ class Test_rIndexing(unittest.TestCase):
cls.sds.close()
def test_1(self):
- npA = np.zeros((10, 2))
+ npA = np.arange(0, 100).reshape(10, 10)
m1 = self.sds.from_numpy(npA)
- npres = npA[4,]
- print(npres)
- res = m1[4,].compute()
- print(res)
+ npres = npA[4]
+ res = m1[4].compute()
self.assertTrue(np.allclose(res, npres))
+ def test_2(self):
+ npA = np.arange(0, 100).reshape(10, 10)
+ m1 = self.sds.from_numpy(npA)
+ npres = npA[4:5]
+ res = m1[4:5].compute()
+ self.assertTrue(np.allclose(res, npres))
+
+ def test_3(self):
+ npA = np.arange(0, 100).reshape(10, 10)
+ m1 = self.sds.from_numpy(npA)
+ # Invalid to slice with a step
+ with self.assertRaises(ValueError) as context:
+ res = m1[4:7:2].compute()
+
+ def test_4(self):
+ npA = np.arange(0, 100).reshape(10, 10)
+ m1 = self.sds.from_numpy(npA)
+ npres = npA[:,4]
+ res = m1[:,4].compute().flatten()
+ self.assertTrue(np.allclose(res, npres))
+
+ def test_5(self):
+ npA = np.arange(0, 100).reshape(10, 10)
+ m1 = self.sds.from_numpy(npA)
+ npres = npA[:,4:6]
+ res = m1[:,4:6].compute()
+ self.assertTrue(np.allclose(res, npres))
+
+ def test_6(self):
+ npA = np.arange(0, 100).reshape(10, 10)
+ m1 = self.sds.from_numpy(npA)
+ npres = npA[1:2,4:6]
+ res = m1[1:2,4:6].compute()
+ self.assertTrue(np.allclose(res, npres))
+
+ def test_7(self):
+ npA = np.arange(0, 100).reshape(10, 10)
+ m1 = self.sds.from_numpy(npA)
+ npres = npA[1,4:6]
+ res = m1[1,4:6].compute()
+ self.assertTrue(np.allclose(res, npres))
+
+ def test_8(self):
+ npA = np.arange(0, 100).reshape(10, 10)
+ m1 = self.sds.from_numpy(npA)
+ with self.assertRaises(NotImplementedError) as context:
+ res = m1[1:,4:6].compute()
+
+ def test_9(self):
+ npA = np.arange(0, 100).reshape(10, 10)
+ m1 = self.sds.from_numpy(npA)
+ with self.assertRaises(NotImplementedError) as context:
+ res = m1[:3,4:6].compute()
+
if __name__ == "__main__":
unittest.main(exit=False)