You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/05/23 17:05:08 UTC

[systemds] branch master updated: [SYSTEMDS-2953] Fix parfor spark runtime (frame support, singlenode)

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new c0d8456  [SYSTEMDS-2953] Fix parfor spark runtime (frame support, singlenode)
c0d8456 is described below

commit c0d84560067365ce144713d49559494786637c84
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sun May 23 19:02:11 2021 +0200

    [SYSTEMDS-2953] Fix parfor spark runtime (frame support, singlenode)
    
    This patch fixes a few issues of forced remote spark parfor loops,
    especially for basic frame input support, execution context and
    blocksizes in forced singlenode execution, and foramt handling.
    
    The prime use cases are data cleaning pipelines which deal with matrices
    and frames and need to run both at logical and physical pipeline level
    in parfor and parfor remote if the dataset is small enough.
---
 src/main/java/org/apache/sysds/common/Types.java   |   3 +
 .../java/org/apache/sysds/parser/DMLProgram.java   |  10 +++
 .../apache/sysds/parser/ParForStatementBlock.java  |   6 ++
 .../runtime/controlprogram/ParForProgramBlock.java |   9 +-
 .../controlprogram/caching/FrameObject.java        |   4 +-
 .../controlprogram/caching/MatrixObject.java       |   3 +-
 .../context/ExecutionContextFactory.java           |   3 +-
 .../sysds/runtime/util/ProgramConverter.java       |  34 ++++++-
 .../misc/ParForFunctionSerializationTest.java      |   5 +-
 .../parfor/misc/ParForRemoteRobustnessTest.java    | 100 +++++++++++++++++++++
 .../scripts/functions/parfor/parfor_remote1.dml    |  33 +++++++
 .../scripts/functions/parfor/parfor_remote2.dml    |  36 ++++++++
 12 files changed, 233 insertions(+), 13 deletions(-)

diff --git a/src/main/java/org/apache/sysds/common/Types.java b/src/main/java/org/apache/sysds/common/Types.java
index 284ba9c..2dd5ca9 100644
--- a/src/main/java/org/apache/sysds/common/Types.java
+++ b/src/main/java/org/apache/sysds/common/Types.java
@@ -55,6 +55,9 @@ public class Types
 		public boolean isFrame() {
 			return this == FRAME;
 		}
+		public boolean isMatrixOrFrame() {
+			return isMatrix() | isFrame();
+		}
 		public boolean isScalar() {
 			return this == SCALAR;
 		}
diff --git a/src/main/java/org/apache/sysds/parser/DMLProgram.java b/src/main/java/org/apache/sysds/parser/DMLProgram.java
index ea9f306..498a59d 100644
--- a/src/main/java/org/apache/sysds/parser/DMLProgram.java
+++ b/src/main/java/org/apache/sysds/parser/DMLProgram.java
@@ -35,10 +35,12 @@ public class DMLProgram
 	
 	private ArrayList<StatementBlock> _blocks;
 	private Map<String, FunctionDictionary<FunctionStatementBlock>> _namespaces;
+	private boolean _containsRemoteParfor;
 	
 	public DMLProgram(){
 		_blocks = new ArrayList<>();
 		_namespaces = new HashMap<>();
+		_containsRemoteParfor = false;
 	}
 	
 	public DMLProgram(String namespace) {
@@ -58,6 +60,14 @@ public class DMLProgram
 		return _blocks.size();
 	}
 	
+	public void setContainsRemoteParfor(boolean flag) {
+		_containsRemoteParfor = flag;
+	}
+	
+	public boolean containsRemoteParfor() {
+		return _containsRemoteParfor;
+	}
+	
 	public static boolean isInternalNamespace(String namespace) {
 		return DEFAULT_NAMESPACE.equals(namespace)
 			|| BUILTIN_NAMESPACE.equals(namespace)
diff --git a/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java b/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java
index 9219ba1..4b47148 100644
--- a/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java
+++ b/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java
@@ -223,6 +223,12 @@ public class ParForStatementBlock extends ForStatementBlock
 					else //default case
 						params.put(key, _paramDefaults.get(key));
 				}
+			
+			//keep info if forced into remote exec
+			if( constrained && params.containsKey(EXEC_MODE) )
+				dmlProg.setContainsRemoteParfor(
+					params.get(EXEC_MODE).equals(PExecMode.REMOTE_SPARK.name()) ||
+					params.get(EXEC_MODE).equals(PExecMode.REMOTE_SPARK_DP.name()));
 		}
 		else {
 			//set all defaults
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index 7e318ae..cb94c00 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -37,6 +37,7 @@ import org.apache.sysds.parser.ParForStatementBlock.ResultVar;
 import org.apache.sysds.parser.StatementBlock;
 import org.apache.sysds.parser.VariableSet;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
@@ -1143,8 +1144,8 @@ public class ParForProgramBlock extends ForProgramBlock
 			for (String key : ec.getVariables().keySet() ) {
 				if( varsRead.containsVariable(key) && !excludeNames.contains(key) ) {
 					Data d = ec.getVariable(key);
-					if( d.getDataType() == DataType.MATRIX )
-						((MatrixObject)d).exportData(_replicationExport);
+					if( d.getDataType().isMatrixOrFrame() )
+						((CacheableData<?>)d).exportData(_replicationExport);
 				}
 			}
 		}
@@ -1153,8 +1154,8 @@ public class ParForProgramBlock extends ForProgramBlock
 			for (String key : ec.getVariables().keySet() ) {
 				if( !excludeNames.contains(key) ) {
 					Data d = ec.getVariable(key);
-					if( d.getDataType() == DataType.MATRIX )
-						((MatrixObject)d).exportData(_replicationExport);
+					if( d.getDataType().isMatrixOrFrame() )
+						((CacheableData<?>)d).exportData(_replicationExport);
 				}
 			}
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
index 36cd0f0..388238c 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
@@ -272,7 +272,9 @@ public class FrameObject extends CacheableData<FrameBlock>
 	protected void writeBlobToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop)
 		throws IOException, DMLRuntimeException 
 	{
-		FileFormat fmt = FileFormat.safeValueOf(ofmt);
+		MetaDataFormat iimd = (MetaDataFormat) _metaData;
+		FileFormat fmt = (ofmt != null ? FileFormat.safeValueOf(ofmt) : iimd.getFileFormat());
+		
 		FrameWriter writer = FrameWriterFactory.createFrameWriter(fmt, fprop);
 		writer.writeFrameToHDFS(_data, fname,  getNumRows(), getNumColumns());
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
index 3001e44..428fe9a 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
@@ -462,9 +462,10 @@ public class MatrixObject extends CacheableData<MatrixBlock>
 
 		// Read matrix and maintain meta data, 
 		// if the MatrixObject is federated there is nothing extra to read, and therefore only acquire read and release
+		int blen = mc.getBlocksize() <= 0 ? ConfigurationManager.getBlocksize() : mc.getBlocksize();
 		MatrixBlock newData = isFederated() ? acquireReadAndRelease() :
 			DataConverter.readMatrixFromHDFS(fname, iimd.getFileFormat(), rlen,
-			clen, mc.getBlocksize(), mc.getNonZeros(), getFileFormatProperties());
+			clen, blen, mc.getNonZeros(), getFileFormatProperties());
 		
 		if(iimd.getFileFormat() == FileFormat.CSV){
 			_metaData = _metaData instanceof MetaDataFormat ?
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContextFactory.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContextFactory.java
index 66ff510..6f408b2 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContextFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContextFactory.java
@@ -54,7 +54,8 @@ public class ExecutionContextFactory
 			case SINGLE_NODE:
 				//NOTE: even in case of forced singlenode operations, users might still 
 				//want to run remote parfor which requires the correct execution context
-				if( OptimizerUtils.getDefaultExecutionMode()==ExecMode.HYBRID)
+				if( OptimizerUtils.getDefaultExecutionMode()==ExecMode.HYBRID
+					&& !(prog.getDMLProg()!=null && prog.getDMLProg().containsRemoteParfor()))
 					ec = new ExecutionContext(allocateVars, allocateLineage, prog);
 				else
 					ec = new SparkExecutionContext(allocateVars, allocateLineage, prog);
diff --git a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
index 16472c7..f595137 100644
--- a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
+++ b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
@@ -64,6 +64,7 @@ import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PartitionForma
 import org.apache.sysds.runtime.controlprogram.Program;
 import org.apache.sysds.runtime.controlprogram.ProgramBlock;
 import org.apache.sysds.runtime.controlprogram.WhileProgramBlock;
+import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
@@ -160,8 +161,6 @@ public class ProgramConverter
 	public static final String PSBODY_END = LEVELOUT + CDATA_END;
 	
 	//exception msgs
-	public static final String NOT_SUPPORTED_EXTERNALFUNCTION_PB = "Not supported: ExternalFunctionProgramBlock contains MR instructions. " +
-																	"(ExternalFunctionPRogramBlockCP can be used)";
 	public static final String NOT_SUPPORTED_SPARK_INSTRUCTION   = "Not supported: Instructions of type other than CP instructions";
 	public static final String NOT_SUPPORTED_SPARK_PARFOR        = "Not supported: Nested ParFOR REMOTE_SPARK due to possible deadlocks." +
 																	"(LOCAL can be used for innner ParFOR)";
@@ -920,7 +919,7 @@ public class ProgramConverter
 				//name = so.getName();
 				value = so.getStringValue();
 				break;
-			case MATRIX:
+			case MATRIX: {
 				MatrixObject mo = (MatrixObject) dat;
 				MetaDataFormat md = (MetaDataFormat) dat.getMetaData();
 				DataCharacteristics dc = md.getDataCharacteristics();
@@ -938,6 +937,21 @@ public class ProgramConverter
 				metaData[7] = String.valueOf(mo.isHDFSFileExists());
 				metaData[8] = String.valueOf(mo.isCleanupEnabled());
 				break;
+			}
+			case FRAME: {
+				FrameObject fo = (FrameObject) dat;
+				MetaDataFormat md = (MetaDataFormat) dat.getMetaData();
+				DataCharacteristics dc = md.getDataCharacteristics();
+				value = fo.getFileName();
+				metaData = new String[6];
+				metaData[0] = String.valueOf(dc.getRows());
+				metaData[1] = String.valueOf(dc.getCols());
+				metaData[2] = String.valueOf(dc.getBlocksize());
+				metaData[3] = md.getFileFormat().toString();
+				metaData[4] = String.valueOf(fo.isHDFSFileExists());
+				metaData[5] = String.valueOf(fo.isCleanupEnabled());
+				break;
+			}
 			case LIST:
 				// SCHEMA: <name>|<datatype>|<valuetype>|value|<metadata>|<tab>element1<tab>element2<tab>element3 (this is the list)
 				//         (for the element1) <listName-index>|<datatype>|<valuetype>|value
@@ -1639,6 +1653,20 @@ public class ProgramConverter
 				dat = mo;
 				break;
 			}
+			case FRAME: {
+				FrameObject mo = new FrameObject(valString);
+				long rows = Long.parseLong(st.nextToken());
+				long cols = Long.parseLong(st.nextToken());
+				int blen = Integer.parseInt(st.nextToken());
+				FileFormat fmt = FileFormat.safeValueOf(st.nextToken());
+				MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, blen, -1);
+				MetaDataFormat md = new MetaDataFormat(mc, fmt);
+				mo.setMetaData( md );
+				mo.setHDFSFileExists(Boolean.valueOf(st.nextToken()));
+				mo.enableCleanup(Boolean.valueOf(st.nextToken()));
+				dat = mo;
+				break;
+			}
 			case LIST:
 				int size = Integer.parseInt(st.nextToken());
 				String namesStr = st.nextToken();
diff --git a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForFunctionSerializationTest.java b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForFunctionSerializationTest.java
index 6833b33..f75a2b0 100644
--- a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForFunctionSerializationTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForFunctionSerializationTest.java
@@ -69,9 +69,8 @@ public class ParForFunctionSerializationTest extends AutomatedTestBase
 		
 		fullRScriptName = HOME + TEST_NAME1 + ".R";
 		rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() + " " + expectedDir();
-
-		long seed = System.nanoTime();
-		double[][] V = getRandomMatrix(rows, cols, 0, 1, sparsity, seed);
+		
+		double[][] V = getRandomMatrix(rows, cols, 0, 1, sparsity, 7);
 		writeInputMatrix("V", V, true);
 
 		boolean exceptionExpected = false;
diff --git a/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRemoteRobustnessTest.java b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRemoteRobustnessTest.java
new file mode 100644
index 0000000..a1599c6
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/parfor/misc/ParForRemoteRobustnessTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.parfor.misc;
+
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+
+public class ParForRemoteRobustnessTest extends AutomatedTestBase 
+{
+	private final static String TEST_NAME1 = "parfor_remote1";
+	private final static String TEST_NAME2 = "parfor_remote2";
+	private final static String TEST_DIR = "functions/parfor/";
+	private final static String TEST_CLASS_DIR = TEST_DIR + ParForRemoteRobustnessTest.class.getSimpleName() + "/";
+	private final static double eps = 1e-10;
+	
+	private final static int rows = 20;
+	private final static int cols = 10;
+	private final static double sparsity = 1.0;
+	
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[]{"Rout"}));
+		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[]{"Rout"}));
+	}
+
+	@Test
+	public void testParForRemoteMatrixCP() {
+		runParforRemoteTest(TEST_NAME1, ExecMode.SINGLE_NODE);
+	}
+	
+	@Test
+	public void testParForRemoteMatrixHybrid() {
+		runParforRemoteTest(TEST_NAME1, ExecMode.HYBRID);
+	}
+	
+	@Test
+	public void testParForRemoteFrameCP() {
+		runParforRemoteTest(TEST_NAME2, ExecMode.SINGLE_NODE);
+	}
+	
+	@Test
+	public void testParForRemoteFrameHybrid() {
+		runParforRemoteTest(TEST_NAME2, ExecMode.HYBRID);
+	}
+	
+	private void runParforRemoteTest( String TEST_NAME, ExecMode type )
+	{
+		TestConfiguration config = getTestConfiguration(TEST_NAME);
+		config.addVariable("rows", rows);
+		config.addVariable("cols", cols);
+		loadTestConfiguration(config);
+		ExecMode oldExec = setExecMode(type);
+		if( type == ExecMode.SINGLE_NODE )
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		
+		try {
+			String HOME = SCRIPT_DIR + TEST_DIR;
+			fullDMLScriptName = HOME + TEST_NAME + ".dml";
+			programArgs = new String[]{"-explain","-args", input("V"), 
+				Integer.toString(rows), Integer.toString(cols), output("R") };
+			
+			double[][] V = getRandomMatrix(rows, cols, 5, 5, sparsity, 3);
+			writeInputMatrix("V", V, true);
+	
+			boolean exceptionExpected = false;
+			runTest(true, exceptionExpected, null, -1);
+			
+			//compare matrices
+			HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R");
+			Assert.assertEquals(5d*rows*cols, dmlfile.get(new CellIndex(1,1)), eps);
+		}
+		finally {
+			resetExecMode(oldExec);
+		}
+	}
+}
diff --git a/src/test/scripts/functions/parfor/parfor_remote1.dml b/src/test/scripts/functions/parfor/parfor_remote1.dml
new file mode 100644
index 0000000..6dd1af3
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_remote1.dml
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = read($1, rows=$2, cols=$3, format="text");
+
+R = matrix(0, nrow(A), 1)
+parfor(i in 1:nrow(A), mode=REMOTE_SPARK, opt=CONSTRAINED) {
+  Ai = A[i, ];
+  if( sum(Ai) < 0 ) # compile spark
+    Ai = rand(rows=1e10, cols=1e4); 
+  R[i,1] = as.matrix(sum(Ai));
+}
+
+R2 = as.matrix(sum(R))
+write(R2, $4);
diff --git a/src/test/scripts/functions/parfor/parfor_remote2.dml b/src/test/scripts/functions/parfor/parfor_remote2.dml
new file mode 100644
index 0000000..c4f376a
--- /dev/null
+++ b/src/test/scripts/functions/parfor/parfor_remote2.dml
@@ -0,0 +1,36 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = read($1, rows=$2, cols=$3, data_type="frame", format="text");
+
+R = matrix(0, nrow(A), 1)
+parfor(i in 1:nrow(A), mode=REMOTE_SPARK, opt=CONSTRAINED) {
+  Ai = A[i, ];
+  Ai3 = as.matrix(Ai);
+  if( sum(Ai3) < 0 ) { # compile spark
+    [Ai2,M] = transformencode(target=Ai, spec="{recode:[1]}");
+    Ai3 = rand(rows=1e10, cols=1e4) + sum(Ai2); 
+  }
+  R[i,1] = as.matrix(sum(Ai3));
+}
+
+R2 = as.matrix(sum(R))
+write(R2, $4);