You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2022/04/02 23:28:17 UTC

[systemds] branch main updated: [SYSTEMDS-3342] Fix remote parfor for cleaning pipeline enumeration, II

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 33b9d41  [SYSTEMDS-3342] Fix remote parfor for cleaning pipeline enumeration, II
33b9d41 is described below

commit 33b9d41d7689b1ff0ab02fad060593c0bef5a5c1
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sun Apr 3 01:27:56 2022 +0200

    [SYSTEMDS-3342] Fix remote parfor for cleaning pipeline enumeration, II
    
    This patch makes additional fixes for cleaning pipeline enumeration in
    remote parfor loops.
    
    * Proper export of modified frames/list result variables
    * Fix bandit builtin to avoid unnecessary list result variables
    * Fix special cases of eval function loading (rt program duplicate funs)
    * Fix aggregate binary typos in method names
    * Fix ultra-sparse matrix multiplication selection
---
 scripts/builtin/bandit.dml                         |  8 +-
 .../org/apache/sysds/parser/DMLTranslator.java     |  6 +-
 .../sysds/parser/FunctionCallIdentifier.java       |  3 +-
 .../org/apache/sysds/parser/StatementBlock.java    |  7 +-
 .../apache/sysds/runtime/codegen/CodegenUtils.java |  1 -
 .../federated/MatrixLineagePair.java               |  1 -
 .../controlprogram/parfor/RemoteParForUtils.java   | 89 +++++-----------------
 .../cp/AggregateBinaryCPInstruction.java           |  4 +-
 .../instructions/cp/EvalNaryCPInstruction.java     | 11 ++-
 .../sysds/runtime/matrix/data/LibMatrixMult.java   | 10 +--
 10 files changed, 49 insertions(+), 91 deletions(-)

diff --git a/scripts/builtin/bandit.dml b/scripts/builtin/bandit.dml
index dd5b5d8..d017556 100644
--- a/scripts/builtin/bandit.dml
+++ b/scripts/builtin/bandit.dml
@@ -239,7 +239,8 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
     [hp, applyFunctions, no_of_res, no_of_flag_vars] = getHyperparam(op, param, r_i, default, enablePruning)
     hpForPruning = matrix(0, rows=1, cols=ncol(op))
     changesByOp = matrix(0, rows=1, cols=ncol(op))
-    metaList["applyFunc"] = applyFunctions
+    metaList2 = metaList; #ensure metaList is no result var
+    metaList2["applyFunc"] = applyFunctions
     for(r in 1:no_of_res)
     {
       # as the matrix first block of r rows belongs to first operator and r+1 block of rows to second operator 
@@ -260,13 +261,13 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
         {
           pipList = list(ph = op, hp = hp_matrix, flags = no_of_flag_vars)
           [accuracy, evalHp, hpForPruning, changesByOp, changesByPip] = crossV(X=X, y=Y, cvk=cvk, evalFunHp=evalFunHp,
-            pipList=pipList, metaList=metaList, hpForPruning=hpForPruning, 
+            pipList=pipList, metaList=metaList2, hpForPruning=hpForPruning, 
           changesByOp=changesByOp, evalFunc=evaluationFunc, ref=ref)
         }
         else 
         {
           [eXtrain, eYtrain, eXtest, eYtest, Tr, hpForPruning, changesByOp, changesByPip] = executePipeline(pipeline=op, 
-            Xtrain=X, Ytrain=Y, Xtest=Xtest, Ytest=Ytest, metaList=metaList,  hyperParameters=hp_matrix, hpForPruning=hpForPruning,
+            Xtrain=X, Ytrain=Y, Xtest=Xtest, Ytest=Ytest, metaList=metaList2,  hyperParameters=hp_matrix, hpForPruning=hpForPruning,
             changesByOp=changesByOp, flagsCount=no_of_flag_vars, test=TRUE, verbose=FALSE)
           if(max(eYtrain) == min(eYtrain)) 
             print("Y contains only one class")
@@ -299,7 +300,6 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
   output_hyperparam = removeEmpty(target=cbind(output_accuracy, output_hp), margin="rows", select = sel)
   output_operator = removeEmpty(target=cbind(output_accuracy, output_pipelines), margin="rows", select = sel)
   changesByPipMatrix = removeEmpty(target=changesByPipMatrix, margin="rows", select = sel)
-
 }
 
 # extract the hyper-parameters for pipelines
diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
index ef51904..8383318 100644
--- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
@@ -152,6 +152,10 @@ public class DMLTranslator
 	}
 	
 	public void validateFunction(DMLProgram dmlp, FunctionStatementBlock fsb) {
+		validateFunction(dmlp, fsb, false);
+	}
+	
+	public void validateFunction(DMLProgram dmlp, FunctionStatementBlock fsb, boolean conditional) {
 		HashMap<String, ConstIdentifier> constVars = new HashMap<>();
 		VariableSet vs = new VariableSet();
 	
@@ -162,7 +166,7 @@ public class DMLTranslator
 				currVar.setDimensions(0, 0);
 			vs.addVariable(currVar.getName(), currVar);
 		}
-		fsb.validate(dmlp, vs, constVars, false);
+		fsb.validate(dmlp, vs, constVars, conditional);
 	}
 
 	public void liveVariableAnalysis(DMLProgram dmlp) {
diff --git a/src/main/java/org/apache/sysds/parser/FunctionCallIdentifier.java b/src/main/java/org/apache/sysds/parser/FunctionCallIdentifier.java
index 173a17e..f60047f 100644
--- a/src/main/java/org/apache/sysds/parser/FunctionCallIdentifier.java
+++ b/src/main/java/org/apache/sysds/parser/FunctionCallIdentifier.java
@@ -139,7 +139,8 @@ public class FunctionCallIdentifier extends DataIdentifier
 			fblock = dmlp.getFunctionStatementBlock(_namespace, _name);
 			if( fblock == null ) {
 				raiseValidateError("Builtin function '"+_name+ "': script loaded "
-					+ "but function not found. Is there a typo in the function name?");
+					+ "but function not found. Is there a typo in the function name?", conditional);
+				return; //robustness on warnings (conditional)
 			}
 		}
 		
diff --git a/src/main/java/org/apache/sysds/parser/StatementBlock.java b/src/main/java/org/apache/sysds/parser/StatementBlock.java
index 1604a28..ca78220 100644
--- a/src/main/java/org/apache/sysds/parser/StatementBlock.java
+++ b/src/main/java/org/apache/sysds/parser/StatementBlock.java
@@ -1057,12 +1057,13 @@ public class StatementBlock extends LiveVariableAnalysis implements ParseInfo
 				DataIdentifier target = targetList.get(j);
 				// set target properties (based on type info in function call statement return params)
 				FunctionCallIdentifier fci = (FunctionCallIdentifier)source;
-				FunctionStatement fstmt = (FunctionStatement)_dmlProg
-					.getFunctionStatementBlock(fci.getNamespace(), fci.getName()).getStatement(0);
-				if (fstmt == null){
+				FunctionStatementBlock fblock = _dmlProg.getFunctionStatementBlock(fci.getNamespace(), fci.getName());
+				if (fblock == null){
 					fci.raiseValidateError(" function " + fci.getName() 
 						+ " is undefined in namespace " + fci.getNamespace(), conditional);
+					return;
 				}
+				FunctionStatement fstmt = (FunctionStatement)fblock.getStatement(0);
 				if (!(target instanceof IndexedIdentifier)){
 					target.setProperties(fstmt.getOutputParams().get(j));
 				}
diff --git a/src/main/java/org/apache/sysds/runtime/codegen/CodegenUtils.java b/src/main/java/org/apache/sysds/runtime/codegen/CodegenUtils.java
index 90f7089..898d01d 100644
--- a/src/main/java/org/apache/sysds/runtime/codegen/CodegenUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/codegen/CodegenUtils.java
@@ -194,7 +194,6 @@ public class CodegenUtils
 	private synchronized static Class<?> compileClassJanino(String name, String src) {
 		try {
 			// compile source code
-			// (in recent spark versions )
 			SimpleCompiler compiler = new SimpleCompiler();
 			if( _mainClassLoader != null )
 				compiler.setParentClassLoader(_mainClassLoader);
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/MatrixLineagePair.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/MatrixLineagePair.java
index a6d73cd..0e1380e 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/MatrixLineagePair.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/MatrixLineagePair.java
@@ -23,7 +23,6 @@ import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.hops.fedplanner.FTypes.FType;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
 import org.apache.sysds.runtime.lineage.LineageItem;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
index 073d4a2..4a5a898 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
@@ -26,14 +26,11 @@ import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.parser.ParForStatementBlock.ResultVar;
 import org.apache.sysds.runtime.DMLRuntimeException;
@@ -46,6 +43,9 @@ import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
 import org.apache.sysds.runtime.controlprogram.parfor.stat.Stat;
 import org.apache.sysds.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.cp.ListObject;
+import org.apache.sysds.runtime.io.FileFormatProperties;
+import org.apache.sysds.runtime.io.ListWriter;
 import org.apache.sysds.runtime.lineage.Lineage;
 import org.apache.sysds.runtime.lineage.LineageItem;
 import org.apache.sysds.runtime.lineage.LineageParser;
@@ -97,62 +97,6 @@ public class RemoteParForUtils
 		}
 	}
 
-	public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<ResultVar> resultVars, OutputCollector<Writable, Writable> out ) throws IOException {
-		exportResultVariables(workerID, vars, resultVars, null, out);
-	}
-	
-	/**
-	 * For remote MR parfor workers.
-	 * 
-	 * @param workerID worker id
-	 * @param vars local variable map
-	 * @param resultVars list of result variables
-	 * @param rvarFnames ?
-	 * @param out output collectors
-	 * @throws IOException if IOException occurs
-	 */
-	public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<ResultVar> resultVars, 
-			HashMap<String,String> rvarFnames, OutputCollector<Writable, Writable> out ) throws IOException
-	{
-		//create key and value for reuse
-		LongWritable okey = new LongWritable( workerID ); 
-		Text ovalue = new Text();
-		
-		//foreach result variables probe if export necessary
-		for( ResultVar rvar : resultVars )
-		{
-			Data dat = vars.get( rvar._name );
-			
-			//export output variable to HDFS (see RunMRJobs)
-			if ( dat != null && dat.getDataType() == DataType.MATRIX ) 
-			{
-				MatrixObject mo = (MatrixObject) dat;
-				if( mo.isDirty() )
-				{
-					if( rvarFnames!=null ) {
-						String fname = rvarFnames.get( rvar._name );
-						if( fname!=null )
-							mo.setFileName( fname );
-							
-						//export result var (iff actually modified in parfor)
-						mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation)		
-						rvarFnames.put(rvar._name, mo.getFileName());
-					}
-					else {
-						//export result var (iff actually modified in parfor)
-						mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation)
-					}
-					
-					//pass output vars (scalars by value, matrix by ref) to result
-					//(only if actually exported, hence in check for dirty, otherwise potential problems in result merge)
-					String datStr = ProgramConverter.serializeDataObject(rvar._name, mo);
-					ovalue.set( datStr );
-					out.collect( okey, ovalue );
-				}
-			}
-		}
-	}
-	
 	/**
 	 * For remote Spark parfor workers. This is a simplified version compared to MR.
 	 * 
@@ -170,18 +114,23 @@ public class RemoteParForUtils
 		//foreach result variables probe if export necessary
 		for( ResultVar rvar : resultVars ) {
 			Data dat = vars.get( rvar._name );
-			//export output variable to HDFS (see RunMRJobs)
-			if ( dat != null && dat.getDataType() == DataType.MATRIX )  {
-				MatrixObject mo = (MatrixObject) dat;
-				if( mo.isDirty() ) {
-					//export result var (iff actually modified in parfor)
-					mo.exportData(); 
-					//pass output vars (scalars by value, matrix by ref) to result
-					//(only if actually exported, hence in check for dirty, otherwise potential problems in result merge)
-					ret.add( ProgramConverter.serializeDataObject(rvar._name, mo) );
+			
+			if ( dat != null && dat.getDataType().isMatrixOrFrame() ) {
+				CacheableData<?> cd = (CacheableData<?>) dat;
+				//export result var (iff actually modified in parfor)
+				if( cd.isDirty() ) {
+					cd.exportData();
+					//pass output vars to result (only if actually exported)
+					ret.add( ProgramConverter.serializeDataObject(rvar._name, dat) );
 				}
 				//cleanup pinned result variable from buffer pool
-				mo.freeEvictedBlob();
+				cd.freeEvictedBlob();
+			}
+			else if (dat instanceof ListObject) {
+				String fname = OptimizerUtils.getUniqueTempFileName();
+				ListWriter.writeListToHDFS((ListObject) dat, fname, "binary",
+					new FileFormatProperties(ConfigurationManager.getBlocksize()));
+				ret.add( ProgramConverter.serializeDataObject(rvar._name, dat) );
 			}
 		}
 		
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
index 60dfdd1..d421c21 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
@@ -82,10 +82,10 @@ public class AggregateBinaryCPInstruction extends BinaryCPInstruction {
 		else if(transposeLeft || transposeRight)
 			processTransposedFusedAggregateBinary(ec);
 		else
-			precessNormal(ec);
+			processNormal(ec);
 	}
 
-	private void precessNormal(ExecutionContext ec) {
+	private void processNormal(ExecutionContext ec) {
 		// get inputs
 		MatrixBlock matBlock1 = ec.getMatrixInput(input1.getName());
 		MatrixBlock matBlock2 = ec.getMatrixInput(input2.getName());
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
index 46fbeeb..fc6132f 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
@@ -218,6 +218,7 @@ public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
 			fsbs.get(Builtins.getInternalFName(name, dt)).getDMLProg();
 		
 		//filter already existing functions (e.g., already loaded internally-called functions)
+		//note: in remote parfor the runtime program might contain more functions than the DML program
 		fsbs = (dmlp.getBuiltinFunctionDictionary() == null) ? fsbs : fsbs.entrySet().stream()
 			.filter(e -> !dmlp.getBuiltinFunctionDictionary().containsFunction(e.getKey()))
 			.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
@@ -237,7 +238,9 @@ public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
 		// validate functions, in two passes for cross references
 		for( FunctionStatementBlock fsb : fsbs.values() ) {
 			dmlt.liveVariableAnalysisFunction(dmlp, fsb);
-			dmlt.validateFunction(dmlp, fsb);
+			//mark as conditional (warnings instead of errors) because internally
+			//called functions might not be available in dmlp but prog in remote parfor
+			dmlt.validateFunction(dmlp, fsb, true);
 		}
 		
 		// compile hop dags, rewrite hop dags and compile lop dags
@@ -260,8 +263,10 @@ public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
 		for( Entry<String,FunctionStatementBlock> fsb : fsbs.entrySet() ) {
 			FunctionProgramBlock fpb = (FunctionProgramBlock) dmlt
 				.createRuntimeProgramBlock(prog, fsb.getValue(), ConfigurationManager.getDMLConfig());
-			prog.addFunctionProgramBlock(nsName, fsb.getKey(), fpb, true);  // optimized
-			prog.addFunctionProgramBlock(nsName, fsb.getKey(), fpb, false); // unoptimized -> eval
+			if(!prog.containsFunctionProgramBlock(nsName, fsb.getKey(), true))
+				prog.addFunctionProgramBlock(nsName, fsb.getKey(), fpb, true);  // optimized
+			if(!prog.containsFunctionProgramBlock(nsName, fsb.getKey(), false))
+				prog.addFunctionProgramBlock(nsName, fsb.getKey(), fpb, false); // unoptimized -> eval
 		}
 	}
 	
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java
index c619c61..0e003ca 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java
@@ -181,7 +181,7 @@ public class LibMatrixMult
 			k = 1;
 
 		if(k <= 1)
-			singleThreadMatrixMult(m1, m2, ret, ultraSparse, sparse, tm2, m1Perm, fixedRet);
+			singleThreadedMatrixMult(m1, m2, ret, ultraSparse, sparse, tm2, m1Perm, fixedRet);
 		else
 			parallelMatrixMult(m1, m2, ret, k, ultraSparse, sparse, tm2, m1Perm);
 
@@ -191,7 +191,7 @@ public class LibMatrixMult
 		return ret;
 	}
 
-	private static void singleThreadMatrixMult(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret,  
+	private static void singleThreadedMatrixMult(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret,  
 		boolean ultraSparse, boolean sparse, boolean tm2, boolean m1Perm, boolean fixedRet){
 		// prepare row-upper for special cases of vector-matrix
 		final boolean pm2 = !ultraSparse && checkParMatrixMultRightInputRows(m1, m2, Integer.MAX_VALUE);
@@ -3967,7 +3967,8 @@ public class LibMatrixMult
 			|| (m1Perm && OptimizerUtils.getSparsity(m2.rlen, m2.clen, m2.nonZeros)<1.0)
 			|| ((m1.isUltraSparse(false) || m2.isUltraSparse(false)) 
 				&& outSp < MatrixBlock.ULTRA_SPARSITY_TURN_POINT2)
-			|| (m1.getSparsity() < MatrixBlock.ULTRA_SPARSITY_TURN_POINT2
+			|| (m1.isInSparseFormat() // otherwise no matching branch
+				&& m1.getSparsity() < MatrixBlock.ULTRA_SPARSITY_TURN_POINT2
 				&& m1.getNonZeros() < MatrixBlock.ULTRA_SPARSE_BLOCK_NNZ
 				&& m1.getLength()+m2.getLength() < (long)m1.rlen*m2.clen
 				&& outSp < MatrixBlock.SPARSITY_TURN_POINT);
@@ -3994,8 +3995,7 @@ public class LibMatrixMult
 		//transpose if dense-dense, skinny rhs matrix (not vector), and memory guarded by output 
 		if( tm2 ) {
 			MatrixBlock tmpBlock = new MatrixBlock(m2.clen, m2.rlen, m2.sparse);
-			LibMatrixReorg.reorg(m2, tmpBlock, new ReorgOperator(SwapIndex.getSwapIndexFnObject()));
-			ret = tmpBlock;
+			ret = LibMatrixReorg.reorg(m2, tmpBlock, new ReorgOperator(SwapIndex.getSwapIndexFnObject()));
 		}
 		
 		return ret;