You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2020/08/20 17:49:06 UTC

[systemds] branch master updated: [MINOR] Negative test of federatedError

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new c9a02a2  [MINOR] Negative test of federatedError
c9a02a2 is described below

commit c9a02a2e9fa3ec50a2a8c3611d62a2a334b5bab3
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Thu Aug 20 19:48:46 2020 +0200

    [MINOR] Negative test of federatedError
---
 .../federated/FederatedResponse.java               |  6 +-
 .../federated/FederatedWorkerHandler.java          |  2 +-
 .../org/apache/sysds/test/AutomatedTestBase.java   | 22 +++++-
 .../functions/federated/FederatedNegativeTest.java | 78 ++++++++++++++++++++++
 4 files changed, 105 insertions(+), 3 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedResponse.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedResponse.java
index ea03bd4..5b0e4eb 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedResponse.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedResponse.java
@@ -91,7 +91,11 @@ public class FederatedResponse implements Serializable {
 				throw (Exception) potentialException;
 			}
 		}
-		throw new DMLRuntimeException("Unknown runtime exception in handling of federated request by federated worker.");
+		String errorMessage = getErrorMessage();
+		if (getErrorMessage() != "No readable error message")
+			throw new DMLRuntimeException(errorMessage);
+		else
+			throw new DMLRuntimeException("Unknown runtime exception in handling of federated request by federated worker.");
 	}
 
 	/**
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index b0c75df..35b844d 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -295,7 +295,7 @@ public class FederatedWorkerHandler extends ChannelInboundHandlerAdapter {
 		@Override
 		public void operationComplete(ChannelFuture channelFuture) throws InterruptedException {
 			if (!channelFuture.isSuccess()){
-				log.fatal("Federated Worker Write failed");
+				log.error("Federated Worker Write failed");
 				channelFuture
 					.channel()
 					.writeAndFlush(
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index b40a637..bfb2e5d 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -1273,7 +1273,7 @@ public abstract class AutomatedTestBase {
 			args.add("-gpu");
 	}
 
-	protected int getRandomAvailablePort() {
+	public static int getRandomAvailablePort() {
 		try(ServerSocket availableSocket = new ServerSocket(0)) {
 			return availableSocket.getLocalPort();
 		}
@@ -1312,6 +1312,26 @@ public abstract class AutomatedTestBase {
 		return t;
 	}
 
+	public static Thread startLocalFedWorkerWithArgs(String[] args) {
+		Thread t = null;
+
+		try {
+			t = new Thread(() -> {
+				try {
+					main(args);
+				}
+				catch(IOException e) {
+				}
+			});
+			t.start();
+			java.util.concurrent.TimeUnit.MILLISECONDS.sleep(FED_WORKER_WAIT);
+		}
+		catch(InterruptedException e) {
+			// Should happen at closing of the worker so don't print
+		}
+		return t;
+	}
+
 	private boolean rCompareException(boolean exceptionExpected, String errMessage, Throwable e, boolean result) {
 		if(e.getCause() != null) {
 			result |= rCompareException(exceptionExpected, errMessage, e.getCause(), result);
diff --git a/src/test/java/org/apache/sysds/test/functions/federated/FederatedNegativeTest.java b/src/test/java/org/apache/sysds/test/functions/federated/FederatedNegativeTest.java
new file mode 100644
index 0000000..a355275
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/federated/FederatedNegativeTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.controlprogram.federated.*;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@net.jcip.annotations.NotThreadSafe
+public class FederatedNegativeTest {
+	protected static Logger log = Logger.getLogger(FederatedNegativeTest.class);
+
+	static {
+		Logger.getLogger("org.apache.sysds").setLevel(Level.OFF);
+	}
+
+	@Test
+	public void NegativeTest1() {
+        int port = AutomatedTestBase.getRandomAvailablePort();
+		String[] args = {"-w", Integer.toString(port)};
+        Thread t = AutomatedTestBase.startLocalFedWorkerWithArgs(args);
+		Map<FederatedRange, FederatedData> fedMap = new HashMap<>();
+		FederatedRange r = new FederatedRange(new long[]{0,0}, new long[]{1,1});
+		FederatedData d = new FederatedData(
+				Types.DataType.SCALAR,
+				new InetSocketAddress("localhost", port),
+				"Nowhere");
+		fedMap.put(r,d);
+		FederationMap fedM = new FederationMap(fedMap);
+		FederatedRequest fr = new FederatedRequest(FederatedRequest.RequestType.GET_VAR);
+		Future<FederatedResponse>[] res = fedM.execute(0, fr);
+		try {
+			FederatedResponse fres = res[0].get();
+			assertFalse(fres.isSuccessful());
+			assertTrue(fres.getErrorMessage().contains("Variable 0 does not exist at federated worker"));
+
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		} catch (ExecutionException e) {
+			e.printStackTrace();
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+
+		TestUtils.shutdownThread(t);
+	}
+
+}