You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2020/08/20 17:49:06 UTC
[systemds] branch master updated: [MINOR] Negative test of
federatedError
This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new c9a02a2 [MINOR] Negative test of federatedError
c9a02a2 is described below
commit c9a02a2e9fa3ec50a2a8c3611d62a2a334b5bab3
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Thu Aug 20 19:48:46 2020 +0200
[MINOR] Negative test of federatedError
---
.../federated/FederatedResponse.java | 6 +-
.../federated/FederatedWorkerHandler.java | 2 +-
.../org/apache/sysds/test/AutomatedTestBase.java | 22 +++++-
.../functions/federated/FederatedNegativeTest.java | 78 ++++++++++++++++++++++
4 files changed, 105 insertions(+), 3 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedResponse.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedResponse.java
index ea03bd4..5b0e4eb 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedResponse.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedResponse.java
@@ -91,7 +91,11 @@ public class FederatedResponse implements Serializable {
throw (Exception) potentialException;
}
}
- throw new DMLRuntimeException("Unknown runtime exception in handling of federated request by federated worker.");
+ String errorMessage = getErrorMessage();
+ if (getErrorMessage() != "No readable error message")
+ throw new DMLRuntimeException(errorMessage);
+ else
+ throw new DMLRuntimeException("Unknown runtime exception in handling of federated request by federated worker.");
}
/**
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index b0c75df..35b844d 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -295,7 +295,7 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
@Override
public void operationComplete(ChannelFuture channelFuture) throws InterruptedException {
if (!channelFuture.isSuccess()){
- log.fatal("Federated Worker Write failed");
+ log.error("Federated Worker Write failed");
channelFuture
.channel()
.writeAndFlush(
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index b40a637..bfb2e5d 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -1273,7 +1273,7 @@ public abstract class AutomatedTestBase {
args.add("-gpu");
}
- protected int getRandomAvailablePort() {
+ public static int getRandomAvailablePort() {
try(ServerSocket availableSocket = new ServerSocket(0)) {
return availableSocket.getLocalPort();
}
@@ -1312,6 +1312,26 @@ public abstract class AutomatedTestBase {
return t;
}
+ public static Thread startLocalFedWorkerWithArgs(String[] args) {
+ Thread t = null;
+
+ try {
+ t = new Thread(() -> {
+ try {
+ main(args);
+ }
+ catch(IOException e) {
+ }
+ });
+ t.start();
+ java.util.concurrent.TimeUnit.MILLISECONDS.sleep(FED_WORKER_WAIT);
+ }
+ catch(InterruptedException e) {
+ // Should happen at closing of the worker so don't print
+ }
+ return t;
+ }
+
private boolean rCompareException(boolean exceptionExpected, String errMessage, Throwable e, boolean result) {
if(e.getCause() != null) {
result |= rCompareException(exceptionExpected, errMessage, e.getCause(), result);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedNegativeTest.java b/src/test/java/org/apache/sysds/test/functions/federated/FederatedNegativeTest.java
new file mode 100644
index 0000000..a355275
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/FederatedNegativeTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.controlprogram.federated.*;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@net.jcip.annotations.NotThreadSafe
+public class FederatedNegativeTest {
+ protected static Logger log = Logger.getLogger(FederatedNegativeTest.class);
+
+ static {
+ Logger.getLogger("org.apache.sysds").setLevel(Level.OFF);
+ }
+
+ @Test
+ public void NegativeTest1() {
+ int port = AutomatedTestBase.getRandomAvailablePort();
+ String[] args = {"-w", Integer.toString(port)};
+ Thread t = AutomatedTestBase.startLocalFedWorkerWithArgs(args);
+ Map<FederatedRange, FederatedData> fedMap = new HashMap<>();
+ FederatedRange r = new FederatedRange(new long[]{0,0}, new long[]{1,1});
+ FederatedData d = new FederatedData(
+ Types.DataType.SCALAR,
+ new InetSocketAddress("localhost", port),
+ "Nowhere");
+ fedMap.put(r,d);
+ FederationMap fedM = new FederationMap(fedMap);
+ FederatedRequest fr = new FederatedRequest(FederatedRequest.RequestType.GET_VAR);
+ Future<FederatedResponse>[] res = fedM.execute(0, fr);
+ try {
+ FederatedResponse fres = res[0].get();
+ assertFalse(fres.isSuccessful());
+ assertTrue(fres.getErrorMessage().contains("Variable 0 does not exist at federated worker"));
+
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ TestUtils.shutdownThread(t);
+ }
+
+}