You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/09/04 21:03:11 UTC

[systemds] 02/02: [SYSTEMDS-3112] Refactoring top-k cleaning pipelines (context obj), I

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 57c1643dcb4d94e4c21aba0f87143abdab02e819
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Sat Sep 4 23:01:22 2021 +0200

    [SYSTEMDS-3112] Refactoring top-k cleaning pipelines (context obj), I
---
 scripts/builtin/topk_cleaning.dml                  | 55 ++++++++++--------
 scripts/pipelines/scripts/cleaning.dml             |  4 +-
 scripts/pipelines/scripts/utils.dml                | 65 +++++++++-------------
 .../pipelines/BuiltinTopkEvaluateTest.java         |  4 +-
 4 files changed, 61 insertions(+), 67 deletions(-)

diff --git a/scripts/builtin/topk_cleaning.dml b/scripts/builtin/topk_cleaning.dml
index c4d8cf9..d9bdc93 100644
--- a/scripts/builtin/topk_cleaning.dml
+++ b/scripts/builtin/topk_cleaning.dml
@@ -22,7 +22,6 @@
 source("scripts/pipelines/scripts/utils.dml") as utils;
 source("scripts/pipelines/scripts/enumerateLogical.dml") as lg;
 
-
 s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = as.frame("NULL"), Frame[Unknown] metaData = as.frame("NULL"), Frame[Unknown] primitives,
   Frame[Unknown] parameters, Matrix[Double] cmr = matrix("4 0.7 1", rows=1, cols=3), String evaluationFunc, Matrix[Double] evalFunHp, Integer topK = 5, 
   Integer resource_val = 20, Double sample = 0.1, Boolean cv=TRUE, Integer cvk = 2, Boolean isLastLabel = TRUE, Boolean correctTypos=FALSE, String output)
@@ -30,15 +29,18 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a
   # return (Frame[Unknown] topKPipelines, Matrix[Double] topKHyperParams, Matrix[Double] topKScores, Frame[Unknown] bestLogical,
   # Frame[Unknown] features, Double dirtyScore, Matrix[Double] evalFunHp)
 {
+  t1 = time(); print("TopK-Cleaning:");
+  
   Xtest = as.frame("0")
   Ytest = as.frame("0")
-  print("starting topk_cleaning")
+  ctx = list(prefix="----"); #TODO include seed
   
-  [schema, mask, fdMask, maskY] = prepareMeta(dataTrain, metaData)
-
+  # prepare meta data
   # # keeping the meta list format if we decide to add more stuff in metadata
+  [schema, mask, fdMask, maskY] = prepareMeta(dataTrain, metaData)
   metaList = list(mask=mask, schema=schema, fd=fdMask)
-  
+  t2 = time(); print("-- Cleaning - Prepare Metadata: "+(t2-t1)/1e9+"s");
+    
   # separate the label
   [Xtrain, Ytrain] = getLabel(dataTrain, isLastLabel)
   if(!cv)
@@ -49,24 +51,31 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a
     [eYtrain, M] = transformencode(target=Ytrain, spec= "{ids:true, recode:[1]}");
     eYtest = transformapply(target=Ytest, spec= "{ids:true, recode:[1]}", meta=M);
   }
-  else
-  {
+  else {
     eYtrain = as.matrix(Ytrain)
     eYtest = as.matrix(Ytest)
   }
+  t3 = time(); print("-- Cleaning - Prepare Labels: "+(t3-t2)/1e9+"s");
 
   # # # when the evaluation function is called first we also compute and keep hyperparams of target application
+  print("-- Cleaning - Get Dirty Score: ");
   [dirtyScore, evalFunHp] = getDirtyScore(X=Xtrain, Y=eYtrain, Xtest=Xtest, Ytest=eYtest, evaluationFunc=evaluationFunc, 
-    metaList=metaList, evalFunHp=evalFunHp, sample=sample, trainML=1, cv=cv, cvk=cvk)
-  
+    metaList=metaList, evalFunHp=evalFunHp, sample=sample, trainML=1, cv=cv, cvk=cvk, ctx=ctx)
+  t4 = time(); print("---- finalized in: "+(t4-t3)/1e9+"s");
+
   # # do the string processing
-  [Xtrain, Xtest] = runStringPipeline(Xtrain, Xtest, schema, mask, cv, correctTypos)
+  print("-- Cleaning - Data Preparation (strings, transform, sample): ");
+  [Xtrain, Xtest] = runStringPipeline(Xtrain, Xtest, schema, mask, cv, correctTypos, ctx)
   
   # # if mask has 1s then there are categorical features
+  print("---- feature transformations to numeric matrix");
   [eXtrain, eXtest] = recodeData(Xtrain, Xtest, mask, cv, "recode")
   
   # apply sampling on training data for pipeline enumeration
+  # TODO why recoding/sampling twice (within getDirtyScore)
+  print("---- class-stratified sampling of feature matrix w/ f="+sample);
   [eXtrain, eYtrain] = utils::doSample(eXtrain, eYtrain, sample, TRUE)
+  t5 = time(); print("---- finalized in: "+(t5-t4)/1e9+"s");
 
   # # # create logical pipeline seeds
   logicalSeedCI =  frame([
@@ -109,7 +118,7 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a
   [bestLogical, score, T] = lg::enumerateLogical(X=eXtrain, y=eYtrain, Xtest=eXtest, ytest=eYtest, cmr=cmr, cat=category, population=logical[2:nrow(logical)],
     max_iter=ceil(resource_val/topK), metaList = metaList, evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, 
     primitives=primitives, param=parameters, num_inst=3 , num_exec=2, cv=cv, cvk=cvk, verbose=TRUE)
-  # # # bestLogical = frame(["MVI", "CI", "SCALE"], rows=1, cols=3)
+  t6 = time(); print("-- Cleaning - Enum Logical Pipelines: "+(t6-t5)/1e9+"s");
 
   topKPipelines = as.frame("NULL"); topKHyperParams = matrix(0,0,0); topKScores = matrix(0,0,0); features = as.frame("NULL")
   
@@ -117,6 +126,7 @@ s_topk_cleaning = function(Frame[Unknown] dataTrain, Frame[Unknown] dataTest = a
   perf = bandit(X_train=eXtrain, Y_train=eYtrain, X_test=eXtest, Y_test=eYtest,  metaList=metaList,
     evaluationFunc=evaluationFunc, evalFunHp=evalFunHp, lp=bestLogical, primitives=primitives, param=parameters, baseLineScore=dirtyScore,
     k=topK, R=resource_val, cv=cv, output=output, verbose=TRUE);  
+  t7 = time(); print("-- Cleaning - Enum Physical Pipelines: "+(t7-t6)/1e9+"s");
 }
 
 prepareMeta = function(Frame[Unknown] data, Frame[Unknown] metaData)
@@ -160,45 +170,46 @@ return(Frame[Unknown] X, Frame[Unknown] Y)
 }
 
 runStringPipeline = function(Frame[Unknown] Xtrain, Frame[Unknown] Xtest, Frame[String] schema,
-  Matrix[Double] mask, Boolean cv, Boolean correctTypos = FALSE)
+  Matrix[Double] mask, Boolean cv, Boolean correctTypos = FALSE, List[Unknown] ctx)
 return(Frame[Unknown] Xtrain, Frame[Unknown] Xtest)
 {
   if(cv)
-    Xtrain = utils::stringProcessing(data=Xtrain, mask=mask, schema=schema, CorrectTypos=correctTypos)
+    Xtrain = utils::stringProcessing(data=Xtrain, mask=mask, schema=schema, CorrectTypos=correctTypos, ctx=ctx)
   else
   {
     # # # binding train and test to use same dictionary for both
-    XAll = utils::stringProcessing(data=rbind(Xtrain, Xtest), mask=mask, schema=schema, CorrectTypos=correctTypos)
+    XAll = utils::stringProcessing(data=rbind(Xtrain, Xtest), mask=mask, schema=schema, CorrectTypos=correctTypos, ctx=ctx)
     Xtrain = XAll[1:nrow(Xtrain),]
     Xtest = XAll[nrow(Xtrain)+1:nrow(XAll),]
   }
 }
 
 getDirtyScore = function(Frame[Unknown] X, Matrix[Double] Y, Frame[Unknown] Xtest, Matrix[Double] Ytest, String evaluationFunc, List[Unknown] metaList,
-  Matrix[Double] evalFunHp, Double sample, Integer trainML, Boolean cv, Integer cvk)
+  Matrix[Double] evalFunHp, Double sample, Integer trainML, Boolean cv, Integer cvk, List[Unknown] ctx=list() )
 return(Double dirtyScore, Matrix[Double] evalFunHp)
 {
+  prefix = as.scalar(ctx["prefix"]);
   mask = as.matrix(metaList['mask']) 
   [eXtrain, eXtest] = recodeData(X, Xtest, mask, cv, "recode")
   eXtrain = replace(target=eXtrain, pattern=NaN, replacement = 0)
   eXtest = replace(target=eXtest, pattern=NaN, replacement = 0)
   dirtyScore = 100
-  # # # sample data
+  print(prefix+" sample from train data and dummy code");
   [eXtrain, Ytrain] =  utils::doSample(eXtrain, Y, sample, TRUE)
   [eXtrain, eXtest] = recodeData(as.frame(eXtrain), as.frame(eXtest), mask, cv, "dummycode")
   pipList = list(lp = as.frame("NULL"), ph = as.frame("NULL"), hp = as.matrix(0), flags = 0)
-  if(cv)
-  {
-    score = crossV(X=eXtrain, y=Ytrain, cvk=cvk, evalFunHp=evalFunHp, pipList=pipList, metaList=metaList, evalFunc=evaluationFunc, trainML = 1)
+
+  print(prefix+" hyper-parameter tuning");
+  if(cv) {
+    score = crossV(X=eXtrain, y=Ytrain, cvk=cvk, evalFunHp=evalFunHp,
+      pipList=pipList, metaList=metaList, evalFunc=evaluationFunc, trainML = 1)
   }
-  else 
-  {
+  else {
     score = eval(evaluationFunc, list(X=eXtrain, Y=Ytrain, Xtest=eXtest, Ytest=Ytest, Xorig=as.matrix(0), evalFunHp=evalFunHp, trainML = 1))
   }
 
   dirtyScore = as.scalar(score[1, 1])
   evalFunHp = score[1, 2:ncol(score)]
-  # evalFunHp = scoreAndHp[1, 2:ncol(scoreAndHp)]
 }
 
 recodeData = function(Frame[Unknown] Xtrain, Frame[Unknown] Xtest, Matrix[Double] mask, Boolean cv, String code)
diff --git a/scripts/pipelines/scripts/cleaning.dml b/scripts/pipelines/scripts/cleaning.dml
index 4557f07..73200bd 100644
--- a/scripts/pipelines/scripts/cleaning.dml
+++ b/scripts/pipelines/scripts/cleaning.dml
@@ -97,7 +97,7 @@ startCleaning = function(Frame[Unknown] F, Frame[Unknown] logical, String target
   paramRanges = list(10^seq(0,-10), seq(10,100, 10));
 
   [opt, loss] = gridSearchMLR(X_train, y_train, X_test, y_test, 
-	 "multiLogReg", "lossFunc", params, paramRanges, FALSE);
+    "multiLogReg", "lossFunc", params, paramRanges, FALSE);
    
   d_accuracy = classifyDirty(X_train, y_train, opt, getMask, isWeighted, cv)
   # [eX, eY] = prioritise(eX, eY, getMask)
@@ -493,7 +493,6 @@ crossV = function(Matrix[double] X, Matrix[double] y, Integer k, Matrix[Double]
   Matrix[Double] MLhp, Boolean isWeighted) 
 return (Matrix[Double] accuracyMatrix)
 {
-
   accuracyMatrix = matrix(0, k, 1)
 
   dataList = list()
@@ -526,7 +525,6 @@ return (Matrix[Double] accuracyMatrix)
     dataList = append(dataList, fold_i)
     fold_idxes[, 1] = fold_idxes[, 2] + 1
     fold_idxes[, 2] += ins_per_fold
-    while(FALSE){}
   }
 
   for(i in seq(1,k))
diff --git a/scripts/pipelines/scripts/utils.dml b/scripts/pipelines/scripts/utils.dml
index f6d3d01..05d22a8 100644
--- a/scripts/pipelines/scripts/utils.dml
+++ b/scripts/pipelines/scripts/utils.dml
@@ -60,24 +60,26 @@ doSample = function(Matrix[Double] eX, Matrix[Double] eY, Double ratio, Boolean
 {
   MIN_SAMPLE = 1000
   sampled = floor(nrow(eX) * ratio)
-  sample = ifelse(sampled > MIN_SAMPLE, TRUE, FALSE)
-  dist = table(eY, 1)
-  dist = nrow(dist)
-  if(sample)
+  sampledX = eX
+  sampledY = eY
+  
+  if(sampled > MIN_SAMPLE)
   {
+    dist = max(eY) # num classes (one-hot encoded eY) 
+    
     if((nrow(eY) > 1) & (dist < 10))  # for classification
     {
       XY = order(target = cbind(eY, eX),  by = 1, decreasing=FALSE, index.return=FALSE)
-      # get the class count 
+      # get the class count
       classes = table(eY, 1)
+      # TODO vectorize extraction compute extraction vector
       start_class = 1
       out_s = 1 
       out_e = 0
       end_class = 0
       out = matrix(0, sampled, ncol(XY))
       classes_ratio = floor(classes*ratio)
-      for(i in 1:nrow(classes))
-      {
+      for(i in 1:nrow(classes)) {
         end_class = end_class + as.scalar(classes[i])
         class_t = XY[start_class:end_class, ]
         out_e = out_e + as.scalar(classes_ratio[i]) 
@@ -89,28 +91,15 @@ doSample = function(Matrix[Double] eX, Matrix[Double] eY, Double ratio, Boolean
       sampledY = out[, 1]
       sampledX = out[, 2:ncol(out)]
     }
-    else if(nrow(eY) > 1 & (dist > 10)) # regression
-    {
+    else if(nrow(eY) > 1 & (dist > 10)) { # regression
       sampledX = eX[1:sampled, ]
       sampledY = eY[1:sampled, ]
     }
-    else if(nrow(eY) == 1)
-    {
+    else if(nrow(eY) == 1) { # TODO ?
       sampledX =  eX[1:sampled, ]
       sampledY = eY 
     }
-    else {
-      sampledX = eX
-      sampledY = eY    
-    }
   }
-  else 
-  { 
-    sampledX = eX
-    sampledY = eY 
-  }
-  if(verbose)
-    print("AFTER SAMPLING: "+nrow(eX))
 }
 
 # #######################################################################
@@ -154,29 +143,26 @@ return(Boolean validForResources)
   validForResources = count > 0
 }
 
-stringProcessing = function(Frame[Unknown] data, Matrix[Double] mask, Frame[String] schema, Boolean CorrectTypos)
+stringProcessing = function(Frame[Unknown] data, Matrix[Double] mask, 
+  Frame[String] schema, Boolean CorrectTypos, List[Unknown] ctx = list(prefix="--"))
 return(Frame[Unknown] processedData)
 {
+  prefix = as.scalar(ctx["prefix"]);
 
   # step 1 drop invalid types
+  print(prefix+" drop values with type mismatch");
   data = dropInvalidType(data, schema)
   
   # step 2 do the case transformations
+  print(prefix+" convert strings to lower case");
   for(i in 1:ncol(mask))
-  {
     if(as.scalar(schema[1,i]) == "STRING")
-    {
-      lowerCase = map(data[, i], "x -> x.toLowerCase()")
-      data[, i] = lowerCase
-    }
-
-  }
-
+      data[, i] = map(data[, i], "x -> x.toLowerCase()")
+    
   if(CorrectTypos)
   {
-  # recode data to get null mask
-    if(sum(mask) > 0)
-    {
+    # recode data to get null mask
+    if(sum(mask) > 0) {
       # always recode the label
       index = vectorToCsv(mask)
       jspecR = "{ids:true, recode:["+index+"]}"
@@ -186,18 +172,19 @@ return(Frame[Unknown] processedData)
     else
       eX = as.matrix(data)
     nullMask = is.na(eX)
-    print("starting correctTypos ")
+    print(prefix+" correct typos in strings");
     # fix the typos
     for(i in 1:ncol(schema))
-    {
       if(as.scalar(schema[1,i]) == "STRING")
         data[, i] = correctTypos(data[, i], nullMask[, i], 0.2, 0.9, FALSE, TRUE, FALSE);
-    }
-    # print("after correctTypos "+toString(data, rows=5))
   }
   
+  print(prefix+" porter-stemming on all features");
   data = map(data, "x -> PorterStemmer.stem(x)")
+  
   # TODO add deduplication
+  print(prefix+" deduplication via entity resolution");
+  
   processedData = data
 }
 
@@ -238,7 +225,7 @@ topk_gridSearch = function(Matrix[Double] X, Matrix[Double] y, Matrix[Double] Xt
   # Step 2) materialize hyper-parameter combinations
   # (simplify debugging and compared to compute negligible)
   HP = matrix(0, numConfigs, numParams);
-  for( i in 1:nrow(HP) ) {
+  parfor( i in 1:nrow(HP) ) {
     for( j in 1:numParams )
       HP[i,j] = paramVals[j,as.scalar(((i-1)/cumLens[j,1])%%paramLens[j,1]+1)];
   }
diff --git a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java
index acfd032..a5bb997 100644
--- a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinTopkEvaluateTest.java
@@ -49,9 +49,7 @@ public class BuiltinTopkEvaluateTest extends AutomatedTestBase {
 	}
 
 	private void evalPip(double split, String cv, String path, Types.ExecMode et) {
-
-		setOutputBuffering(true);
-		String HOME = SCRIPT_DIR+"functions/pipelines/" ;
+		String HOME = SCRIPT_DIR+"functions/pipelines/";
 		Types.ExecMode modeOld = setExecMode(et);
 		try {
 			loadTestConfiguration(getTestConfiguration(TEST_NAME1));