You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2022/08/19 15:28:19 UTC

[systemds] branch main updated (ec06eb32a2 -> 22a8e12d38)

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


    from ec06eb32a2 [MINOR][DOCS] Update Java API Documentation
     new 9fe84b5306 [SYSTEMDS-2835] Add a federated version of adult_neural and mnist tests
     new 22a8e12d38 [MINOR] Add stats test for python

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/sysds/api/jmlc/Connection.java |   1 +
 .../controlprogram/caching/CacheableData.java      |   8 +-
 .../cp/ParamservBuiltinCPInstruction.java          |   1 +
 .../instructions/fed/InitFEDInstruction.java       |   4 +
 .../python/systemds/context/systemds_context.py    |  72 +++++++++++-
 .../python/systemds/examples/tutorials/adult.py    |   4 +-
 .../python/systemds/operator/operation_node.py     |   2 +
 .../python/tests/basics/test_context_creation.py   |   1 +
 .../test_context_stats.py}                         |  37 ++++---
 .../python/tests/examples/tutorials/test_adult.py  |   4 +-
 .../test_federated_adult_neural.py}                |  93 ++++++++++++++--
 .../python/tests/federated/test_federated_mnist.py | 123 +++++++++++++++++++++
 12 files changed, 317 insertions(+), 33 deletions(-)
 copy src/main/python/tests/{matrix/test_r_c_bind.py => basics/test_context_stats.py} (63%)
 copy src/main/python/tests/{examples/tutorials/test_adult_neural.py => federated/test_federated_adult_neural.py} (54%)
 create mode 100644 src/main/python/tests/federated/test_federated_mnist.py


[systemds] 01/02: [SYSTEMDS-2835] Add a federated version of adult_neural and mnist tests

Posted by ba...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 9fe84b53060c99c8298b0fe8e5262e606b5cdfe3
Author: Kevin Innerebner <ke...@gmail.com>
AuthorDate: Wed Jul 20 15:12:34 2022 +0200

    [SYSTEMDS-2835] Add a federated version of adult_neural and mnist tests
    
    - Fixes a bug where instructions were not replaced by FED equivalent
    instructions, because the correct `CompilerConfig` option was not set.
    - Remove unnecessary CompilerConfigs for Python API
    - Add a federated version of adult_neural and mnist tests
    - Minor fix for the adult example testcase. We adapt to the changed
    label data (from series to dataframe).
    
    Closes #1668
---
 .../java/org/apache/sysds/api/jmlc/Connection.java |   1 +
 .../controlprogram/caching/CacheableData.java      |   8 +-
 .../cp/ParamservBuiltinCPInstruction.java          |   1 +
 .../instructions/fed/InitFEDInstruction.java       |   4 +
 .../python/systemds/context/systemds_context.py    |  72 +++++++-
 .../python/systemds/examples/tutorials/adult.py    |   4 +-
 .../python/systemds/operator/operation_node.py     |   2 +
 .../python/tests/examples/tutorials/test_adult.py  |   4 +-
 .../tests/federated/test_federated_adult_neural.py | 201 +++++++++++++++++++++
 .../python/tests/federated/test_federated_mnist.py | 123 +++++++++++++
 10 files changed, 413 insertions(+), 7 deletions(-)

diff --git a/src/main/java/org/apache/sysds/api/jmlc/Connection.java b/src/main/java/org/apache/sysds/api/jmlc/Connection.java
index 7037e2172b..64cb504360 100644
--- a/src/main/java/org/apache/sysds/api/jmlc/Connection.java
+++ b/src/main/java/org/apache/sysds/api/jmlc/Connection.java
@@ -150,6 +150,7 @@ public class Connection implements Closeable
 		_cconf.set(ConfigType.IGNORE_TEMPORARY_FILENAMES, true);
 		_cconf.set(ConfigType.REJECT_READ_WRITE_UNKNOWNS, false);
 		_cconf.set(ConfigType.ALLOW_CSE_PERSISTENT_READS, false);
+		_cconf.set(ConfigType.ALLOW_INDIVIDUAL_SB_SPECIFIC_OPS, false);
 
 		//disable caching globally 
 		CacheableData.disableCaching();
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index 8cb108d478..6dd726db6f 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -392,8 +392,12 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 		if(_fedMapping == null && _metaData instanceof MetaDataFormat){
 			MetaDataFormat mdf = (MetaDataFormat) _metaData;
 			if(mdf.getFileFormat() == FileFormat.FEDERATED){
-				InitFEDInstruction.federateMatrix(
-					this, ReaderWriterFederated.read(_hdfsFileName, mdf.getDataCharacteristics()));
+				if (this instanceof FrameObject)
+					InitFEDInstruction.federateFrame((FrameObject) this,
+						ReaderWriterFederated.read(_hdfsFileName, mdf.getDataCharacteristics()));
+				else
+					InitFEDInstruction.federateMatrix(
+							this, ReaderWriterFederated.read(_hdfsFileName, mdf.getDataCharacteristics()));
 				return true;
 			}
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
index ef45a9c2b3..b4c3c64553 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
@@ -87,6 +87,7 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc
 	@Override
 	public void processInstruction(ExecutionContext ec) {
 		// check if the input is federated
+		// FIXME: does not work if features are federated, but labels are not
 		if(ec.getMatrixObject(getParam(PS_FEATURES)).isFederated() ||
 			ec.getMatrixObject(getParam(PS_LABELS)).isFederated()) {
 			runFederated(ec);
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
index 3e648bbe3b..68965db4c3 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
@@ -451,6 +451,10 @@ public class InitFEDInstruction extends FEDInstruction implements LineageTraceab
 			LOG.debug("Fed map Inited:" + output.getFedMapping());
 	}
 
+	public static void federateFrame(FrameObject output, List<Pair<FederatedRange, FederatedData>> workers) {
+		federateFrame(output, workers, null);
+	}
+
 	public static void federateFrame(FrameObject output, List<Pair<FederatedRange, FederatedData>> workers, CacheBlock[] blocks) {
 		List<Pair<FederatedRange, FederatedData>> fedMapping = new ArrayList<>();
 		for(Pair<FederatedRange, FederatedData> e : workers)
diff --git a/src/main/python/systemds/context/systemds_context.py b/src/main/python/systemds/context/systemds_context.py
index d25805cf96..e467a4d65a 100644
--- a/src/main/python/systemds/context/systemds_context.py
+++ b/src/main/python/systemds/context/systemds_context.py
@@ -31,6 +31,7 @@ from subprocess import PIPE, Popen
 from threading import Thread
 from time import sleep
 from typing import Dict, Iterable, Sequence, Tuple, Union
+from contextlib import contextmanager
 
 import numpy as np
 import pandas as pd
@@ -51,8 +52,10 @@ class SystemDSContext(object):
     """
 
     java_gateway: JavaGateway
+    _capture_statistics: bool
+    _statistics: str
 
-    def __init__(self, port: int = -1):
+    def __init__(self, port: int = -1, capture_statistics: bool = False):
         """Starts a new instance of SystemDSContext, in which the connection to a JVM systemds instance is handled
         Any new instance of this SystemDS Context, would start a separate new JVM.
 
@@ -61,6 +64,8 @@ class SystemDSContext(object):
         """
         actual_port = self.__start(port)
         process = self.__process
+        self._statistics = ""
+        self._capture_statistics = capture_statistics
         if process.poll() is None:
             self.__start_gateway(actual_port)
         else:
@@ -306,6 +311,71 @@ class SystemDSContext(object):
         s.close()
         return port
 
+    def _execution_completed(self, script: 'DMLScript'):
+        """
+        Should/will be called after execution of a script.
+        Used to update statistics.
+        :param script: The script that got executed
+        """
+        if self._capture_statistics:
+            self._statistics += script.prepared_script.statistics()
+
+    def capture_stats(self, enable: bool = True):
+        """
+        Enable (or disable) capturing of execution statistics.
+        :param enable: if `True` enable capturing, else disable it
+        """
+        self._capture_statistics = enable
+        self.java_gateway.entry_point.getConnection().setStatistics(enable)
+
+    @contextmanager
+    def capture_stats_context(self):
+        """
+        Context for capturing statistics. Should be used in a `with` statement.
+        Afterwards capturing will be reset to the state it was before.
+
+        Example:
+        ```Python
+        with sds.capture_stats_context():
+            a = some_computation.compute()
+            b = another_computation.compute()
+        print(sds.take_stats())
+        ```
+        :return: a context object to be used in a `with` statement
+        """
+        was_enabled = self._capture_statistics
+        try:
+            self.capture_stats(True)
+            yield None
+        finally:
+            self.capture_stats(was_enabled)
+
+    def get_stats(self):
+        """
+        Get the captured statistics. Will not clear the captured statistics.
+
+        See `take_stats()` for an option that also clears the captured statistics.
+        :return: The captured statistics
+        """
+        return self._statistics
+
+    def take_stats(self):
+        """
+        Get the captured statistics and clear the captured statistics.
+
+        See `get_stats()` for an option that does not clear the captured statistics.
+        :return: The captured statistics
+        """
+        stats = self.get_stats()
+        self.clear_stats()
+        return stats
+
+    def clear_stats(self):
+        """
+        Clears the captured statistics.
+        """
+        self._statistics = ""
+
     def full(self, shape: Tuple[int, int], value: Union[float, int]) -> 'Matrix':
         """Generates a matrix completely filled with a value
 
diff --git a/src/main/python/systemds/examples/tutorials/adult.py b/src/main/python/systemds/examples/tutorials/adult.py
index f15ed382e9..575697ae01 100644
--- a/src/main/python/systemds/examples/tutorials/adult.py
+++ b/src/main/python/systemds/examples/tutorials/adult.py
@@ -63,7 +63,7 @@ class DataManager:
 
     def get_train_labels_pandas(self) -> pd.DataFrame:
         self._get_data(self._train_data_loc)
-        return self._parse_data(self._train_data_loc)["income"]
+        return self._parse_data(self._train_data_loc)[["income"]]
 
     def get_train_labels(self, sds: SystemDSContext) -> 'Frame':
         self._get_data(self._train_data_loc)
@@ -80,7 +80,7 @@ class DataManager:
 
     def get_test_labels_pandas(self) -> pd.DataFrame:
         self._get_data(self._test_data_loc)
-        return self._parse_data(self._test_data_loc)["income"]
+        return self._parse_data(self._test_data_loc)[["income"]]
 
     def get_test_labels(self, sds: SystemDSContext) -> 'Frame':
         self._get_data(self._test_data_loc)
diff --git a/src/main/python/systemds/operator/operation_node.py b/src/main/python/systemds/operator/operation_node.py
index 99d823bc14..5db90b3a9f 100644
--- a/src/main/python/systemds/operator/operation_node.py
+++ b/src/main/python/systemds/operator/operation_node.py
@@ -100,6 +100,8 @@ class OperationNode(DAGNode):
             else:
                 result_variables = self._script.execute()
 
+            self.sds_context._execution_completed(self._script)
+
             if result_variables is not None:
                 self._result_var = self._parse_output_result_variables(
                     result_variables)
diff --git a/src/main/python/tests/examples/tutorials/test_adult.py b/src/main/python/tests/examples/tutorials/test_adult.py
index d327676977..c6b1018658 100644
--- a/src/main/python/tests/examples/tutorials/test_adult.py
+++ b/src/main/python/tests/examples/tutorials/test_adult.py
@@ -58,7 +58,7 @@ class TestAdultStandardML(unittest.TestCase):
 
     def test_train_labels(self):
         y = self.d.get_train_labels_pandas()
-        self.assertEqual((32561,), y.shape)
+        self.assertEqual((32561, 1), y.shape)
 
     def test_test_data(self):
         x_l = self.d.get_test_data_pandas()
@@ -66,7 +66,7 @@ class TestAdultStandardML(unittest.TestCase):
 
     def test_test_labels(self):
         y_l = self.d.get_test_labels_pandas()
-        self.assertEqual((16281,), y_l.shape)
+        self.assertEqual((16281, 1), y_l.shape)
 
     def test_train_data_pandas_vs_systemds(self):
         pandas = self.d.get_train_data_pandas()
diff --git a/src/main/python/tests/federated/test_federated_adult_neural.py b/src/main/python/tests/federated/test_federated_adult_neural.py
new file mode 100644
index 0000000000..df65565c0b
--- /dev/null
+++ b/src/main/python/tests/federated/test_federated_adult_neural.py
@@ -0,0 +1,201 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+import itertools
+import shutil
+import unittest
+import io
+import json
+import pandas as pd
+from os import path, makedirs
+
+from systemds.context import SystemDSContext
+from systemds.examples.tutorials.adult import DataManager
+from systemds.operator.algorithm.builtin.scale import scale
+from systemds.operator.algorithm.builtin.scaleApply import scaleApply
+
+
+def create_schema(dataset):
+    schema = []
+    for dtype in dataset.dtypes:
+        if pd.api.types.is_integer_dtype(dtype):
+            schema.append('int64')
+        elif pd.api.types.is_float_dtype(dtype):
+            schema.append('fp64')
+        elif pd.api.types.is_bool_dtype(dtype):
+            schema.append('bool')
+        else:
+            schema.append('string')
+    return ','.join(schema)
+
+
+def create_row_federated_dataset(name, dataset, num_parts=2,
+                                 federated_workers=None):
+    if federated_workers is None:
+        federated_workers = ["localhost:8001", "localhost:8002"]
+    tempdir = "./tests/federated/tmp/test_federated_adult_neural/"
+    federated_file = path.join(tempdir, f"{name}.fed")
+    makedirs(tempdir, exist_ok=True)
+
+    schema = create_schema(dataset)
+    r = dataset.shape[0] // num_parts
+    rs = [r for _ in range(num_parts - 1)] + [dataset.shape[0] - r * (num_parts - 1)]
+    c = dataset.shape[1]
+
+    fed_file_content = []
+    rows_processed = 0
+    for worker_id, address, rows in zip(range(num_parts), itertools.cycle(federated_workers), rs):
+        dataset_part_path = path.join(tempdir, f"{name}{worker_id}.csv")
+        mtd = {"format": "csv", "header": True, "rows": rows, "cols": c,
+               "data_type": "frame", "schema": schema}
+
+        dataset_part = dataset[rows_processed:rows_processed + rows]
+        dataset_part.to_csv(dataset_part_path, index=False)
+        with io.open(f"{dataset_part_path}.mtd", "w", encoding="utf-8") as f:
+            json.dump(mtd, f, ensure_ascii=False)
+
+        fed_file_content.append({
+            "address": address,
+            "dataType": "FRAME",
+            "filepath": dataset_part_path,
+            "begin": [rows_processed, 0],
+            "end": [rows_processed + rows, c],
+        })
+        rows_processed += rows
+
+    with open(federated_file, "w", encoding="utf-8") as f:
+        json.dump(fed_file_content, f)
+    with open(federated_file + '.mtd', "w", encoding="utf-8") as f:
+        json.dump({"format": "federated", "rows": dataset.shape[0], "cols": c,
+                   "data_type": "frame", "schema": schema}, f)
+
+    return federated_file
+
+
+class TestFederatedAdultNeural(unittest.TestCase):
+    """
+    Test class for adult neural network code
+    """
+
+    sds: SystemDSContext = None
+    d: DataManager = None
+    neural_net_src_path: str = "tests/examples/tutorials/neural_net_source.dml"
+    preprocess_src_path: str = "tests/examples/tutorials/preprocess.dml"
+    data_path_train: str = ""
+    data_path_test: str = ""
+    labels_path_train: str = ""
+    labels_path_test: str = ""
+    dataset_jspec: str = "../../test/resources/datasets/adult/jspec.json"
+
+    train_count: int = 15000
+    test_count: int = 300
+
+    network_dir: str = "tests/examples/tutorials/model"
+    network: str = network_dir + "/fnn"
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+        cls.d = DataManager()
+        cls.data_path_train = create_row_federated_dataset("train_data",
+                                                           cls.d.get_train_data_pandas()[0:cls.train_count])
+        cls.labels_path_train = create_row_federated_dataset("train_labels",
+                                                             cls.d.get_train_labels_pandas()[0:cls.train_count])
+        cls.data_path_test = create_row_federated_dataset("test_data",
+                                                          cls.d.get_test_data_pandas()[0:cls.test_count])
+        cls.labels_path_test = create_row_federated_dataset("test_labels",
+                                                            cls.d.get_test_labels_pandas()[0:cls.test_count])
+        shutil.rmtree(cls.network_dir, ignore_errors=True)
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+        shutil.rmtree(cls.network_dir, ignore_errors=True)
+
+    # Tests
+
+    @unittest.skip("`toOneHot()` won't be federated -> param-server won't work")
+    def test_train_neural_net(self):
+        self.train_neural_net_and_save()
+        self.eval_neural_net()
+
+    @unittest.skip("`toOneHot()` won't be federated -> param-server won't work")
+    def test_train_predict(self):
+        self.train_neural_net_and_predict()
+
+    # Helper methods
+
+    def prepare_x(self):
+        jspec = self.d.get_jspec(self.sds)
+        train_x_frame = self.sds.read(self.data_path_train)
+        train_x, M1 = train_x_frame.transform_encode(spec=jspec)
+        test_x_frame = self.sds.read(self.data_path_test)
+        test_x = test_x_frame.transform_apply(spec=jspec, meta=M1)
+        # Scale and shift .... not needed because of sigmoid layer,
+        # could be useful therefore tested.
+        [train_x, ce, sc] = scale(train_x)
+        test_x = scaleApply(test_x, ce, sc)
+        return [train_x, test_x]
+
+    def prepare_y(self):
+        jspec_dict = {"recode": ["income"]}
+        jspec_labels = self.sds.scalar(f'"{jspec_dict}"')
+        train_y_frame = self.sds.read(self.labels_path_train)
+        train_y, M2 = train_y_frame.transform_encode(spec=jspec_labels)
+        test_y_frame = self.sds.read(self.labels_path_test)
+        test_y = test_y_frame.transform_apply(spec=jspec_labels, meta=M2)
+        labels = 2
+        train_y = train_y.to_one_hot(labels)
+        test_y = test_y.to_one_hot(labels)
+        return [train_y, test_y]
+
+    def prepare(self):
+        x = self.prepare_x()
+        y = self.prepare_y()
+        return [x[0], x[1], y[0], y[1]]
+
+    def train_neural_net_and_save(self):
+        [train_x, _, train_y, _] = self.prepare()
+        FFN_package = self.sds.source(self.neural_net_src_path, "fnn")
+        network = FFN_package.train(train_x, train_y, 4, 16, 0.01, 1)
+        network.write(self.network).compute()
+
+    def train_neural_net_and_predict(self):
+        [train_x, test_x, train_y, test_y] = self.prepare()
+        FFN_package = self.sds.source(self.neural_net_src_path, "fnn")
+        network = FFN_package.train_paramserv(
+            train_x, train_y, 1, 16, 0.01, 2, 1)
+        probs = FFN_package.predict(test_x, network)
+        accuracy = FFN_package.eval(probs, test_y).compute()
+        # accuracy is returned in percent
+        self.assertTrue(accuracy > 0.80)
+
+    def eval_neural_net(self):
+        [_, test_x, _, test_y] = self.prepare()
+        network = self.sds.read(self.network)
+        FFN_package = self.sds.source(self.neural_net_src_path, "fnn")
+        probs = FFN_package.predict(test_x, network)
+        accuracy = FFN_package.eval(probs, test_y).compute()
+        # accuracy is returned in percent
+        self.assertTrue(accuracy > 0.80)
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)
diff --git a/src/main/python/tests/federated/test_federated_mnist.py b/src/main/python/tests/federated/test_federated_mnist.py
new file mode 100644
index 0000000000..3b11bd3194
--- /dev/null
+++ b/src/main/python/tests/federated/test_federated_mnist.py
@@ -0,0 +1,123 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import unittest
+import itertools
+import io
+import json
+import pandas as pd
+from os import path, makedirs
+
+from systemds.context import SystemDSContext
+from systemds.examples.tutorials.mnist import DataManager
+from systemds.operator.algorithm import kmeans, multiLogReg, multiLogRegPredict
+
+
+def create_row_federated_dataset(name, dataset, num_parts=2,
+                                 federated_workers=None):
+    if federated_workers is None:
+        federated_workers = ["localhost:8001", "localhost:8002"]
+    tempdir = "./tests/federated/tmp/test_federated_mnist/"
+    federated_file = path.join(tempdir, f"{name}.fed")
+    makedirs(tempdir, exist_ok=True)
+
+    r = dataset.shape[0] // num_parts
+    rs = [r for _ in range(num_parts - 1)] + [dataset.shape[0] - r * (num_parts - 1)]
+    c = dataset.shape[1]
+
+    fed_file_content = []
+    rows_processed = 0
+    for worker_id, address, rows in zip(range(num_parts), itertools.cycle(federated_workers), rs):
+        dataset_part_path = path.join(tempdir, f"{name}{worker_id}.csv")
+        mtd = {"format": "csv", "rows": rows, "cols": c,
+               "data_type": "matrix", "value_type": "double"}
+
+        dataset_part = dataset[rows_processed:rows_processed + rows]
+        pd.DataFrame(dataset_part).to_csv(dataset_part_path, index=False, header=False)
+        with io.open(f"{dataset_part_path}.mtd", "w", encoding="utf-8") as f:
+            json.dump(mtd, f, ensure_ascii=False)
+
+        fed_file_content.append({
+            "address": address,
+            "dataType": "MATRIX",
+            "filepath": dataset_part_path,
+            "begin": [rows_processed, 0],
+            "end": [rows_processed + rows, c],
+        })
+        rows_processed += rows
+
+    with open(federated_file, "w", encoding="utf-8") as f:
+        json.dump(fed_file_content, f)
+    with open(federated_file + '.mtd', "w", encoding="utf-8") as f:
+        json.dump({"format": "federated", "rows": dataset.shape[0], "cols": c,
+                   "data_type": "matrix", "value_type": "double"}, f)
+
+    return federated_file
+
+
+class TestFederatedMnist(unittest.TestCase):
+    """
+    Test class for mnist dml script tutorial code.
+    """
+
+    sds: SystemDSContext = None
+    d: DataManager = None
+    base_path = "systemds/examples/tutorials/mnist/"
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+        cls.d = DataManager()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+
+    def test_multi_log_reg(self):
+        # Reduced because we want the tests to finish a bit faster.
+        train_count = 15000
+        test_count = 5000
+        # Train data
+        np_train_data = self.d.get_train_data().reshape(60000, 28 * 28)[0:train_count]
+        data_path_train = create_row_federated_dataset("train_data", np_train_data)
+        X = self.sds.read(data_path_train)
+        Y = self.sds.from_numpy(self.d.get_train_labels()[:train_count])
+        Y = Y + 1.0
+
+        # Test data
+        np_test_data = self.d.get_test_data().reshape(10000, 28 * 28)[0:test_count]
+        data_path_test = create_row_federated_dataset("test_data", np_test_data)
+        Xt = self.sds.read(data_path_test)
+        Yt = self.sds.from_numpy(self.d.get_test_labels()[:test_count])
+        Yt = Yt + 1.0
+
+        bias = multiLogReg(X, Y)
+
+        with self.sds.capture_stats_context():
+            [_, _, acc] = multiLogRegPredict(Xt, bias, Yt).compute()
+        stats = self.sds.take_stats()
+        for fed_instr in ["fed_isnan", "fed_*", "fed_-", "fed_uark+", "fed_r'", "fed_rightIndex"]:
+            self.assertIn(fed_instr, stats)
+        self.assertGreater(acc, 80)
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)


[systemds] 02/02: [MINOR] Add stats test for python

Posted by ba...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 22a8e12d38f9134c4eb006d1ceca6f773a412416
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Fri Aug 19 16:44:35 2022 +0200

    [MINOR] Add stats test for python
---
 .../python/tests/basics/test_context_creation.py   |  1 +
 ...t_context_creation.py => test_context_stats.py} | 58 +++++++++++++---------
 2 files changed, 36 insertions(+), 23 deletions(-)

diff --git a/src/main/python/tests/basics/test_context_creation.py b/src/main/python/tests/basics/test_context_creation.py
index 1a70deb4ca..6a72757255 100644
--- a/src/main/python/tests/basics/test_context_creation.py
+++ b/src/main/python/tests/basics/test_context_creation.py
@@ -49,3 +49,4 @@ class TestContextCreation(unittest.TestCase):
         b.close()
         c.close()
         d.close()
+
diff --git a/src/main/python/tests/basics/test_context_creation.py b/src/main/python/tests/basics/test_context_stats.py
similarity index 55%
copy from src/main/python/tests/basics/test_context_creation.py
copy to src/main/python/tests/basics/test_context_stats.py
index 1a70deb4ca..745629868c 100644
--- a/src/main/python/tests/basics/test_context_creation.py
+++ b/src/main/python/tests/basics/test_context_stats.py
@@ -21,31 +21,43 @@
 
 import unittest
 
+import numpy as np
 from systemds.context import SystemDSContext
+np.random.seed(1412)
 
 
 class TestContextCreation(unittest.TestCase):
 
-    def test_same_port(self):
-        # Same port should graciously change port
-        sds1 = SystemDSContext(port=9415)
-        sds2 = SystemDSContext(port=9415)
-        sds1.close()
-        sds2.close()
-
-    def test_create_10_contexts(self):
-        # Creating multiple contexts and closing them should be no problem.
-        for _ in range(0, 10):
-            SystemDSContext().close()
-
-    def test_create_multiple_context(self):
-        # Creating multiple contexts in sequence but open at the same time is okay.
-        a = SystemDSContext()
-        b = SystemDSContext()
-        c = SystemDSContext()
-        d = SystemDSContext()
-
-        a.close()
-        b.close()
-        c.close()
-        d.close()
+    sds: SystemDSContext = None
+
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+
+    def getM(self):
+        m1 = np.array(np.random.randint(10, size=5*5), dtype=np.int)
+        m1.shape = (5, 5)
+        return m1
+
+    def test_stats_v1(self):
+        a = self.sds.from_numpy(self.getM())
+        a = a + 1
+        a = a * 4
+        a = a + 3
+        a = a / 23
+
+        self.sds.capture_stats()
+        a.compute()
+        self.sds.capture_stats(False)
+
+        stats = self.sds.get_stats()
+        self.sds.clear_stats()
+        instructions = "\n".join(stats.split("Heavy hitter instructions:")[1].split("\n")[2:])
+        assert("+" in instructions and "*" in instructions and "/" in instructions)
+
+