You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/09/06 21:08:26 UTC

[systemds] branch master updated: [SYSTEMDS-3115] Performance topK-cleaning::enumerateLogical

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ae165a  [SYSTEMDS-3115] Performance topK-cleaning::enumerateLogical
6ae165a is described below

commit 6ae165ae491a75da08d879bb806e0e4e248db7ca
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Mon Sep 6 23:08:04 2021 +0200

    [SYSTEMDS-3115] Performance topK-cleaning::enumerateLogical
    
    This patch refactors part of the enumeration of logical cleaning
    pipelines, by unrolling nested parallelism (physical pipelines) per
    logical pipelines, which increases the effective degree of parallelism
    of the related major parfor loop. On a test setup with the Adult
    dataset, this patch improved the logical pipeline enumeration from 1413s
    to 382s (with three physical pipelines per logical pipeline).
    
    Furthermore, this includes a few fixes for frame constructors with
    common schema information, and parfor optimization in eval contexts.
---
 scripts/builtin/bandit.dml                         |  6 +-
 scripts/builtin/executePipeline.dml                | 14 ++--
 scripts/builtin/topk_cleaning.dml                  | 14 ++--
 scripts/pipelines/scripts/enumerateLogical.dml     | 75 ++++++++++------------
 .../runtime/controlprogram/ParForProgramBlock.java |  4 ++
 .../controlprogram/paramserv/ParamservUtils.java   |  3 +
 .../instructions/cp/DataGenCPInstruction.java      |  7 +-
 .../pipelines/BuiltinTopkLogicalTest.java          |  2 -
 .../functions/pipelines/topkLogicalTest.dml        |  9 +--
 9 files changed, 68 insertions(+), 66 deletions(-)

diff --git a/scripts/builtin/bandit.dml b/scripts/builtin/bandit.dml
index 1aa7fbf..28aa909 100644
--- a/scripts/builtin/bandit.dml
+++ b/scripts/builtin/bandit.dml
@@ -220,15 +220,15 @@ get_physical_configurations = function(Frame[String] logical, Scalar[int] numCon
   }
   
   physical = transformdecode(target=HP, spec=jspecR, meta=M);
-  print("physical pipeline "+toString(physical))
+  #print("physical pipeline "+toString(physical))
 }
 
 # this method will call the execute pipelines with their hyper-parameters
 run_with_hyperparam = function(Frame[Unknown] lp, Frame[Unknown] ph_pip, Integer r_i, Matrix[Double] X, Matrix[Double] Y,
   Matrix[Double] Xtest, Matrix[Double] Ytest, List[Unknown] metaList, String evaluationFunc, Matrix[Double] evalFunHp,
   Frame[Unknown] param, Frame[Unknown] featureFrameOuter, Boolean cv,  Integer cvk = 2, Boolean verbose)
-  return (Matrix[Double] output_operator, Matrix[Double] output_hyperparam, Frame[Unknown] featureFrameOuter) {
-  print("run_with_hyperparam started")
+  return (Matrix[Double] output_operator, Matrix[Double] output_hyperparam, Frame[Unknown] featureFrameOuter)
+{
   output_hp = matrix(0, nrow(ph_pip)*r_i, ncol(lp) * 5 * 3)
   output_accuracy = matrix(0, nrow(ph_pip)*r_i, 1)
   output_pipelines = matrix(0, nrow(ph_pip)*r_i, 2)
diff --git a/scripts/builtin/executePipeline.dml b/scripts/builtin/executePipeline.dml
index 215917c..304feda 100644
--- a/scripts/builtin/executePipeline.dml
+++ b/scripts/builtin/executePipeline.dml
@@ -36,7 +36,7 @@ s_executePipeline = function(Frame[String] logical = as.frame("NULL"), Frame[Str
   Y = rbind(Y, Ytest)
   testRow = nrow(Xtest)
   t1 = time()
-  print("PIPELINE EXECUTION START ... "+toString(pipeline))
+  #print("PIPELINE EXECUTION START ... "+toString(pipeline))
 
   if(verbose) {
     print("checks   rows in X = "+nrow(X)+" rows in Y = "+nrow(Y)+" cols in X = "+ncol(X)+" col in Y = "+ncol(Y))
@@ -66,7 +66,7 @@ s_executePipeline = function(Frame[String] logical = as.frame("NULL"), Frame[Str
     }
     else {
       Xclone = X 
-      print("not applying "+lgOp+" "+op+" on data test flag: "+test)
+      #print("not applying "+lgOp+" "+op+" on data test flag: "+test)
       Xtest = X[testStIdx:nrow(X), ]
       Ytest = Y[testStIdx:nrow(X), ]
       X = X[1:trainEndIdx, ]
@@ -98,7 +98,7 @@ s_executePipeline = function(Frame[String] logical = as.frame("NULL"), Frame[Str
     stop("executePipeline: test rows altered")
   t2 = floor((time() - t1) / 1e+6)
 
-  print("PIPELINE EXECUTION ENDED: "+t2+" ms")
+  #print("PIPELINE EXECUTION ENDED: "+t2+" ms")
 }
 
 # This function will convert the matrix row-vector into list
@@ -321,7 +321,7 @@ return (Matrix[Double] XY)
   diff = (maxClass - minClass)/sum(classes)
   if(diff > 0.5)
   {
-    print("initiating oversampling")
+    #print("initiating oversampling")
     XY = order(target = cbind(Y, X),  by = 1, decreasing=FALSE, index.return=FALSE)
     synthesized = matrix(0,0,0) # initialize variable
     start_class = 1
@@ -332,7 +332,7 @@ return (Matrix[Double] XY)
     outSet = matrix(0, 0, ncol(XY))
     remainingRatio = ifelse((remainingRatio%%100) >= 50, remainingRatio+(100 - (remainingRatio%%100)),
     remainingRatio-(remainingRatio%%100))
-    print("remaining ratio: "+remainingRatio)
+    #print("remaining ratio: "+remainingRatio)
     for(i in 1: nrow(k), check=0) {
       end_class = end_class + as.scalar(classes[i])
       class_t = XY[start_class:end_class, ]
@@ -351,7 +351,7 @@ return (Matrix[Double] XY)
     classes = table(Y, 1)
   }
   else { 
-    print("smote not applicable")
+    #print("smote not applicable")
     XY = cbind(X, Y)
   }
 }
@@ -367,7 +367,7 @@ return(Matrix[Double] X){
   X = replace(target=X, pattern=NaN, replacement=max(X))
   Mask = Mask * defaullt
   X = X + Mask
-  print("fillDefault: no of NaNs "+sum(is.na(X)))
+ # print("fillDefault: no of NaNs "+sum(is.na(X)))
 }
 
 ########################################################
diff --git a/scripts/builtin/topk_cleaning.dml b/scripts/builtin/topk_cleaning.dml
index d9bdc93..75ee184 100644
--- a/scripts/builtin/topk_cleaning.dml
+++ b/scripts/builtin/topk_cleaning.dml
@@ -110,15 +110,15 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a
     logical = logicalSeedCI
   else 
     logical = logicalSeedNoCI
-
-  # category = frame(["MVI", "OTLR"], rows=1, cols=2)
   idx = as.integer(as.scalar(logical[1, 1])) + 1
-
   category = logical[1, 2:idx]
-  [bestLogical, score, T] = lg::enumerateLogical(X=eXtrain, y=eYtrain, Xtest=eXtest, ytest=eYtest, cmr=cmr, cat=category, population=logical[2:nrow(logical)],
-    max_iter=ceil(resource_val/topK), metaList = metaList, evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, 
-    primitives=primitives, param=parameters, num_inst=3 , num_exec=2, cv=cv, cvk=cvk, verbose=TRUE)
-  t6 = time(); print("-- Cleaning - Enum Logical Pipelines: "+(t6-t5)/1e9+"s");
+  
+  print("-- Cleaning - Enum Logical Pipelines: ");
+  [bestLogical, score] = lg::enumerateLogical(X=eXtrain, y=eYtrain, Xtest=eXtest, ytest=eYtest, cmr=cmr, 
+    cat=category, population=logical[2:nrow(logical)], max_iter=ceil(resource_val/topK), metaList = metaList,
+    evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, primitives=primitives, param=parameters,
+    num_inst=3 , num_exec=2, cv=cv, cvk=cvk, verbose=TRUE, ctx=ctx)
+  t6 = time(); print("---- finalized in: "+(t6-t5)/1e9+"s");
 
   topKPipelines = as.frame("NULL"); topKHyperParams = matrix(0,0,0); topKScores = matrix(0,0,0); features = as.frame("NULL")
   
diff --git a/scripts/pipelines/scripts/enumerateLogical.dml b/scripts/pipelines/scripts/enumerateLogical.dml
index 1133eb7..29ac78c 100644
--- a/scripts/pipelines/scripts/enumerateLogical.dml
+++ b/scripts/pipelines/scripts/enumerateLogical.dml
@@ -50,46 +50,50 @@
 
 
 source("scripts/builtin/bandit.dml") as bandit;
-enumerateLogical = function(Matrix[Double] X, Matrix[Double] y, Matrix[Double] Xtest, Matrix[Double] ytest, Matrix[Double] cmr, Frame[Unknown] cat, Frame[Unknown] population,
-  Integer max_iter=10, List[Unknown] metaList, String evaluationFunc, Matrix[Double] evalFunHp, Frame[Unknown] primitives, Frame[Unknown] param,
-  Integer num_inst, Integer num_exec, Boolean cv=FALSE, Boolean cvk=3, Boolean verbose)
-return (Frame[Unknown] bestLg, Double pre_best, Double T)
-{ 
-  t1 = time()
+
+enumerateLogical = function(Matrix[Double] X, Matrix[Double] y, Matrix[Double] Xtest, Matrix[Double] ytest,
+  Matrix[Double] cmr, Frame[Unknown] cat, Frame[Unknown] population, Integer max_iter=10, List[Unknown] metaList, 
+  String evaluationFunc, Matrix[Double] evalFunHp, Frame[Unknown] primitives, Frame[Unknown] param,
+  Integer num_inst, Integer num_exec, Boolean cv=FALSE, Boolean cvk=3, Boolean verbose, List[Unknown] ctx=list(prefix="----"))
+return (Frame[Unknown] bestLg, Double pre_best)
+{
+  prefix = as.scalar(ctx["prefix"]);  
   bestLg = as.frame("")
-  best_score = 0
-  pre_best = 0
+  best_score = 0.0
+  pre_best = 0.0
   feaFrameOuter = as.frame("NULL")
   iter = 1
   convergedOuter = FALSE
 
   while(iter <= max_iter & !convergedOuter)
   {
-    physicalPipList = list()
-    logicalPipList = list()
+  	print(prefix+" EnumLP iteration "+iter+"/"+as.integer(max_iter)+":" );
+    physicalPipList = list();
+    logicalPipList = list();
     
-    # # # get the physical instances from logical ones
+    # get the physical instances from logical ones
+    # unrolled by physical pipelines
     for(i in 1:nrow(population)) { 
       lv = as.integer(as.scalar(population[i, 1])) + 1
       lp = population[i, 2:lv]
-      physicalConf = bandit::get_physical_configurations(lp, num_inst, primitives)
-      physicalPipList = append(physicalPipList, physicalConf)
-      logicalPipList = append(logicalPipList, lp)
+      pconf = bandit::get_physical_configurations(lp, num_inst, primitives)
+      for(j in 1:nrow(pconf))
+        physicalPipList = append(physicalPipList, pconf[j,]);
+      logicalPipList = append(logicalPipList, lp);
     }
     
     # # # execute the physical pipelines
-    scores = matrix(0, length(physicalPipList), 1)
+    scores = matrix(0, nrow(physicalPipList), 1)
     # TODO better parfor-dep handling of multi-assignments to avoid check=0 
     parfor(i in 1:length(physicalPipList), check=0) {
-      lp2 = as.frame(logicalPipList[i,1])
-      pp2 = as.frame(physicalPipList[i,1])
+      lp2 = as.frame(logicalPipList[((i-1)%/%num_inst)+1,])
+      pp2 = as.frame(physicalPipList[i,])
       # # append configuration keys for extracting the pipeline later on
       id = seq(1, nrow(pp2))
       idpp = cbind(as.frame(id), pp2)
-
       # # execute the physical instances and store the minimum scores, each pipeline is executed num_exec times
       [outPip, outHp, feaFrameOuter] = bandit::run_with_hyperparam(lp2, idpp, num_exec, X, y, Xtest, ytest, metaList,
-        evaluationFunc, evalFunHp, param, as.frame(""), cv, cvk, verbose)
+        evaluationFunc, evalFunHp, param, as.frame(""), cv, cvk, FALSE)
       # # sort the configurations groupwise
       max_perf = bandit::getMaxPerConf(outPip, nrow(pp2)) 
       scores[i,1] = as.matrix(max_perf[1,1])
@@ -97,31 +101,28 @@ return (Frame[Unknown] bestLg, Double pre_best, Double T)
     
     # # select parents and best score
     selected = order(target = scores, by = 1, decreasing=TRUE, index.return=TRUE)
-    idxR = as.scalar(selected[1, 1])
+    idxR = as.scalar(selected[1,1])
     best_score = as.scalar(scores[idxR])
     converged =  pre_best > best_score
     convergedOuter = converged
-    if(converged & (iter > 1))
-    {
-      print("converged after "+iter+" iteration(s)")
-      print("best score " + pre_best)
-      print("best pipeline " + toString(bestLg))
+    if(converged & (iter > 1)) {
+      print(prefix+"EnumLP: converged after "+iter+" iteration(s)")
+      print(prefix+"EnumLP: best score " + pre_best)
+      print(prefix+"EnumLP: best pipeline " + toString(bestLg))
     }
-    else 
-    {
+    else {
       pre_best = best_score
-      idxC = as.integer(as.scalar(population[idxR, 1])) + 1
-      bestLg = population[idxR, 2:idxC]
+      idxR2 = ((idxR-1)%/%num_inst)+1 #logical pipeline ID
+      idxC = as.integer(as.scalar(population[idxR2, 1])) + 1
+      bestLg = population[idxR2, 2:idxC]
     }
     pipLength = max(as.matrix(population[, 1])) + as.scalar(cmr[1, 1]) + 3
     # # # if new best is not better than pre_best then no need od generating new population
     children = frame(0, rows=ceil(nrow(scores)/2), cols=pipLength)
     i = 1
 
-    while(i <= ceil(nrow(scores)/2) & !converged)
-    {
-      top = population[as.scalar(selected[i]), ]
-
+    while(i <= ceil(nrow(scores)/2) & !converged) {
+      top = population[as.scalar(((selected[i]-1)%/%num_inst)+1), ]
       length_top = as.integer(as.scalar(top[1, 1]))
       top = top[, 2:(length_top+1)]
       
@@ -131,12 +132,10 @@ return (Frame[Unknown] bestLg, Double pre_best, Double T)
       # perform mutation
       c1 = mutation(c1, as.scalar(cmr[1, 2]))
 
-
       # perform removal if non-zero
       c1 = removal(c1, as.scalar(cmr[1, 3]))
 
       # # # append length of pipeline and pipeline in frame
-      # # 
       children[i, 1] = ncol(c1)
       children[i, 2:(ncol(c1) + 1)] = c1
       i = i + 1
@@ -145,12 +144,8 @@ return (Frame[Unknown] bestLg, Double pre_best, Double T)
     iter  = iter + 1
   }
   if(pre_best == best_score) {
-    print("LogicalENumerator: did not converge after "+max_iter+" iterations")
-  
+    print(prefix+" EnumLP did not converge after "+max_iter+" iterations")  
   }
-
-  T = floor((time() - t1) / 1e+6)
-  print("time "+T+" ms")
 }
 
 
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index a289218..1fc95ce 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -463,6 +463,10 @@ public class ParForProgramBlock extends ForProgramBlock
 		return _optMode;
 	}
 	
+	public void setOptimizationMode(POptMode mode) {
+		_optMode = mode;
+	}
+	
 	public int getDegreeOfParallelism() {
 		return _numThreads;
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
index b25c7df..5b416d7 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
@@ -42,6 +42,7 @@ import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock;
 import org.apache.sysds.runtime.controlprogram.IfProgramBlock;
 import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
+import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.POptMode;
 import org.apache.sysds.runtime.controlprogram.Program;
 import org.apache.sysds.runtime.controlprogram.ProgramBlock;
 import org.apache.sysds.runtime.controlprogram.WhileProgramBlock;
@@ -311,6 +312,8 @@ public class ParamservUtils {
 				ParForProgramBlock pfpb = (ParForProgramBlock) pb;
 				if( !pfpb.isDegreeOfParallelismFixed() ) {
 					pfpb.setDegreeOfParallelism(k);
+					if( k == 1 )
+						pfpb.setOptimizationMode(POptMode.NONE);
 					recompiled |= rAssignParallelismAndRecompile(pfpb.getChildBlocks(), 1, recompiled, forceExecTypeCP);
 				}
 			} else if (pb instanceof ForProgramBlock) {
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/DataGenCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/DataGenCPInstruction.java
index aa4f9ad..865174d 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/DataGenCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/DataGenCPInstruction.java
@@ -341,9 +341,10 @@ public class DataGenCPInstruction extends UnaryCPInstruction {
 		else if(method == OpOpDG.FRAMEINIT) {
 			int lrows = (int) ec.getScalarInput(rows).getLongValue();
 			int lcols = (int) ec.getScalarInput(cols).getLongValue();
-			String schemaValues[] = schema.split(DataExpression.DELIM_NA_STRING_SEP);
-			ValueType[] vt = schemaValues[0].equals(DataExpression.DEFAULT_SCHEMAPARAM) ? UtilFunctions.nCopies(lcols,
-				ValueType.STRING) : UtilFunctions.stringToValueType(schemaValues);
+			String sparts[] = schema.split(DataExpression.DELIM_NA_STRING_SEP);
+			ValueType[] vt = sparts[0].equals(DataExpression.DEFAULT_SCHEMAPARAM) ?
+				UtilFunctions.nCopies(lcols, ValueType.STRING) : (sparts.length == 1 && lcols > 1) ?
+				UtilFunctions.nCopies(lcols, ValueType.valueOf(sparts[0])) : UtilFunctions.stringToValueType(sparts);
 			int schemaLength = vt.length;
 			if(schemaLength != lcols)
 				throw new DMLRuntimeException("schema-dimension mismatch");
diff --git a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkLogicalTest.java b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkLogicalTest.java
index f71b767..812870a 100644
--- a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkLogicalTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkLogicalTest.java
@@ -65,8 +65,6 @@ public class BuiltinTopkLogicalTest extends AutomatedTestBase {
 	}
 
 	private void runTestLogical(int max_iter,  int num_inst, int num_exec,  Types.ExecMode et) {
-
-		setOutputBuffering(true);
 		String HOME = SCRIPT_DIR+"functions/pipelines/" ;
 		Types.ExecMode modeOld = setExecMode(et);
 		try {
diff --git a/src/test/scripts/functions/pipelines/topkLogicalTest.dml b/src/test/scripts/functions/pipelines/topkLogicalTest.dml
index a52a40f..481fb66 100644
--- a/src/test/scripts/functions/pipelines/topkLogicalTest.dml
+++ b/src/test/scripts/functions/pipelines/topkLogicalTest.dml
@@ -88,11 +88,12 @@ cmr = matrix("4 0.7 1", rows=1, cols=3)
 # testY = eY[split+1:nrow(eY),]
 
 
-[bestLogical, score, T] = lg::enumerateLogical(X=trainX, y=trainY, Xtest=testX, ytest=testY,  cmr=cmr, cat=categories, population=logical,
-    max_iter=max_iter, metaList = metaList, evaluationFunc="evalML", evalFunHp=matrix("1 1e-3 1e-9 100", rows=1, cols=4), 
-    primitives=primitives, param=param , num_inst=num_inst, num_exec=num_exec, cv=FALSE, verbose=TRUE)
+[bestLogical, score] = lg::enumerateLogical(X=trainX, y=trainY, Xtest=testX, ytest=testY,  cmr=cmr, 
+  cat=categories, population=logical, max_iter=max_iter, metaList = metaList, evaluationFunc="evalML",
+  evalFunHp=matrix("1 1e-3 1e-9 100", rows=1, cols=4), primitives=primitives, param=param,
+	num_inst=num_inst, num_exec=num_exec, cv=FALSE, verbose=TRUE)
 
-print("score of pipeline: "+toString(score)+" in "+(T/60000)+" mins")
+print("score of pipeline: "+toString(score))
 print("bestLogical "+toString(bestLogical))
 result = dirtyScore < score  
 print("result satisfied ------------"+result)