You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2020/08/28 22:14:36 UTC

[systemds] branch master updated: [MINOR] Cleanup federated worker tests and federated data read

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new de7b0c7  [MINOR] Cleanup federated worker tests and federated data read
de7b0c7 is described below

commit de7b0c75a78c45049418b6c1b7f11ffc06277f2d
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sat Aug 29 00:14:14 2020 +0200

    [MINOR] Cleanup federated worker tests and federated data read
    
    1) Fix for multiple tests within one JVM, where the negative test
    created a federated data that was never existing but was added to all
    federated sites for CLEAR. When trying to clean it up it always failed,
    aborted before cleanup of previous federated sites, and thus, corrupted
    thereby all following tests.
    
    2) Fix arbitrary forcing of dense and sparse nnz which affected the
    non-zero allocation (for either over or under allocation)
    
    3) Refactored the federated tests to move the federated transform as a
    new github workflow into the federated package to avoid missing local
    test issues.
---
 .github/workflows/functionsTests.yml               |  1 +
 .../apache/sysds/hops/recompile/Recompiler.java    | 21 ++++++------
 .../controlprogram/federated/FederatedData.java    | 31 ++++++++++++------
 .../federated/FederatedWorkerHandler.java          | 37 ++++++++--------------
 .../primitives/FederatedNegativeTest.java          |  4 +++
 .../TransformFederatedEncodeDecodeTest.java        |  2 +-
 6 files changed, 52 insertions(+), 44 deletions(-)

diff --git a/.github/workflows/functionsTests.yml b/.github/workflows/functionsTests.yml
index d3930b1..48cb62b 100644
--- a/.github/workflows/functionsTests.yml
+++ b/.github/workflows/functionsTests.yml
@@ -57,6 +57,7 @@ jobs:
           dnn,
           federated.algorithms,
           federated.primitives,
+          federated.transform,
           frame,
           indexing,
           io,
diff --git a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
index 4334384..d048863 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/Recompiler.java
@@ -1604,17 +1604,16 @@ public class Recompiler
 			//get meta data filename
 			String mtdname = DataExpression.getMTDFileName(dop.getFileName());
 			Path path = new Path(mtdname);
-			try( FileSystem fs = IOUtilFunctions.getFileSystem(mtdname) ) {
-				if( fs.exists(path) ){
-					try(BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)))) {
-						JSONObject mtd = JSONHelper.parse(br);
-						DataType dt = DataType.valueOf(String.valueOf(mtd.get(DataExpression.DATATYPEPARAM)).toUpperCase());
-						dop.setDataType(dt);
-						if(dt != DataType.FRAME)
-							dop.setValueType(ValueType.valueOf(String.valueOf(mtd.get(DataExpression.VALUETYPEPARAM)).toUpperCase()));
-						dop.setDim1((dt==DataType.MATRIX||dt==DataType.FRAME)?Long.parseLong(mtd.get(DataExpression.READROWPARAM).toString()):0);
-						dop.setDim2((dt==DataType.MATRIX||dt==DataType.FRAME)?Long.parseLong(mtd.get(DataExpression.READCOLPARAM).toString()):0);
-					}
+			FileSystem fs = IOUtilFunctions.getFileSystem(mtdname); //no auto-close
+			if( fs.exists(path) ){
+				try(BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)))) {
+					JSONObject mtd = JSONHelper.parse(br);
+					DataType dt = DataType.valueOf(String.valueOf(mtd.get(DataExpression.DATATYPEPARAM)).toUpperCase());
+					dop.setDataType(dt);
+					if(dt != DataType.FRAME)
+						dop.setValueType(ValueType.valueOf(String.valueOf(mtd.get(DataExpression.VALUETYPEPARAM)).toUpperCase()));
+					dop.setDim1((dt==DataType.MATRIX||dt==DataType.FRAME)?Long.parseLong(mtd.get(DataExpression.READROWPARAM).toString()):0);
+					dop.setDim2((dt==DataType.MATRIX||dt==DataType.FRAME)?Long.parseLong(mtd.get(DataExpression.READCOLPARAM).toString()):0);
 				}
 			}
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
index 9f5f942..8a3fbd2 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
@@ -33,6 +33,7 @@ import io.netty.handler.codec.serialization.ObjectDecoder;
 import io.netty.handler.codec.serialization.ObjectEncoder;
 import io.netty.util.concurrent.Promise;
 
+import org.apache.log4j.Logger;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.runtime.DMLRuntimeException;
@@ -47,7 +48,8 @@ import java.util.concurrent.Future;
 
 
 public class FederatedData {
-	private static Set<InetSocketAddress> _allFedSites = new HashSet<>();
+	protected final static Logger log = Logger.getLogger(FederatedWorkerHandler.class);
+	private final static Set<InetSocketAddress> _allFedSites = new HashSet<>();
 	
 	private final Types.DataType _dataType;
 	private final InetSocketAddress _address;
@@ -158,14 +160,25 @@ public class FederatedData {
 		if( _allFedSites.isEmpty() )
 			return;
 		
-		//create and execute clear request on all workers
-		FederatedRequest fr = new FederatedRequest(RequestType.CLEAR);
-		List<Future<FederatedResponse>> ret = new ArrayList<>();
-		for( InetSocketAddress address : _allFedSites )
-			ret.add(executeFederatedOperation(address, fr));
-		
-		//wait for successful completion
-		FederationUtils.waitFor(ret);
+		try {
+			//create and execute clear request on all workers
+			FederatedRequest fr = new FederatedRequest(RequestType.CLEAR);
+			List<Future<FederatedResponse>> ret = new ArrayList<>();
+			for( InetSocketAddress address : _allFedSites )
+				ret.add(executeFederatedOperation(address, fr));
+			
+			//wait for successful completion
+			FederationUtils.waitFor(ret);
+		}
+		catch(Exception ex) {
+			log.warn("Failed to execute CLEAR request on existing federated sites.", ex);
+		}
+		finally {
+			resetFederatedSites();
+		}
+	}
+	
+	public static void resetFederatedSites() {
 		_allFedSites.clear();
 	}
 	
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 0a0a05b..a2e62fe 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -187,29 +187,20 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 		try {
 			String mtdname = DataExpression.getMTDFileName(filename);
 			Path path = new Path(mtdname);
-			try (FileSystem fs = IOUtilFunctions.getFileSystem(mtdname)) {
-				try (BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)))) {
-					JSONObject mtd = JSONHelper.parse(br);
-					if (mtd == null)
-						return new FederatedResponse(ResponseType.ERROR,
-							new FederatedWorkerHandlerException("Could not parse metadata file"));
-					mc.setRows(mtd.getLong(DataExpression.READROWPARAM));
-					mc.setCols(mtd.getLong(DataExpression.READCOLPARAM));
-					if(mtd.containsKey(DataExpression.READNNZPARAM)){
-						mc.setNonZeros(mtd.getLong(DataExpression.READNNZPARAM));
-					}
-					else if (mc.getCols() * mc.getRows() < 8000000){
-						// force dense allocation.
-						mc.setNonZeros(mc.getCols() *mc.getRows());
-					}
-					else{
-						// force sparse allocation 
-						mc.setNonZeros((long)(mc.getCols() * mc.getRows() * 0.35));
-					}
-					
-					cd = (CacheableData<?>) PrivacyPropagator.parseAndSetPrivacyConstraint(cd, mtd);
-					fmt = FileFormat.safeValueOf(mtd.getString(DataExpression.FORMAT_TYPE));
-				}
+			
+			FileSystem fs = IOUtilFunctions.getFileSystem(mtdname); //no auto-close
+			try (BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)))) {
+				JSONObject mtd = JSONHelper.parse(br);
+				if (mtd == null)
+					return new FederatedResponse(ResponseType.ERROR,
+						new FederatedWorkerHandlerException("Could not parse metadata file"));
+				mc.setRows(mtd.getLong(DataExpression.READROWPARAM));
+				mc.setCols(mtd.getLong(DataExpression.READCOLPARAM));
+				if(mtd.containsKey(DataExpression.READNNZPARAM))
+					mc.setNonZeros(mtd.getLong(DataExpression.READNNZPARAM));
+				
+				cd = (CacheableData<?>) PrivacyPropagator.parseAndSetPrivacyConstraint(cd, mtd);
+				fmt = FileFormat.safeValueOf(mtd.getString(DataExpression.FORMAT_TYPE));
 			}
 		}
 		catch (Exception ex) {
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedNegativeTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedNegativeTest.java
index 5cac52c..2ebe0c8 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedNegativeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedNegativeTest.java
@@ -57,6 +57,10 @@ public class FederatedNegativeTest {
 		catch (Exception e) {
 			e.printStackTrace();
 		}
+		finally {
+			//robustness in single JVM tests
+			FederatedData.resetFederatedSites();
+		}
 		TestUtils.shutdownThread(t);
 	}
 }
diff --git a/src/test/java/org/apache/sysds/test/functions/transform/TransformFederatedEncodeDecodeTest.java b/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeDecodeTest.java
similarity index 99%
rename from src/test/java/org/apache/sysds/test/functions/transform/TransformFederatedEncodeDecodeTest.java
rename to src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeDecodeTest.java
index 48a17f5..ceee7da 100644
--- a/src/test/java/org/apache/sysds/test/functions/transform/TransformFederatedEncodeDecodeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/transform/TransformFederatedEncodeDecodeTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.sysds.test.functions.transform;
+package org.apache.sysds.test.functions.federated.transform;
 
 import java.util.Arrays;
 import java.util.HashMap;