You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2021/08/23 10:36:09 UTC

[systemds] branch master updated: [SYSTEMDS-3097] Fix CSV metadata parsing in federated execution

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 276d153  [SYSTEMDS-3097] Fix CSV metadata parsing in federated execution
276d153 is described below

commit 276d153b8ab18a10015c2230900b448e085f6610
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Mon Aug 23 09:44:18 2021 +0200

    [SYSTEMDS-3097] Fix CSV metadata parsing in federated execution
    
    This commit fixes the metadata handling when parsing a federated csv
    file.
    The issues was that header was always set to true, when parsing CSV.
    The commit also contains both python and java tests to remove future
    errors.
    
    Closes #1370
---
 .../federated/FederatedWorkerHandler.java          |   5 +-
 .../org/apache/sysds/runtime/meta/MetaDataAll.java |  20 +-
 src/main/python/tests/federated/runFedTest.sh      |  11 +-
 .../test_federated_aggregations_noHeader.py        | 204 +++++++++++++++++++++
 .../org/apache/sysds/test/AutomatedTestBase.java   |  33 +++-
 src/test/java/org/apache/sysds/test/TestUtils.java |  26 +++
 .../functions/federated/io/FederatedReaderCSV.java | 115 ++++++++++++
 .../federated/io/FederatedReaderTest.java          |   4 +-
 8 files changed, 399 insertions(+), 19 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 7bcae7e..062cfa0 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -197,6 +197,7 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 		String delim = null;
 		FileSystem fs = null;
 		MetaDataAll mtd;
+		
 		try {
 			String mtdname = DataExpression.getMTDFileName(filename);
 			Path path = new Path(mtdname);
@@ -219,9 +220,7 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 			throw ex;
 		}
 		catch (Exception ex) {
-			String msg = "Exception in reading metadata of: " + filename;
-			log.error(msg, ex);
-			throw new DMLRuntimeException(msg);
+			throw new DMLRuntimeException(ex);
 		}
 		finally {
 			IOUtilFunctions.closeSilently(fs);
diff --git a/src/main/java/org/apache/sysds/runtime/meta/MetaDataAll.java b/src/main/java/org/apache/sysds/runtime/meta/MetaDataAll.java
index ae6cc99..9132bd4 100644
--- a/src/main/java/org/apache/sysds/runtime/meta/MetaDataAll.java
+++ b/src/main/java/org/apache/sysds/runtime/meta/MetaDataAll.java
@@ -39,6 +39,7 @@ import org.apache.sysds.parser.Expression;
 import org.apache.sysds.parser.LanguageException;
 import org.apache.sysds.parser.ParseException;
 import org.apache.sysds.parser.StringIdentifier;
+import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
 import org.apache.sysds.runtime.privacy.PrivacyConstraint;
@@ -50,6 +51,7 @@ import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
 
 public class MetaDataAll extends DataIdentifier {
+	// private static final Log LOG = LogFactory.getLog(MetaDataAll.class.getName());
 
 	private JSONObject _metaObj;
 
@@ -79,8 +81,8 @@ public class MetaDataAll extends DataIdentifier {
 		try {
 			_metaObj = JSONHelper.parse(br);
 		}
-		catch(IOException e) {
-			e.printStackTrace();
+		catch(Exception e) {
+			throw new DMLRuntimeException(e);
 		}
 		setPrivacy(PrivacyConstraint.PrivacyLevel.None);
 		parseMetaDataParams();
@@ -174,7 +176,14 @@ public class MetaDataAll extends DataIdentifier {
 			case DataExpression.FINE_GRAINED_PRIVACY:  setFineGrainedPrivacy(val.toString()); break;
 			case DataExpression.DELIM_DELIMITER: setDelim(val.toString()); break;
 			case DataExpression.SCHEMAPARAM: setSchema(val.toString()); break;
-			case DataExpression.DELIM_HAS_HEADER_ROW: setHasHeader(true);
+			case DataExpression.DELIM_HAS_HEADER_ROW:
+				if(val instanceof Boolean){
+					boolean valB = (Boolean) val;
+					setHasHeader(valB);
+					break;
+				}
+				else
+					setHasHeader(false);
 			case DataExpression.DELIM_SPARSE: setSparseDelim((boolean) val);
 		}
 	}
@@ -402,4 +411,9 @@ public class MetaDataAll extends DataIdentifier {
 		}
 		return false;
 	}
+
+	@Override
+	public String toString() {
+		return "MetaDataAll\n" + _metaObj + "\n" + super.toString();
+	}
 }
diff --git a/src/main/python/tests/federated/runFedTest.sh b/src/main/python/tests/federated/runFedTest.sh
index 0d6b4f4..b34ca99 100755
--- a/src/main/python/tests/federated/runFedTest.sh
+++ b/src/main/python/tests/federated/runFedTest.sh
@@ -30,8 +30,8 @@
 workerdir="tests/federated/worker/"
 outputdir="tests/federated/output/"
 tmpfiledir="tests/federated/tmp/"
-mkdir $workerdir
-mkdir $outputdir
+mkdir -p $workerdir
+mkdir -p $outputdir
 w1_Output="$workerdir/w1"
 w2_Output="$workerdir/w2"
 log="$outputdir/out.log"
@@ -55,13 +55,16 @@ echo -e "\nWorker 1:"
 cat $w1_Output
 echo -e "\nWorker 2:"
 cat $w2_Output
-rm -r $workerdir
 echo -e "\n------------\nTest output:\n------------"
 cat $log
 grepvals="$(tail -n 10 $log | grep OK)"
+echo -e "------------\n"
+
+# Cleanup
+rm -r $workerdir
 rm -r $outputdir
 rm -r $tmpfiledir
-echo -e "------------\n"
+
 if [[ $grepvals == *"OK"* ]]; then
 	exit 0
 else
diff --git a/src/main/python/tests/federated/test_federated_aggregations_noHeader.py b/src/main/python/tests/federated/test_federated_aggregations_noHeader.py
new file mode 100644
index 0000000..25832b6
--- /dev/null
+++ b/src/main/python/tests/federated/test_federated_aggregations_noHeader.py
@@ -0,0 +1,204 @@
+# -------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# -------------------------------------------------------------
+
+import io
+import json
+import os
+import shutil
+import sys
+import unittest
+
+import numpy as np
+from systemds.context import SystemDSContext
+
+os.environ['SYSDS_QUIET'] = "1"
+
+dim = 3
+
+m1 = np.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], dtype=np.int16)
+m2 = np.asarray([[2, 2, 2], [3, 3, 3], [4, 4, 4]], dtype=np.int16)
+
+tempdir = "./tests/federated/tmp/test_federated_aggregations_noHeader/"
+mtd = {"format": "csv", "header": False, "rows": dim,
+       "cols": dim, "data_type": "matrix", "value_type": "double"}
+
+# Create the testing directory if it does not exist.
+if not os.path.exists(tempdir):
+    os.makedirs(tempdir)
+
+# Save data files for the Federated workers.
+np.savetxt(tempdir + "m1.csv", m1, delimiter=",",fmt='%d')
+with io.open(tempdir + "m1.csv.mtd", "w", encoding="utf-8") as f:
+    f.write(json.dumps(mtd, ensure_ascii=False))
+
+np.savetxt(tempdir + "m2.csv", m2, delimiter=",",fmt='%d')
+with io.open(tempdir + "m2.csv.mtd", "w", encoding="utf-8") as f:
+    f.write(json.dumps(mtd, ensure_ascii=False))
+
+# Federated workers + file locations
+fed1 = "localhost:8001/" + tempdir + "m1.csv"
+fed2 = "localhost:8002/" + tempdir + "m2.csv"
+
+
+class TestFederatedAggFn(unittest.TestCase):
+
+    sds: SystemDSContext = None
+
+    @classmethod
+    def setUpClass(cls):
+        cls.sds = SystemDSContext()
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.sds.close()
+
+    def test_equals(self):
+        f_m = (
+            self.sds.federated(
+                [fed1],
+                [([0, 0], [dim, dim])])
+            .compute()
+        )
+        self.assertTrue(np.allclose(f_m, m1))
+
+    def test_sum3(self):
+        #   [[m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]]
+        f_m_a = (
+            self.sds.federated(
+                [fed1, fed2],
+                [([0, 0], [dim, dim]), ([0, dim], [dim, dim * 2])])
+            .sum()
+            .compute()
+        )
+        m1_m2 = m1.sum() + m2.sum()
+        self.assertAlmostEqual(f_m_a, m1_m2)
+
+    def test_sum1(self):
+        f_m1 = (
+            self.sds.federated(
+                [fed1],
+                [([0, 0], [dim, dim])])
+            .sum()
+            .compute()
+        )
+        m1_r = m1.sum()
+        self.assertAlmostEqual(f_m1, m1_r)
+
+    def test_sum2(self):
+        f_m2 = (
+            self.sds.federated(
+                [fed2],
+                [([0, 0], [dim, dim])])
+            .sum()
+            .compute()
+        )
+        m2_r = m2.sum()
+        self.assertAlmostEqual(f_m2, m2_r)
+
+    def test_sum3(self):
+        #   [[m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]]
+        f_m1_m2 = (
+            self.sds.federated(
+                [fed1, fed2],
+                [([0, 0], [dim, dim]), ([0, dim], [dim, dim * 2])])
+            .sum()
+            .compute()
+        )
+
+        m1_m2 = np.concatenate((m1, m2), axis=1).sum()
+
+        self.assertAlmostEqual(f_m1_m2, m1_m2)
+
+    def test_sum4(self):
+        #   [[m1,m1,m1,m1,m1]
+        #    [m1,m1,m1,m1,m1]
+        #    [m1,m1,m1,m1,m1]
+        #    [m1,m1,m1,m1,m1]
+        #    [m1,m1,m1,m1,m1]
+        #    [m2,m2,m2,m2,m2]
+        #    [m2,m2,m2,m2,m2]
+        #    [m2,m2,m2,m2,m2]
+        #    [m2,m2,m2,m2,m2]
+        #    [m2,m2,m2,m2,m2]]
+        f_m1_m2 = (
+            self.sds.federated(
+                [fed1, fed2],
+                [([0, 0], [dim, dim]), ([dim, 0], [dim * 2, dim])])
+            .sum()
+            .compute()
+        )
+        m1_m2 = np.concatenate((m1, m2)).sum()
+        self.assertAlmostEqual(f_m1_m2, m1_m2)
+
+    # -----------------------------------
+    # The rest of the tests are
+    # Extended functionality not working Yet
+    # -----------------------------------
+
+    def test_sum5(self):
+        #   [[m1,m1,m1,m1,m1, 0, 0, 0, 0, 0]
+        #    [m1,m1,m1,m1,m1, 0, 0, 0, 0, 0]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [m1,m1,m1,m1,m1,m2,m2,m2,m2,m2]
+        #    [ 0, 0, 0, 0, 0,m2,m2,m2,m2,m2]
+        #    [ 0, 0, 0, 0, 0,m2,m2,m2,m2,m2]]
+        f_m_a = (
+            self.sds.federated(
+                [fed1, fed2],
+                [([0, 0], [dim, dim]), ([2, dim], [dim + 2, dim * 2])])
+            .sum()
+            .compute()
+        )
+        m1_m2 = m1.sum() + m2.sum()
+        self.assertAlmostEqual(f_m_a, m1_m2)
+
+    def test_sum8(self):
+        #   [[ 0, 0, 0, 0, 0, 0, 0, 0]
+        #    [ 0, 0, 0, 0, 0, 0, 0, 0]
+        #    [ 0, 0, 0,m1,m1,m1,m1,m1]
+        #    [ 0, 0, 0,m1,m1,m1,m1,m1]
+        #    [ 0, 0, 0,m1,m1,m1,m1,m1]
+        #    [ 0, 0, 0,m1,m1,m1,m1,m1]
+        #    [ 0, 0, 0,m1,m1,m1,m1,m1]]
+        f_m_a = (
+            self.sds.federated(
+                [fed1],
+                [([2, 3], [dim + 2, dim + 3])])
+            .sum()
+            .compute()
+        )
+
+        m = m1.sum()
+
+        self.assertAlmostEqual(f_m_a, m)
+
+
+if __name__ == "__main__":
+    unittest.main(exit=False)
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index a4da6e3..3d96cd9 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -69,6 +69,7 @@ import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
 import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
 import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
+import org.apache.sysds.runtime.io.FileFormatProperties;
 import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
 import org.apache.sysds.runtime.io.FrameReader;
 import org.apache.sysds.runtime.io.FrameReaderFactory;
@@ -515,13 +516,7 @@ public abstract class AutomatedTestBase {
 		String completePath = baseDirectory + INPUT_DIR + name + "/in";
 		String completeRPath = baseDirectory + INPUT_DIR + name + ".mtx";
 
-		try {
-			cleanupExistingData(baseDirectory + INPUT_DIR + name, bIncludeR);
-		}
-		catch(IOException e) {
-			e.printStackTrace();
-			throw new RuntimeException(e);
-		}
+		cleanupDir(baseDirectory + INPUT_DIR + name, bIncludeR);
 
 		TestUtils.writeTestMatrix(completePath, matrix);
 		if(bIncludeR) {
@@ -535,6 +530,30 @@ public abstract class AutomatedTestBase {
 		return matrix;
 	}
 
+	protected void writeCSVMatrix(String name, double[][] matrix, boolean header, MatrixCharacteristics mc) {
+		try {
+			final String completePath = baseDirectory + INPUT_DIR + name;
+			final String completeMTDPath = baseDirectory + INPUT_DIR + name + ".mtd";
+			cleanupDir(completePath, false);
+			TestUtils.writeCSV(completePath, matrix, header);
+			final FileFormatProperties ffp = header ? new FileFormatPropertiesCSV(true, ",", false, 0.0, "") : new FileFormatPropertiesCSV();
+			HDFSTool.writeMetaDataFile(completeMTDPath, ValueType.FP64, mc, FileFormat.CSV, ffp);
+		}
+		catch(Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	protected void cleanupDir(String fullPath, boolean bIncludeR){
+		try {
+			cleanupExistingData(fullPath, bIncludeR);
+		}
+		catch(IOException e) {
+			e.printStackTrace();
+			throw new RuntimeException(e);
+		}
+	}
+
 	protected double[][] writeInputMatrixWithMTD(String name, MatrixBlock matrix, boolean bIncludeR) {
 		double[][] data = DataConverter.convertToDoubleMatrix(matrix);
 		return writeInputMatrixWithMTD(name, data, bIncludeR);
diff --git a/src/test/java/org/apache/sysds/test/TestUtils.java b/src/test/java/org/apache/sysds/test/TestUtils.java
index 9de6ae8..9282d93 100644
--- a/src/test/java/org/apache/sysds/test/TestUtils.java
+++ b/src/test/java/org/apache/sysds/test/TestUtils.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.data.TensorBlock;
 import org.apache.sysds.runtime.io.FrameWriter;
 import org.apache.sysds.runtime.io.FrameWriterFactory;
@@ -2040,6 +2041,31 @@ public class TestUtils
 		}
 	}
 
+
+	protected static void writeCSV(String completePath, double[][] matrix, boolean header) throws IOException{
+		Path path = new Path(completePath);
+		FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
+		DataOutputStream out = fs.create(path, true);
+		try(BufferedWriter pw = new BufferedWriter(new OutputStreamWriter(out))) {
+
+			if(header) {
+				pw.append("d0");
+				for(int i = 1; i < matrix[0].length; i++) {
+					pw.append(",d" + i);
+				}
+				pw.append("\n");
+			}
+			for(int j = 0; j < matrix.length; j++) {
+				pw.append("" + matrix[j][0]);
+				for(int i = 1; i < matrix[j].length; i++) {
+					pw.append("," + matrix[j][i]);
+				}
+				pw.append("\n");
+			}
+		}
+	}
+
+
 	/**
 	 * <p>
 	 * Writes a matrix to a file using the text format.
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderCSV.java b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderCSV.java
new file mode 100644
index 0000000..e8e1b31
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderCSV.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.test.functions.federated.io;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ExecType;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.apache.sysds.test.functions.federated.FederatedTestObjectConstructor;
+import org.junit.Assert;
+import org.junit.Test;
+
+@net.jcip.annotations.NotThreadSafe
+public class FederatedReaderCSV extends AutomatedTestBase {
+
+    private static final Log LOG = LogFactory.getLog(FederatedReaderCSV.class.getName());
+    private final static String TEST_DIR = "functions/federated/ioR/";
+    private final static String TEST_NAME = "FederatedReaderTest";
+    private final static String TEST_CLASS_DIR = TEST_DIR + FederatedReaderCSV.class.getSimpleName() + "/";
+    private final static int blocksize = 1024;
+
+    private final static int dim = 3;
+    long[][] begins = new long[][] {new long[] {0, 0}};
+    long[][] ends = new long[][] {new long[] {dim, dim}};
+
+    @Override
+    public void setUp() {
+        TestUtils.clearAssertionInformation();
+        addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME,new String[] {"X1"}));
+    }
+
+    @Test
+    public void testWithHeader() {
+        federatedRead(true);
+    }
+
+    @Test
+    public void testWithoutHeader() {
+        federatedRead(false);
+    }
+
+    public void federatedRead( boolean header) {
+        Types.ExecMode oldPlatform = setExecMode(ExecType.CP);
+        getAndLoadTestConfiguration(TEST_NAME);
+        setOutputBuffering(true);
+
+        
+        // empty script name because we don't execute any script, just start the worker
+        
+        fullDMLScriptName = "";
+        int port1 = getRandomAvailablePort();
+        Thread t1 = startLocalFedWorkerThread(port1);
+        String host = "localhost";
+        
+        try {
+            double[][] X1 = new double[][] {new double[] {1, 2, 3}, new double[] {4, 5, 6}, new double[] {7, 8, 9}};
+            MatrixCharacteristics mc = new MatrixCharacteristics(dim, dim, blocksize, dim * dim);
+            writeCSVMatrix("X1", X1, header, mc);
+
+            // Thread.sleep(10000);
+            MatrixObject fed = FederatedTestObjectConstructor.constructFederatedInput(dim, dim, blocksize, host, begins,
+                ends, new int[] {port1}, new String[] {input("X1")}, input("X.json"));
+            writeInputFederatedWithMTD("X.json", fed, null);
+
+            // Run reference dml script with normal matrix
+
+            fullDMLScriptName = SCRIPT_DIR + "functions/federated/io/" + TEST_NAME + "1Reference.dml";
+            programArgs = new String[] {"-stats", "-args", input("X1")};
+
+            String refOut = runTest(null).toString();
+
+            LOG.debug(refOut);
+
+            // Run federated
+            fullDMLScriptName = SCRIPT_DIR + "functions/federated/io/" + TEST_NAME + ".dml";
+            programArgs = new String[] {"-stats", "-args", input("X.json")};
+            String out = runTest(null).toString();
+
+            Assert.assertTrue(heavyHittersContainsString("fed_uak+"));
+            // Verify output
+            Assert.assertEquals(Double.parseDouble(refOut.split("\n")[0]), Double.parseDouble(out.split("\n")[0]),
+                0.00001);
+        }
+        catch(Exception e) {
+            e.printStackTrace();
+            Assert.assertTrue(false);
+        }
+        finally {
+            resetExecMode(oldPlatform);
+        }
+
+        TestUtils.shutdownThreads(t1);
+    }
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
index 62cfd32..ff68c83 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/io/FederatedReaderTest.java
@@ -67,13 +67,11 @@ public class FederatedReaderTest extends AutomatedTestBase {
 
 	@Test
 	public void federatedSingleNodeReadOneWorker() {
-		LOG.debug("1Federated");
 		federatedRead(Types.ExecMode.SINGLE_NODE, 1);
 	}
 
 	@Test
 	public void federatedSingleNodeReadTwoWorker() {
-		LOG.debug("2Federated");
 		federatedRead(Types.ExecMode.SINGLE_NODE, 2);
 	}
 
@@ -124,6 +122,8 @@ public class FederatedReaderTest extends AutomatedTestBase {
 
 			String refOut = runTest(null).toString();
 
+			LOG.debug(refOut);
+			
 			// Run federated
 			fullDMLScriptName = SCRIPT_DIR + "functions/federated/io/" + TEST_NAME + ".dml";
 			programArgs = new String[] {"-stats", "-args", input("X.json")};