You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ss...@apache.org on 2020/07/23 12:13:09 UTC

[systemds] branch master updated: [SYSTEMDS-611][MINOR] Refactoring structure Rewriting the function for generating all possible combinations of physical pipelines out of each logical pipeline. simplified the generation of top k pipelines.

This is an automated email from the ASF dual-hosted git repository.

ssiddiqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new e93758d  [SYSTEMDS-611][MINOR] Refactoring structure Rewriting the function for generating all possible combinations of physical pipelines out of each logical pipeline. simplified the generation of top k pipelines.
e93758d is described below

commit e93758d639cc22bf8e8193e3164698320664164e
Author: Shafaq Siddiqi <sh...@tugraz.at>
AuthorDate: Thu Jul 23 14:09:40 2020 +0200

    [SYSTEMDS-611][MINOR] Refactoring structure
    Rewriting the function for generating all possible combinations of physical pipelines out of each logical pipeline.
    simplified the generation of top k pipelines.
---
 scripts/staging/pipelines/enumerator.dml           | 186 +++++++--------------
 scripts/staging/pipelines/permutations.dml         |  58 +++++++
 scripts/staging/pipelines/utils.dml                |  37 ++++
 .../functions/pipelines/BuiltinEnumeratorTest.java |  12 +-
 4 files changed, 158 insertions(+), 135 deletions(-)

diff --git a/scripts/staging/pipelines/enumerator.dml b/scripts/staging/pipelines/enumerator.dml
index d70bf9a..3e70559 100644
--- a/scripts/staging/pipelines/enumerator.dml
+++ b/scripts/staging/pipelines/enumerator.dml
@@ -19,11 +19,15 @@
 #
 #-------------------------------------------------------------
 
+# import
+source("./scripts/staging/pipelines/permutations.dml") as perm;
+source("./scripts/staging/pipelines/utils.dml") as utils;
+
 enumerator = function(Matrix[Double] X, Matrix[Double] Y, Frame[String] logical, Frame[String] outlierPrimitives, 
                Frame[String] mviPrimitives, Frame[String] param, Integer k, Boolean verbose = TRUE)
 return(Frame[String] Kpipeline)
 {
-  for(i in 1:1) {#nrow(logical) 
+  for(i in 1:2) {#nrow(logical) 
     operator = as.frame(matrix(0,nrow(outlierPrimitives),1)) #combine all logical primitives
     for(j in 1:ncol(logical))
     {
@@ -33,47 +37,53 @@ return(Frame[String] Kpipeline)
         operator = cbind(operator, mviPrimitives);
     }
     operator = operator[,2:ncol(operator)]
-    intermediates = generatePermutations(operator) # get the all possible combination of pysical primitives
+    intermediates = perm::getPermutations(operator) # get the all possible combination of physical primitives
                                                    # for ith logical pipeline 
     if(verbose)
       print(" pipelines \n"+toString(intermediates))
     [p, h] = executeAll(X, Y,intermediates, param, verbose);
+
     Kpipeline = getFinalTopK(p, h, k)
+    print("top k pipelines of "+i+"th logical pipeline "+toString(Kpipeline))
+    # str = "top k pipelines of iteration "+i
+    # str = append(str, toString(Kpipeline))
   }
   # if(verbose)
-  print("final top k pipelines \n"+toString(Kpipeline))
+  # print("final top k pipelines \n"+toString(Kpipeline))
+  # write(str, "D:/Workspace/Pipelines/output/kpipeline.txt")
 }  
 
-
 # The pipeline execution functions 
 ###################################################
 executeAll = function(Matrix[Double] X, Matrix[Double] Y, Frame[String] intermediates,  Frame[String] param, Boolean verbose)
-return(Frame[String] topP, Matrix[Double] topH)
+return(Frame[String] opt, Matrix[Double] hyper_param)
 {
-  topP = as.frame("")
-  topH = matrix(0,1,1)
-  p = as.frame("")
-  hp = matrix(0,1,1)
-  clone_X = X;
 
+
+  clone_X = X;
+  # initialize output variables
+  opt = as.frame("NA")
+  hyper_param = matrix(0,0,1)
   if(verbose)
     print("total pipelines to be executed "+nrow(intermediates))
   for(i in 1:nrow(intermediates)) {
     paraList = list()
-    paraList = getInstanceParam(intermediates[i,], param)
+    op = intermediates[i,]
+
+    paraList = getInstanceParam(op, param)
     sum = 1
+    print("executing "+toString(op))
     while(sum > 0) #condition to terminate when all hyper parameters are executed
     {
       paramL = list()
-      tmp_hp = matrix(0,1,1)
-      tmp_p = intermediates[i,]
+      hp_temp = matrix(0,1,0)
+      opt_temp = op
       for(j in 1: length(paraList))
       {
         singleHp = as.matrix(paraList[j])
+        hp_temp = cbind(hp_temp, as.matrix(ncol(singleHp)))
+        hp_temp = cbind(hp_temp, singleHp[1,])
         paramL = append(paramL, singleHp[1, ])
-        tmp_hp = cbind(tmp_hp, as.matrix(ncol(p)))
-        tmp_hp = cbind(tmp_hp, p[1,]) 
-      
         if(nrow(singleHp) > 1)
         {
           singleHp = singleHp[2:nrow(singleHp),]
@@ -81,38 +91,20 @@ return(Frame[String] topP, Matrix[Double] topH)
           sum = sum(singleHp)
         }
       }
-      X = executePipeline(intermediates[i,], X, paramL, FALSE)
+      X = executePipeline(op, X, paramL, FALSE)
       data = cbind(Y, X)
       acc = eval("fclassify", data)
+      hp_temp = cbind(hp_temp, acc)
       X = clone_X
-      tmp_hp = cbind(tmp_hp,acc)
-      if(ncol(p) == 1 & sum(hp) == 0){
-        p = tmp_p
-        hp = tmp_hp 
-      } else {
-        p = rbind(p, tmp_p)
-        hp = rbind(hp, tmp_hp)
+      if(as.scalar(opt[1,1]) == "NA" & nrow(hyper_param) == 0)
+      {
+        opt = opt_temp
+        hyper_param = hp_temp
       }
-    }
-
-    if(ncol(topP) == 1 & sum(topH) == 0){
-      topP = p
-      topH = hp 
-    }
-    else { 
-      if(ncol(p) < ncol(topP)){
-        margin = ncol(topP) - ncol(p)
-        toAppend = topP[1,1:margin]
-        toAppend[1,] = "" 
-        p = cbind(p, toAppend)
+      else {
+        opt = rbind(opt, opt_temp)
+        hyper_param = rbind(hyper_param, hp_temp)
       }
-      else if(ncol(hp) < ncol(topH))
-        hp = cbind(matrix(0,nrow(hp),ncol(topH) - ncol(hp)), hp)
-      else if(ncol(hp) > ncol(topH))
-        topH = cbind(matrix(0,nrow(topH),ncol(hp) - ncol(topH)), topH)
-        
-      topP = rbind(topP, p)  
-      topH = rbind(topH, hp)
     }
     X = clone_X
   }
@@ -120,57 +112,6 @@ return(Frame[String] topP, Matrix[Double] topH)
  
 
 # The below functions will generate the all possible 
-# physical pipelines for a  given logical pipeline
-###################################################
-generatePermutations = function(Frame[String] operators)
-return (Frame[String] combinations)
-{
-  if(ncol(operators) == 1)
-    stop("invalid number of columns")
-  
-  if(ncol(operators) > 2 ) {
-    com2 = generatePermutationsOf2(operators[,1:2])
-    operators = operators[,3:ncol(operators)]
-    for(out in 1: ncol(operators)) {
-      temp =  com2[,1]  
-      temp1 = com2[1,1]
-      comTemp = com2[1,]
-      for(i in 1:nrow(operators)) {
-        for(j in 1:nrow(com2))
-          temp[j,1] = operators[i,out] 
-        temp1 = rbind(temp1, temp)
-        comTemp = rbind(comTemp, com2)
-      }
-      comTemp = cbind(comTemp, temp1)
-      com2 = comTemp[2:nrow(comTemp),]
-    }
-    combinations = com2
-  }
-  else
-    combinations = generatePermutationsOf2(operators)  
-}
-
-
-generatePermutationsOf2 = function(Frame[String] operators )
-return(Frame[String] output)
-{
-  jspecR = "{ids:true, recode:[1,2]}";
-  [X, M] = transformencode(target=operators, spec=jspecR);
-  out = matrix(0,0,2)
-  for(i in 1:nrow(X[,2])) {
-    broadcast = matrix(as.scalar(X[i,2]), nrow(X), 1)
-    if(nrow(out) == 0){
-      out = cbind(X[,1], broadcast)
-    }
-    else {
-      output_tmp = cbind(X[,1], broadcast)
-      out = rbind(out, output_tmp)
-    }
-  }
-  output = transformdecode(target=out, spec=jspecR, meta=M);
-}
-
-# The below functions will generate the all possible 
 # combinations for different hyper parameter values
 ###################################################
 getInstanceParam = function(Frame[String] instance, Frame[String] param )
@@ -220,14 +161,14 @@ return(list[Unknown] L)
 
   for(i in 1:ncol(instance))
   {
-    if(as.scalar(hpNum[1,i]) > 0)
+    if(as.scalar(hpNum[1, i]) > 0)
     {
       L =  append(L, parameters[,index:(index+as.scalar(hpNum[1,i]))-1])
       index = index+as.scalar(hpNum[1,i])
     }
     else 
-      L = append(L, matrix(-1,1,1))
-  }
+      L = append(L, matrix(-1, 1, 1))
+  } 
 }
 
 getParaCombinations = function(Matrix[Double] para, Matrix[Double] vec)
@@ -266,35 +207,23 @@ return (Double psum)
 getFinalTopK = function(Frame[String] pipeline, Matrix[Double] hparameter, Integer k)
 return (Frame[String] pipeline)
 {
-
-  s=""
-  for(i in 1: ncol(pipeline), check =0)
-    s = s+i+",";
-    # encoding categorical columns using recode transformation
-  jspecR = "{ids:true, recode:["+s+"]}";
-  [X, M] = transformencode(target=pipeline, spec=jspecR);
-
-  nColPip = ncol(pipeline)
-  allParam = cbind(X, hparameter)
-  clone_Param = allParam
-  emptyR = matrix(0,0,ncol(allParam))
-  while(nrow(emptyR) <= k)
-  {
-    maxFirst = clone_Param[, ncol(clone_Param)] == max(clone_Param[, ncol(clone_Param)])
-    clone_Param = clone_Param * (maxFirst == 0)  
-    emptyR = removeEmpty(target = clone_Param, margin = "rows", select = (clone_Param[, ncol(clone_Param)] == 0) )
-  }
-  top = removeEmpty(target = allParam, margin = "rows", select = (clone_Param[, ncol(clone_Param)] == 0) )
-  X = top[,1:nColPip]
-  hparameter = top[,nColPip+1:ncol(top)]
-  pipeline = transformdecode(target=X, spec=jspecR, meta=M);
-  pipeline = cbind(pipeline, as.frame(hparameter))
+  if(nrow(pipeline) < k)
+    stop("the top k should be less than the total pipelines")
+  # combine all parameter i.e., operation and hyper-parameter values
+  allParam = cbind(pipeline, as.frame(hparameter))
+  # get the indexes of columns for recode transformation
+  idx = seq(1, ncol(pipeline))
+  index = utils::vectorToCsv(idx)
+  # encoding categorical columns using recode transformation
+  jspecR = "{ids:true, recode:["+index+"]}";
+  [X, M] = transformencode(target=allParam, spec=jspecR);  
+  top = order(target=X, by=ncol(X), decreasing=TRUE, index.return=FALSE);
+  pipeline = transformdecode(target=top, spec=jspecR, meta=M);
   # TODO if k+n pipelines have same accuracy then how to return k pipelines  
   pipeline = pipeline[1:k,]
 }
 
-
-# These private function are used to impute values and classification
+# These private function are used to impute values by mean and by median
 ##################################################################################
 imputeByMean = function(Matrix[Double] X, Boolean verbose = FALSE)
 return(Matrix[Double] X)
@@ -310,8 +239,8 @@ return(Matrix[Double] X)
 {
   cols = ncol(X)
   colMedian = matrix(0, 1, cols)
-  X = replace(target=X, pattern=NaN, replacement=0)
   Mask = is.nan(X)
+  X = replace(target=X, pattern=NaN, replacement=0)
   parfor(i in 1:cols)
     colMedian[, i] = median(X[,i])
   Mask = Mask * colMedian
@@ -319,6 +248,8 @@ return(Matrix[Double] X)
 }
 
 
+# Function to evaluate the pipeline using classification accuracy
+##################################################################################
 fclassify = function(Matrix[Double] X)
 return (Double accuracy)
 {
@@ -352,17 +283,14 @@ return (Double accuracy)
   [prob, yhat, accuracy] = multiLogRegPredict(test_X, betas, test_Y, FALSE)
 }
 
-
-##########################################
-## Call the function Enumerator
-#########################################
+# Enumeration call
+##################################################################################
 X = read($1, data_type="matrix", format="csv", header=TRUE);
 Y = X[,1]+1
 X = X[,2:ncol(X)]
-
 L = read($2, data_type="frame", format="csv");
 OP = read($3, data_type="frame", format="csv");
 MVIP = read($4, data_type="frame", format="csv");
 param = read($5, data_type="frame", format="csv");
 R = enumerator(X, Y, L, OP, MVIP, param, 5, TRUE);
-write(R, $6, format="csv", sep=",")
+write(R, $6, format="csv", sep=",")
\ No newline at end of file
diff --git a/scripts/staging/pipelines/permutations.dml b/scripts/staging/pipelines/permutations.dml
new file mode 100644
index 0000000..3055651
--- /dev/null
+++ b/scripts/staging/pipelines/permutations.dml
@@ -0,0 +1,58 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+source("./scripts/staging/pipelines/utils.dml") as utils;
+
+
+# The below functions will generate the all possible 
+# physical pipelines for a  given logical pipeline
+###################################################
+
+getPermutations = function(Frame[String] opt)
+return(Frame[String] output)
+{
+  idx = seq(1, ncol(opt))
+  # get the indexes of columns for recode transformation
+  index = utils::vectorToCsv(idx)
+  # recode logical pipelines for easy handling
+  jspecR = "{ids:true, recode:["+index+"]}";
+  [X, M] = transformencode(target=opt, spec=jspecR);
+  # initialize output matrix
+  n = nrow(opt)
+  d = ncol(opt)
+  outC = matrix(0, n^d, d)
+ 
+  parfor(i in 1 : d) {
+    # matrix for storing rows of ith columns
+    outR = matrix(0, 0, 1)
+    j = n^i
+    rowIdx = 1
+    for(k in 1:j) {
+      valDup = matrix(as.scalar(X[rowIdx, i]), n^(d-i), 1)
+      outR = rbind(outR, valDup)
+      rowIdx = rowIdx + 1
+      rowIdx = ifelse(((rowIdx)%%(n+1)) == 0, 1, rowIdx)
+    }  
+    outC[,i] = outR
+  }
+  output = transformdecode(target=outC, spec=jspecR, meta=M);
+}
diff --git a/scripts/staging/pipelines/utils.dml b/scripts/staging/pipelines/utils.dml
new file mode 100644
index 0000000..0d64fee
--- /dev/null
+++ b/scripts/staging/pipelines/utils.dml
@@ -0,0 +1,37 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+
+
+# Utility function to convert vector into csv
+##################################################################################
+vectorToCsv = function(Matrix[Double] vector)
+return (String indexes){
+  if(nrow(vector) >  ncol(vector))
+    vector = t(vector)
+  s = ""
+  for(i in 1:ncol(vector)-1)
+  {
+    s = s+i+","
+  }
+  indexes = s+ncol(vector)
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinEnumeratorTest.java b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinEnumeratorTest.java
index 08db73a..b583fdc 100644
--- a/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinEnumeratorTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/pipelines/BuiltinEnumeratorTest.java
@@ -32,11 +32,11 @@ public class BuiltinEnumeratorTest extends AutomatedTestBase {
 	private final static String TEST_DIR = "pipelines/";
 	private static final String TEST_CLASS_DIR = TEST_DIR + BuiltinEnumeratorTest.class.getSimpleName() + "/";
 	protected static final String SCRIPT_DIR = "./scripts/staging/";
-	private final static String logicalFile = SCRIPT_DIR+TEST_DIR+"logical.csv";
-	private final static String outlierPrimitives = SCRIPT_DIR+TEST_DIR+"outlierPrimitives.csv";
-	private final static String mviPrimitives = SCRIPT_DIR+TEST_DIR+"mviPrimitives.csv";
-	private final static String parameters = SCRIPT_DIR+TEST_DIR+"properties.csv";
-	private final static String DATASET = SCRIPT_DIR+TEST_DIR+"airbnb.csv";
+	private final static String logicalFile = SCRIPT_DIR+TEST_DIR + "logical.csv";
+	private final static String outlierPrimitives = SCRIPT_DIR+TEST_DIR + "outlierPrimitives.csv";
+	private final static String mviPrimitives = SCRIPT_DIR+TEST_DIR + "mviPrimitives.csv";
+	private final static String parameters = SCRIPT_DIR+TEST_DIR + "properties.csv";
+	private final static String DATASET = SCRIPT_DIR+TEST_DIR + "airbnb.csv";
 
 	@Override
 	public void setUp() {
@@ -57,7 +57,7 @@ public class BuiltinEnumeratorTest extends AutomatedTestBase {
 
 			String HOME = SCRIPT_DIR + TEST_DIR;
 			fullDMLScriptName = HOME + TEST_NAME + ".dml";
-			programArgs = new String[]{"-stats","-args", DATASET, logicalFile, outlierPrimitives, mviPrimitives, parameters, output("A")};
+			programArgs = new String[]{"-stats", "-args", DATASET, logicalFile, outlierPrimitives, mviPrimitives, parameters, output("A")};
 
 			runTest(true, false, null, -1);