You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/09/06 21:08:26 UTC
[systemds] branch master updated: [SYSTEMDS-3115] Performance
topK-cleaning::enumerateLogical
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new 6ae165a [SYSTEMDS-3115] Performance topK-cleaning::enumerateLogical
6ae165a is described below
commit 6ae165ae491a75da08d879bb806e0e4e248db7ca
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Mon Sep 6 23:08:04 2021 +0200
[SYSTEMDS-3115] Performance topK-cleaning::enumerateLogical
This patch refactors part of the enumeration of logical cleaning
pipelines, by unrolling nested parallelism (physical pipelines) per
logical pipelines, which increases the effective degree of parallelism
of the related major parfor loop. On a test setup with the Adult
dataset, this patch improved the logical pipeline enumeration from 1413s
to 382s (with three physical pipelines per logical pipeline).
Furthermore, this includes a few fixes for frame constructors with
common schema information, and parfor optimization in eval contexts.
---
scripts/builtin/bandit.dml | 6 +-
scripts/builtin/executePipeline.dml | 14 ++--
scripts/builtin/topk_cleaning.dml | 14 ++--
scripts/pipelines/scripts/enumerateLogical.dml | 75 ++++++++++------------
.../runtime/controlprogram/ParForProgramBlock.java | 4 ++
.../controlprogram/paramserv/ParamservUtils.java | 3 +
.../instructions/cp/DataGenCPInstruction.java | 7 +-
.../pipelines/BuiltinTopkLogicalTest.java | 2 -
.../functions/pipelines/topkLogicalTest.dml | 9 +--
9 files changed, 68 insertions(+), 66 deletions(-)
diff --git a/scripts/builtin/bandit.dml b/scripts/builtin/bandit.dml
index 1aa7fbf..28aa909 100644
--- a/scripts/builtin/bandit.dml
+++ b/scripts/builtin/bandit.dml
@@ -220,15 +220,15 @@ get_physical_configurations = function(Frame[String] logical, Scalar[int] numCon
}
physical = transformdecode(target=HP, spec=jspecR, meta=M);
- print("physical pipeline "+toString(physical))
+ #print("physical pipeline "+toString(physical))
}
# this method will call the execute pipelines with their hyper-parameters
run_with_hyperparam = function(Frame[Unknown] lp, Frame[Unknown] ph_pip, Integer r_i, Matrix[Double] X, Matrix[Double] Y,
Matrix[Double] Xtest, Matrix[Double] Ytest, List[Unknown] metaList, String evaluationFunc, Matrix[Double] evalFunHp,
Frame[Unknown] param, Frame[Unknown] featureFrameOuter, Boolean cv, Integer cvk = 2, Boolean verbose)
- return (Matrix[Double] output_operator, Matrix[Double] output_hyperparam, Frame[Unknown] featureFrameOuter) {
- print("run_with_hyperparam started")
+ return (Matrix[Double] output_operator, Matrix[Double] output_hyperparam, Frame[Unknown] featureFrameOuter)
+{
output_hp = matrix(0, nrow(ph_pip)*r_i, ncol(lp) * 5 * 3)
output_accuracy = matrix(0, nrow(ph_pip)*r_i, 1)
output_pipelines = matrix(0, nrow(ph_pip)*r_i, 2)
diff --git a/scripts/builtin/executePipeline.dml b/scripts/builtin/executePipeline.dml
index 215917c..304feda 100644
--- a/scripts/builtin/executePipeline.dml
+++ b/scripts/builtin/executePipeline.dml
@@ -36,7 +36,7 @@ s_executePipeline = function(Frame[String] logical = as.frame("NULL"), Frame[Str
Y = rbind(Y, Ytest)
testRow = nrow(Xtest)
t1 = time()
- print("PIPELINE EXECUTION START ... "+toString(pipeline))
+ #print("PIPELINE EXECUTION START ... "+toString(pipeline))
if(verbose) {
print("checks rows in X = "+nrow(X)+" rows in Y = "+nrow(Y)+" cols in X = "+ncol(X)+" col in Y = "+ncol(Y))
@@ -66,7 +66,7 @@ s_executePipeline = function(Frame[String] logical = as.frame("NULL"), Frame[Str
}
else {
Xclone = X
- print("not applying "+lgOp+" "+op+" on data test flag: "+test)
+ #print("not applying "+lgOp+" "+op+" on data test flag: "+test)
Xtest = X[testStIdx:nrow(X), ]
Ytest = Y[testStIdx:nrow(X), ]
X = X[1:trainEndIdx, ]
@@ -98,7 +98,7 @@ s_executePipeline = function(Frame[String] logical = as.frame("NULL"), Frame[Str
stop("executePipeline: test rows altered")
t2 = floor((time() - t1) / 1e+6)
- print("PIPELINE EXECUTION ENDED: "+t2+" ms")
+ #print("PIPELINE EXECUTION ENDED: "+t2+" ms")
}
# This function will convert the matrix row-vector into list
@@ -321,7 +321,7 @@ return (Matrix[Double] XY)
diff = (maxClass - minClass)/sum(classes)
if(diff > 0.5)
{
- print("initiating oversampling")
+ #print("initiating oversampling")
XY = order(target = cbind(Y, X), by = 1, decreasing=FALSE, index.return=FALSE)
synthesized = matrix(0,0,0) # initialize variable
start_class = 1
@@ -332,7 +332,7 @@ return (Matrix[Double] XY)
outSet = matrix(0, 0, ncol(XY))
remainingRatio = ifelse((remainingRatio%%100) >= 50, remainingRatio+(100 - (remainingRatio%%100)),
remainingRatio-(remainingRatio%%100))
- print("remaining ratio: "+remainingRatio)
+ #print("remaining ratio: "+remainingRatio)
for(i in 1: nrow(k), check=0) {
end_class = end_class + as.scalar(classes[i])
class_t = XY[start_class:end_class, ]
@@ -351,7 +351,7 @@ return (Matrix[Double] XY)
classes = table(Y, 1)
}
else {
- print("smote not applicable")
+ #print("smote not applicable")
XY = cbind(X, Y)
}
}
@@ -367,7 +367,7 @@ return(Matrix[Double] X){
X = replace(target=X, pattern=NaN, replacement=max(X))
Mask = Mask * defaullt
X = X + Mask
- print("fillDefault: no of NaNs "+sum(is.na(X)))
+ # print("fillDefault: no of NaNs "+sum(is.na(X)))
}
########################################################
diff --git a/scripts/builtin/topk_cleaning.dml b/scripts/builtin/topk_cleaning.dml
index d9bdc93..75ee184 100644
--- a/scripts/builtin/topk_cleaning.dml
+++ b/scripts/builtin/topk_cleaning.dml
@@ -110,15 +110,15 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a
logical = logicalSeedCI
else
logical = logicalSeedNoCI
-
- # category = frame(["MVI", "OTLR"], rows=1, cols=2)
idx = as.integer(as.scalar(logical[1, 1])) + 1
-
category = logical[1, 2:idx]
- [bestLogical, score, T] = lg::enumerateLogical(X=eXtrain, y=eYtrain, Xtest=eXtest, ytest=eYtest, cmr=cmr, cat=category, population=logical[2:nrow(logical)],
- max_iter=ceil(resource_val/topK), metaList = metaList, evaluationFunc=evaluationFunc, evalFunHp=evalFunHp,
- primitives=primitives, param=parameters, num_inst=3 , num_exec=2, cv=cv, cvk=cvk, verbose=TRUE)
- t6 = time(); print("-- Cleaning - Enum Logical Pipelines: "+(t6-t5)/1e9+"s");
+
+ print("-- Cleaning - Enum Logical Pipelines: ");
+ [bestLogical, score] = lg::enumerateLogical(X=eXtrain, y=eYtrain, Xtest=eXtest, ytest=eYtest, cmr=cmr,
+ cat=category, population=logical[2:nrow(logical)], max_iter=ceil(resource_val/topK), metaList = metaList,
+ evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, primitives=primitives, param=parameters,
+ num_inst=3 , num_exec=2, cv=cv, cvk=cvk, verbose=TRUE, ctx=ctx)
+ t6 = time(); print("---- finalized in: "+(t6-t5)/1e9+"s");
topKPipelines = as.frame("NULL"); topKHyperParams = matrix(0,0,0); topKScores = matrix(0,0,0); features = as.frame("NULL")
diff --git a/scripts/pipelines/scripts/enumerateLogical.dml b/scripts/pipelines/scripts/enumerateLogical.dml
index 1133eb7..29ac78c 100644
--- a/scripts/pipelines/scripts/enumerateLogical.dml
+++ b/scripts/pipelines/scripts/enumerateLogical.dml
@@ -50,46 +50,50 @@
source("scripts/builtin/bandit.dml") as bandit;
-enumerateLogical = function(Matrix[Double] X, Matrix[Double] y, Matrix[Double] Xtest, Matrix[Double] ytest, Matrix[Double] cmr, Frame[Unknown] cat, Frame[Unknown] population,
- Integer max_iter=10, List[Unknown] metaList, String evaluationFunc, Matrix[Double] evalFunHp, Frame[Unknown] primitives, Frame[Unknown] param,
- Integer num_inst, Integer num_exec, Boolean cv=FALSE, Boolean cvk=3, Boolean verbose)
-return (Frame[Unknown] bestLg, Double pre_best, Double T)
-{
- t1 = time()
+
+enumerateLogical = function(Matrix[Double] X, Matrix[Double] y, Matrix[Double] Xtest, Matrix[Double] ytest,
+ Matrix[Double] cmr, Frame[Unknown] cat, Frame[Unknown] population, Integer max_iter=10, List[Unknown] metaList,
+ String evaluationFunc, Matrix[Double] evalFunHp, Frame[Unknown] primitives, Frame[Unknown] param,
+ Integer num_inst, Integer num_exec, Boolean cv=FALSE, Boolean cvk=3, Boolean verbose, List[Unknown] ctx=list(prefix="----"))
+return (Frame[Unknown] bestLg, Double pre_best)
+{
+ prefix = as.scalar(ctx["prefix"]);
bestLg = as.frame("")
- best_score = 0
- pre_best = 0
+ best_score = 0.0
+ pre_best = 0.0
feaFrameOuter = as.frame("NULL")
iter = 1
convergedOuter = FALSE
while(iter <= max_iter & !convergedOuter)
{
- physicalPipList = list()
- logicalPipList = list()
+ print(prefix+" EnumLP iteration "+iter+"/"+as.integer(max_iter)+":" );
+ physicalPipList = list();
+ logicalPipList = list();
- # # # get the physical instances from logical ones
+ # get the physical instances from logical ones
+ # unrolled by physical pipelines
for(i in 1:nrow(population)) {
lv = as.integer(as.scalar(population[i, 1])) + 1
lp = population[i, 2:lv]
- physicalConf = bandit::get_physical_configurations(lp, num_inst, primitives)
- physicalPipList = append(physicalPipList, physicalConf)
- logicalPipList = append(logicalPipList, lp)
+ pconf = bandit::get_physical_configurations(lp, num_inst, primitives)
+ for(j in 1:nrow(pconf))
+ physicalPipList = append(physicalPipList, pconf[j,]);
+ logicalPipList = append(logicalPipList, lp);
}
# # # execute the physical pipelines
- scores = matrix(0, length(physicalPipList), 1)
+ scores = matrix(0, nrow(physicalPipList), 1)
# TODO better parfor-dep handling of multi-assignments to avoid check=0
parfor(i in 1:length(physicalPipList), check=0) {
- lp2 = as.frame(logicalPipList[i,1])
- pp2 = as.frame(physicalPipList[i,1])
+ lp2 = as.frame(logicalPipList[((i-1)%/%num_inst)+1,])
+ pp2 = as.frame(physicalPipList[i,])
# # append configuration keys for extracting the pipeline later on
id = seq(1, nrow(pp2))
idpp = cbind(as.frame(id), pp2)
-
# # execute the physical instances and store the minimum scores, each pipeline is executed num_exec times
[outPip, outHp, feaFrameOuter] = bandit::run_with_hyperparam(lp2, idpp, num_exec, X, y, Xtest, ytest, metaList,
- evaluationFunc, evalFunHp, param, as.frame(""), cv, cvk, verbose)
+ evaluationFunc, evalFunHp, param, as.frame(""), cv, cvk, FALSE)
# # sort the configurations groupwise
max_perf = bandit::getMaxPerConf(outPip, nrow(pp2))
scores[i,1] = as.matrix(max_perf[1,1])
@@ -97,31 +101,28 @@ return (Frame[Unknown] bestLg, Double pre_best, Double T)
# # select parents and best score
selected = order(target = scores, by = 1, decreasing=TRUE, index.return=TRUE)
- idxR = as.scalar(selected[1, 1])
+ idxR = as.scalar(selected[1,1])
best_score = as.scalar(scores[idxR])
converged = pre_best > best_score
convergedOuter = converged
- if(converged & (iter > 1))
- {
- print("converged after "+iter+" iteration(s)")
- print("best score " + pre_best)
- print("best pipeline " + toString(bestLg))
+ if(converged & (iter > 1)) {
+ print(prefix+"EnumLP: converged after "+iter+" iteration(s)")
+ print(prefix+"EnumLP: best score " + pre_best)
+ print(prefix+"EnumLP: best pipeline " + toString(bestLg))
}
- else
- {
+ else {
pre_best = best_score
- idxC = as.integer(as.scalar(population[idxR, 1])) + 1
- bestLg = population[idxR, 2:idxC]
+ idxR2 = ((idxR-1)%/%num_inst)+1 #logical pipeline ID
+ idxC = as.integer(as.scalar(population[idxR2, 1])) + 1
+ bestLg = population[idxR2, 2:idxC]
}
pipLength = max(as.matrix(population[, 1])) + as.scalar(cmr[1, 1]) + 3
# # # if new best is not better than pre_best then no need od generating new population
children = frame(0, rows=ceil(nrow(scores)/2), cols=pipLength)
i = 1
- while(i <= ceil(nrow(scores)/2) & !converged)
- {
- top = population[as.scalar(selected[i]), ]
-
+ while(i <= ceil(nrow(scores)/2) & !converged) {
+ top = population[as.scalar(((selected[i]-1)%/%num_inst)+1), ]
length_top = as.integer(as.scalar(top[1, 1]))
top = top[, 2:(length_top+1)]
@@ -131,12 +132,10 @@ return (Frame[Unknown] bestLg, Double pre_best, Double T)
# perform mutation
c1 = mutation(c1, as.scalar(cmr[1, 2]))
-
# perform removal if non-zero
c1 = removal(c1, as.scalar(cmr[1, 3]))
# # # append length of pipeline and pipeline in frame
- # #
children[i, 1] = ncol(c1)
children[i, 2:(ncol(c1) + 1)] = c1
i = i + 1
@@ -145,12 +144,8 @@ return (Frame[Unknown] bestLg, Double pre_best, Double T)
iter = iter + 1
}
if(pre_best == best_score) {
- print("LogicalENumerator: did not converge after "+max_iter+" iterations")
-
+ print(prefix+" EnumLP did not converge after "+max_iter+" iterations")
}
-
- T = floor((time() - t1) / 1e+6)
- print("time "+T+" ms")
}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index a289218..1fc95ce 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -463,6 +463,10 @@ public class ParForProgramBlock extends ForProgramBlock
return _optMode;
}
+ public void setOptimizationMode(POptMode mode) {
+ _optMode = mode;
+ }
+
public int getDegreeOfParallelism() {
return _numThreads;
}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
index b25c7df..5b416d7 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
@@ -42,6 +42,7 @@ import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock;
import org.apache.sysds.runtime.controlprogram.IfProgramBlock;
import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
+import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.POptMode;
import org.apache.sysds.runtime.controlprogram.Program;
import org.apache.sysds.runtime.controlprogram.ProgramBlock;
import org.apache.sysds.runtime.controlprogram.WhileProgramBlock;
@@ -311,6 +312,8 @@ public class ParamservUtils {
ParForProgramBlock pfpb = (ParForProgramBlock) pb;
if( !pfpb.isDegreeOfParallelismFixed() ) {
pfpb.setDegreeOfParallelism(k);
+ if( k == 1 )
+ pfpb.setOptimizationMode(POptMode.NONE);
recompiled |= rAssignParallelismAndRecompile(pfpb.getChildBlocks(), 1, recompiled, forceExecTypeCP);
}
} else if (pb instanceof ForProgramBlock) {
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/DataGenCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/DataGenCPInstruction.java
index aa4f9ad..865174d 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/DataGenCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/DataGenCPInstruction.java
@@ -341,9 +341,10 @@ public class DataGenCPInstruction extends UnaryCPInstruction {
else if(method == OpOpDG.FRAMEINIT) {
int lrows = (int) ec.getScalarInput(rows).getLongValue();
int lcols = (int) ec.getScalarInput(cols).getLongValue();
- String schemaValues[] = schema.split(DataExpression.DELIM_NA_STRING_SEP);
- ValueType[] vt = schemaValues[0].equals(DataExpression.DEFAULT_SCHEMAPARAM) ? UtilFunctions.nCopies(lcols,
- ValueType.STRING) : UtilFunctions.stringToValueType(schemaValues);
+ String sparts[] = schema.split(DataExpression.DELIM_NA_STRING_SEP);
+ ValueType[] vt = sparts[0].equals(DataExpression.DEFAULT_SCHEMAPARAM) ?
+ UtilFunctions.nCopies(lcols, ValueType.STRING) : (sparts.length == 1 && lcols > 1) ?
+ UtilFunctions.nCopies(lcols, ValueType.valueOf(sparts[0])) : UtilFunctions.stringToValueType(sparts);
int schemaLength = vt.length;
if(schemaLength != lcols)
throw new DMLRuntimeException("schema-dimension mismatch");
diff --git a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkLogicalTest.java b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkLogicalTest.java
index f71b767..812870a 100644
--- a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkLogicalTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkLogicalTest.java
@@ -65,8 +65,6 @@ public class BuiltinTopkLogicalTest extends AutomatedTestBase {
}
private void runTestLogical(int max_iter, int num_inst, int num_exec, Types.ExecMode et) {
-
- setOutputBuffering(true);
String HOME = SCRIPT_DIR+"functions/pipelines/" ;
Types.ExecMode modeOld = setExecMode(et);
try {
diff --git a/src/test/scripts/functions/pipelines/topkLogicalTest.dml b/src/test/scripts/functions/pipelines/topkLogicalTest.dml
index a52a40f..481fb66 100644
--- a/src/test/scripts/functions/pipelines/topkLogicalTest.dml
+++ b/src/test/scripts/functions/pipelines/topkLogicalTest.dml
@@ -88,11 +88,12 @@ cmr = matrix("4 0.7 1", rows=1, cols=3)
# testY = eY[split+1:nrow(eY),]
-[bestLogical, score, T] = lg::enumerateLogical(X=trainX, y=trainY, Xtest=testX, ytest=testY, cmr=cmr, cat=categories, population=logical,
- max_iter=max_iter, metaList = metaList, evaluationFunc="evalML", evalFunHp=matrix("1 1e-3 1e-9 100", rows=1, cols=4),
- primitives=primitives, param=param , num_inst=num_inst, num_exec=num_exec, cv=FALSE, verbose=TRUE)
+[bestLogical, score] = lg::enumerateLogical(X=trainX, y=trainY, Xtest=testX, ytest=testY, cmr=cmr,
+ cat=categories, population=logical, max_iter=max_iter, metaList = metaList, evaluationFunc="evalML",
+ evalFunHp=matrix("1 1e-3 1e-9 100", rows=1, cols=4), primitives=primitives, param=param,
+ num_inst=num_inst, num_exec=num_exec, cv=FALSE, verbose=TRUE)
-print("score of pipeline: "+toString(score)+" in "+(T/60000)+" mins")
+print("score of pipeline: "+toString(score))
print("bestLogical "+toString(bestLogical))
result = dirtyScore < score
print("result satisfied ------------"+result)