You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/09/04 21:03:09 UTC

[systemds] branch master updated (bfa2234 -> 57c1643)

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git.


    from bfa2234  [SYSTEMDS-3121] Fix robustness concurrent parfor optimization (in eval)
     new e03ef96  [SYSTEMDS-3122] Fix parfor degree of parallelism w/ eval functions
     new 57c1643  [SYSTEMDS-3112] Refactoring top-k cleaning pipelines (context obj), I

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 scripts/builtin/topk_cleaning.dml                  | 55 ++++++++++--------
 scripts/pipelines/scripts/cleaning.dml             |  4 +-
 scripts/pipelines/scripts/utils.dml                | 65 +++++++++-------------
 .../runtime/controlprogram/ParForProgramBlock.java | 11 +++-
 .../controlprogram/paramserv/ParamservUtils.java   |  6 +-
 .../parfor/opt/OptimizerRuleBased.java             | 11 +++-
 .../pipelines/BuiltinTopkEvaluateTest.java         |  4 +-
 7 files changed, 84 insertions(+), 72 deletions(-)

[systemds] 01/02: [SYSTEMDS-3122] Fix parfor degree of parallelism w/ eval functions

Posted by mb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit e03ef9691d111eca82e8c8bdb0373cd40bdc361e
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sat Sep 4 22:19:24 2021 +0200

    [SYSTEMDS-3122] Fix parfor degree of parallelism w/ eval functions
    
    Assume the following special-case (but increasingly common) scenario of
    three functions fun1, fun2, fun3, where fun1 might be, for example,
    hyper-parameter tuning with unknown models/functions. There was an issue
    where the parfor optimizer set the degree of parallelism to 112, and
    then tried to set all reachable program blocks and functions to a DOP 1.
    However, because it encounters an eval with unknown function call, it
    recompiled all existing functions (including fun1) to DOP 1 and thus,
    destroyed its own optimization decisions. This patch now properly fixes
    these decisions (for a tree of nested parfor) when recompiling eval
    functions.
    
    function fun1()
      parfor(i in 1:n)
        eval("fun2", X, y)
    
    function fun2()
      fun3()
    
    function fun3()
      X = X + 1
    
    On the topk-cleaning pipeline enumeration until the hyper-parameter
    tuning for dirty baseline accuracy, this patch improved the end-to-end
    runtime from 51s to 11s.
---
 .../sysds/runtime/controlprogram/ParForProgramBlock.java      | 11 ++++++++++-
 .../runtime/controlprogram/paramserv/ParamservUtils.java      |  6 ++++--
 .../runtime/controlprogram/parfor/opt/OptimizerRuleBased.java | 11 +++++++++--
 3 files changed, 23 insertions(+), 5 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index 42ab8bc..a289218 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -320,6 +320,7 @@ public class ParForProgramBlock extends ForProgramBlock
 	protected final boolean _monitor;
 	protected final Level _optLogLevel;
 	protected int _numThreads = -1;
+	protected boolean _fixedDOP = false; //guard for numThreads
 	protected long _taskSize = -1;
 	protected PTaskPartitioner _taskPartitioner = null;
 	protected PDataPartitioner _dataPartitioner = null;
@@ -471,6 +472,14 @@ public class ParForProgramBlock extends ForProgramBlock
 		_params.put(ParForStatementBlock.PAR, String.valueOf(_numThreads)); //kept up-to-date for copies
 		setLocalParWorkerIDs();
 	}
+	
+	public boolean isDegreeOfParallelismFixed() {
+		return _fixedDOP;
+	}
+	
+	public void setDegreeOfParallelismFixed(boolean flag) {
+		_fixedDOP = flag;
+	}
 
 	public void setCPCaching(boolean flag) {
 		_enableCPCaching = flag;
@@ -1187,7 +1196,7 @@ public class ParForProgramBlock extends ForProgramBlock
 		try
 		{
 			//create deep copies of required elements child blocks
-			ArrayList<ProgramBlock> cpChildBlocks = null;	
+			ArrayList<ProgramBlock> cpChildBlocks = null;
 			HashSet<String> fnNames = new HashSet<>();
 			if( USE_PB_CACHE )
 			{
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
index da1e9f7..b25c7df 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
@@ -309,8 +309,10 @@ public class ParamservUtils {
 		for (ProgramBlock pb : pbs) {
 			if (pb instanceof ParForProgramBlock) {
 				ParForProgramBlock pfpb = (ParForProgramBlock) pb;
-				pfpb.setDegreeOfParallelism(k);
-				recompiled |= rAssignParallelismAndRecompile(pfpb.getChildBlocks(), 1, recompiled, forceExecTypeCP);
+				if( !pfpb.isDegreeOfParallelismFixed() ) {
+					pfpb.setDegreeOfParallelism(k);
+					recompiled |= rAssignParallelismAndRecompile(pfpb.getChildBlocks(), 1, recompiled, forceExecTypeCP);
+				}
 			} else if (pb instanceof ForProgramBlock) {
 				recompiled |= rAssignParallelismAndRecompile(((ForProgramBlock) pb).getChildBlocks(), k, recompiled, forceExecTypeCP);
 			} else if (pb instanceof WhileProgramBlock) {
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index cf2c091..b2c82c1 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -1178,12 +1178,14 @@ public class OptimizerRuleBased extends Optimizer {
 			
 			//set parfor degree of parallelism
 			pfpb.setDegreeOfParallelism(parforK);
+			pfpb.setDegreeOfParallelismFixed(true);
 			n.setK(parforK);
 			
 			//distribute remaining parallelism 
 			int remainParforK = getRemainingParallelismParFor(kMax, parforK);
 			int remainOpsK = getRemainingParallelismOps(_lkmaxCP, parforK);
 			rAssignRemainingParallelism( n, remainParforK, remainOpsK );
+			pfpb.setDegreeOfParallelismFixed(false);
 		}
 		else // ExecType.MR/ExecType.SPARK
 		{
@@ -1212,7 +1214,9 @@ public class OptimizerRuleBased extends Optimizer {
 				kMax = 1;
 			
 			//distribute remaining parallelism and recompile parallel instructions
+			pfpb.setDegreeOfParallelismFixed(true);
 			rAssignRemainingParallelism( n, kMax, 1 );
+			pfpb.setDegreeOfParallelismFixed(false);
 		}
 		
 		_numEvaluatedPlans++;
@@ -1247,14 +1251,15 @@ public class OptimizerRuleBased extends Optimizer {
 					//set parfor degree of parallelism
 					long id = c.getID();
 					c.setK(tmpK);
-					ParForProgramBlock pfpb = (ParForProgramBlock) 
-						_plan.getMappedProgramBlock(id);
+					ParForProgramBlock pfpb = (ParForProgramBlock) _plan.getMappedProgramBlock(id);
 					pfpb.setDegreeOfParallelism(tmpK);
 					
 					//distribute remaining parallelism
 					int remainParforK = getRemainingParallelismParFor(parforK, tmpK);
 					int remainOpsK = getRemainingParallelismOps(opsK, tmpK);
+					pfpb.setDegreeOfParallelismFixed(true);
 					rAssignRemainingParallelism(c, remainParforK, remainOpsK);
+					pfpb.setDegreeOfParallelismFixed(false);
 				}
 				else if( c.getNodeType() == NodeType.HOP )
 				{
@@ -1278,6 +1283,8 @@ public class OptimizerRuleBased extends Optimizer {
 					}
 					
 					//if parfor contains eval call, make unoptimized functions single-threaded
+					//(parent parfor program blocks have been frozen such that the following
+					//recompilation of all possible functions does not reset the DOP to 1)
 					if( HopRewriteUtils.isNary(h, OpOpN.EVAL) ) {
 						ProgramBlock pb = _plan.getMappedProgramBlock(n.getID());
 						pb.getProgram().getFunctionProgramBlocks(false)

[systemds] 02/02: [SYSTEMDS-3112] Refactoring top-k cleaning pipelines (context obj), I

Posted by mb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 57c1643dcb4d94e4c21aba0f87143abdab02e819
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sat Sep 4 23:01:22 2021 +0200

    [SYSTEMDS-3112] Refactoring top-k cleaning pipelines (context obj), I
---
 scripts/builtin/topk_cleaning.dml                  | 55 ++++++++++--------
 scripts/pipelines/scripts/cleaning.dml             |  4 +-
 scripts/pipelines/scripts/utils.dml                | 65 +++++++++-------------
 .../pipelines/BuiltinTopkEvaluateTest.java         |  4 +-
 4 files changed, 61 insertions(+), 67 deletions(-)

diff --git a/scripts/builtin/topk_cleaning.dml b/scripts/builtin/topk_cleaning.dml
index c4d8cf9..d9bdc93 100644
--- a/scripts/builtin/topk_cleaning.dml
+++ b/scripts/builtin/topk_cleaning.dml
@@ -22,7 +22,6 @@
 source("scripts/pipelines/scripts/utils.dml") as utils;
 source("scripts/pipelines/scripts/enumerateLogical.dml") as lg;
 
-
 s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = as.frame("NULL"), Frame[Unknown] metaData = as.frame("NULL"), Frame[Unknown] primitives,
   Frame[Unknown] parameters, Matrix[Double] cmr = matrix("4 0.7 1", rows=1, cols=3), String evaluationFunc, Matrix[Double] evalFunHp, Integer topK = 5, 
   Integer resource_val = 20, Double sample = 0.1, Boolean cv=TRUE, Integer cvk = 2, Boolean isLastLabel = TRUE, Boolean correctTypos=FALSE, String output)
@@ -30,15 +29,18 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a
   # return (Frame[Unknown] topKPipelines, Matrix[Double] topKHyperParams, Matrix[Double] topKScores, Frame[Unknown] bestLogical,
   # Frame[Unknown] features, Double dirtyScore, Matrix[Double] evalFunHp)
 {
+  t1 = time(); print("TopK-Cleaning:");
+  
   Xtest = as.frame("0")
   Ytest = as.frame("0")
-  print("starting topk_cleaning")
+  ctx = list(prefix="----"); #TODO include seed
   
-  [schema, mask, fdMask, maskY] = prepareMeta(dataTrain, metaData)
-
+  # prepare meta data
   # # keeping the meta list format if we decide to add more stuff in metadata
+  [schema, mask, fdMask, maskY] = prepareMeta(dataTrain, metaData)
   metaList = list(mask=mask, schema=schema, fd=fdMask)
-  
+  t2 = time(); print("-- Cleaning - Prepare Metadata: "+(t2-t1)/1e9+"s");
+    
   # separate the label
   [Xtrain, Ytrain] = getLabel(dataTrain, isLastLabel)
   if(!cv)
@@ -49,24 +51,31 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a
     [eYtrain, M] = transformencode(target=Ytrain, spec= "{ids:true, recode:[1]}");
     eYtest = transformapply(target=Ytest, spec= "{ids:true, recode:[1]}", meta=M);
   }
-  else
-  {
+  else {
     eYtrain = as.matrix(Ytrain)
     eYtest = as.matrix(Ytest)
   }
+  t3 = time(); print("-- Cleaning - Prepare Labels: "+(t3-t2)/1e9+"s");
 
   # # # when the evaluation function is called first we also compute and keep hyperparams of target application
+  print("-- Cleaning - Get Dirty Score: ");
   [dirtyScore, evalFunHp] = getDirtyScore(X=Xtrain, Y=eYtrain, Xtest=Xtest, Ytest=eYtest, evaluationFunc=evaluationFunc, 
-    metaList=metaList, evalFunHp=evalFunHp, sample=sample, trainML=1, cv=cv, cvk=cvk)
-  
+    metaList=metaList, evalFunHp=evalFunHp, sample=sample, trainML=1, cv=cv, cvk=cvk, ctx=ctx)
+  t4 = time(); print("---- finalized in: "+(t4-t3)/1e9+"s");
+
   # # do the string processing
-  [Xtrain, Xtest] = runStringPipeline(Xtrain, Xtest, schema, mask, cv, correctTypos)
+  print("-- Cleaning - Data Preparation (strings, transform, sample): ");
+  [Xtrain, Xtest] = runStringPipeline(Xtrain, Xtest, schema, mask, cv, correctTypos, ctx)
   
   # # if mask has 1s then there are categorical features
+  print("---- feature transformations to numeric matrix");
   [eXtrain, eXtest] = recodeData(Xtrain, Xtest, mask, cv, "recode")
   
   # apply sampling on training data for pipeline enumeration
+  # TODO why recoding/sampling twice (within getDirtyScore)
+  print("---- class-stratified sampling of feature matrix w/ f="+sample);
   [eXtrain, eYtrain] = utils::doSample(eXtrain, eYtrain, sample, TRUE)
+  t5 = time(); print("---- finalized in: "+(t5-t4)/1e9+"s");
 
   # # # create logical pipeline seeds
   logicalSeedCI =  frame([
@@ -109,7 +118,7 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a
   [bestLogical, score, T] = lg::enumerateLogical(X=eXtrain, y=eYtrain, Xtest=eXtest, ytest=eYtest, cmr=cmr, cat=category, population=logical[2:nrow(logical)],
     max_iter=ceil(resource_val/topK), metaList = metaList, evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, 
     primitives=primitives, param=parameters, num_inst=3 , num_exec=2, cv=cv, cvk=cvk, verbose=TRUE)
-  # # # bestLogical = frame(["MVI", "CI", "SCALE"], rows=1, cols=3)
+  t6 = time(); print("-- Cleaning - Enum Logical Pipelines: "+(t6-t5)/1e9+"s");
 
   topKPipelines = as.frame("NULL"); topKHyperParams = matrix(0,0,0); topKScores = matrix(0,0,0); features = as.frame("NULL")
   
@@ -117,6 +126,7 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a
   perf = bandit(X_train=eXtrain, Y_train=eYtrain, X_test=eXtest, Y_test=eYtest,  metaList=metaList,
     evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, lp=bestLogical, primitives=primitives, param=parameters, baseLineScore=dirtyScore,
     k=topK, R=resource_val, cv=cv, output=output, verbose=TRUE);  
+  t7 = time(); print("-- Cleaning - Enum Physical Pipelines: "+(t7-t6)/1e9+"s");
 }
 
 prepareMeta = function(Frame[Unknown] data, Frame[Unknown] metaData)
@@ -160,45 +170,46 @@ return(Frame[Unknown] X, Frame[Unknown] Y)
 }
 
 runStringPipeline = function(Frame[Unknown] Xtrain, Frame[Unknown] Xtest, Frame[String] schema,
-  Matrix[Double] mask, Boolean cv, Boolean correctTypos = FALSE)
+  Matrix[Double] mask, Boolean cv, Boolean correctTypos = FALSE, List[Unknown] ctx)
 return(Frame[Unknown] Xtrain, Frame[Unknown] Xtest)
 {
   if(cv)
-    Xtrain = utils::stringProcessing(data=Xtrain, mask=mask, schema=schema, CorrectTypos=correctTypos)
+    Xtrain = utils::stringProcessing(data=Xtrain, mask=mask, schema=schema, CorrectTypos=correctTypos, ctx=ctx)
   else
   {
     # # # binding train and test to use same dictionary for both
-    XAll = utils::stringProcessing(data=rbind(Xtrain, Xtest), mask=mask, schema=schema, CorrectTypos=correctTypos)
+    XAll = utils::stringProcessing(data=rbind(Xtrain, Xtest), mask=mask, schema=schema, CorrectTypos=correctTypos, ctx=ctx)
     Xtrain = XAll[1:nrow(Xtrain),]
     Xtest = XAll[nrow(Xtrain)+1:nrow(XAll),]
   }
 }
 
 getDirtyScore = function(Frame[Unknown] X, Matrix[Double] Y, Frame[Unknown] Xtest, Matrix[Double] Ytest, String evaluationFunc, List[Unknown] metaList,
-  Matrix[Double] evalFunHp, Double sample, Integer trainML, Boolean cv, Integer cvk)
+  Matrix[Double] evalFunHp, Double sample, Integer trainML, Boolean cv, Integer cvk, List[Unknown] ctx=list() )
 return(Double dirtyScore, Matrix[Double] evalFunHp)
 {
+  prefix = as.scalar(ctx["prefix"]);
   mask = as.matrix(metaList['mask']) 
   [eXtrain, eXtest] = recodeData(X, Xtest, mask, cv, "recode")
   eXtrain = replace(target=eXtrain, pattern=NaN, replacement = 0)
   eXtest = replace(target=eXtest, pattern=NaN, replacement = 0)
   dirtyScore = 100
-  # # # sample data
+  print(prefix+" sample from train data and dummy code");
   [eXtrain, Ytrain] =  utils::doSample(eXtrain, Y, sample, TRUE)
   [eXtrain, eXtest] = recodeData(as.frame(eXtrain), as.frame(eXtest), mask, cv, "dummycode")
   pipList = list(lp = as.frame("NULL"), ph = as.frame("NULL"), hp = as.matrix(0), flags = 0)
-  if(cv)
-  {
-    score = crossV(X=eXtrain, y=Ytrain, cvk=cvk, evalFunHp=evalFunHp, pipList=pipList, metaList=metaList, evalFunc=evaluationFunc, trainML = 1)
+
+  print(prefix+" hyper-parameter tuning");
+  if(cv) {
+    score = crossV(X=eXtrain, y=Ytrain, cvk=cvk, evalFunHp=evalFunHp,
+      pipList=pipList, metaList=metaList, evalFunc=evaluationFunc, trainML = 1)
   }
-  else 
-  {
+  else {
     score = eval(evaluationFunc, list(X=eXtrain, Y=Ytrain, Xtest=eXtest, Ytest=Ytest, Xorig=as.matrix(0), evalFunHp=evalFunHp, trainML = 1))
   }
 
   dirtyScore = as.scalar(score[1, 1])
   evalFunHp = score[1, 2:ncol(score)]
-  # evalFunHp = scoreAndHp[1, 2:ncol(scoreAndHp)]
 }
 
 recodeData = function(Frame[Unknown] Xtrain, Frame[Unknown] Xtest, Matrix[Double] mask, Boolean cv, String code)
diff --git a/scripts/pipelines/scripts/cleaning.dml b/scripts/pipelines/scripts/cleaning.dml
index 4557f07..73200bd 100644
--- a/scripts/pipelines/scripts/cleaning.dml
+++ b/scripts/pipelines/scripts/cleaning.dml
@@ -97,7 +97,7 @@ startCleaning = function(Frame[Unknown] F, Frame[Unknown] logical, String target
   paramRanges = list(10^seq(0,-10), seq(10,100, 10));
 
   [opt, loss] = gridSearchMLR(X_train, y_train, X_test, y_test, 
-	 "multiLogReg", "lossFunc", params, paramRanges, FALSE);
+    "multiLogReg", "lossFunc", params, paramRanges, FALSE);
    
   d_accuracy = classifyDirty(X_train, y_train, opt, getMask, isWeighted, cv)
   # [eX, eY] = prioritise(eX, eY, getMask)
@@ -493,7 +493,6 @@ crossV = function(Matrix[double] X, Matrix[double] y, Integer k, Matrix[Double]
   Matrix[Double] MLhp, Boolean isWeighted) 
 return (Matrix[Double] accuracyMatrix)
 {
-
   accuracyMatrix = matrix(0, k, 1)
 
   dataList = list()
@@ -526,7 +525,6 @@ return (Matrix[Double] accuracyMatrix)
     dataList = append(dataList, fold_i)
     fold_idxes[, 1] = fold_idxes[, 2] + 1
     fold_idxes[, 2] += ins_per_fold
-    while(FALSE){}
   }
 
   for(i in seq(1,k))
diff --git a/scripts/pipelines/scripts/utils.dml b/scripts/pipelines/scripts/utils.dml
index f6d3d01..05d22a8 100644
--- a/scripts/pipelines/scripts/utils.dml
+++ b/scripts/pipelines/scripts/utils.dml
@@ -60,24 +60,26 @@ doSample = function(Matrix[Double] eX, Matrix[Double] eY, Double ratio, Boolean
 {
   MIN_SAMPLE = 1000
   sampled = floor(nrow(eX) * ratio)
-  sample = ifelse(sampled > MIN_SAMPLE, TRUE, FALSE)
-  dist = table(eY, 1)
-  dist = nrow(dist)
-  if(sample)
+  sampledX = eX
+  sampledY = eY
+  
+  if(sampled > MIN_SAMPLE)
   {
+    dist = max(eY) # num classes (one-hot encoded eY) 
+    
     if((nrow(eY) > 1) & (dist < 10))  # for classification
     {
       XY = order(target = cbind(eY, eX),  by = 1, decreasing=FALSE, index.return=FALSE)
-      # get the class count 
+      # get the class count
       classes = table(eY, 1)
+      # TODO vectorize extraction compute extraction vector
       start_class = 1
       out_s = 1 
       out_e = 0
       end_class = 0
       out = matrix(0, sampled, ncol(XY))
       classes_ratio = floor(classes*ratio)
-      for(i in 1:nrow(classes))
-      {
+      for(i in 1:nrow(classes)) {
         end_class = end_class + as.scalar(classes[i])
         class_t = XY[start_class:end_class, ]
         out_e = out_e + as.scalar(classes_ratio[i]) 
@@ -89,28 +91,15 @@ doSample = function(Matrix[Double] eX, Matrix[Double] eY, Double ratio, Boolean
       sampledY = out[, 1]
       sampledX = out[, 2:ncol(out)]
     }
-    else if(nrow(eY) > 1 & (dist > 10)) # regression
-    {
+    else if(nrow(eY) > 1 & (dist > 10)) { # regression
       sampledX = eX[1:sampled, ]
       sampledY = eY[1:sampled, ]
     }
-    else if(nrow(eY) == 1)
-    {
+    else if(nrow(eY) == 1) { # TODO ?
       sampledX =  eX[1:sampled, ]
       sampledY = eY 
     }
-    else {
-      sampledX = eX
-      sampledY = eY    
-    }
   }
-  else 
-  { 
-    sampledX = eX
-    sampledY = eY 
-  }
-  if(verbose)
-    print("AFTER SAMPLING: "+nrow(eX))
 }
 
 # #######################################################################
@@ -154,29 +143,26 @@ return(Boolean validForResources)
   validForResources = count > 0
 }
 
-stringProcessing = function(Frame[Unknown] data, Matrix[Double] mask, Frame[String] schema, Boolean CorrectTypos)
+stringProcessing = function(Frame[Unknown] data, Matrix[Double] mask, 
+  Frame[String] schema, Boolean CorrectTypos, List[Unknown] ctx = list(prefix="--"))
 return(Frame[Unknown] processedData)
 {
+  prefix = as.scalar(ctx["prefix"]);
 
   # step 1 drop invalid types
+  print(prefix+" drop values with type mismatch");
   data = dropInvalidType(data, schema)
   
   # step 2 do the case transformations
+  print(prefix+" convert strings to lower case");
   for(i in 1:ncol(mask))
-  {
     if(as.scalar(schema[1,i]) == "STRING")
-    {
-      lowerCase = map(data[, i], "x -> x.toLowerCase()")
-      data[, i] = lowerCase
-    }
-
-  }
-
+      data[, i] = map(data[, i], "x -> x.toLowerCase()")
+    
   if(CorrectTypos)
   {
-  # recode data to get null mask
-    if(sum(mask) > 0)
-    {
+    # recode data to get null mask
+    if(sum(mask) > 0) {
       # always recode the label
       index = vectorToCsv(mask)
       jspecR = "{ids:true, recode:["+index+"]}"
@@ -186,18 +172,19 @@ return(Frame[Unknown] processedData)
     else
       eX = as.matrix(data)
     nullMask = is.na(eX)
-    print("starting correctTypos ")
+    print(prefix+" correct typos in strings");
     # fix the typos
     for(i in 1:ncol(schema))
-    {
       if(as.scalar(schema[1,i]) == "STRING")
         data[, i] = correctTypos(data[, i], nullMask[, i], 0.2, 0.9, FALSE, TRUE, FALSE);
-    }
-    # print("after correctTypos "+toString(data, rows=5))
   }
   
+  print(prefix+" porter-stemming on all features");
   data = map(data, "x -> PorterStemmer.stem(x)")
+  
   # TODO add deduplication
+  print(prefix+" deduplication via entity resolution");
+  
   processedData = data
 }
 
@@ -238,7 +225,7 @@ topk_gridSearch = function(Matrix[Double] X, Matrix[Double] y, Matrix[Double] Xt
   # Step 2) materialize hyper-parameter combinations
   # (simplify debugging and compared to compute negligible)
   HP = matrix(0, numConfigs, numParams);
-  for( i in 1:nrow(HP) ) {
+  parfor( i in 1:nrow(HP) ) {
     for( j in 1:numParams )
       HP[i,j] = paramVals[j,as.scalar(((i-1)/cumLens[j,1])%%paramLens[j,1]+1)];
   }
diff --git a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java
index acfd032..a5bb997 100644
--- a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java
@@ -49,9 +49,7 @@ public class BuiltinTopkEvaluateTest extends AutomatedTestBase {
 	}
 
 	private void evalPip(double split, String cv, String path, Types.ExecMode et) {
-
-		setOutputBuffering(true);
-		String HOME = SCRIPT_DIR+"functions/pipelines/" ;
+		String HOME = SCRIPT_DIR+"functions/pipelines/";
 		Types.ExecMode modeOld = setExecMode(et);
 		try {
 			loadTestConfiguration(getTestConfiguration(TEST_NAME1));