You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ss...@apache.org on 2021/05/04 09:18:25 UTC

[systemds] branch master updated: [SYSTEMDS-2962] Initial implementation of Logical Pipelines optimizer Optimizer is based on evolutionary algorithm with constrained crossover and mutation this commit also contains some minor formatting cleanups in pipelines package Closes #1249.

This is an automated email from the ASF dual-hosted git repository.

ssiddiqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 92cda3f  [SYSTEMDS-2962] Initial implementation of Logical Pipelines optimizer   Optimizer is based on evolutionary algorithm with constrained crossover and mutation   this commit also contains some minor formatting cleanups in pipelines package Closes #1249.
92cda3f is described below

commit 92cda3f811ccf0b1cbb326fa0f96fddf26a0d8e5
Author: Shafaq Siddiqi <sh...@tugraz.at>
AuthorDate: Tue May 4 11:16:40 2021 +0200

    [SYSTEMDS-2962] Initial implementation of Logical Pipelines optimizer
      Optimizer is based on evolutionary algorithm with constrained crossover and mutation
      this commit also contains some minor formatting cleanups in pipelines package
    Closes #1249.
---
 scripts/builtin/bandit.dml                         |  69 +++++----
 scripts/pipelines/scripts/enumerateLogical.dml     | 172 +++++++++++++++++++++
 scripts/pipelines/scripts/logicalFunc.dml          |   4 +-
 scripts/pipelines/scripts/utils.dml                |  22 ++-
 .../pipelines/CleaningTestClassification.java      |  19 ++-
 .../functions/pipelines/CleaningTestCompare.java   |   5 +-
 ...ngTestCompare.java => CleaningTestLogical.java} |  51 +++---
 .../scripts/functions/pipelines/testLogical.dml    | 122 +++++++++++++++
 8 files changed, 391 insertions(+), 73 deletions(-)

diff --git a/scripts/builtin/bandit.dml b/scripts/builtin/bandit.dml
index 687d76b..ec93e7e 100644
--- a/scripts/builtin/bandit.dml
+++ b/scripts/builtin/bandit.dml
@@ -25,6 +25,7 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, List[Unknown
 {
   print("Starting optimizer")
   NUM_FEATURES = 14
+  HYPERPARAM_LENGTH = 110
   print("null in data "+sum(is.na(X_train)))
   bestPipeline = frame("", rows=1, cols=1)
   bestHyperparams = as.matrix(0)
@@ -36,7 +37,7 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, List[Unknown
   B = (s_max + 1) * R;
   
   # initialize output variables
-  hparam = matrix(0, rows=k*(s_max+1), cols=100)
+  hparam = matrix(0, rows=k*(s_max+1), cols=HYPERPARAM_LENGTH)
   pipeline = frame(0, rows=k*(s_max+1), cols=ncol(lp)+1)
   startOut=0; endOut=0;
   feaFrameOuter = frame(data=["#MissingValues", "MinVla", "MaxVal", "AverageMin", "AverageMax", 
@@ -46,7 +47,7 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, List[Unknown
   for(s in s_max:0) {
     
    # result variables
-    bracket_hp = matrix(0, rows=k*(s+1)+k, cols=100)
+    bracket_hp = matrix(0, rows=k*(s+1)+k, cols=HYPERPARAM_LENGTH)
     bracket_pipel = matrix(0, rows=k*(s+1)+k, cols=3)
     start=1; end=0;
     
@@ -90,8 +91,8 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, List[Unknown
       bracket_hp[start:end, 1:ncol(b)] =  b[1:rowIndex,]
       start = end + 1
 
-      # sort the configurations fro successive halving
-      avergae_perf =  getMaxPerConf(outPip) 
+      # sort the configurations for successive halving
+      avergae_perf =  getMaxPerConf(outPip, nrow(configurations)) 
       configurations = frameSort(cbind(avergae_perf, configurations))
       configurations = configurations[, 2:ncol(configurations)]
     }
@@ -201,7 +202,7 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
    List[Unknown] targetList, Frame[Unknown] param, Frame[Unknown] featureFrameOuter, Boolean verbose)                    
   return (Matrix[Double] output_operator, Matrix[Double] output_hyperparam, Frame[Unknown] featureFrameOuter) {
 
-  output_hp = matrix(0, nrow(ph_pip)*r_i, 60)
+  output_hp = matrix(0, nrow(ph_pip)*r_i, 100)
   output_accuracy = matrix(0, nrow(ph_pip)*r_i, 1)
   output_pipelines = matrix(0, nrow(ph_pip)*r_i, 2)
   
@@ -218,7 +219,8 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
   {
     # execute configurations with r resources
     [hp, no_of_res, no_of_flag_vars] = getHyperparam(ph_pip[i], param, r_i)
-    feaFrame = frame("", rows = no_of_res, cols = ncol(featureFrameOuter))
+    if(ncol(featureFrameOuter) > 1)
+      feaFrame = frame("", rows = no_of_res, cols = ncol(featureFrameOuter))
     pip_toString = pipToString(ph_pip[i])
     for(r in 1:no_of_res)
     { 
@@ -239,20 +241,24 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
       hp_vec = cbind(matrix_width, matrix(hp_matrix, rows=1, cols=nrow(hp_matrix)*ncol(hp_matrix), byrow=TRUE))
       output_accuracy[index, 1] = accuracy
       output_hp[index, 1:ncol(hp_vec)] = hp_vec
-      output_pipelines[index, ] = cbind(as.matrix(i), id[i,1])
+      output_pipelines[index, ] = cbind(as.matrix(index), id[i,1])
       X = clone_X
       Y = clone_Y
-      index = index + 1  
-      feaFrame[r, 1:ncol(feaVec)] = as.frame(feaVec)
-      feaFrame[r, (ncol(feaVec)+1)] = pip_toString
-      feaFrame[r, (ncol(feaVec)+2)] = accuracy
-      feaFrame[r, (ncol(feaVec)+3)] = T
-      feaFrame[r, (ncol(feaVec)+4)] = accT
+      index = index + 1 
+      
+      if(ncol(featureFrameOuter) > 1) {
+        feaFrame[r, 1:ncol(feaVec)] = as.frame(feaVec)
+        feaFrame[r, (ncol(feaVec)+1)] = pip_toString
+        feaFrame[r, (ncol(feaVec)+2)] = accuracy
+        feaFrame[r, (ncol(feaVec)+3)] = T
+        feaFrame[r, (ncol(feaVec)+4)] = accT
+      }
     }
     
     X = clone_X
     Y = clone_Y
-    featureFrameOuter = rbind(featureFrameOuter, feaFrame)
+    if(ncol(featureFrameOuter) > 1) 
+      featureFrameOuter = rbind(featureFrameOuter, feaFrame)
   }
   output_hyperparam = removeEmpty(target=cbind(output_accuracy, output_hp), margin="rows")
   output_operator = removeEmpty(target=cbind(output_accuracy, output_pipelines) ,margin="rows")
@@ -316,23 +322,26 @@ getHyperparam = function(Frame[Unknown] pipeline, Frame[Unknown]  hpList, Intege
         maxVal = as.scalar(hpList[index, paramValIndex + 1])
         if(type == "FP") {
           val = rand(rows=no_of_res, cols=1, min=minVal,max=maxVal, pdf="uniform");
-          OpParam[, j] = val;
-        } else if(type == "INT") {
+          OpParam[, j] = val; 
+        }
+        else if(type == "INT") {
           val = sample(maxVal, no_of_res, TRUE);
           less_than_min = val < minVal;
           val = (less_than_min * minVal) + val;
-          OpParam[, j] = val
-        } else if(type == "BOOL") {
+          OpParam[, j] = val; 
+        }
+        else if(type == "BOOL") {
           if(maxVal == 1) {
             s = sample(2, no_of_res, TRUE);
             b = s - 1;
-            OpParam[, j] = b;
-          } else  
+            OpParam[, j] = b; 
+          } 
+          else  
             OpParam[, j] = matrix(0, rows=no_of_res, cols=1)
-        } else {
-          # TODO handle string set something like {,,}
-          print("invalid data type")
         }
+        else 
+          print("invalid data type")  # TODO handle string set something like {,,}
+          
         paramIdx = paramIdx + 2
         typeIdx = typeIdx + 1
       }
@@ -434,11 +443,11 @@ extractBracketWinners = function(Matrix[Double] pipeline, Matrix[Double] hyperpa
 ###########################################################################
 # The function will return the max performance by each individual pipeline
 ############################################################################
-getMaxPerConf = function(Matrix[Double] pipelines)
+getMaxPerConf = function(Matrix[Double] pipelines, Double size)
 return (Frame[Unknown] maxperconf)
 {
   tab = removeEmpty(target=table(pipelines[, 2], pipelines[, 3], pipelines[, 1]), margin="cols")
-  maxperconf = frame(0, rows=max(pipelines[, 2]), cols=1)
+  maxperconf = frame(0, rows=size, cols=1)
   maxperconf[1:ncol(tab),] = as.frame(t(colMaxs(tab)))
 }
 
@@ -457,9 +466,7 @@ fclassify = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Ma
   else { 
     print("STARTING "+cv+" CROSS VALIDATIONS")
     # do the k = 3 cross validations
-    t1 = time()
-    accuracyMatrix = crossV(X, Y, cv, mask, MLhp, isWeighted)
-    T = floor((time() - t1) / 1e+6)
+    [accuracyMatrix, T] = crossV(X, Y, cv, mask, MLhp, isWeighted)
     accuracyMatrix = removeEmpty(target=accuracyMatrix, margin="rows")
     acc = colMeans(accuracyMatrix)
     accuracy = as.scalar(acc[1,1])
@@ -477,8 +484,9 @@ fclassify = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Ma
 
 crossV = function(Matrix[double] X, Matrix[double] y, Integer k, Matrix[Double] mask,
   Matrix[Double] MLhp, Boolean isWeighted) 
-return (Matrix[Double] accuracyMatrix)
+return (Matrix[Double] accuracyMatrix, Double T)
 {
+  t1 = time()
   accuracyMatrix = matrix(0, k, 1)
   dataList = list()
   testL = list()
@@ -523,6 +531,7 @@ return (Matrix[Double] accuracyMatrix)
     accuracy = getAccuracy(testy, yhat, isWeighted)
     accuracyMatrix[i] = accuracy
   }
+  T = floor((time() - t1) / 1e+6)
 }
 
 
@@ -603,7 +612,7 @@ return (Double precision, Double T)
   match = (abs(cleanX - fixedX) < 0.001) * correctionsRequired
   print("total matches "+sum(match))
   # print("total matches \n"+toString(match))
-  precision = max(0.001, sum(match) / correctionsMade)
+  precision = max(0.001, sum(match) / max(1, correctionsMade))
   T = floor((time() - t1) / 1e+6)
   print("Precision: "+toString(precision) + " in "+T+" ms")
 
diff --git a/scripts/pipelines/scripts/enumerateLogical.dml b/scripts/pipelines/scripts/enumerateLogical.dml
new file mode 100644
index 0000000..1cb0ca3
--- /dev/null
+++ b/scripts/pipelines/scripts/enumerateLogical.dml
@@ -0,0 +1,172 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+# Generate the logical pipelines using basic evolutionary algorithm, 
+# population -> logical Pipeline, chromosome -> physical pipeline, gene -> hp
+# input :
+# 1. Dataset X
+# 2. population, different logical seed pipelines format = [number of operators, op1, op2, op3 ..., opn]
+# 3. number of iterations
+# 4. pipLength, length of pipeline, number of operator in each pipeline
+# 5. meta data list i.e., schema, mask, fdmask
+# 6. target list i.e, target application, cv value etc.
+# 7. primitives, physical operator list
+# 8. param, physical operator hyperparameters
+# 9. num_inst value, number of physical instances for each logical to be executed
+# 10. num_exec, how many times each physical pipeline should be executed
+# 11. n_pop, children created in each generation
+# output: best logical pipeline and evaluation time in ms
+
+
+# idea is to get the initial set of logical pipelines, as population, then get the num_inst physical pipelines foreach
+# logical pipeline in population. Then execute these physical pipelines num_exec time were in each execution a random set of
+# hyperparameters is used to execute operators. The compute a score vector by storing the best score foreach logical pipeline in 
+# population. Sort the pipelines by score and take n_pop pipelines as parents for generating new population.
+# from the selected pipelines take a pair in each iteration as parent and generate a pair of children by doing crossover and mutation.
+# In crossover make a child by taking some operation from p1 and some operations from p2 and in mutation randomly swap the 
+# operators in children. There new children will be the population in next iteration. Repeat the process max_iter time.
+# Converge in between if the best_score of previous generation is better then best_score of new generation.
+
+source("scripts/builtin/bandit.dml") as bandit;
+source("scripts/pipelines/scripts/utils.dml") as utils;
+
+enumerateLogical = function(Matrix[Double] X, Matrix[Double] y, Frame[Unknown] population, Integer max_iter=10,
+  Integer pipLength, List[Unknown] metaList, List[Unknown] targetList, Frame[Unknown] primitives, Frame[Unknown] param,
+  Integer num_inst, Integer num_exec, Integer n_pop, Boolean verbose)
+return (Frame[Unknown] bestLg, Double pre_best, Double T)
+{ 
+
+  t1 = time()
+  bestLg = as.frame("")
+  best_score = 0
+  pre_best = 0
+  feaFrameOuter = as.frame("NULL")
+  iter = 1
+  convergedOuter = FALSE
+  while(iter <= max_iter & !convergedOuter)
+  {
+    physicalPipList = list()
+    # # # get the physical instances from logical ones
+    for(i in 1:nrow(population))
+    { 
+      lv = as.integer(as.scalar(population[i, 1])) + 1
+      lp = population[i, 2:lv]
+      physicalConf = bandit::get_physical_configurations(lp, num_inst, primitives)
+      physicalPipList = append(physicalPipList, physicalConf)
+    }
+    
+    scores = matrix(0, rows=length(physicalPipList), cols=1)
+    
+    # # # execute the physical pipelines
+    for(i in 1:length(physicalPipList))
+    {
+      physicalConf = as.frame(physicalPipList[i])
+      # # append configuration keys for extracting the pipeline later on
+      id = seq(1, nrow(physicalConf))
+      physicalConf = cbind(as.frame(id), physicalConf)
+      # # execute the physical instances and store the minimum scores, each pipeline is executed num_exec times
+      [outPip,outHp, feaFrameOuter] = bandit::run_with_hyperparam(physicalConf, num_exec, X, y, metaList,
+        targetList, param, as.frame(""), verbose)
+      # # sort the configurations groupwise
+      max_perf =  bandit::getMaxPerConf(outPip, nrow(physicalConf)) 
+      scores[i] = as.matrix(max_perf[1, 1])
+    }
+    
+    # # select parents and best score
+    selected = order(target = scores, by = 1, decreasing=TRUE, index.return=TRUE)
+    idxR = as.scalar(selected[1, 1])
+    best_score = as.scalar(scores[idxR])
+    if(verbose)
+    {
+      print("best score "+best_score)
+      print("previous score "+pre_best)
+    }
+    
+    converge = ifelse(pre_best > best_score, TRUE, FALSE)
+    if(converge) {
+      convergedOuter = TRUE
+      print("----------- converged after "+iter+" iteration-------------")
+      print("best score "+pre_best)
+      print("best pipeline "+toString(bestLg))
+    }
+    else 
+    {
+      pre_best = best_score
+      idxC = as.integer(as.scalar(population[idxR, 1])) + 1
+      bestLg = population[idxR, 2:idxC]
+    }
+    
+    # # # if new best is not better than pre_best then no need od generating new population
+    children = frame(0, rows=n_pop, cols=pipLength+1)
+    CROSS_OVER_RATE = 2
+    i = 1
+    while(i <= n_pop & !converge)
+    {
+      p1 = population[as.scalar(selected[i]), ]
+      p2 = population[as.scalar(selected[i+1]), ]
+      lengthp1 = as.integer(as.scalar(p1[1, 1]))
+      lengthp2 = as.integer(as.scalar(p2[1, 1]))
+      p1 = p1[, 2:(lengthp1+1)]
+      p2 = p2[, 2:(lengthp2+1)]
+      # # # cross over, this constrained crossover will only add first operator from each parent to child
+      
+      if(lengthp1 >= 5 & (lengthp1 + CROSS_OVER_RATE) < pipLength) #check if pipeline is less than 5 operation only crossover one 
+        c1 = cbind(p1[1,1:CROSS_OVER_RATE], p2)                    # operator so the probability of swapping pca and dummycoding is 
+      else if ((lengthp1 + 1) < pipLength)          # low and the crossover all should not exceed pipeline total length 
+        c1 = cbind(p1[1,1], p2)
+        
+      if(lengthp2 >= 5 & (lengthp2 + CROSS_OVER_RATE) < pipLength)
+        c2 = cbind(p2[1,1:CROSS_OVER_RATE], p1)
+      else if ((lengthp2 + 1) < pipLength)
+        c2 = cbind(p2[1,1], p1)
+
+      # # # mutation swap the operators at random positions if the length is greater than 5
+      if(ncol(c1) >= 5)
+      {
+        r = sample(3, 2)
+        r1 = as.scalar(r[1,1])
+        r2 = as.scalar(r[2,1])
+        temp = c1[1, r1]
+        c1[1, r1] = c1[1, r2]
+        c1[1, r2] = temp
+      }
+      if(ncol(c2) >= 5)
+      {
+        r = sample(3, 2)
+        r1 = as.scalar(r[1,1])
+        r2 = as.scalar(r[2,1])
+        temp = c2[1, r1]
+        c2[1, r1] = c2[1, r2]
+        c2[1, r2] = temp
+      }
+      # # # append length of pipeline and pipeline in frame
+      children[i, 1] = ncol(c1)
+      children[i, 2:(ncol(c1) + 1)] = c1
+      children[i+1, 1] = ncol(c2)
+      children[i+1, 2:(ncol(c2) + 1)] = c2
+
+      i = i + 2
+    }
+    population = children
+  }
+  T = floor((time() - t1) / 1e+6)
+  print("time "+T+" ms")
+}
+
diff --git a/scripts/pipelines/scripts/logicalFunc.dml b/scripts/pipelines/scripts/logicalFunc.dml
index 0ddc1bb..a984273 100644
--- a/scripts/pipelines/scripts/logicalFunc.dml
+++ b/scripts/pipelines/scripts/logicalFunc.dml
@@ -105,9 +105,9 @@ return(Frame[Unknown] transformLogical) {
   }
   transformLogical[1, 1:ncol(seed)] = seed
   transformLogical = map(transformLogical, "var -> var.replace(\"0\", \"\")")
-  transformLogical = utils::frameRemoveEmpty(target=transformLogical, margin="cols", select=as.matrix(0))
+  transformLogical = utils::frameRemoveEmpty(target=transformLogical, marginParam="cols", select=as.matrix(0))
   if(nrow(transformLogical) > 1)
-    transformLogical = utils::frameRemoveEmpty(target=transformLogical, margin="rows", select=as.matrix(0))
+    transformLogical = utils::frameRemoveEmpty(target=transformLogical, marginParam="rows", select=as.matrix(0))
 
 }
 
diff --git a/scripts/pipelines/scripts/utils.dml b/scripts/pipelines/scripts/utils.dml
index 8b23536..17186ab 100644
--- a/scripts/pipelines/scripts/utils.dml
+++ b/scripts/pipelines/scripts/utils.dml
@@ -22,7 +22,7 @@ source("scripts/builtin/bandit.dml") as bandit;
 
 
 # remove empty wrapper for frames
-frameRemoveEmpty = function(Frame[Unknown] target, String margin, Matrix[Double] select)
+frameRemoveEmpty = function(Frame[Unknown] target, String marginParam, Matrix[Double] select)
 return (Frame[Unknown] frameblock)
 {
   idx = seq(1, ncol(target))
@@ -32,11 +32,19 @@ return (Frame[Unknown] frameblock)
   jspecR = "{ids:true, recode:["+index+"]}";
   [Xd, M] = transformencode(target=target, spec=jspecR);
   X = replace(target=Xd, pattern = NaN, replacement=0)
-  if(nrow(select) > 1)
-    X = removeEmpty(target = X, margin = margin, select = select)
-  else  
-    X = removeEmpty(target = X, margin = margin)
-    
+  if(nrow(select) > 1 ) {
+    # TODO fix removeEmpty Spark instruction to accept margin as a variable for now only support literal 
+    if(marginParam == "rows")
+      X = removeEmpty(target = X, margin = "rows", select = select)
+    else
+      X = removeEmpty(target = X, margin = "cols", select = select)
+  }
+  else { 
+    if(marginParam == "rows")
+      X = removeEmpty(target = X, margin = "rows")
+    else
+      X = removeEmpty(target = X, margin = "cols")
+  }
   frameblock = transformdecode(target = Xd, spec = jspecR, meta = M)
   frameblock = frameblock[1:nrow(X), 1:ncol(X)]
 }
@@ -115,7 +123,7 @@ classifyDirty = function(Matrix[Double] Xtrain, Matrix[Double] ytrain, Matrix[Do
   # # classify without cleaning fill with edfault values 1
   Xtrain = replace(target = Xtrain, pattern = NaN, replacement=0)
   dX_train = dummycoding(Xtrain, mask)
-  accuracy = bandit::crossV(Xtrain, ytrain, cv, mask, opt, isWeighted)
+  [accuracy, T] = bandit::crossV(Xtrain, ytrain, cv, mask, opt, isWeighted)
   accuracy = mean(accuracy)
   print("cross validated dirty accuracy "+accuracy)
 }
diff --git a/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestClassification.java b/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestClassification.java
index 1a6e4b7..7c4871f 100644
--- a/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestClassification.java
+++ b/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestClassification.java
@@ -32,15 +32,15 @@ public class CleaningTestClassification extends AutomatedTestBase {
 	private final static String TEST_NAME2 = "compareAccuracy";
 	private final static String TEST_CLASS_DIR = SCRIPT_DIR + CleaningTestClassification.class.getSimpleName() + "/";
 
-	protected static final String RESOURCE = SCRIPT_DIR+"functions/pipelines/";
-	protected static final String DATA_DIR = DATASET_DIR+ "pipelines/";
+	private static final String RESOURCE = SCRIPT_DIR+"functions/pipelines/";
+	private static final String DATA_DIR = DATASET_DIR+ "pipelines/";
 
 	private final static String DIRTY = DATA_DIR+ "dirty.csv";
 	private final static String CLEAN = DATA_DIR+ "clean.csv";
 	private final static String META = RESOURCE+ "meta/meta_census.csv";
 	private final static String OUTPUT = RESOURCE+"intermediates/";
 
-	protected static final String PARAM_DIR = "./scripts/pipelines/properties/";
+	private static final String PARAM_DIR = "./scripts/pipelines/properties/";
 	private final static String PARAM = PARAM_DIR + "param.csv";
 	private final static String PRIMITIVES = PARAM_DIR + "primitives.csv";
 
@@ -51,13 +51,13 @@ public class CleaningTestClassification extends AutomatedTestBase {
 	}
 
 	@Ignore
-	public void testCP1() {
+	public void testFindBestPipeline() {
 		runFindPipelineTest(0.1, 5,10, 2,
 			true, "classification", Types.ExecMode.SINGLE_NODE);
 	}
 
 	@Test
-	public void testCP2() {
+	public void testCompareRepairs() {
 		runCleanAndCompareTest( Types.ExecMode.SINGLE_NODE);
 	}
 
@@ -70,11 +70,10 @@ public class CleaningTestClassification extends AutomatedTestBase {
 		try {
 			loadTestConfiguration(getTestConfiguration(TEST_NAME1));
 			fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
-			programArgs = new String[] {"-stats", "-exec", "singlenode", "-nvargs", "dirtyData="+DIRTY, "metaData="+META,
-				"primitives="+PRIMITIVES, "parameters="+PARAM, "sampleSize="+String.valueOf(sample),
-				"topk="+String.valueOf(topk), "rv="+String.valueOf(resources), "cv="+String.valueOf(crossfold),
-				"weighted="+ String.valueOf(weightedAccuracy), "output="+OUTPUT, "target="+target, "cleanData="+CLEAN,
-				"O="+output("O")};
+			programArgs = new String[] {"-stats", "-exec", "singlenode", "-nvargs", "dirtyData="+DIRTY,
+				"metaData="+META, "primitives="+PRIMITIVES, "parameters="+PARAM, "sampleSize="+ sample,
+				"topk="+ topk, "rv="+ resources, "cv="+ crossfold, "weighted="+ weightedAccuracy,
+				"output="+OUTPUT, "target="+target, "cleanData="+CLEAN, "O="+output("O")};
 
 			runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
 
diff --git a/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestCompare.java b/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestCompare.java
index 11868df..d7160be 100644
--- a/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestCompare.java
+++ b/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestCompare.java
@@ -46,7 +46,6 @@ public class CleaningTestCompare extends AutomatedTestBase {
 		addTestConfiguration(TEST_NAME1,new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1,new String[]{"R"}));
 	}
 
-
 	@Test
 	public void testCP1() {
 		runFindPipelineTest(5,10, 2,
@@ -62,8 +61,8 @@ public class CleaningTestCompare extends AutomatedTestBase {
 		try {
 			loadTestConfiguration(getTestConfiguration(TEST_NAME1));
 			fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
-			programArgs = new String[] {"-stats", "-exec", "singlenode", "-nvargs", "dirtyData="+DIRTY, "metaData="+META,
-				"primitives="+PRIMITIVES, "parameters="+PARAM,  "topk="+String.valueOf(topk), "rv="+String.valueOf(resources),
+			programArgs = new String[] {"-stats", "-exec", "singlenode", "-nvargs", "dirtyData="+DIRTY,
+				"metaData="+META, "primitives="+PRIMITIVES, "parameters="+PARAM,  "topk="+ topk, "rv="+ resources,
 				"output="+OUTPUT, "target="+target, "cleanData="+CLEAN, "O="+output("O")};
 
 			runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
diff --git a/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestCompare.java b/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestLogical.java
similarity index 53%
copy from src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestCompare.java
copy to src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestLogical.java
index 11868df..69909b7 100644
--- a/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestCompare.java
+++ b/src/test/java/org/apache/sysds/test/functions/pipelines/CleaningTestLogical.java
@@ -24,49 +24,58 @@ import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
-public class CleaningTestCompare extends AutomatedTestBase {
-	private final static String TEST_NAME1 = "testCompare";
-	private final static String TEST_CLASS_DIR = SCRIPT_DIR + CleaningTestCompare.class.getSimpleName() + "/";
+public class CleaningTestLogical extends AutomatedTestBase {
+	private final static String TEST_NAME = "testLogical";
+	private final static String TEST_CLASS_DIR = SCRIPT_DIR + CleaningTestLogical.class.getSimpleName() + "/";
 
-	protected static final String RESOURCE = SCRIPT_DIR+"functions/pipelines/";
+	private static final String RESOURCE = SCRIPT_DIR+"functions/pipelines/";
+	private static final String DATA_DIR = DATASET_DIR+ "pipelines/";
 
-	private final static String DIRTY = DATASET_DIR+ "pipelines/dirty.csv";
-	private final static String CLEAN = DATASET_DIR+ "pipelines/clean.csv";
+	private final static String DIRTY = DATA_DIR+ "dirty.csv";
+	private final static String CLEAN = DATA_DIR+ "clean.csv";
 	private final static String META = RESOURCE+ "meta/meta_census.csv";
-	private final static String OUTPUT = RESOURCE+"intermediates/";
 
-	protected static final String PARAM_DIR = "./scripts/pipelines/properties/";
+	private static final String PARAM_DIR = "./scripts/pipelines/properties/";
 	private final static String PARAM = PARAM_DIR + "param.csv";
 	private final static String PRIMITIVES = PARAM_DIR + "primitives.csv";
 
 	@Override
 	public void setUp() {
-		addTestConfiguration(TEST_NAME1,new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1,new String[]{"R"}));
+		addTestConfiguration(TEST_NAME,new TestConfiguration(TEST_CLASS_DIR, TEST_NAME,new String[]{"R"}));
 	}
 
-
 	@Test
-	public void testCP1() {
-		runFindPipelineTest(5,10, 2,
-			true, "compare", Types.ExecMode.SINGLE_NODE);
+	public void testLogical1() {
+		runTestLogical(2, 10, 2, 2, 2, 2,
+			"classification", Types.ExecMode.SINGLE_NODE);
+	}
+
+	@Ignore
+	public void testLogicalSP() {
+		runTestLogical(3, 10, 3, 2, 2, 4,
+			"classification", Types.ExecMode.SPARK);
 	}
 
-	private void runFindPipelineTest(int topk, int resources, int crossfold,
-		boolean weightedAccuracy, String target, Types.ExecMode et) {
+	private void runTestLogical(int max_iter, int pipelineLength, int crossfold,
+		int num_inst, int num_exec, int n_pop, String target, Types.ExecMode et) {
 
-		setOutputBuffering(true);
+		//		setOutputBuffering(true);
 		String HOME = SCRIPT_DIR+"functions/pipelines/" ;
 		Types.ExecMode modeOld = setExecMode(et);
 		try {
-			loadTestConfiguration(getTestConfiguration(TEST_NAME1));
-			fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
-			programArgs = new String[] {"-stats", "-exec", "singlenode", "-nvargs", "dirtyData="+DIRTY, "metaData="+META,
-				"primitives="+PRIMITIVES, "parameters="+PARAM,  "topk="+String.valueOf(topk), "rv="+String.valueOf(resources),
-				"output="+OUTPUT, "target="+target, "cleanData="+CLEAN, "O="+output("O")};
+			loadTestConfiguration(getTestConfiguration(TEST_NAME));
+			fullDMLScriptName = HOME + TEST_NAME + ".dml";
+			programArgs = new String[] {"-stats", "-exec", "singlenode", "-nvargs", "dirtyData="+DIRTY,
+				"metaData="+META, "primitives="+PRIMITIVES, "parameters="+PARAM, "max_iter="+ max_iter,
+				 "pipLength="+ pipelineLength, "cv="+ crossfold, "num_inst="+ num_inst, "num_exec="+ num_exec,
+				"n_pop="+ n_pop,"target="+target, "cleanData="+CLEAN, "O="+output("O")};
 
 			runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
+
+			//expected loss smaller than default invocation
 			Assert.assertTrue(TestUtils.readDMLBoolean(output("O")));
 		}
 		finally {
diff --git a/src/test/scripts/functions/pipelines/testLogical.dml b/src/test/scripts/functions/pipelines/testLogical.dml
new file mode 100644
index 0000000..d0c7bf5
--- /dev/null
+++ b/src/test/scripts/functions/pipelines/testLogical.dml
@@ -0,0 +1,122 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+# Generate the logical pipelines for data cleaning
+
+source("scripts/pipelines/scripts/utils.dml") as utils;
+source("scripts/pipelines/scripts/logicalFunc.dml") as logical;
+source("scripts/pipelines/scripts/gridsearchMLR.dml") as gs;
+source("scripts/pipelines/scripts/enumerateLogical.dml") as lg;
+
+
+# read the inputs
+X = read($dirtyData, data_type="frame", format="csv", header=TRUE, 
+  naStrings= ["NA", "null","  ","NaN", "nan", "", "?", "99999"]);
+
+metaInfo = read($metaData, data_type="frame", format="csv", header=FALSE);
+primitives = read($primitives, data_type = "frame", format="csv", header= TRUE)
+param = read($parameters, data_type = "frame", format="csv", header= TRUE)
+weightedAccuracy = FALSE # accuracy flag
+targetApplicaton = $target # accuracy flag
+
+max_iter = $max_iter
+num_inst = $num_inst
+num_exec = $num_exec
+n_pop=$n_pop
+pipLength = $pipLength
+crossValidations = $cv
+
+
+getSchema = metaInfo[1, 2:ncol(metaInfo)]
+getMask = as.matrix(metaInfo[2, 2:ncol(metaInfo)])
+getFdMask = as.matrix(metaInfo[3, 2:ncol(metaInfo)]) # columns of interest for FD computation
+  
+
+# encode the categorical data
+if(sum(getMask) > 0)
+{
+  # always recode the label
+  index = vectorToCsv(getMask)
+  jspecR = "{ids:true, recode:["+index+"]}"
+  [eX, X_meta] = transformencode(target=X, spec=jspecR);
+  # change the schema to reflect the encoded values
+  getSchema = map(getSchema, "x->x.replace(\"STRING\", \"INT64\")")
+  getSchema = map(getSchema, "x->x.replace(\"BOOLEAN\", \"INT64\")")
+
+} 
+# if no categorical value exist then just cast the frame into matrix
+else
+  eX = as.matrix(X)
+  
+# extract the class label  
+eY = eX[, ncol(eX)]
+eX = eX[, 1:ncol(eX) - 1]
+
+getMask = getMask[, 1:ncol(getMask) - 1] # strip the mask of class label
+getFdMask = getFdMask[, 1:ncol(getFdMask) - 1] # strip the mask of class label
+getSchema = getSchema[, 1:ncol(getSchema) - 1] # strip the mask of class label
+# hyperparam for classifier
+opt = matrix("0 100", rows=1, cols=2)
+
+# get the cross validated accuracy on dirty dataset (only on training set)
+d_accuracy = 0
+d_accuracy = utils::classifyDirty(eX, eY, opt, getMask, weightedAccuracy, crossValidations)
+
+# get FD for IC operations
+FD = discoverFD(X=replace(target=eX, pattern=NaN, replacement=1), Mask=getFdMask, threshold=0.8)
+FD = (diag(matrix(1, rows=nrow(FD), cols=1)) ==0) * FD 
+FD = FD > 0
+
+metaList = list(mask=getMask, schema=getSchema, fd=FD)
+targetClassification = list(target=targetApplicaton, cv=crossValidations, wAccuracy=weightedAccuracy, 
+  dirAcc = d_accuracy, mlHp = opt, cleanData = as.matrix(0))
+
+# # initialize output variables
+pip = as.frame("NULL"); hp = matrix(0,0,0); acc = matrix(0,0,0); features = as.frame("NULL")
+
+
+logical1 =  frame(["4", "MVI", "SCALE", "DUMMY", "DIM", "0", "0", "0"], rows=1, cols=8)
+# logical2 =  frame(["2", "MVI", "DUMMY", "0", "0", "0", "0", "0"], rows=1, cols=8)
+logical3 =  frame(["3", "MVI", "SCALE", "DUMMY", "0", "0", "0", "0"], rows=1, cols=8)
+logical4 =  frame(["6", "MVI", "OTLR", "CI", "SCALE", "DUMMY", "DIM", "0"], rows=1, cols=8)
+logical5 =  frame(["7", "MVI", "OTLR", "MVI", "CI", "SCALE", "DUMMY", "DIM"], rows=1, cols=8)
+logical6 =  frame(["6", "OTLR", "MVI", "CI", "SCALE", "DUMMY", "DIM", "0"], rows=1, cols=8)
+
+# log = rbind(logical1, logical2)
+log = rbind(logical1, logical3)
+log = rbind(log, logical4)
+log = rbind(log, logical5)
+log = rbind(log, logical6)
+
+[logicalEnum, score, T] = lg::enumerateLogical(X=eX, y=eY, population=log, max_iter=max_iter, pipLength=pipLength, metaList=metaList,
+  targetList=targetClassification, primitives=primitives, param=param, num_inst=num_inst, num_exec=num_exec, n_pop=n_pop, verbose=FALSE)
+# [logicalEnum, score, T] = lg::enumerateLogical(X=eX, y=eY, population=log, max_iter=3, pipLength=10, metaList=metaList,
+  # targetList=targetClassification, primitives=primitives, param=param, num_inst=4, num_exec=2, n_pop=4, verbose=FALSE)
+
+print("score of pipeline: "+toString(score)+" in "+(T/60000)+" mins")
+print("logicalENum "+toString(logicalEnum))
+
+result = d_accuracy < score  
+print("result satisfied ------------"+result)
+
+write(result , $O)
+
+
+