You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ss...@apache.org on 2021/05/04 09:18:25 UTC
[systemds] branch master updated: [SYSTEMDS-2962] Initial
implementation of Logical Pipelines optimizer Optimizer is based on
evolutionary algorithm with constrained crossover and mutation this commit
also contains some minor formatting cleanups in pipelines package Closes
#1249.
This is an automated email from the ASF dual-hosted git repository.
ssiddiqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new 92cda3f [SYSTEMDS-2962] Initial implementation of Logical Pipelines optimizer Optimizer is based on evolutionary algorithm with constrained crossover and mutation this commit also contains some minor formatting cleanups in pipelines package Closes #1249.
92cda3f is described below
commit 92cda3f811ccf0b1cbb326fa0f96fddf26a0d8e5
Author: Shafaq Siddiqi <sh...@tugraz.at>
AuthorDate: Tue May 4 11:16:40 2021 +0200
[SYSTEMDS-2962] Initial implementation of Logical Pipelines optimizer
Optimizer is based on evolutionary algorithm with constrained crossover and mutation
this commit also contains some minor formatting cleanups in pipelines package
Closes #1249.
---
scripts/builtin/bandit.dml | 69 +++++----
scripts/pipelines/scripts/enumerateLogical.dml | 172 +++++++++++++++++++++
scripts/pipelines/scripts/logicalFunc.dml | 4 +-
scripts/pipelines/scripts/utils.dml | 22 ++-
.../pipelines/CleaningTestClassification.java | 19 ++-
.../functions/pipelines/CleaningTestCompare.java | 5 +-
...ngTestCompare.java => CleaningTestLogical.java} | 51 +++---
.../scripts/functions/pipelines/testLogical.dml | 122 +++++++++++++++
8 files changed, 391 insertions(+), 73 deletions(-)
diff --git a/scripts/builtin/bandit.dml b/scripts/builtin/bandit.dml
index 687d76b..ec93e7e 100644
--- a/scripts/builtin/bandit.dml
+++ b/scripts/builtin/bandit.dml
@@ -25,6 +25,7 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, List[Unknown
{
print("Starting optimizer")
NUM_FEATURES = 14
+ HYPERPARAM_LENGTH = 110
print("null in data "+sum(is.na(X_train)))
bestPipeline = frame("", rows=1, cols=1)
bestHyperparams = as.matrix(0)
@@ -36,7 +37,7 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, List[Unknown
B = (s_max + 1) * R;
# initialize output variables
- hparam = matrix(0, rows=k*(s_max+1), cols=100)
+ hparam = matrix(0, rows=k*(s_max+1), cols=HYPERPARAM_LENGTH)
pipeline = frame(0, rows=k*(s_max+1), cols=ncol(lp)+1)
startOut=0; endOut=0;
feaFrameOuter = frame(data=["#MissingValues", "MinVla", "MaxVal", "AverageMin", "AverageMax",
@@ -46,7 +47,7 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, List[Unknown
for(s in s_max:0) {
# result variables
- bracket_hp = matrix(0, rows=k*(s+1)+k, cols=100)
+ bracket_hp = matrix(0, rows=k*(s+1)+k, cols=HYPERPARAM_LENGTH)
bracket_pipel = matrix(0, rows=k*(s+1)+k, cols=3)
start=1; end=0;
@@ -90,8 +91,8 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, List[Unknown
bracket_hp[start:end, 1:ncol(b)] = b[1:rowIndex,]
start = end + 1
- # sort the configurations fro successive halving
- avergae_perf = getMaxPerConf(outPip)
+ # sort the configurations for successive halving
+ avergae_perf = getMaxPerConf(outPip, nrow(configurations))
configurations = frameSort(cbind(avergae_perf, configurations))
configurations = configurations[, 2:ncol(configurations)]
}
@@ -201,7 +202,7 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
List[Unknown] targetList, Frame[Unknown] param, Frame[Unknown] featureFrameOuter, Boolean verbose)
return (Matrix[Double] output_operator, Matrix[Double] output_hyperparam, Frame[Unknown] featureFrameOuter) {
- output_hp = matrix(0, nrow(ph_pip)*r_i, 60)
+ output_hp = matrix(0, nrow(ph_pip)*r_i, 100)
output_accuracy = matrix(0, nrow(ph_pip)*r_i, 1)
output_pipelines = matrix(0, nrow(ph_pip)*r_i, 2)
@@ -218,7 +219,8 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
{
# execute configurations with r resources
[hp, no_of_res, no_of_flag_vars] = getHyperparam(ph_pip[i], param, r_i)
- feaFrame = frame("", rows = no_of_res, cols = ncol(featureFrameOuter))
+ if(ncol(featureFrameOuter) > 1)
+ feaFrame = frame("", rows = no_of_res, cols = ncol(featureFrameOuter))
pip_toString = pipToString(ph_pip[i])
for(r in 1:no_of_res)
{
@@ -239,20 +241,24 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
hp_vec = cbind(matrix_width, matrix(hp_matrix, rows=1, cols=nrow(hp_matrix)*ncol(hp_matrix), byrow=TRUE))
output_accuracy[index, 1] = accuracy
output_hp[index, 1:ncol(hp_vec)] = hp_vec
- output_pipelines[index, ] = cbind(as.matrix(i), id[i,1])
+ output_pipelines[index, ] = cbind(as.matrix(index), id[i,1])
X = clone_X
Y = clone_Y
- index = index + 1
- feaFrame[r, 1:ncol(feaVec)] = as.frame(feaVec)
- feaFrame[r, (ncol(feaVec)+1)] = pip_toString
- feaFrame[r, (ncol(feaVec)+2)] = accuracy
- feaFrame[r, (ncol(feaVec)+3)] = T
- feaFrame[r, (ncol(feaVec)+4)] = accT
+ index = index + 1
+
+ if(ncol(featureFrameOuter) > 1) {
+ feaFrame[r, 1:ncol(feaVec)] = as.frame(feaVec)
+ feaFrame[r, (ncol(feaVec)+1)] = pip_toString
+ feaFrame[r, (ncol(feaVec)+2)] = accuracy
+ feaFrame[r, (ncol(feaVec)+3)] = T
+ feaFrame[r, (ncol(feaVec)+4)] = accT
+ }
}
X = clone_X
Y = clone_Y
- featureFrameOuter = rbind(featureFrameOuter, feaFrame)
+ if(ncol(featureFrameOuter) > 1)
+ featureFrameOuter = rbind(featureFrameOuter, feaFrame)
}
output_hyperparam = removeEmpty(target=cbind(output_accuracy, output_hp), margin="rows")
output_operator = removeEmpty(target=cbind(output_accuracy, output_pipelines) ,margin="rows")
@@ -316,23 +322,26 @@ getHyperparam = function(Frame[Unknown] pipeline, Frame[Unknown] hpList, Intege
maxVal = as.scalar(hpList[index, paramValIndex + 1])
if(type == "FP") {
val = rand(rows=no_of_res, cols=1, min=minVal,max=maxVal, pdf="uniform");
- OpParam[, j] = val;
- } else if(type == "INT") {
+ OpParam[, j] = val;
+ }
+ else if(type == "INT") {
val = sample(maxVal, no_of_res, TRUE);
less_than_min = val < minVal;
val = (less_than_min * minVal) + val;
- OpParam[, j] = val
- } else if(type == "BOOL") {
+ OpParam[, j] = val;
+ }
+ else if(type == "BOOL") {
if(maxVal == 1) {
s = sample(2, no_of_res, TRUE);
b = s - 1;
- OpParam[, j] = b;
- } else
+ OpParam[, j] = b;
+ }
+ else
OpParam[, j] = matrix(0, rows=no_of_res, cols=1)
- } else {
- # TODO handle string set something like {,,}
- print("invalid data type")
}
+ else
+ print("invalid data type") # TODO handle string set something like {,,}
+
paramIdx = paramIdx + 2
typeIdx = typeIdx + 1
}
@@ -434,11 +443,11 @@ extractBracketWinners = function(Matrix[Double] pipeline, Matrix[Double] hyperpa
###########################################################################
# The function will return the max performance by each individual pipeline
############################################################################
-getMaxPerConf = function(Matrix[Double] pipelines)
+getMaxPerConf = function(Matrix[Double] pipelines, Double size)
return (Frame[Unknown] maxperconf)
{
tab = removeEmpty(target=table(pipelines[, 2], pipelines[, 3], pipelines[, 1]), margin="cols")
- maxperconf = frame(0, rows=max(pipelines[, 2]), cols=1)
+ maxperconf = frame(0, rows=size, cols=1)
maxperconf[1:ncol(tab),] = as.frame(t(colMaxs(tab)))
}
@@ -457,9 +466,7 @@ fclassify = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Ma
else {
print("STARTING "+cv+" CROSS VALIDATIONS")
# do the k = 3 cross validations
- t1 = time()
- accuracyMatrix = crossV(X, Y, cv, mask, MLhp, isWeighted)
- T = floor((time() - t1) / 1e+6)
+ [accuracyMatrix, T] = crossV(X, Y, cv, mask, MLhp, isWeighted)
accuracyMatrix = removeEmpty(target=accuracyMatrix, margin="rows")
acc = colMeans(accuracyMatrix)
accuracy = as.scalar(acc[1,1])
@@ -477,8 +484,9 @@ fclassify = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Ma
crossV = function(Matrix[double] X, Matrix[double] y, Integer k, Matrix[Double] mask,
Matrix[Double] MLhp, Boolean isWeighted)
-return (Matrix[Double] accuracyMatrix)
+return (Matrix[Double] accuracyMatrix, Double T)
{
+ t1 = time()
accuracyMatrix = matrix(0, k, 1)
dataList = list()
testL = list()
@@ -523,6 +531,7 @@ return (Matrix[Double] accuracyMatrix)
accuracy = getAccuracy(testy, yhat, isWeighted)
accuracyMatrix[i] = accuracy
}
+ T = floor((time() - t1) / 1e+6)
}
@@ -603,7 +612,7 @@ return (Double precision, Double T)
match = (abs(cleanX - fixedX) < 0.001) * correctionsRequired
print("total matches "+sum(match))
# print("total matches \n"+toString(match))
- precision = max(0.001, sum(match) / correctionsMade)
+ precision = max(0.001, sum(match) / max(1, correctionsMade))
T = floor((time() - t1) / 1e+6)
print("Precision: "+toString(precision) + " in "+T+" ms")
diff --git a/scripts/pipelines/scripts/enumerateLogical.dml b/scripts/pipelines/scripts/enumerateLogical.dml
new file mode 100644
index 0000000..1cb0ca3
--- /dev/null
+++ b/scripts/pipelines/scripts/enumerateLogical.dml
@@ -0,0 +1,172 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+# Generate the logical pipelines using basic evolutionary algorithm,
+# population -> logical Pipeline, chromosome -> physical pipeline, gene -> hp
+# input :
+# 1. Dataset X
+# 2. population, different logical seed pipelines format = [number of operators, op1, op2, op3 ..., opn]
+# 3. number of iterations
+# 4. pipLength, length of pipeline, number of operator in each pipeline
+# 5. meta data list i.e., schema, mask, fdmask
+# 6. target list i.e, target application, cv value etc.
+# 7. primitives, physical operator list
+# 8. param, physical operator hyperparameters
+# 9. num_inst value, number of physical instances for each logical to be executed
+# 10. num_exec, how many times each physical pipeline should be executed
+# 11. n_pop, children created in each generation
+# output: best logical pipeline and evaluation time in ms
+
+
+# idea is to get the initial set of logical pipelines, as population, then get the num_inst physical pipelines foreach
+# logical pipeline in population. Then execute these physical pipelines num_exec time were in each execution a random set of
+# hyperparameters is used to execute operators. The compute a score vector by storing the best score foreach logical pipeline in
+# population. Sort the pipelines by score and take n_pop pipelines as parents for generating new population.
+# from the selected pipelines take a pair in each iteration as parent and generate a pair of children by doing crossover and mutation.
+# In crossover make a child by taking some operation from p1 and some operations from p2 and in mutation randomly swap the
+# operators in children. There new children will be the population in next iteration. Repeat the process max_iter time.
+# Converge in between if the best_score of previous generation is better then best_score of new generation.
+
+source("scripts/builtin/bandit.dml") as bandit;
+source("scripts/pipelines/scripts/utils.dml") as utils;
+
+enumerateLogical = function(Matrix[Double] X, Matrix[Double] y, Frame[Unknown] population, Integer max_iter=10,
+ Integer pipLength, List[Unknown] metaList, List[Unknown] targetList, Frame[Unknown] primitives, Frame[Unknown] param,
+ Integer num_inst, Integer num_exec, Integer n_pop, Boolean verbose)
+return (Frame[Unknown] bestLg, Double pre_best, Double T)
+{
+
+ t1 = time()
+ bestLg = as.frame("")
+ best_score = 0
+ pre_best = 0
+ feaFrameOuter = as.frame("NULL")
+ iter = 1
+ convergedOuter = FALSE
+ while(iter <= max_iter & !convergedOuter)
+ {
+ physicalPipList = list()
+ # # # get the physical instances from logical ones
+ for(i in 1:nrow(population))
+ {
+ lv = as.integer(as.scalar(population[i, 1])) + 1
+ lp = population[i, 2:lv]
+ physicalConf = bandit::get_physical_configurations(lp, num_inst, primitives)
+ physicalPipList = append(physicalPipList, physicalConf)
+ }
+
+ scores = matrix(0, rows=length(physicalPipList), cols=1)
+
+ # # # execute the physical pipelines
+ for(i in 1:length(physicalPipList))
+ {
+ physicalConf = as.frame(physicalPipList[i])
+ # # append configuration keys for extracting the pipeline later on
+ id = seq(1, nrow(physicalConf))
+ physicalConf = cbind(as.frame(id), physicalConf)
+ # # execute the physical instances and store the minimum scores, each pipeline is executed num_exec times
+ [outPip,outHp, feaFrameOuter] = bandit::run_with_hyperparam(physicalConf, num_exec, X, y, metaList,
+ targetList, param, as.frame(""), verbose)
+ # # sort the configurations groupwise
+ max_perf = bandit::getMaxPerConf(outPip, nrow(physicalConf))
+ scores[i] = as.matrix(max_perf[1, 1])
+ }
+
+ # # select parents and best score
+ selected = order(target = scores, by = 1, decreasing=TRUE, index.return=TRUE)
+ idxR = as.scalar(selected[1, 1])
+ best_score = as.scalar(scores[idxR])
+ if(verbose)
+ {
+ print("best score "+best_score)
+ print("previous score "+pre_best)
+ }
+
+ converge = ifelse(pre_best > best_score, TRUE, FALSE)
+ if(converge) {
+ convergedOuter = TRUE
+ print("----------- converged after "+iter+" iteration-------------")
+ print("best score "+pre_best)
+ print("best pipeline "+toString(bestLg))
+ }
+ else
+ {
+ pre_best = best_score
+ idxC = as.integer(as.scalar(population[idxR, 1])) + 1
+ bestLg = population[idxR, 2:idxC]
+ }
+
+ # # # if new best is not better than pre_best then no need od generating new population
+ children = frame(0, rows=n_pop, cols=pipLength+1)
+ CROSS_OVER_RATE = 2
+ i = 1
+ while(i <= n_pop & !converge)
+ {
+ p1 = population[as.scalar(selected[i]), ]
+ p2 = population[as.scalar(selected[i+1]), ]
+ lengthp1 = as.integer(as.scalar(p1[1, 1]))
+ lengthp2 = as.integer(as.scalar(p2[1, 1]))
+ p1 = p1[, 2:(lengthp1+1)]
+ p2 = p2[, 2:(lengthp2+1)]
+ # # # cross over, this constrained crossover will only add first operator from each parent to child
+
+ if(lengthp1 >= 5 & (lengthp1 + CROSS_OVER_RATE) < pipLength) #check if pipeline is less than 5 operation only crossover one
+ c1 = cbind(p1[1,1:CROSS_OVER_RATE], p2) # operator so the probability of swapping pca and dummycoding is
+ else if ((lengthp1 + 1) < pipLength) # low and the crossover all should not exceed pipeline total length
+ c1 = cbind(p1[1,1], p2)
+
+ if(lengthp2 >= 5 & (lengthp2 + CROSS_OVER_RATE) < pipLength)
+ c2 = cbind(p2[1,1:CROSS_OVER_RATE], p1)
+ else if ((lengthp2 + 1) < pipLength)
+ c2 = cbind(p2[1,1], p1)
+
+ # # # mutation swap the operators at random positions if the length is greater than 5
+ if(ncol(c1) >= 5)
+ {
+ r = sample(3, 2)
+ r1 = as.scalar(r[1,1])
+ r2 = as.scalar(r[2,1])
+ temp = c1[1, r1]
+ c1[1, r1] = c1[1, r2]
+ c1[1, r2] = temp
+ }
+ if(ncol(c2) >= 5)
+ {
+ r = sample(3, 2)
+ r1 = as.scalar(r[1,1])
+ r2 = as.scalar(r[2,1])
+ temp = c2[1, r1]
+ c2[1, r1] = c2[1, r2]
+ c2[1, r2] = temp
+ }
+ # # # append length of pipeline and pipeline in frame
+ children[i, 1] = ncol(c1)
+ children[i, 2:(ncol(c1) + 1)] = c1
+ children[i+1, 1] = ncol(c2)
+ children[i+1, 2:(ncol(c2) + 1)] = c2
+
+ i = i + 2
+ }
+ population = children
+ }
+ T = floor((time() - t1) / 1e+6)
+ print("time "+T+" ms")
+}
+
diff --git a/scripts/pipelines/scripts/logicalFunc.dml b/scripts/pipelines/scripts/logicalFunc.dml
index 0ddc1bb..a984273 100644
--- a/scripts/pipelines/scripts/logicalFunc.dml
+++ b/scripts/pipelines/scripts/logicalFunc.dml
@@ -105,9 +105,9 @@ return(Frame[Unknown] transformLogical) {
}
transformLogical[1, 1:ncol(seed)] = seed
transformLogical = map(transformLogical, "var -> var.replace(\"0\", \"\")")
- transformLogical = utils::frameRemoveEmpty(target=transformLogical, margin="cols", select=as.matrix(0))
+ transformLogical = utils::frameRemoveEmpty(target=transformLogical, marginParam="cols", select=as.matrix(0))
if(nrow(transformLogical) > 1)
- transformLogical = utils::frameRemoveEmpty(target=transformLogical, margin="rows", select=as.matrix(0))
+ transformLogical = utils::frameRemoveEmpty(target=transformLogical, marginParam="rows", select=as.matrix(0))
}
diff --git a/scripts/pipelines/scripts/utils.dml b/scripts/pipelines/scripts/utils.dml
index 8b23536..17186ab 100644
--- a/scripts/pipelines/scripts/utils.dml
+++ b/scripts/pipelines/scripts/utils.dml
@@ -22,7 +22,7 @@ source("scripts/builtin/bandit.dml") as bandit;
# remove empty wrapper for frames
-frameRemoveEmpty = function(Frame[Unknown] target, String margin, Matrix[Double] select)
+frameRemoveEmpty = function(Frame[Unknown] target, String marginParam, Matrix[Double] select)
return (Frame[Unknown] frameblock)
{
idx = seq(1, ncol(target))
@@ -32,11 +32,19 @@ return (Frame[Unknown] frameblock)
jspecR = "{ids:true, recode:["+index+"]}";
[Xd, M] = transformencode(target=target, spec=jspecR);
X = replace(target=Xd, pattern = NaN, replacement=0)
- if(nrow(select) > 1)
- X = removeEmpty(target = X, margin = margin, select = select)
- else
- X = removeEmpty(target = X, margin = margin)
-
+ if(nrow(select) > 1 ) {
+ # TODO fix removeEmpty Spark instruction to accept margin as a variable for now only support literal
+ if(marginParam == "rows")
+ X = removeEmpty(target = X, margin = "rows", select = select)
+ else
+ X = removeEmpty(target = X, margin = "cols", select = select)
+ }
+ else {
+ if(marginParam == "rows")
+ X = removeEmpty(target = X, margin = "rows")
+ else
+ X = removeEmpty(target = X, margin = "cols")
+ }
frameblock = transformdecode(target = Xd, spec = jspecR, meta = M)
frameblock = frameblock[1:nrow(X), 1:ncol(X)]
}
@@ -115,7 +123,7 @@ classifyDirty = function(Matrix[Double] Xtrain, Matrix[Double] ytrain, Matrix[Do
# # classify without cleaning fill with edfault values 1
Xtrain = replace(target = Xtrain, pattern = NaN, replacement=0)
dX_train = dummycoding(Xtrain, mask)
- accuracy = bandit::crossV(Xtrain, ytrain, cv, mask, opt, isWeighted)
+ [accuracy, T] = bandit::crossV(Xtrain, ytrain, cv, mask, opt, isWeighted)
accuracy = mean(accuracy)
print("cross validated dirty accuracy "+accuracy)
}
diff --git a/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestClassification.java b/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestClassification.java
index 1a6e4b7..7c4871f 100644
--- a/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestClassification.java
+++ b/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestClassification.java
@@ -32,15 +32,15 @@ public class CleaningTestClassification extends AutomatedTestBase {
private final static String TEST_NAME2 = "compareAccuracy";
private final static String TEST_CLASS_DIR = SCRIPT_DIR + CleaningTestClassification.class.getSimpleName() + "/";
- protected static final String RESOURCE = SCRIPT_DIR+"functions/pipelines/";
- protected static final String DATA_DIR = DATASET_DIR+ "pipelines/";
+ private static final String RESOURCE = SCRIPT_DIR+"functions/pipelines/";
+ private static final String DATA_DIR = DATASET_DIR+ "pipelines/";
private final static String DIRTY = DATA_DIR+ "dirty.csv";
private final static String CLEAN = DATA_DIR+ "clean.csv";
private final static String META = RESOURCE+ "meta/meta_census.csv";
private final static String OUTPUT = RESOURCE+"intermediates/";
- protected static final String PARAM_DIR = "./scripts/pipelines/properties/";
+ private static final String PARAM_DIR = "./scripts/pipelines/properties/";
private final static String PARAM = PARAM_DIR + "param.csv";
private final static String PRIMITIVES = PARAM_DIR + "primitives.csv";
@@ -51,13 +51,13 @@ public class CleaningTestClassification extends AutomatedTestBase {
}
@Ignore
- public void testCP1() {
+ public void testFindBestPipeline() {
runFindPipelineTest(0.1, 5,10, 2,
true, "classification", Types.ExecMode.SINGLE_NODE);
}
@Test
- public void testCP2() {
+ public void testCompareRepairs() {
runCleanAndCompareTest( Types.ExecMode.SINGLE_NODE);
}
@@ -70,11 +70,10 @@ public class CleaningTestClassification extends AutomatedTestBase {
try {
loadTestConfiguration(getTestConfiguration(TEST_NAME1));
fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
- programArgs = new String[] {"-stats", "-exec", "singlenode", "-nvargs", "dirtyData="+DIRTY, "metaData="+META,
- "primitives="+PRIMITIVES, "parameters="+PARAM, "sampleSize="+String.valueOf(sample),
- "topk="+String.valueOf(topk), "rv="+String.valueOf(resources), "cv="+String.valueOf(crossfold),
- "weighted="+ String.valueOf(weightedAccuracy), "output="+OUTPUT, "target="+target, "cleanData="+CLEAN,
- "O="+output("O")};
+ programArgs = new String[] {"-stats", "-exec", "singlenode", "-nvargs", "dirtyData="+DIRTY,
+ "metaData="+META, "primitives="+PRIMITIVES, "parameters="+PARAM, "sampleSize="+ sample,
+ "topk="+ topk, "rv="+ resources, "cv="+ crossfold, "weighted="+ weightedAccuracy,
+ "output="+OUTPUT, "target="+target, "cleanData="+CLEAN, "O="+output("O")};
runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
diff --git a/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestCompare.java b/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestCompare.java
index 11868df..d7160be 100644
--- a/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestCompare.java
+++ b/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestCompare.java
@@ -46,7 +46,6 @@ public class CleaningTestCompare extends AutomatedTestBase {
addTestConfiguration(TEST_NAME1,new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1,new String[]{"R"}));
}
-
@Test
public void testCP1() {
runFindPipelineTest(5,10, 2,
@@ -62,8 +61,8 @@ public class CleaningTestCompare extends AutomatedTestBase {
try {
loadTestConfiguration(getTestConfiguration(TEST_NAME1));
fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
- programArgs = new String[] {"-stats", "-exec", "singlenode", "-nvargs", "dirtyData="+DIRTY, "metaData="+META,
- "primitives="+PRIMITIVES, "parameters="+PARAM, "topk="+String.valueOf(topk), "rv="+String.valueOf(resources),
+ programArgs = new String[] {"-stats", "-exec", "singlenode", "-nvargs", "dirtyData="+DIRTY,
+ "metaData="+META, "primitives="+PRIMITIVES, "parameters="+PARAM, "topk="+ topk, "rv="+ resources,
"output="+OUTPUT, "target="+target, "cleanData="+CLEAN, "O="+output("O")};
runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
diff --git a/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestCompare.java b/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestLogical.java
similarity index 53%
copy from src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestCompare.java
copy to src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestLogical.java
index 11868df..69909b7 100644
--- a/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestCompare.java
+++ b/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestLogical.java
@@ -24,49 +24,58 @@ import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestConfiguration;
import org.apache.sysds.test.TestUtils;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
-public class CleaningTestCompare extends AutomatedTestBase {
- private final static String TEST_NAME1 = "testCompare";
- private final static String TEST_CLASS_DIR = SCRIPT_DIR + CleaningTestCompare.class.getSimpleName() + "/";
+public class CleaningTestLogical extends AutomatedTestBase {
+ private final static String TEST_NAME = "testLogical";
+ private final static String TEST_CLASS_DIR = SCRIPT_DIR + CleaningTestLogical.class.getSimpleName() + "/";
- protected static final String RESOURCE = SCRIPT_DIR+"functions/pipelines/";
+ private static final String RESOURCE = SCRIPT_DIR+"functions/pipelines/";
+ private static final String DATA_DIR = DATASET_DIR+ "pipelines/";
- private final static String DIRTY = DATASET_DIR+ "pipelines/dirty.csv";
- private final static String CLEAN = DATASET_DIR+ "pipelines/clean.csv";
+ private final static String DIRTY = DATA_DIR+ "dirty.csv";
+ private final static String CLEAN = DATA_DIR+ "clean.csv";
private final static String META = RESOURCE+ "meta/meta_census.csv";
- private final static String OUTPUT = RESOURCE+"intermediates/";
- protected static final String PARAM_DIR = "./scripts/pipelines/properties/";
+ private static final String PARAM_DIR = "./scripts/pipelines/properties/";
private final static String PARAM = PARAM_DIR + "param.csv";
private final static String PRIMITIVES = PARAM_DIR + "primitives.csv";
@Override
public void setUp() {
- addTestConfiguration(TEST_NAME1,new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1,new String[]{"R"}));
+ addTestConfiguration(TEST_NAME,new TestConfiguration(TEST_CLASS_DIR, TEST_NAME,new String[]{"R"}));
}
-
@Test
- public void testCP1() {
- runFindPipelineTest(5,10, 2,
- true, "compare", Types.ExecMode.SINGLE_NODE);
+ public void testLogical1() {
+ runTestLogical(2, 10, 2, 2, 2, 2,
+ "classification", Types.ExecMode.SINGLE_NODE);
+ }
+
+ @Ignore
+ public void testLogicalSP() {
+ runTestLogical(3, 10, 3, 2, 2, 4,
+ "classification", Types.ExecMode.SPARK);
}
- private void runFindPipelineTest(int topk, int resources, int crossfold,
- boolean weightedAccuracy, String target, Types.ExecMode et) {
+ private void runTestLogical(int max_iter, int pipelineLength, int crossfold,
+ int num_inst, int num_exec, int n_pop, String target, Types.ExecMode et) {
- setOutputBuffering(true);
+ // setOutputBuffering(true);
String HOME = SCRIPT_DIR+"functions/pipelines/" ;
Types.ExecMode modeOld = setExecMode(et);
try {
- loadTestConfiguration(getTestConfiguration(TEST_NAME1));
- fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
- programArgs = new String[] {"-stats", "-exec", "singlenode", "-nvargs", "dirtyData="+DIRTY, "metaData="+META,
- "primitives="+PRIMITIVES, "parameters="+PARAM, "topk="+String.valueOf(topk), "rv="+String.valueOf(resources),
- "output="+OUTPUT, "target="+target, "cleanData="+CLEAN, "O="+output("O")};
+ loadTestConfiguration(getTestConfiguration(TEST_NAME));
+ fullDMLScriptName = HOME + TEST_NAME + ".dml";
+ programArgs = new String[] {"-stats", "-exec", "singlenode", "-nvargs", "dirtyData="+DIRTY,
+ "metaData="+META, "primitives="+PRIMITIVES, "parameters="+PARAM, "max_iter="+ max_iter,
+ "pipLength="+ pipelineLength, "cv="+ crossfold, "num_inst="+ num_inst, "num_exec="+ num_exec,
+ "n_pop="+ n_pop,"target="+target, "cleanData="+CLEAN, "O="+output("O")};
runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
+
+ //expected loss smaller than default invocation
Assert.assertTrue(TestUtils.readDMLBoolean(output("O")));
}
finally {
diff --git a/src/test/scripts/functions/pipelines/testLogical.dml b/src/test/scripts/functions/pipelines/testLogical.dml
new file mode 100644
index 0000000..d0c7bf5
--- /dev/null
+++ b/src/test/scripts/functions/pipelines/testLogical.dml
@@ -0,0 +1,122 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+# Generate the logical pipelines for data cleaning
+
+source("scripts/pipelines/scripts/utils.dml") as utils;
+source("scripts/pipelines/scripts/logicalFunc.dml") as logical;
+source("scripts/pipelines/scripts/gridsearchMLR.dml") as gs;
+source("scripts/pipelines/scripts/enumerateLogical.dml") as lg;
+
+
+# read the inputs
+X = read($dirtyData, data_type="frame", format="csv", header=TRUE,
+ naStrings= ["NA", "null"," ","NaN", "nan", "", "?", "99999"]);
+
+metaInfo = read($metaData, data_type="frame", format="csv", header=FALSE);
+primitives = read($primitives, data_type = "frame", format="csv", header= TRUE)
+param = read($parameters, data_type = "frame", format="csv", header= TRUE)
+weightedAccuracy = FALSE # accuracy flag
+targetApplicaton = $target # accuracy flag
+
+max_iter = $max_iter
+num_inst = $num_inst
+num_exec = $num_exec
+n_pop=$n_pop
+pipLength = $pipLength
+crossValidations = $cv
+
+
+getSchema = metaInfo[1, 2:ncol(metaInfo)]
+getMask = as.matrix(metaInfo[2, 2:ncol(metaInfo)])
+getFdMask = as.matrix(metaInfo[3, 2:ncol(metaInfo)]) # columns of interest for FD computation
+
+
+# encode the categorical data
+if(sum(getMask) > 0)
+{
+ # always recode the label
+ index = vectorToCsv(getMask)
+ jspecR = "{ids:true, recode:["+index+"]}"
+ [eX, X_meta] = transformencode(target=X, spec=jspecR);
+ # change the schema to reflect the encoded values
+ getSchema = map(getSchema, "x->x.replace(\"STRING\", \"INT64\")")
+ getSchema = map(getSchema, "x->x.replace(\"BOOLEAN\", \"INT64\")")
+
+}
+# if no categorical value exist then just cast the frame into matrix
+else
+ eX = as.matrix(X)
+
+# extract the class label
+eY = eX[, ncol(eX)]
+eX = eX[, 1:ncol(eX) - 1]
+
+getMask = getMask[, 1:ncol(getMask) - 1] # strip the mask of class label
+getFdMask = getFdMask[, 1:ncol(getFdMask) - 1] # strip the mask of class label
+getSchema = getSchema[, 1:ncol(getSchema) - 1] # strip the mask of class label
+# hyperparam for classifier
+opt = matrix("0 100", rows=1, cols=2)
+
+# get the cross validated accuracy on dirty dataset (only on training set)
+d_accuracy = 0
+d_accuracy = utils::classifyDirty(eX, eY, opt, getMask, weightedAccuracy, crossValidations)
+
+# get FD for IC operations
+FD = discoverFD(X=replace(target=eX, pattern=NaN, replacement=1), Mask=getFdMask, threshold=0.8)
+FD = (diag(matrix(1, rows=nrow(FD), cols=1)) ==0) * FD
+FD = FD > 0
+
+metaList = list(mask=getMask, schema=getSchema, fd=FD)
+targetClassification = list(target=targetApplicaton, cv=crossValidations, wAccuracy=weightedAccuracy,
+ dirAcc = d_accuracy, mlHp = opt, cleanData = as.matrix(0))
+
+# # initialize output variables
+pip = as.frame("NULL"); hp = matrix(0,0,0); acc = matrix(0,0,0); features = as.frame("NULL")
+
+
+logical1 = frame(["4", "MVI", "SCALE", "DUMMY", "DIM", "0", "0", "0"], rows=1, cols=8)
+# logical2 = frame(["2", "MVI", "DUMMY", "0", "0", "0", "0", "0"], rows=1, cols=8)
+logical3 = frame(["3", "MVI", "SCALE", "DUMMY", "0", "0", "0", "0"], rows=1, cols=8)
+logical4 = frame(["6", "MVI", "OTLR", "CI", "SCALE", "DUMMY", "DIM", "0"], rows=1, cols=8)
+logical5 = frame(["7", "MVI", "OTLR", "MVI", "CI", "SCALE", "DUMMY", "DIM"], rows=1, cols=8)
+logical6 = frame(["6", "OTLR", "MVI", "CI", "SCALE", "DUMMY", "DIM", "0"], rows=1, cols=8)
+
+# log = rbind(logical1, logical2)
+log = rbind(logical1, logical3)
+log = rbind(log, logical4)
+log = rbind(log, logical5)
+log = rbind(log, logical6)
+
+[logicalEnum, score, T] = lg::enumerateLogical(X=eX, y=eY, population=log, max_iter=max_iter, pipLength=pipLength, metaList=metaList,
+ targetList=targetClassification, primitives=primitives, param=param, num_inst=num_inst, num_exec=num_exec, n_pop=n_pop, verbose=FALSE)
+# [logicalEnum, score, T] = lg::enumerateLogical(X=eX, y=eY, population=log, max_iter=3, pipLength=10, metaList=metaList,
+ # targetList=targetClassification, primitives=primitives, param=param, num_inst=4, num_exec=2, n_pop=4, verbose=FALSE)
+
+print("score of pipeline: "+toString(score)+" in "+(T/60000)+" mins")
+print("logicalENum "+toString(logicalEnum))
+
+result = d_accuracy < score
+print("result satisfied ------------"+result)
+
+write(result , $O)
+
+
+