You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2022/08/19 15:28:20 UTC

[systemds] 01/02: [SYSTEMDS-2835] Add a federated version of adult_neural and mnist tests

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 9fe84b53060c99c8298b0fe8e5262e606b5cdfe3
Author: Kevin Innerebner <ke...@gmail.com>
AuthorDate: Wed Jul 20 15:12:34 2022 +0200

    [SYSTEMDS-2835] Add a federated version of adult_neural and mnist tests
    
    - Fixes a bug where instructions were not replaced by FED equivalent
    instructions, because the correct `CompilerConfig` option was not set.
    - Remove unnecessary CompilerConfigs for Python API
    - Add a federated version of adult_neural and mnist tests
    - Minor fix for the adult example testcase. We adapt to the changed
    label data (from series to dataframe).
    
    Closes #1668
---
 .../java/org/apache/sysds/api/jmlc/Connection.java |   1 +
 .../controlprogram/caching/CacheableData.java      |   8 +-
 .../cp/ParamservBuiltinCPInstruction.java          |   1 +
 .../instructions/fed/InitFEDInstruction.java       |   4 +
 .../python/systemds/context/systemds_context.py    |  72 +++++++-
 .../python/systemds/examples/tutorials/adult.py    |   4 +-
 .../python/systemds/operator/operation_node.py     |   2 +
 .../python/tests/examples/tutorials/test_adult.py  |   4 +-
 .../tests/federated/test_federated_adult_neural.py | 201 +++++++++++++++++++++
 .../python/tests/federated/test_federated_mnist.py | 123 +++++++++++++
 10 files changed, 413 insertions(+), 7 deletions(-)

diff --git a/src/main/java/org/apache/sysds/api/jmlc/Connection.java b/src/main/java/org/apache/sysds/api/jmlc/Connection.java
index 7037e2172b..64cb504360 100644
--- a/src/main/java/org/apache/sysds/api/jmlc/Connection.java
+++ b/src/main/java/org/apache/sysds/api/jmlc/Connection.java
@@ -150,6 +150,7 @@ public class Connection implements Closeable
 		_cconf.set(ConfigType.IGNORE_TEMPORARY_FILENAMES, true);
 		_cconf.set(ConfigType.REJECT_READ_WRITE_UNKNOWNS, false);
 		_cconf.set(ConfigType.ALLOW_CSE_PERSISTENT_READS, false);
+		_cconf.set(ConfigType.ALLOW_INDIVIDUAL_SB_SPECIFIC_OPS, false);
 
 		//disable caching globally 
 		CacheableData.disableCaching();
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index 8cb108d478..6dd726db6f 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -392,8 +392,12 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 		if(_fedMapping == null && _metaData instanceof MetaDataFormat){
 			MetaDataFormat mdf = (MetaDataFormat) _metaData;
 			if(mdf.getFileFormat() == FileFormat.FEDERATED){
-				InitFEDInstruction.federateMatrix(
-					this, ReaderWriterFederated.read(_hdfsFileName, mdf.getDataCharacteristics()));
+				if (this instanceof FrameObject)
+					InitFEDInstruction.federateFrame((FrameObject) this,
+						ReaderWriterFederated.read(_hdfsFileName, mdf.getDataCharacteristics()));
+				else
+					InitFEDInstruction.federateMatrix(
+							this, ReaderWriterFederated.read(_hdfsFileName, mdf.getDataCharacteristics()));
 				return true;
 			}
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
index ef45a9c2b3..b4c3c64553 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
@@ -87,6 +87,7 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc
 	@Override
 	public void processInstruction(ExecutionContext ec) {
 		// check if the input is federated
+		// FIXME: does not work if features are federated, but labels are not
 		if(ec.getMatrixObject(getParam(PS_FEATURES)).isFederated() ||
 			ec.getMatrixObject(getParam(PS_LABELS)).isFederated()) {
 			runFederated(ec);
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
index 3e648bbe3b..68965db4c3 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/fed/InitFEDInstruction.java
@@ -451,6 +451,10 @@ public class InitFEDInstruction extends FEDInstruction implements LineageTraceab
 			LOG.debug("Fed map Inited:" + output.getFedMapping());
 	}
 
+	public static void federateFrame(FrameObject output, List<Pair<FederatedRange, FederatedData>> workers) {
+		federateFrame(output, workers, null);
+	}
+
 	public static void federateFrame(FrameObject output, List<Pair<FederatedRange, FederatedData>> workers, CacheBlock[] blocks) {
 		List<Pair<FederatedRange, FederatedData>> fedMapping = new ArrayList<>();
 		for(Pair<FederatedRange, FederatedData> e : workers)
diff --git a/src/main/python/systemds/context/systemds_context.py b/src/main/python/systemds/context/systemds_context.py
index d25805cf96..e467a4d65a 100644
--- a/src/main/python/systemds/context/systemds_context.py
+++ b/src/main/python/systemds/context/systemds_context.py
@@ -31,6 +31,7 @@ from subprocess import PIPE, Popen
 from threading import Thread
 from time import sleep
 from typing import Dict, Iterable, Sequence, Tuple, Union
+from contextlib import contextmanager
 
 import numpy as np
 import pandas as pd
@@ -51,8 +52,10 @@ class SystemDSContext(object):
     """
 
     java_gateway: JavaGateway
+    _capture_statistics: bool
+    _statistics: str
 
-    def __init__(self, port: int = -1):
+    def __init__(self, port: int = -1, capture_statistics: bool = False):
         """Starts a new instance of SystemDSContext, in which the connection to a JVM systemds instance is handled
         Any new instance of this SystemDS Context, would start a separate new JVM.
 
@@ -61,6 +64,8 @@ class SystemDSContext(object):
         """
         actual_port = self.__start(port)
         process = self.__process
+        self._statistics = ""
+        self._capture_statistics = capture_statistics
         if process.poll() is None:
             self.__start_gateway(actual_port)
         else:
@@ -306,6 +311,71 @@ class SystemDSContext(object):
         s.close()
         return port
 
+    def _execution_completed(self, script: 'DMLScript'):
+        """
+        Should/will be called after execution of a script.
+        Used to update statistics.
+        :param script: The script that got executed
+        """
+        if self._capture_statistics:
+            self._statistics += script.prepared_script.statistics()
+
+    def capture_stats(self, enable: bool = True):
+        """
+        Enable (or disable) capturing of execution statistics.
+        :param enable: if `True` enable capturing, else disable it
+        """
+        self._capture_statistics = enable
+        self.java_gateway.entry_point.getConnection().setStatistics(enable)
+
+    @contextmanager
+    def capture_stats_context(self):
+        """
+        Context for capturing statistics. Should be used in a `with` statement.
+        Afterwards capturing will be reset to the state it was before.
+
+        Example:
+        ```Python
+        with sds.capture_stats_context():
+            a = some_computation.compute()
+            b = another_computation.compute()
+        print(sds.take_stats())
+        ```
+        :return: a context object to be used in a `with` statement
+        """
+        was_enabled = self._capture_statistics
+        try:
+            self.capture_stats(True)
+            yield None
+        finally:
+            self.capture_stats(was_enabled)
+
+    def get_stats(self):
+        """
+        Get the captured statistics. Will not clear the captured statistics.
+
+        See `take_stats()` for an option that also clears the captured statistics.
+        :return: The captured statistics
+        """
+        return self._statistics
+
+    def take_stats(self):
+        """
+        Get the captured statistics and clear the captured statistics.
+
+        See `get_stats()` for an option that does not clear the captured statistics.
+        :return: The captured statistics
+        """
+        stats = self.get_stats()
+        self.clear_stats()
+        return stats
+
+    def clear_stats(self):
+        """
+        Clears the captured statistics.
+        """
+        self._statistics = ""
+
     def full(self, shape: Tuple[int, int], value: Union[float, int]) -> 'Matrix':
         """Generates a matrix completely filled with a value
 
diff --git a/src/main/python/systemds/examples/tutorials/adult.py b/src/main/python/systemds/examples/tutorials/adult.py
index f15ed382e9..575697ae01 100644
--- a/src/main/python/systemds/examples/tutorials/adult.py
+++ b/src/main/python/systemds/examples/tutorials/adult.py
@@ -63,7 +63,7 @@ class DataManager:
 
     def get_train_labels_pandas(self) -> pd.DataFrame:
         self._get_data(self._train_data_loc)
-        return self._parse_data(self._train_data_loc)["income"]
+        return self._parse_data(self._train_data_loc)[["income"]]
 
     def get_train_labels(self, sds: SystemDSContext) -> 'Frame':
         self._get_data(self._train_data_loc)
@@ -80,7 +80,7 @@ class DataManager:
 
     def get_test_labels_pandas(self) -> pd.DataFrame:
         self._get_data(self._test_data_loc)
-        return self._parse_data(self._test_data_loc)["income"]
+        return self._parse_data(self._test_data_loc)[["income"]]
 
     def get_test_labels(self, sds: SystemDSContext) -> 'Frame':
         self._get_data(self._test_data_loc)
diff --git a/src/main/python/systemds/operator/operation_node.py b/src/main/python/systemds/operator/operation_node.py
index 99d823bc14..5db90b3a9f 100644
--- a/src/main/python/systemds/operator/operation_node.py
+++ b/src/main/python/systemds/operator/operation_node.py
@@ -100,6 +100,8 @@ class OperationNode(DAGNode):
             else:
                 result_variables = self._script.execute()
 
+            self.sds_context._execution_completed(self._script)
+
             if result_variables is not None:
                 self._result_var = self._parse_output_result_variables(
                     result_variables)
diff --git a/src/main/python/tests/examples/tutorials/test_adult.py b/src/main/python/tests/examples/tutorials/test_adult.py
index d327676977..c6b1018658 100644
--- a/src/main/python/tests/examples/tutorials/test_adult.py
+++ b/src/main/python/tests/examples/tutorials/test_adult.py
@@ -58,7 +58,7 @@ class TestAdultStandardML(unittest.TestCase):
 
     def test_train_labels(self):
         y = self.d.get_train_labels_pandas()
-        self.assertEqual((32561,), y.shape)
+        self.assertEqual((32561, 1), y.shape)
 
     def test_test_data(self):
         x_l = self.d.get_test_data_pandas()
@@ -66,7 +66,7 @@ class TestAdultStandardML(unittest.TestCase):
 
     def test_test_labels(self):
         y_l = self.d.get_test_labels_pandas()
-        self.assertEqual((16281,), y_l.shape)
+        self.assertEqual((16281, 1), y_l.shape)
 
     def test_train_data_pandas_vs_systemds(self):
         pandas = self.d.get_train_data_pandas()
diff --git a/src/main/python/tests/federated/test_federated_adult_neural.py b/src/main/python/tests/federated/test_federated_adult_neural.py
new file mode 100644
index 0000000000..df65565c0b
--- /dev/null
+++ b/src/main/python/tests/federated/test_federated_adult_neural.py
@@ -0,0 +1,201 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+import itertools
+import shutil
+import unittest
+import io
+import json
+import pandas as pd
+from os import path, makedirs
+
+from systemds.context import SystemDSContext
+from systemds.examples.tutorials.adult import DataManager
+from systemds.operator.algorithm.builtin.scale import scale
+from systemds.operator.algorithm.builtin.scaleApply import scaleApply
+
+
+def create_schema(dataset):
+    schema = []
+    for dtype in dataset.dtypes:
+        if pd.api.types.is_integer_dtype(dtype):
+            schema.append('int64')
+        elif pd.api.types.is_float_dtype(dtype):
+            schema.append('fp64')
+        elif pd.api.types.is_bool_dtype(dtype):
+            schema.append('bool')
+        else:
+            schema.append('string')
+    return ','.join(schema)
+
+
+def create_row_federated_dataset(name, dataset, num_parts=2,
+                                 federated_workers=None):
+    if federated_workers is None:
+        federated_workers = ["localhost:8001", "localhost:8002"]
+    tempdir = "./tests/federated/tmp/test_federated_adult_neural/"
+    federated_file = path.join(tempdir, f"{name}.fed")
+    makedirs(tempdir, exist_ok=True)
+
+    schema = create_schema(dataset)
+    r = dataset.shape[0] // num_parts
+    rs = [r for _ in range(num_parts - 1)] + [dataset.shape[0] - r * (num_parts - 1)]
+    c = dataset.shape[1]
+
+    fed_file_content = []
+    rows_processed = 0
+    for worker_id, address, rows in zip(range(num_parts), itertools.cycle(federated_workers), rs):
+        dataset_part_path = path.join(tempdir, f"{name}{worker_id}.csv")
+        mtd = {"format": "csv", "header": True, "rows": rows, "cols": c,
+               "data_type": "frame", "schema": schema}
+
+        dataset_part = dataset[rows_processed:rows_processed + rows]
+        dataset_part.to_csv(dataset_part_path, index=False)
+        with io.open(f"{dataset_part_path}.mtd", "w", encoding="utf-8") as f:
+            json.dump(mtd, f, ensure_ascii=False)
+
+        fed_file_content.append({
+            "address": address,
+            "dataType": "FRAME",
+            "filepath": dataset_part_path,
+            "begin": [rows_processed, 0],
+            "end": [rows_processed + rows, c],
+        })
+        rows_processed += rows
+
+    with open(federated_file, "w", encoding="utf-8") as f:
+        json.dump(fed_file_content, f)
+    with open(federated_file + '.mtd', "w", encoding="utf-8") as f:
+        json.dump({"format": "federated", "rows": dataset.shape[0], "cols": c,
+                   "data_type": "frame", "schema": schema}, f)
+
+    return federated_file
+
+
+class TestFederatedAdultNeural(unittest.TestCase):
+    """
+    Test class for adult neural network code
+    """
+
+    sds: SystemDSContext = None
+    d: DataManager = None
+    neural_net_src_path: str = "tests/examples/tutorials/neural_net_source.dml"
+    preprocess_src_path: str = "tests/examples/tutorials/preprocess.dml"
+    data_path_train: str = ""
+    data_path_test: str = ""
+    labels_path_train: str = ""
+    labels_path_test: str = ""
+    dataset_jspec: str = "../../test/resources/datasets/adult/jspec.json"
+
+    train_count: int = 15000
+    test_count: int = 300
+
+    network_dir: str = "tests/examples/tutorials/model"
+    network: str = network_dir + "/fnn"
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+        cls.d = DataManager()
+        cls.data_path_train = create_row_federated_dataset("train_data",
+                                                           cls.d.get_train_data_pandas()[0:cls.train_count])
+        cls.labels_path_train = create_row_federated_dataset("train_labels",
+                                                             cls.d.get_train_labels_pandas()[0:cls.train_count])
+        cls.data_path_test = create_row_federated_dataset("test_data",
+                                                          cls.d.get_test_data_pandas()[0:cls.test_count])
+        cls.labels_path_test = create_row_federated_dataset("test_labels",
+                                                            cls.d.get_test_labels_pandas()[0:cls.test_count])
+        shutil.rmtree(cls.network_dir, ignore_errors=True)
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+        shutil.rmtree(cls.network_dir, ignore_errors=True)
+
+    # Tests
+
+    @unittest.skip("`toOneHot()` won't be federated -> param-server won't work")
+    def test_train_neural_net(self):
+        self.train_neural_net_and_save()
+        self.eval_neural_net()
+
+    @unittest.skip("`toOneHot()` won't be federated -> param-server won't work")
+    def test_train_predict(self):
+        self.train_neural_net_and_predict()
+
+    # Helper methods
+
+    def prepare_x(self):
+        jspec = self.d.get_jspec(self.sds)
+        train_x_frame = self.sds.read(self.data_path_train)
+        train_x, M1 = train_x_frame.transform_encode(spec=jspec)
+        test_x_frame = self.sds.read(self.data_path_test)
+        test_x = test_x_frame.transform_apply(spec=jspec, meta=M1)
+        # Scale and shift .... not needed because of sigmoid layer,
+        # could be useful therefore tested.
+        [train_x, ce, sc] = scale(train_x)
+        test_x = scaleApply(test_x, ce, sc)
+        return [train_x, test_x]
+
+    def prepare_y(self):
+        jspec_dict = {"recode": ["income"]}
+        jspec_labels = self.sds.scalar(f'"{jspec_dict}"')
+        train_y_frame = self.sds.read(self.labels_path_train)
+        train_y, M2 = train_y_frame.transform_encode(spec=jspec_labels)
+        test_y_frame = self.sds.read(self.labels_path_test)
+        test_y = test_y_frame.transform_apply(spec=jspec_labels, meta=M2)
+        labels = 2
+        train_y = train_y.to_one_hot(labels)
+        test_y = test_y.to_one_hot(labels)
+        return [train_y, test_y]
+
+    def prepare(self):
+        x = self.prepare_x()
+        y = self.prepare_y()
+        return [x[0], x[1], y[0], y[1]]
+
+    def train_neural_net_and_save(self):
+        [train_x, _, train_y, _] = self.prepare()
+        FFN_package = self.sds.source(self.neural_net_src_path, "fnn")
+        network = FFN_package.train(train_x, train_y, 4, 16, 0.01, 1)
+        network.write(self.network).compute()
+
+    def train_neural_net_and_predict(self):
+        [train_x, test_x, train_y, test_y] = self.prepare()
+        FFN_package = self.sds.source(self.neural_net_src_path, "fnn")
+        network = FFN_package.train_paramserv(
+            train_x, train_y, 1, 16, 0.01, 2, 1)
+        probs = FFN_package.predict(test_x, network)
+        accuracy = FFN_package.eval(probs, test_y).compute()
+        # accuracy is returned in percent
+        self.assertTrue(accuracy > 0.80)
+
+    def eval_neural_net(self):
+        [_, test_x, _, test_y] = self.prepare()
+        network = self.sds.read(self.network)
+        FFN_package = self.sds.source(self.neural_net_src_path, "fnn")
+        probs = FFN_package.predict(test_x, network)
+        accuracy = FFN_package.eval(probs, test_y).compute()
+        # accuracy is returned in percent
+        self.assertTrue(accuracy > 0.80)
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)
diff --git a/src/main/python/tests/federated/test_federated_mnist.py b/src/main/python/tests/federated/test_federated_mnist.py
new file mode 100644
index 0000000000..3b11bd3194
--- /dev/null
+++ b/src/main/python/tests/federated/test_federated_mnist.py
@@ -0,0 +1,123 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import unittest
+import itertools
+import io
+import json
+import pandas as pd
+from os import path, makedirs
+
+from systemds.context import SystemDSContext
+from systemds.examples.tutorials.mnist import DataManager
+from systemds.operator.algorithm import kmeans, multiLogReg, multiLogRegPredict
+
+
+def create_row_federated_dataset(name, dataset, num_parts=2,
+                                 federated_workers=None):
+    if federated_workers is None:
+        federated_workers = ["localhost:8001", "localhost:8002"]
+    tempdir = "./tests/federated/tmp/test_federated_mnist/"
+    federated_file = path.join(tempdir, f"{name}.fed")
+    makedirs(tempdir, exist_ok=True)
+
+    r = dataset.shape[0] // num_parts
+    rs = [r for _ in range(num_parts - 1)] + [dataset.shape[0] - r * (num_parts - 1)]
+    c = dataset.shape[1]
+
+    fed_file_content = []
+    rows_processed = 0
+    for worker_id, address, rows in zip(range(num_parts), itertools.cycle(federated_workers), rs):
+        dataset_part_path = path.join(tempdir, f"{name}{worker_id}.csv")
+        mtd = {"format": "csv", "rows": rows, "cols": c,
+               "data_type": "matrix", "value_type": "double"}
+
+        dataset_part = dataset[rows_processed:rows_processed + rows]
+        pd.DataFrame(dataset_part).to_csv(dataset_part_path, index=False, header=False)
+        with io.open(f"{dataset_part_path}.mtd", "w", encoding="utf-8") as f:
+            json.dump(mtd, f, ensure_ascii=False)
+
+        fed_file_content.append({
+            "address": address,
+            "dataType": "MATRIX",
+            "filepath": dataset_part_path,
+            "begin": [rows_processed, 0],
+            "end": [rows_processed + rows, c],
+        })
+        rows_processed += rows
+
+    with open(federated_file, "w", encoding="utf-8") as f:
+        json.dump(fed_file_content, f)
+    with open(federated_file + '.mtd', "w", encoding="utf-8") as f:
+        json.dump({"format": "federated", "rows": dataset.shape[0], "cols": c,
+                   "data_type": "matrix", "value_type": "double"}, f)
+
+    return federated_file
+
+
+class TestFederatedMnist(unittest.TestCase):
+    """
+    Test class for mnist dml script tutorial code.
+    """
+
+    sds: SystemDSContext = None
+    d: DataManager = None
+    base_path = "systemds/examples/tutorials/mnist/"
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+        cls.d = DataManager()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+
+    def test_multi_log_reg(self):
+        # Reduced because we want the tests to finish a bit faster.
+        train_count = 15000
+        test_count = 5000
+        # Train data
+        np_train_data = self.d.get_train_data().reshape(60000, 28 * 28)[0:train_count]
+        data_path_train = create_row_federated_dataset("train_data", np_train_data)
+        X = self.sds.read(data_path_train)
+        Y = self.sds.from_numpy(self.d.get_train_labels()[:train_count])
+        Y = Y + 1.0
+
+        # Test data
+        np_test_data = self.d.get_test_data().reshape(10000, 28 * 28)[0:test_count]
+        data_path_test = create_row_federated_dataset("test_data", np_test_data)
+        Xt = self.sds.read(data_path_test)
+        Yt = self.sds.from_numpy(self.d.get_test_labels()[:test_count])
+        Yt = Yt + 1.0
+
+        bias = multiLogReg(X, Y)
+
+        with self.sds.capture_stats_context():
+            [_, _, acc] = multiLogRegPredict(Xt, bias, Yt).compute()
+        stats = self.sds.take_stats()
+        for fed_instr in ["fed_isnan", "fed_*", "fed_-", "fed_uark+", "fed_r'", "fed_rightIndex"]:
+            self.assertIn(fed_instr, stats)
+        self.assertGreater(acc, 80)
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)