You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@systemds.apache.org by GitBox <gi...@apache.org> on 2020/11/27 12:47:22 UTC

[GitHub] [systemds] Baunsgaard commented on a change in pull request #1112: [SYSTEMDS-2738] Federated rdiag instruction

Baunsgaard commented on a change in pull request #1112:
URL: https://github.com/apache/systemds/pull/1112#discussion_r531578960



##########
File path: src/main/java/org/apache/sysds/runtime/instructions/fed/ReorgFEDInstruction.java
##########
@@ -50,20 +77,196 @@ public static ReorgFEDInstruction parseInstruction ( String str ) {
 	@Override
 	public void processInstruction(ExecutionContext ec) {
 		MatrixObject mo1 = ec.getMatrixObject(input1);
-		
+		ReorgOperator r_op = (ReorgOperator) _optr;
+
 		if( !mo1.isFederated() )
 			throw new DMLRuntimeException("Federated Reorg: "
 				+ "Federated input expected, but invoked w/ "+mo1.isFederated());
 
-		//execute transpose at federated site
-		FederatedRequest fr1 = FederationUtils.callInstruction(instString, output,
-			new CPOperand[]{input1}, new long[]{mo1.getFedMapping().getID()});
-		mo1.getFedMapping().execute(getTID(), true, fr1);
+		if(instOpcode.equals("r'")) {
+			//execute transpose at federated site
+			FederatedRequest fr1 = FederationUtils.callInstruction(instString,
+				output,
+				new CPOperand[] {input1},
+				new long[] {mo1.getFedMapping().getID()});
+			mo1.getFedMapping().execute(getTID(), true, fr1);
+
+			//drive output federated mapping
+			MatrixObject out = ec.getMatrixObject(output);
+			out.getDataCharacteristics().set(mo1.getNumColumns(), mo1.getNumRows(), (int) mo1.getBlocksize(), mo1.getNnz());
+			out.setFedMapping(mo1.getFedMapping().copyWithNewID(fr1.getID()).transpose());
+		}
+		else if (instOpcode.equals("rdiag")) {
+			AbstractMap.SimpleEntry<FederationMap, Map<FederatedRange, int[]>> result;

Review comment:
       I have never seen the AbstractMap.SimpleEntry being used.
   Usually i see Map.Entry<>.
   Is there a difference?
   
   Also since the usage of this entry, is to enable  multi returns on functions calls, i would suggest making a small class inside this file, that you use to setup and return the values.

##########
File path: src/test/java/org/apache/sysds/test/functions/federated/algorithms/FederatedLmPipeline.java
##########
@@ -63,7 +63,6 @@ public void federatedLmPipelineSampled() {
 		federatedLmPipeline(Types.ExecMode.SINGLE_NODE, false, TEST_NAME1);
 	}
 
-	@Test

Review comment:
       should this be added again?
   
   if it is failing and you don't intend to fix it add an "@ Ignore" instead

##########
File path: src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRdiagTest.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedRdiagTest extends AutomatedTestBase {
+
+	private final static String TEST_DIR = "functions/federated/";
+	private final static String TEST_NAME = "FederatedRdiagTest";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedRdiagTest.class.getSimpleName() + "/";
+
+	private final static int blocksize = 1024;
+	@Parameterized.Parameter()
+	public int rows;
+
+	@Parameterized.Parameter(1)
+	public int cols;
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		return Arrays.asList(new Object[][] {
+//			{12, 12},
+			{12,1}

Review comment:
       this seems very small, maybe make them alittle bigger, and if you want to test multiple things then i would suggest :
   rowvector, colvector, and matrix.

##########
File path: src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRdiagTest.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedRdiagTest extends AutomatedTestBase {
+
+	private final static String TEST_DIR = "functions/federated/";
+	private final static String TEST_NAME = "FederatedRdiagTest";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedRdiagTest.class.getSimpleName() + "/";
+
+	private final static int blocksize = 1024;
+	@Parameterized.Parameter()
+	public int rows;
+
+	@Parameterized.Parameter(1)
+	public int cols;
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		return Arrays.asList(new Object[][] {
+//			{12, 12},
+			{12,1}
+		});
+	}
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"S"}));
+	}
+
+	@Test
+	public void federatedRdiagCP() { federatedRdiag(Types.ExecMode.SINGLE_NODE); }
+
+	@Test
+	@Ignore
+	public void federatedRdiagSP() { federatedRdiag(Types.ExecMode.SPARK); }
+
+	public void federatedRdiag(Types.ExecMode execMode) {
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		Types.ExecMode platformOld = rtplatform;
+
+		getAndLoadTestConfiguration(TEST_NAME);
+		String HOME = SCRIPT_DIR + TEST_DIR;
+
+		int r = rows / 4;
+
+		double[][] X1 = getRandomMatrix(r, cols, 1, 5, 1, 3);
+		double[][] X2 = getRandomMatrix(r, cols, 1, 5, 1, 7);
+		double[][] X3 = getRandomMatrix(r, cols, 1, 5, 1, 8);
+		double[][] X4 = getRandomMatrix(r, cols, 1, 5, 1, 9);
+
+		MatrixCharacteristics mc = new MatrixCharacteristics(r, cols, blocksize, r*cols);
+		writeInputMatrixWithMTD("X1", X1, false, mc);
+		writeInputMatrixWithMTD("X2", X2, false, mc);
+		writeInputMatrixWithMTD("X3", X3, false, mc);
+		writeInputMatrixWithMTD("X4", X4, false, mc);
+
+		// empty script name because we don't execute any script, just start the worker
+		fullDMLScriptName = "";
+		int port1 = getRandomAvailablePort();
+		int port2 = getRandomAvailablePort();
+		int port3 = getRandomAvailablePort();
+		int port4 = getRandomAvailablePort();
+		Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+		Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+		Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
+		Thread t4 = startLocalFedWorkerThread(port4);
+
+		// reference file should not be written to hdfs, so we set platform here
+		rtplatform = execMode;
+		if(rtplatform == Types.ExecMode.SPARK) {
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		}
+		// Run reference dml script with normal matrix for Row/Col
+		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+		programArgs = new String[] {"-stats", "100", "-args", 
+			input("X1"), input("X2"), input("X3"), input("X4"), expected("S")};
+		runTest(null);
+
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+
+		fullDMLScriptName = HOME + TEST_NAME + ".dml";
+		programArgs = new String[] {"-stats", "100", "-nvargs",
+			"in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
+			"in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
+			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")),
+			"rows=" + rows,
+			"cols=" + cols,
+			"out_S=" + output("S")};
+		runTest(null);
+
+		// compare all sums via files
+		compareResults(0.01);
+
+//		Assert.assertTrue(heavyHittersContainsString("fed_rdiag"));

Review comment:
       should this not be reenabled

##########
File path: src/main/java/org/apache/sysds/runtime/instructions/fed/ParameterizedBuiltinFEDInstruction.java
##########
@@ -156,11 +156,11 @@ private void rmempty(ExecutionContext ec) {
 		MatrixObject out = ec.getMatrixObject(output);
 
 		boolean marginRow = params.get("margin").equals("rows");
-		boolean k = ((marginRow && mo.getFedMapping().getType().isColPartitioned()) ||
+		boolean isNotAligned = ((marginRow && mo.getFedMapping().getType().isColPartitioned()) ||

Review comment:
       thanks, Much clearer variable name :+1: 

##########
File path: src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRdiagTest.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.primitives;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedRdiagTest extends AutomatedTestBase {
+
+	private final static String TEST_DIR = "functions/federated/";
+	private final static String TEST_NAME = "FederatedRdiagTest";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedRdiagTest.class.getSimpleName() + "/";
+
+	private final static int blocksize = 1024;
+	@Parameterized.Parameter()
+	public int rows;
+
+	@Parameterized.Parameter(1)
+	public int cols;
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		return Arrays.asList(new Object[][] {
+//			{12, 12},
+			{12,1}
+		});
+	}
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"S"}));
+	}
+
+	@Test
+	public void federatedRdiagCP() { federatedRdiag(Types.ExecMode.SINGLE_NODE); }
+
+	@Test
+	@Ignore
+	public void federatedRdiagSP() { federatedRdiag(Types.ExecMode.SPARK); }
+
+	public void federatedRdiag(Types.ExecMode execMode) {
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		Types.ExecMode platformOld = rtplatform;
+
+		getAndLoadTestConfiguration(TEST_NAME);
+		String HOME = SCRIPT_DIR + TEST_DIR;
+
+		int r = rows / 4;
+
+		double[][] X1 = getRandomMatrix(r, cols, 1, 5, 1, 3);
+		double[][] X2 = getRandomMatrix(r, cols, 1, 5, 1, 7);
+		double[][] X3 = getRandomMatrix(r, cols, 1, 5, 1, 8);
+		double[][] X4 = getRandomMatrix(r, cols, 1, 5, 1, 9);
+
+		MatrixCharacteristics mc = new MatrixCharacteristics(r, cols, blocksize, r*cols);
+		writeInputMatrixWithMTD("X1", X1, false, mc);
+		writeInputMatrixWithMTD("X2", X2, false, mc);
+		writeInputMatrixWithMTD("X3", X3, false, mc);
+		writeInputMatrixWithMTD("X4", X4, false, mc);
+
+		// empty script name because we don't execute any script, just start the worker
+		fullDMLScriptName = "";
+		int port1 = getRandomAvailablePort();
+		int port2 = getRandomAvailablePort();
+		int port3 = getRandomAvailablePort();
+		int port4 = getRandomAvailablePort();
+		Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+		Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+		Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
+		Thread t4 = startLocalFedWorkerThread(port4);
+
+		// reference file should not be written to hdfs, so we set platform here
+		rtplatform = execMode;
+		if(rtplatform == Types.ExecMode.SPARK) {
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		}
+		// Run reference dml script with normal matrix for Row/Col
+		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+		programArgs = new String[] {"-stats", "100", "-args", 
+			input("X1"), input("X2"), input("X3"), input("X4"), expected("S")};
+		runTest(null);
+
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+
+		fullDMLScriptName = HOME + TEST_NAME + ".dml";
+		programArgs = new String[] {"-stats", "100", "-nvargs",
+			"in_X1=" + TestUtils.federatedAddress(port1, input("X1")),
+			"in_X2=" + TestUtils.federatedAddress(port2, input("X2")),
+			"in_X3=" + TestUtils.federatedAddress(port3, input("X3")),
+			"in_X4=" + TestUtils.federatedAddress(port4, input("X4")),
+			"rows=" + rows,
+			"cols=" + cols,
+			"out_S=" + output("S")};
+		runTest(null);
+
+		// compare all sums via files
+		compareResults(0.01);
+
+//		Assert.assertTrue(heavyHittersContainsString("fed_rdiag"));
+
+		// check that federated input files are still existing
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X1")));
+		Assert.assertTrue(HDFSTool.existsFileOnHDFS(input("X2")));

Review comment:
       These checks were added in some tests to check if the federated workers would accidentally delete the files, I'm not sure if it is necessary in all tests. (Lets keep it for now, but just for your information.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org