You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2020/08/28 08:20:47 UTC
[systemds] branch master updated: [MINOR] Federated tests now in
same JVM
This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new feaaada [MINOR] Federated tests now in same JVM
feaaada is described below
commit feaaada2dd8f0bf68a5bb50191a3e3eb17454094
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Fri Aug 28 10:04:58 2020 +0200
[MINOR] Federated tests now in same JVM
To make this work, a static flag inside statistics is set, to allow or
disallow the worker to clear and print the statistics. This is still
a temporary hack, since the wanted behavior would be isolated statistic
objects for each the worker and the controller.
---
.../federated/FederatedWorkerHandler.java | 2 +-
.../java/org/apache/sysds/utils/Statistics.java | 2 +
.../org/apache/sysds/test/AutomatedTestBase.java | 43 +++++++++++++++++++++-
.../federated/algorithms/FederatedBivarTest.java | 8 ++--
.../federated/algorithms/FederatedGLMTest.java | 4 +-
.../federated/algorithms/FederatedKmeansTest.java | 4 +-
.../federated/algorithms/FederatedL2SVMTest.java | 5 +--
.../federated/algorithms/FederatedLogRegTest.java | 19 +---------
.../federated/algorithms/FederatedPCATest.java | 8 ++--
.../federated/algorithms/FederatedUnivarTest.java | 8 ++--
.../federated/algorithms/FederatedYL2SVMTest.java | 5 +--
.../primitives/FederatedBinaryMatrixTest.java | 4 +-
.../primitives/FederatedBinaryVectorTest.java | 4 +-
.../primitives/FederatedConstructionTest.java | 2 +-
.../primitives/FederatedMultiplyTest.java | 4 +-
.../federated/primitives/FederatedRCBindTest.java | 2 +-
.../FederatedStatisticsTest.java} | 8 ++--
.../federated/primitives/FederatedSumTest.java | 2 +-
18 files changed, 81 insertions(+), 53 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 5dbccb4..0a0a05b 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -113,7 +113,7 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
response = tmp; //return last
}
- if (DMLScript.STATISTICS && request.getType() == RequestType.CLEAR){
+ if (DMLScript.STATISTICS && request.getType() == RequestType.CLEAR && Statistics.allowWorkerStatistics){
System.out.println("Federated Worker " + Statistics.display());
Statistics.reset();
}
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java b/src/main/java/org/apache/sysds/utils/Statistics.java
index 8f22ab4..7642397 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -159,6 +159,8 @@ public class Statistics
public static long recomputeNNZTime = 0;
public static long examSparsityTime = 0;
public static long allocateDoubleArrTime = 0;
+
+ public static boolean allowWorkerStatistics = true;
public static void incrementNativeFailuresCounter() {
numNativeFailures.increment();
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index 9951982..28a82ff 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -101,7 +101,7 @@ public abstract class AutomatedTestBase {
public static final boolean TEST_GPU = false;
public static final double GPU_TOLERANCE = 1e-9;
- public static final int FED_WORKER_WAIT = 2000; // in ms
+ public static final int FED_WORKER_WAIT = 800; // in ms
// With OpenJDK 8u242 on Windows, the new changes in JDK are not allowing
// to set the native library paths internally thus breaking the code.
@@ -1311,6 +1311,47 @@ public abstract class AutomatedTestBase {
}
/**
+ * Start a thread for a worker. This will share the same JVM,
+ * so all static variables will be shared.!
+ *
+ * Also when using the local Fed Worker thread the statistics printing,
+ * and clearing from the worker is disabled.
+ *
+ * @param port Port to use
+ * @return the thread associated with the worker.
+ */
+ protected Thread startLocalFedWorkerThread(int port) {
+ Thread t = null;
+ String[] fedWorkArgs = {"-w", Integer.toString(port)};
+ ArrayList<String> args = new ArrayList<>();
+
+ addProgramIndependentArguments(args);
+
+ for(int i = 0; i < fedWorkArgs.length; i++)
+ args.add(fedWorkArgs[i]);
+
+ String[] finalArguments = args.toArray(new String[args.size()]);
+
+ Statistics.allowWorkerStatistics = false;
+
+ try {
+ t = new Thread(() -> {
+ try {
+ main(finalArguments);
+ }
+ catch(IOException e) {
+ }
+ });
+ t.start();
+ java.util.concurrent.TimeUnit.MILLISECONDS.sleep(FED_WORKER_WAIT);
+ }
+ catch(InterruptedException e) {
+ e.printStackTrace();
+ }
+ return t;
+ }
+
+ /**
* Start java worker in same JVM.
* @param args the command line arguments
* @return the thread associated with the process.s
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
index 96fef5d..3c9439d 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedBivarTest.java
@@ -116,10 +116,10 @@ public class FederatedBivarTest extends AutomatedTestBase {
int port2 = getRandomAvailablePort();
int port3 = getRandomAvailablePort();
int port4 = getRandomAvailablePort();
- Process t1 = startLocalFedWorker(port1);
- Process t2 = startLocalFedWorker(port2);
- Process t3 = startLocalFedWorker(port3);
- Process t4 = startLocalFedWorker(port4);
+ Thread t1 = startLocalFedWorkerThread(port1);
+ Thread t2 = startLocalFedWorkerThread(port2);
+ Thread t3 = startLocalFedWorkerThread(port3);
+ Thread t4 = startLocalFedWorkerThread(port4);
TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
index 636b279..8fbcfa7 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedGLMTest.java
@@ -94,8 +94,8 @@ public class FederatedGLMTest extends AutomatedTestBase {
fullDMLScriptName = "";
int port1 = getRandomAvailablePort();
int port2 = getRandomAvailablePort();
- Process t1 = startLocalFedWorker(port1);
- Process t2 = startLocalFedWorker(port2);
+ Thread t1 = startLocalFedWorkerThread(port1);
+ Thread t2 = startLocalFedWorkerThread(port2);
TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
index 933e971..c763541 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedKmeansTest.java
@@ -101,8 +101,8 @@ public class FederatedKmeansTest extends AutomatedTestBase {
fullDMLScriptName = "";
int port1 = getRandomAvailablePort();
int port2 = getRandomAvailablePort();
- Process t1 = startLocalFedWorker(port1);
- Process t2 = startLocalFedWorker(port2);
+ Thread t1 = startLocalFedWorkerThread(port1);
+ Thread t2 = startLocalFedWorkerThread(port2);
TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
index 4caf52e..c102ef9 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedL2SVMTest.java
@@ -76,7 +76,6 @@ public class FederatedL2SVMTest extends AutomatedTestBase {
if(rtplatform == Types.ExecMode.SPARK) {
DMLScript.USE_LOCAL_SPARK_CONFIG = true;
}
- Process t1, t2;
getAndLoadTestConfiguration(TEST_NAME);
String HOME = SCRIPT_DIR + TEST_DIR;
@@ -98,8 +97,8 @@ public class FederatedL2SVMTest extends AutomatedTestBase {
fullDMLScriptName = "";
int port1 = getRandomAvailablePort();
int port2 = getRandomAvailablePort();
- t1 = startLocalFedWorker(port1);
- t2 = startLocalFedWorker(port2);
+ Thread t1 = startLocalFedWorkerThread(port1);
+ Thread t2 = startLocalFedWorkerThread(port2);
TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
index fef8889..9f4aaea 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
@@ -19,8 +19,6 @@
package org.apache.sysds.test.functions.federated.algorithms;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Collection;
@@ -95,20 +93,8 @@ public class FederatedLogRegTest extends AutomatedTestBase {
fullDMLScriptName = "";
int port1 = getRandomAvailablePort();
int port2 = getRandomAvailablePort();
- Process t1 = startLocalFedWorker(port1);
- Process t2 = startLocalFedWorker(port2);
-
- BufferedReader output = new BufferedReader(new InputStreamReader(t1.getInputStream()));
- BufferedReader error = new BufferedReader(new InputStreamReader(t1.getInputStream()));
-
- Thread t = new Thread(() -> {
- output.lines().forEach(s -> System.out.println(s));
- });
- Thread te = new Thread(() -> {
- error.lines().forEach(s -> System.err.println(s));
- });
- t.start();
- te.start();
+ Thread t1 = startLocalFedWorkerThread(port1);
+ Thread t2 = startLocalFedWorkerThread(port2);
TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
loadTestConfiguration(config);
@@ -131,7 +117,6 @@ public class FederatedLogRegTest extends AutomatedTestBase {
compareResults(1e-9);
TestUtils.shutdownThreads(t1, t2);
- TestUtils.shutdownThreads(t, te);
// check for federated operations
Assert.assertTrue("contains federated matrix mult", heavyHittersContainsString("fed_ba+*"));
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
index 7a5a2fd..b86c4da 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedPCATest.java
@@ -101,10 +101,10 @@ public class FederatedPCATest extends AutomatedTestBase {
int port2 = getRandomAvailablePort();
int port3 = getRandomAvailablePort();
int port4 = getRandomAvailablePort();
- Process t1 = startLocalFedWorker(port1);
- Process t2 = startLocalFedWorker(port2);
- Process t3 = startLocalFedWorker(port3);
- Process t4 = startLocalFedWorker(port4);
+ Thread t1 = startLocalFedWorkerThread(port1);
+ Thread t2 = startLocalFedWorkerThread(port2);
+ Thread t3 = startLocalFedWorkerThread(port3);
+ Thread t4 = startLocalFedWorkerThread(port4);
TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
index c042cbb..b1fb692 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedUnivarTest.java
@@ -101,10 +101,10 @@ public class FederatedUnivarTest extends AutomatedTestBase {
int port2 = getRandomAvailablePort();
int port3 = getRandomAvailablePort();
int port4 = getRandomAvailablePort();
- Process t1 = startLocalFedWorker(port1);
- Process t2 = startLocalFedWorker(port2);
- Process t3 = startLocalFedWorker(port3);
- Process t4 = startLocalFedWorker(port4);
+ Thread t1 = startLocalFedWorkerThread(port1);
+ Thread t2 = startLocalFedWorkerThread(port2);
+ Thread t3 = startLocalFedWorkerThread(port3);
+ Thread t4 = startLocalFedWorkerThread(port4);
TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java
index ebdee88..24f04f0 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedYL2SVMTest.java
@@ -78,7 +78,6 @@ public class FederatedYL2SVMTest extends AutomatedTestBase {
if(rtplatform == Types.ExecMode.SPARK) {
DMLScript.USE_LOCAL_SPARK_CONFIG = true;
}
- Process t1, t2;
getAndLoadTestConfiguration(TEST_NAME);
String HOME = SCRIPT_DIR + TEST_DIR;
@@ -105,8 +104,8 @@ public class FederatedYL2SVMTest extends AutomatedTestBase {
fullDMLScriptName = "";
int port1 = getRandomAvailablePort();
int port2 = getRandomAvailablePort();
- t1 = startLocalFedWorker(port1);
- t2 = startLocalFedWorker(port2);
+ Thread t1 = startLocalFedWorkerThread(port1);
+ Thread t2 = startLocalFedWorkerThread(port2);
TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
index 052234d..efd98c2 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryMatrixTest.java
@@ -99,8 +99,8 @@ public class FederatedBinaryMatrixTest extends AutomatedTestBase {
int port1 = getRandomAvailablePort();
int port2 = getRandomAvailablePort();
- Process t1 = startLocalFedWorker(port1);
- Process t2 = startLocalFedWorker(port2);
+ Thread t1 = startLocalFedWorkerThread(port1);
+ Thread t2 = startLocalFedWorkerThread(port2);
TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
index 187648b..c0a53d4 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedBinaryVectorTest.java
@@ -100,8 +100,8 @@ public class FederatedBinaryVectorTest extends AutomatedTestBase {
int port1 = getRandomAvailablePort();
int port2 = getRandomAvailablePort();
- Process t1 = startLocalFedWorker(port1);
- Process t2 = startLocalFedWorker(port2);
+ Thread t1 = startLocalFedWorkerThread(port1);
+ Thread t2 = startLocalFedWorkerThread(port2);
TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedConstructionTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedConstructionTest.java
index 74180be..9013f93 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedConstructionTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedConstructionTest.java
@@ -120,7 +120,7 @@ public class FederatedConstructionTest extends AutomatedTestBase {
String HOME = SCRIPT_DIR + TEST_DIR;
int port = getRandomAvailablePort();
- Process t = startLocalFedWorker(port);
+ Thread t = startLocalFedWorkerThread(port);
TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
index c75b0b5..6039354 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedMultiplyTest.java
@@ -96,8 +96,8 @@ public class FederatedMultiplyTest extends AutomatedTestBase {
int port1 = getRandomAvailablePort();
int port2 = getRandomAvailablePort();
- Process t1 = startLocalFedWorker(port1);
- Process t2 = startLocalFedWorker(port2);
+ Thread t1 = startLocalFedWorkerThread(port1);
+ Thread t2 = startLocalFedWorkerThread(port2);
TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
loadTestConfiguration(config);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
index e59eea8..8b3b04f 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
@@ -79,7 +79,7 @@ public class FederatedRCBindTest extends AutomatedTestBase {
writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows, cols, blocksize, rows * cols));
int port = getRandomAvailablePort();
- Process t = startLocalFedWorker(port);
+ Thread t = startLocalFedWorkerThread(port);
// we need the reference file to not be written to hdfs, so we get the correct format
rtplatform = Types.ExecMode.SINGLE_NODE;
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedStatisticsTest.java
similarity index 96%
copy from src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
copy to src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedStatisticsTest.java
index fef8889..d412743 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLogRegTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedStatisticsTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.sysds.test.functions.federated.algorithms;
+package org.apache.sysds.test.functions.federated.primitives;
import java.io.BufferedReader;
import java.io.InputStreamReader;
@@ -32,17 +32,19 @@ import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestConfiguration;
import org.apache.sysds.test.TestUtils;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
@net.jcip.annotations.NotThreadSafe
-public class FederatedLogRegTest extends AutomatedTestBase {
+@Ignore
+public class FederatedStatisticsTest extends AutomatedTestBase {
private final static String TEST_DIR = "functions/federated/";
private final static String TEST_NAME = "FederatedLogRegTest";
- private final static String TEST_CLASS_DIR = TEST_DIR + FederatedLogRegTest.class.getSimpleName() + "/";
+ private final static String TEST_CLASS_DIR = TEST_DIR + FederatedStatisticsTest.class.getSimpleName() + "/";
private final static int blocksize = 1024;
@Parameterized.Parameter()
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSumTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSumTest.java
index c6d4be6..9ce65eb 100644
--- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSumTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedSumTest.java
@@ -78,7 +78,7 @@ public class FederatedSumTest extends AutomatedTestBase {
double[][] A = getRandomMatrix(rows / 2, cols, -10, 10, 1, 1);
writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows / 2, cols, blocksize, (rows / 2) * cols));
int port = getRandomAvailablePort();
- Process t = startLocalFedWorker(port);
+ Thread t = startLocalFedWorkerThread(port);
// we need the reference file to not be written to hdfs, so we get the correct format
rtplatform = Types.ExecMode.SINGLE_NODE;