You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2022/04/02 23:28:17 UTC
[systemds] branch main updated: [SYSTEMDS-3342] Fix remote parfor for cleaning pipeline enumeration, II
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 33b9d41 [SYSTEMDS-3342] Fix remote parfor for cleaning pipeline enumeration, II
33b9d41 is described below
commit 33b9d41d7689b1ff0ab02fad060593c0bef5a5c1
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sun Apr 3 01:27:56 2022 +0200
[SYSTEMDS-3342] Fix remote parfor for cleaning pipeline enumeration, II
This patch makes additional fixes for cleaning pipeline enumeration in
remote parfor loops.
* Proper export of modified frames/list result variables
* Fix bandit builtin to avoid unnecessary list result variables
* Fix special cases of eval function loading (rt program duplicate funs)
* Fix aggregate binary typos in method names
* Fix ultra-sparse matrix multiplication selection
---
scripts/builtin/bandit.dml | 8 +-
.../org/apache/sysds/parser/DMLTranslator.java | 6 +-
.../sysds/parser/FunctionCallIdentifier.java | 3 +-
.../org/apache/sysds/parser/StatementBlock.java | 7 +-
.../apache/sysds/runtime/codegen/CodegenUtils.java | 1 -
.../federated/MatrixLineagePair.java | 1 -
.../controlprogram/parfor/RemoteParForUtils.java | 89 +++++-----------------
.../cp/AggregateBinaryCPInstruction.java | 4 +-
.../instructions/cp/EvalNaryCPInstruction.java | 11 ++-
.../sysds/runtime/matrix/data/LibMatrixMult.java | 10 +--
10 files changed, 49 insertions(+), 91 deletions(-)
diff --git a/scripts/builtin/bandit.dml b/scripts/builtin/bandit.dml
index dd5b5d8..d017556 100644
--- a/scripts/builtin/bandit.dml
+++ b/scripts/builtin/bandit.dml
@@ -239,7 +239,8 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
[hp, applyFunctions, no_of_res, no_of_flag_vars] = getHyperparam(op, param, r_i, default, enablePruning)
hpForPruning = matrix(0, rows=1, cols=ncol(op))
changesByOp = matrix(0, rows=1, cols=ncol(op))
- metaList["applyFunc"] = applyFunctions
+ metaList2 = metaList; #ensure metaList is no result var
+ metaList2["applyFunc"] = applyFunctions
for(r in 1:no_of_res)
{
# as the matrix first block of r rows belongs to first operator and r+1 block of rows to second operator
@@ -260,13 +261,13 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
{
pipList = list(ph = op, hp = hp_matrix, flags = no_of_flag_vars)
[accuracy, evalHp, hpForPruning, changesByOp, changesByPip] = crossV(X=X, y=Y, cvk=cvk, evalFunHp=evalFunHp,
- pipList=pipList, metaList=metaList, hpForPruning=hpForPruning,
+ pipList=pipList, metaList=metaList2, hpForPruning=hpForPruning,
changesByOp=changesByOp, evalFunc=evaluationFunc, ref=ref)
}
else
{
[eXtrain, eYtrain, eXtest, eYtest, Tr, hpForPruning, changesByOp, changesByPip] = executePipeline(pipeline=op,
- Xtrain=X, Ytrain=Y, Xtest=Xtest, Ytest=Ytest, metaList=metaList, hyperParameters=hp_matrix, hpForPruning=hpForPruning,
+ Xtrain=X, Ytrain=Y, Xtest=Xtest, Ytest=Ytest, metaList=metaList2, hyperParameters=hp_matrix, hpForPruning=hpForPruning,
changesByOp=changesByOp, flagsCount=no_of_flag_vars, test=TRUE, verbose=FALSE)
if(max(eYtrain) == min(eYtrain))
print("Y contains only one class")
@@ -299,7 +300,6 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
output_hyperparam = removeEmpty(target=cbind(output_accuracy, output_hp), margin="rows", select = sel)
output_operator = removeEmpty(target=cbind(output_accuracy, output_pipelines), margin="rows", select = sel)
changesByPipMatrix = removeEmpty(target=changesByPipMatrix, margin="rows", select = sel)
-
}
# extract the hyper-parameters for pipelines
diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
index ef51904..8383318 100644
--- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
@@ -152,6 +152,10 @@ public class DMLTranslator
}
public void validateFunction(DMLProgram dmlp, FunctionStatementBlock fsb) {
+ validateFunction(dmlp, fsb, false);
+ }
+
+ public void validateFunction(DMLProgram dmlp, FunctionStatementBlock fsb, boolean conditional) {
HashMap<String, ConstIdentifier> constVars = new HashMap<>();
VariableSet vs = new VariableSet();
@@ -162,7 +166,7 @@ public class DMLTranslator
currVar.setDimensions(0, 0);
vs.addVariable(currVar.getName(), currVar);
}
- fsb.validate(dmlp, vs, constVars, false);
+ fsb.validate(dmlp, vs, constVars, conditional);
}
public void liveVariableAnalysis(DMLProgram dmlp) {
diff --git a/src/main/java/org/apache/sysds/parser/FunctionCallIdentifier.java b/src/main/java/org/apache/sysds/parser/FunctionCallIdentifier.java
index 173a17e..f60047f 100644
--- a/src/main/java/org/apache/sysds/parser/FunctionCallIdentifier.java
+++ b/src/main/java/org/apache/sysds/parser/FunctionCallIdentifier.java
@@ -139,7 +139,8 @@ public class FunctionCallIdentifier extends DataIdentifier
fblock = dmlp.getFunctionStatementBlock(_namespace, _name);
if( fblock == null ) {
raiseValidateError("Builtin function '"+_name+ "': script loaded "
- + "but function not found. Is there a typo in the function name?");
+ + "but function not found. Is there a typo in the function name?", conditional);
+ return; //robustness on warnings (conditional)
}
}
diff --git a/src/main/java/org/apache/sysds/parser/StatementBlock.java b/src/main/java/org/apache/sysds/parser/StatementBlock.java
index 1604a28..ca78220 100644
--- a/src/main/java/org/apache/sysds/parser/StatementBlock.java
+++ b/src/main/java/org/apache/sysds/parser/StatementBlock.java
@@ -1057,12 +1057,13 @@ public class StatementBlock extends LiveVariableAnalysis implements ParseInfo
DataIdentifier target = targetList.get(j);
// set target properties (based on type info in function call statement return params)
FunctionCallIdentifier fci = (FunctionCallIdentifier)source;
- FunctionStatement fstmt = (FunctionStatement)_dmlProg
- .getFunctionStatementBlock(fci.getNamespace(), fci.getName()).getStatement(0);
- if (fstmt == null){
+ FunctionStatementBlock fblock = _dmlProg.getFunctionStatementBlock(fci.getNamespace(), fci.getName());
+ if (fblock == null){
fci.raiseValidateError(" function " + fci.getName()
+ " is undefined in namespace " + fci.getNamespace(), conditional);
+ return;
}
+ FunctionStatement fstmt = (FunctionStatement)fblock.getStatement(0);
if (!(target instanceof IndexedIdentifier)){
target.setProperties(fstmt.getOutputParams().get(j));
}
diff --git a/src/main/java/org/apache/sysds/runtime/codegen/CodegenUtils.java b/src/main/java/org/apache/sysds/runtime/codegen/CodegenUtils.java
index 90f7089..898d01d 100644
--- a/src/main/java/org/apache/sysds/runtime/codegen/CodegenUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/codegen/CodegenUtils.java
@@ -194,7 +194,6 @@ public class CodegenUtils
private synchronized static Class<?> compileClassJanino(String name, String src) {
try {
// compile source code
- // (in recent spark versions )
SimpleCompiler compiler = new SimpleCompiler();
if( _mainClassLoader != null )
compiler.setParentClassLoader(_mainClassLoader);
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/MatrixLineagePair.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/MatrixLineagePair.java
index a6d73cd..0e1380e 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/MatrixLineagePair.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/MatrixLineagePair.java
@@ -23,7 +23,6 @@ import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.hops.fedplanner.FTypes.FType;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
import org.apache.sysds.runtime.lineage.LineageItem;
import org.apache.sysds.runtime.meta.DataCharacteristics;
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
index 073d4a2..4a5a898 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForUtils.java
@@ -26,14 +26,11 @@ import java.util.List;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.parser.ParForStatementBlock.ResultVar;
import org.apache.sysds.runtime.DMLRuntimeException;
@@ -46,6 +43,9 @@ import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
import org.apache.sysds.runtime.controlprogram.parfor.stat.Stat;
import org.apache.sysds.runtime.controlprogram.parfor.util.IDHandler;
import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.cp.ListObject;
+import org.apache.sysds.runtime.io.FileFormatProperties;
+import org.apache.sysds.runtime.io.ListWriter;
import org.apache.sysds.runtime.lineage.Lineage;
import org.apache.sysds.runtime.lineage.LineageItem;
import org.apache.sysds.runtime.lineage.LineageParser;
@@ -97,62 +97,6 @@ public class RemoteParForUtils
}
}
- public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<ResultVar> resultVars, OutputCollector<Writable, Writable> out ) throws IOException {
- exportResultVariables(workerID, vars, resultVars, null, out);
- }
-
- /**
- * For remote MR parfor workers.
- *
- * @param workerID worker id
- * @param vars local variable map
- * @param resultVars list of result variables
- * @param rvarFnames ?
- * @param out output collectors
- * @throws IOException if IOException occurs
- */
- public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<ResultVar> resultVars,
- HashMap<String,String> rvarFnames, OutputCollector<Writable, Writable> out ) throws IOException
- {
- //create key and value for reuse
- LongWritable okey = new LongWritable( workerID );
- Text ovalue = new Text();
-
- //foreach result variables probe if export necessary
- for( ResultVar rvar : resultVars )
- {
- Data dat = vars.get( rvar._name );
-
- //export output variable to HDFS (see RunMRJobs)
- if ( dat != null && dat.getDataType() == DataType.MATRIX )
- {
- MatrixObject mo = (MatrixObject) dat;
- if( mo.isDirty() )
- {
- if( rvarFnames!=null ) {
- String fname = rvarFnames.get( rvar._name );
- if( fname!=null )
- mo.setFileName( fname );
-
- //export result var (iff actually modified in parfor)
- mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation)
- rvarFnames.put(rvar._name, mo.getFileName());
- }
- else {
- //export result var (iff actually modified in parfor)
- mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation)
- }
-
- //pass output vars (scalars by value, matrix by ref) to result
- //(only if actually exported, hence in check for dirty, otherwise potential problems in result merge)
- String datStr = ProgramConverter.serializeDataObject(rvar._name, mo);
- ovalue.set( datStr );
- out.collect( okey, ovalue );
- }
- }
- }
- }
-
/**
* For remote Spark parfor workers. This is a simplified version compared to MR.
*
@@ -170,18 +114,23 @@ public class RemoteParForUtils
//foreach result variables probe if export necessary
for( ResultVar rvar : resultVars ) {
Data dat = vars.get( rvar._name );
- //export output variable to HDFS (see RunMRJobs)
- if ( dat != null && dat.getDataType() == DataType.MATRIX ) {
- MatrixObject mo = (MatrixObject) dat;
- if( mo.isDirty() ) {
- //export result var (iff actually modified in parfor)
- mo.exportData();
- //pass output vars (scalars by value, matrix by ref) to result
- //(only if actually exported, hence in check for dirty, otherwise potential problems in result merge)
- ret.add( ProgramConverter.serializeDataObject(rvar._name, mo) );
+
+ if ( dat != null && dat.getDataType().isMatrixOrFrame() ) {
+ CacheableData<?> cd = (CacheableData<?>) dat;
+ //export result var (iff actually modified in parfor)
+ if( cd.isDirty() ) {
+ cd.exportData();
+ //pass output vars to result (only if actually exported)
+ ret.add( ProgramConverter.serializeDataObject(rvar._name, dat) );
}
//cleanup pinned result variable from buffer pool
- mo.freeEvictedBlob();
+ cd.freeEvictedBlob();
+ }
+ else if (dat instanceof ListObject) {
+ String fname = OptimizerUtils.getUniqueTempFileName();
+ ListWriter.writeListToHDFS((ListObject) dat, fname, "binary",
+ new FileFormatProperties(ConfigurationManager.getBlocksize()));
+ ret.add( ProgramConverter.serializeDataObject(rvar._name, dat) );
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
index 60dfdd1..d421c21 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateBinaryCPInstruction.java
@@ -82,10 +82,10 @@ public class AggregateBinaryCPInstruction extends BinaryCPInstruction {
else if(transposeLeft || transposeRight)
processTransposedFusedAggregateBinary(ec);
else
- precessNormal(ec);
+ processNormal(ec);
}
- private void precessNormal(ExecutionContext ec) {
+ private void processNormal(ExecutionContext ec) {
// get inputs
MatrixBlock matBlock1 = ec.getMatrixInput(input1.getName());
MatrixBlock matBlock2 = ec.getMatrixInput(input2.getName());
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
index 46fbeeb..fc6132f 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/EvalNaryCPInstruction.java
@@ -218,6 +218,7 @@ public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
fsbs.get(Builtins.getInternalFName(name, dt)).getDMLProg();
//filter already existing functions (e.g., already loaded internally-called functions)
+ //note: in remote parfor the runtime program might contain more functions than the DML program
fsbs = (dmlp.getBuiltinFunctionDictionary() == null) ? fsbs : fsbs.entrySet().stream()
.filter(e -> !dmlp.getBuiltinFunctionDictionary().containsFunction(e.getKey()))
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
@@ -237,7 +238,9 @@ public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
// validate functions, in two passes for cross references
for( FunctionStatementBlock fsb : fsbs.values() ) {
dmlt.liveVariableAnalysisFunction(dmlp, fsb);
- dmlt.validateFunction(dmlp, fsb);
+ //mark as conditional (warnings instead of errors) because internally
+ //called functions might not be available in dmlp but prog in remote parfor
+ dmlt.validateFunction(dmlp, fsb, true);
}
// compile hop dags, rewrite hop dags and compile lop dags
@@ -260,8 +263,10 @@ public class EvalNaryCPInstruction extends BuiltinNaryCPInstruction {
for( Entry<String,FunctionStatementBlock> fsb : fsbs.entrySet() ) {
FunctionProgramBlock fpb = (FunctionProgramBlock) dmlt
.createRuntimeProgramBlock(prog, fsb.getValue(), ConfigurationManager.getDMLConfig());
- prog.addFunctionProgramBlock(nsName, fsb.getKey(), fpb, true); // optimized
- prog.addFunctionProgramBlock(nsName, fsb.getKey(), fpb, false); // unoptimized -> eval
+ if(!prog.containsFunctionProgramBlock(nsName, fsb.getKey(), true))
+ prog.addFunctionProgramBlock(nsName, fsb.getKey(), fpb, true); // optimized
+ if(!prog.containsFunctionProgramBlock(nsName, fsb.getKey(), false))
+ prog.addFunctionProgramBlock(nsName, fsb.getKey(), fpb, false); // unoptimized -> eval
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java
index c619c61..0e003ca 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java
@@ -181,7 +181,7 @@ public class LibMatrixMult
k = 1;
if(k <= 1)
- singleThreadMatrixMult(m1, m2, ret, ultraSparse, sparse, tm2, m1Perm, fixedRet);
+ singleThreadedMatrixMult(m1, m2, ret, ultraSparse, sparse, tm2, m1Perm, fixedRet);
else
parallelMatrixMult(m1, m2, ret, k, ultraSparse, sparse, tm2, m1Perm);
@@ -191,7 +191,7 @@ public class LibMatrixMult
return ret;
}
- private static void singleThreadMatrixMult(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret,
+ private static void singleThreadedMatrixMult(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret,
boolean ultraSparse, boolean sparse, boolean tm2, boolean m1Perm, boolean fixedRet){
// prepare row-upper for special cases of vector-matrix
final boolean pm2 = !ultraSparse && checkParMatrixMultRightInputRows(m1, m2, Integer.MAX_VALUE);
@@ -3967,7 +3967,8 @@ public class LibMatrixMult
|| (m1Perm && OptimizerUtils.getSparsity(m2.rlen, m2.clen, m2.nonZeros)<1.0)
|| ((m1.isUltraSparse(false) || m2.isUltraSparse(false))
&& outSp < MatrixBlock.ULTRA_SPARSITY_TURN_POINT2)
- || (m1.getSparsity() < MatrixBlock.ULTRA_SPARSITY_TURN_POINT2
+ || (m1.isInSparseFormat() // otherwise no matching branch
+ && m1.getSparsity() < MatrixBlock.ULTRA_SPARSITY_TURN_POINT2
&& m1.getNonZeros() < MatrixBlock.ULTRA_SPARSE_BLOCK_NNZ
&& m1.getLength()+m2.getLength() < (long)m1.rlen*m2.clen
&& outSp < MatrixBlock.SPARSITY_TURN_POINT);
@@ -3994,8 +3995,7 @@ public class LibMatrixMult
//transpose if dense-dense, skinny rhs matrix (not vector), and memory guarded by output
if( tm2 ) {
MatrixBlock tmpBlock = new MatrixBlock(m2.clen, m2.rlen, m2.sparse);
- LibMatrixReorg.reorg(m2, tmpBlock, new ReorgOperator(SwapIndex.getSwapIndexFnObject()));
- ret = tmpBlock;
+ ret = LibMatrixReorg.reorg(m2, tmpBlock, new ReorgOperator(SwapIndex.getSwapIndexFnObject()));
}
return ret;