You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/09/04 21:03:11 UTC
[systemds] 02/02: [SYSTEMDS-3112] Refactoring top-k cleaning
pipelines (context obj), I
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 57c1643dcb4d94e4c21aba0f87143abdab02e819
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sat Sep 4 23:01:22 2021 +0200
[SYSTEMDS-3112] Refactoring top-k cleaning pipelines (context obj), I
---
scripts/builtin/topk_cleaning.dml | 55 ++++++++++--------
scripts/pipelines/scripts/cleaning.dml | 4 +-
scripts/pipelines/scripts/utils.dml | 65 +++++++++-------------
.../pipelines/BuiltinTopkEvaluateTest.java | 4 +-
4 files changed, 61 insertions(+), 67 deletions(-)
diff --git a/scripts/builtin/topk_cleaning.dml b/scripts/builtin/topk_cleaning.dml
index c4d8cf9..d9bdc93 100644
--- a/scripts/builtin/topk_cleaning.dml
+++ b/scripts/builtin/topk_cleaning.dml
@@ -22,7 +22,6 @@
source("scripts/pipelines/scripts/utils.dml") as utils;
source("scripts/pipelines/scripts/enumerateLogical.dml") as lg;
-
s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = as.frame("NULL"), Frame[Unknown] metaData = as.frame("NULL"), Frame[Unknown] primitives,
Frame[Unknown] parameters, Matrix[Double] cmr = matrix("4 0.7 1", rows=1, cols=3), String evaluationFunc, Matrix[Double] evalFunHp, Integer topK = 5,
Integer resource_val = 20, Double sample = 0.1, Boolean cv=TRUE, Integer cvk = 2, Boolean isLastLabel = TRUE, Boolean correctTypos=FALSE, String output)
@@ -30,15 +29,18 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a
# return (Frame[Unknown] topKPipelines, Matrix[Double] topKHyperParams, Matrix[Double] topKScores, Frame[Unknown] bestLogical,
# Frame[Unknown] features, Double dirtyScore, Matrix[Double] evalFunHp)
{
+ t1 = time(); print("TopK-Cleaning:");
+
Xtest = as.frame("0")
Ytest = as.frame("0")
- print("starting topk_cleaning")
+ ctx = list(prefix="----"); #TODO include seed
- [schema, mask, fdMask, maskY] = prepareMeta(dataTrain, metaData)
-
+ # prepare meta data
# # keeping the meta list format if we decide to add more stuff in metadata
+ [schema, mask, fdMask, maskY] = prepareMeta(dataTrain, metaData)
metaList = list(mask=mask, schema=schema, fd=fdMask)
-
+ t2 = time(); print("-- Cleaning - Prepare Metadata: "+(t2-t1)/1e9+"s");
+
# separate the label
[Xtrain, Ytrain] = getLabel(dataTrain, isLastLabel)
if(!cv)
@@ -49,24 +51,31 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a
[eYtrain, M] = transformencode(target=Ytrain, spec= "{ids:true, recode:[1]}");
eYtest = transformapply(target=Ytest, spec= "{ids:true, recode:[1]}", meta=M);
}
- else
- {
+ else {
eYtrain = as.matrix(Ytrain)
eYtest = as.matrix(Ytest)
}
+ t3 = time(); print("-- Cleaning - Prepare Labels: "+(t3-t2)/1e9+"s");
# # # when the evaluation function is called first we also compute and keep hyperparams of target application
+ print("-- Cleaning - Get Dirty Score: ");
[dirtyScore, evalFunHp] = getDirtyScore(X=Xtrain, Y=eYtrain, Xtest=Xtest, Ytest=eYtest, evaluationFunc=evaluationFunc,
- metaList=metaList, evalFunHp=evalFunHp, sample=sample, trainML=1, cv=cv, cvk=cvk)
-
+ metaList=metaList, evalFunHp=evalFunHp, sample=sample, trainML=1, cv=cv, cvk=cvk, ctx=ctx)
+ t4 = time(); print("---- finalized in: "+(t4-t3)/1e9+"s");
+
# # do the string processing
- [Xtrain, Xtest] = runStringPipeline(Xtrain, Xtest, schema, mask, cv, correctTypos)
+ print("-- Cleaning - Data Preparation (strings, transform, sample): ");
+ [Xtrain, Xtest] = runStringPipeline(Xtrain, Xtest, schema, mask, cv, correctTypos, ctx)
# # if mask has 1s then there are categorical features
+ print("---- feature transformations to numeric matrix");
[eXtrain, eXtest] = recodeData(Xtrain, Xtest, mask, cv, "recode")
# apply sampling on training data for pipeline enumeration
+ # TODO why recoding/sampling twice (within getDirtyScore)
+ print("---- class-stratified sampling of feature matrix w/ f="+sample);
[eXtrain, eYtrain] = utils::doSample(eXtrain, eYtrain, sample, TRUE)
+ t5 = time(); print("---- finalized in: "+(t5-t4)/1e9+"s");
# # # create logical pipeline seeds
logicalSeedCI = frame([
@@ -109,7 +118,7 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a
[bestLogical, score, T] = lg::enumerateLogical(X=eXtrain, y=eYtrain, Xtest=eXtest, ytest=eYtest, cmr=cmr, cat=category, population=logical[2:nrow(logical)],
max_iter=ceil(resource_val/topK), metaList = metaList, evaluationFunc=evaluationFunc, evalFunHp=evalFunHp,
primitives=primitives, param=parameters, num_inst=3 , num_exec=2, cv=cv, cvk=cvk, verbose=TRUE)
- # # # bestLogical = frame(["MVI", "CI", "SCALE"], rows=1, cols=3)
+ t6 = time(); print("-- Cleaning - Enum Logical Pipelines: "+(t6-t5)/1e9+"s");
topKPipelines = as.frame("NULL"); topKHyperParams = matrix(0,0,0); topKScores = matrix(0,0,0); features = as.frame("NULL")
@@ -117,6 +126,7 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a
perf = bandit(X_train=eXtrain, Y_train=eYtrain, X_test=eXtest, Y_test=eYtest, metaList=metaList,
evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, lp=bestLogical, primitives=primitives, param=parameters, baseLineScore=dirtyScore,
k=topK, R=resource_val, cv=cv, output=output, verbose=TRUE);
+ t7 = time(); print("-- Cleaning - Enum Physical Pipelines: "+(t7-t6)/1e9+"s");
}
prepareMeta = function(Frame[Unknown] data, Frame[Unknown] metaData)
@@ -160,45 +170,46 @@ return(Frame[Unknown] X, Frame[Unknown] Y)
}
runStringPipeline = function(Frame[Unknown] Xtrain, Frame[Unknown] Xtest, Frame[String] schema,
- Matrix[Double] mask, Boolean cv, Boolean correctTypos = FALSE)
+ Matrix[Double] mask, Boolean cv, Boolean correctTypos = FALSE, List[Unknown] ctx)
return(Frame[Unknown] Xtrain, Frame[Unknown] Xtest)
{
if(cv)
- Xtrain = utils::stringProcessing(data=Xtrain, mask=mask, schema=schema, CorrectTypos=correctTypos)
+ Xtrain = utils::stringProcessing(data=Xtrain, mask=mask, schema=schema, CorrectTypos=correctTypos, ctx=ctx)
else
{
# # # binding train and test to use same dictionary for both
- XAll = utils::stringProcessing(data=rbind(Xtrain, Xtest), mask=mask, schema=schema, CorrectTypos=correctTypos)
+ XAll = utils::stringProcessing(data=rbind(Xtrain, Xtest), mask=mask, schema=schema, CorrectTypos=correctTypos, ctx=ctx)
Xtrain = XAll[1:nrow(Xtrain),]
Xtest = XAll[nrow(Xtrain)+1:nrow(XAll),]
}
}
getDirtyScore = function(Frame[Unknown] X, Matrix[Double] Y, Frame[Unknown] Xtest, Matrix[Double] Ytest, String evaluationFunc, List[Unknown] metaList,
- Matrix[Double] evalFunHp, Double sample, Integer trainML, Boolean cv, Integer cvk)
+ Matrix[Double] evalFunHp, Double sample, Integer trainML, Boolean cv, Integer cvk, List[Unknown] ctx=list() )
return(Double dirtyScore, Matrix[Double] evalFunHp)
{
+ prefix = as.scalar(ctx["prefix"]);
mask = as.matrix(metaList['mask'])
[eXtrain, eXtest] = recodeData(X, Xtest, mask, cv, "recode")
eXtrain = replace(target=eXtrain, pattern=NaN, replacement = 0)
eXtest = replace(target=eXtest, pattern=NaN, replacement = 0)
dirtyScore = 100
- # # # sample data
+ print(prefix+" sample from train data and dummy code");
[eXtrain, Ytrain] = utils::doSample(eXtrain, Y, sample, TRUE)
[eXtrain, eXtest] = recodeData(as.frame(eXtrain), as.frame(eXtest), mask, cv, "dummycode")
pipList = list(lp = as.frame("NULL"), ph = as.frame("NULL"), hp = as.matrix(0), flags = 0)
- if(cv)
- {
- score = crossV(X=eXtrain, y=Ytrain, cvk=cvk, evalFunHp=evalFunHp, pipList=pipList, metaList=metaList, evalFunc=evaluationFunc, trainML = 1)
+
+ print(prefix+" hyper-parameter tuning");
+ if(cv) {
+ score = crossV(X=eXtrain, y=Ytrain, cvk=cvk, evalFunHp=evalFunHp,
+ pipList=pipList, metaList=metaList, evalFunc=evaluationFunc, trainML = 1)
}
- else
- {
+ else {
score = eval(evaluationFunc, list(X=eXtrain, Y=Ytrain, Xtest=eXtest, Ytest=Ytest, Xorig=as.matrix(0), evalFunHp=evalFunHp, trainML = 1))
}
dirtyScore = as.scalar(score[1, 1])
evalFunHp = score[1, 2:ncol(score)]
- # evalFunHp = scoreAndHp[1, 2:ncol(scoreAndHp)]
}
recodeData = function(Frame[Unknown] Xtrain, Frame[Unknown] Xtest, Matrix[Double] mask, Boolean cv, String code)
diff --git a/scripts/pipelines/scripts/cleaning.dml b/scripts/pipelines/scripts/cleaning.dml
index 4557f07..73200bd 100644
--- a/scripts/pipelines/scripts/cleaning.dml
+++ b/scripts/pipelines/scripts/cleaning.dml
@@ -97,7 +97,7 @@ startCleaning = function(Frame[Unknown] F, Frame[Unknown] logical, String target
paramRanges = list(10^seq(0,-10), seq(10,100, 10));
[opt, loss] = gridSearchMLR(X_train, y_train, X_test, y_test,
- "multiLogReg", "lossFunc", params, paramRanges, FALSE);
+ "multiLogReg", "lossFunc", params, paramRanges, FALSE);
d_accuracy = classifyDirty(X_train, y_train, opt, getMask, isWeighted, cv)
# [eX, eY] = prioritise(eX, eY, getMask)
@@ -493,7 +493,6 @@ crossV = function(Matrix[double] X, Matrix[double] y, Integer k, Matrix[Double]
Matrix[Double] MLhp, Boolean isWeighted)
return (Matrix[Double] accuracyMatrix)
{
-
accuracyMatrix = matrix(0, k, 1)
dataList = list()
@@ -526,7 +525,6 @@ return (Matrix[Double] accuracyMatrix)
dataList = append(dataList, fold_i)
fold_idxes[, 1] = fold_idxes[, 2] + 1
fold_idxes[, 2] += ins_per_fold
- while(FALSE){}
}
for(i in seq(1,k))
diff --git a/scripts/pipelines/scripts/utils.dml b/scripts/pipelines/scripts/utils.dml
index f6d3d01..05d22a8 100644
--- a/scripts/pipelines/scripts/utils.dml
+++ b/scripts/pipelines/scripts/utils.dml
@@ -60,24 +60,26 @@ doSample = function(Matrix[Double] eX, Matrix[Double] eY, Double ratio, Boolean
{
MIN_SAMPLE = 1000
sampled = floor(nrow(eX) * ratio)
- sample = ifelse(sampled > MIN_SAMPLE, TRUE, FALSE)
- dist = table(eY, 1)
- dist = nrow(dist)
- if(sample)
+ sampledX = eX
+ sampledY = eY
+
+ if(sampled > MIN_SAMPLE)
{
+ dist = max(eY) # num classes (one-hot encoded eY)
+
if((nrow(eY) > 1) & (dist < 10)) # for classification
{
XY = order(target = cbind(eY, eX), by = 1, decreasing=FALSE, index.return=FALSE)
- # get the class count
+ # get the class count
classes = table(eY, 1)
+ # TODO vectorize extraction compute extraction vector
start_class = 1
out_s = 1
out_e = 0
end_class = 0
out = matrix(0, sampled, ncol(XY))
classes_ratio = floor(classes*ratio)
- for(i in 1:nrow(classes))
- {
+ for(i in 1:nrow(classes)) {
end_class = end_class + as.scalar(classes[i])
class_t = XY[start_class:end_class, ]
out_e = out_e + as.scalar(classes_ratio[i])
@@ -89,28 +91,15 @@ doSample = function(Matrix[Double] eX, Matrix[Double] eY, Double ratio, Boolean
sampledY = out[, 1]
sampledX = out[, 2:ncol(out)]
}
- else if(nrow(eY) > 1 & (dist > 10)) # regression
- {
+ else if(nrow(eY) > 1 & (dist > 10)) { # regression
sampledX = eX[1:sampled, ]
sampledY = eY[1:sampled, ]
}
- else if(nrow(eY) == 1)
- {
+ else if(nrow(eY) == 1) { # TODO ?
sampledX = eX[1:sampled, ]
sampledY = eY
}
- else {
- sampledX = eX
- sampledY = eY
- }
}
- else
- {
- sampledX = eX
- sampledY = eY
- }
- if(verbose)
- print("AFTER SAMPLING: "+nrow(eX))
}
# #######################################################################
@@ -154,29 +143,26 @@ return(Boolean validForResources)
validForResources = count > 0
}
-stringProcessing = function(Frame[Unknown] data, Matrix[Double] mask, Frame[String] schema, Boolean CorrectTypos)
+stringProcessing = function(Frame[Unknown] data, Matrix[Double] mask,
+ Frame[String] schema, Boolean CorrectTypos, List[Unknown] ctx = list(prefix="--"))
return(Frame[Unknown] processedData)
{
+ prefix = as.scalar(ctx["prefix"]);
# step 1 drop invalid types
+ print(prefix+" drop values with type mismatch");
data = dropInvalidType(data, schema)
# step 2 do the case transformations
+ print(prefix+" convert strings to lower case");
for(i in 1:ncol(mask))
- {
if(as.scalar(schema[1,i]) == "STRING")
- {
- lowerCase = map(data[, i], "x -> x.toLowerCase()")
- data[, i] = lowerCase
- }
-
- }
-
+ data[, i] = map(data[, i], "x -> x.toLowerCase()")
+
if(CorrectTypos)
{
- # recode data to get null mask
- if(sum(mask) > 0)
- {
+ # recode data to get null mask
+ if(sum(mask) > 0) {
# always recode the label
index = vectorToCsv(mask)
jspecR = "{ids:true, recode:["+index+"]}"
@@ -186,18 +172,19 @@ return(Frame[Unknown] processedData)
else
eX = as.matrix(data)
nullMask = is.na(eX)
- print("starting correctTypos ")
+ print(prefix+" correct typos in strings");
# fix the typos
for(i in 1:ncol(schema))
- {
if(as.scalar(schema[1,i]) == "STRING")
data[, i] = correctTypos(data[, i], nullMask[, i], 0.2, 0.9, FALSE, TRUE, FALSE);
- }
- # print("after correctTypos "+toString(data, rows=5))
}
+ print(prefix+" porter-stemming on all features");
data = map(data, "x -> PorterStemmer.stem(x)")
+
# TODO add deduplication
+ print(prefix+" deduplication via entity resolution");
+
processedData = data
}
@@ -238,7 +225,7 @@ topk_gridSearch = function(Matrix[Double] X, Matrix[Double] y, Matrix[Double] Xt
# Step 2) materialize hyper-parameter combinations
# (simplify debugging and compared to compute negligible)
HP = matrix(0, numConfigs, numParams);
- for( i in 1:nrow(HP) ) {
+ parfor( i in 1:nrow(HP) ) {
for( j in 1:numParams )
HP[i,j] = paramVals[j,as.scalar(((i-1)/cumLens[j,1])%%paramLens[j,1]+1)];
}
diff --git a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java
index acfd032..a5bb997 100644
--- a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java
@@ -49,9 +49,7 @@ public class BuiltinTopkEvaluateTest extends AutomatedTestBase {
}
private void evalPip(double split, String cv, String path, Types.ExecMode et) {
-
- setOutputBuffering(true);
- String HOME = SCRIPT_DIR+"functions/pipelines/" ;
+ String HOME = SCRIPT_DIR+"functions/pipelines/";
Types.ExecMode modeOld = setExecMode(et);
try {
loadTestConfiguration(getTestConfiguration(TEST_NAME1));