You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/05/13 19:20:14 UTC

[systemds] branch master updated: [SYSTEMDS-2954] Fix spark frame init in data cleaning pipelines

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e2ebfc  [SYSTEMDS-2954] Fix spark frame init in data cleaning pipelines
9e2ebfc is described below

commit 9e2ebfc7a6fef53c53ad56ba9fa590f6264be21f
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Thu May 13 21:19:37 2021 +0200

    [SYSTEMDS-2954] Fix spark frame init in data cleaning pipelines
    
    This patch fixes the missing runtime propagation of dimension
    information in spark frame init operations, which can cause a dimension
    mismatch subsequent spark operations (e.g., in indexing expressions).
    Furthermore, we now add respective tests, improve the bandit builtin to
    allow size propagation and thus, avoid unnecessary spark jobs, and fix
    unnecessary warnings of the parfor cost estimation on list operations.
---
 scripts/builtin/bandit.dml                         | 48 +++++++++++-----------
 src/main/java/org/apache/sysds/hops/DataGenOp.java |  2 +-
 .../java/org/apache/sysds/hops/OptimizerUtils.java | 20 +++++++++
 .../apache/sysds/hops/rewrite/HopRewriteUtils.java |  5 +++
 .../parfor/opt/CostEstimatorHops.java              |  3 +-
 .../instructions/spark/RandSPInstruction.java      |  3 +-
 .../test/functions/frame/FrameConstructorTest.java | 18 +++++++-
 .../functions/frame/FrameConstructorTest.dml       |  4 ++
 8 files changed, 74 insertions(+), 29 deletions(-)

diff --git a/scripts/builtin/bandit.dml b/scripts/builtin/bandit.dml
index ec93e7e..bcf7f94 100644
--- a/scripts/builtin/bandit.dml
+++ b/scripts/builtin/bandit.dml
@@ -63,11 +63,11 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, List[Unknown
     # save the original configuration as a lookup table
     lookup = configurations
     
-    if(verbose) 
+    if(verbose)
       print("n "+ n +"\nR "+ R +"\ns_max "+ s_max +"\nB "+ B +"\nn "+ n +"\nr "+ r)
     
     for( i in 0:s) {
-      # successive halving    
+      # successive halving
       n_i = min(max(as.integer(floor(n * eta^(-i))), 1), nrow(configurations));
       r_i = as.integer(floor(r * eta^i));
       
@@ -77,7 +77,7 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, List[Unknown
         print("iteration  ---------------------"+i+" out of "+s)
       }
       
-      configurations = configurations[1:n_i, ]      
+      configurations = configurations[1:n_i, ]
       [outPip,outHp, feaFrameOuter] = run_with_hyperparam(configurations, r_i, X_train, Y_train, metaList,
         targetList, param, feaFrameOuter, verbose)
       # sort the pipelines by order of accuracy decreasing
@@ -149,13 +149,13 @@ get_physical_configurations = function(Frame[String] logical, Scalar[int] numCon
     else if(as.scalar(logical[1,j]) == "MVI")
       operator[, j] = mvi;
     else if(as.scalar(logical[1,j]) == "NR")
-      operator[, j] = noise;  
+      operator[, j] = noise;
     else if(as.scalar(logical[1,j]) == "CI")
       operator[, j] = ci;
     else if(as.scalar(logical[1,j]) == "DIM")
       operator[, j] =  dim;
     else if(as.scalar(logical[1,j]) == "DUMMY")
-      operator[, j] =  dummy;  
+      operator[, j] =  dummy;
     else if(as.scalar(logical[1,j]) == "SCALE")
       operator[, j] = scale;
     else stop("invalid operation "+as.scalar(logical[1,j]))
@@ -223,7 +223,7 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
       feaFrame = frame("", rows = no_of_res, cols = ncol(featureFrameOuter))
     pip_toString = pipToString(ph_pip[i])
     for(r in 1:no_of_res)
-    { 
+    {
       # as the matrix first block of r rows belongs to first operator and r+1 block of rows to second operator 
       # we need to extract a row from each block
       indexes = matrix(no_of_res, rows=ncol(ph_pip), cols=1)
@@ -244,7 +244,7 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
       output_pipelines[index, ] = cbind(as.matrix(index), id[i,1])
       X = clone_X
       Y = clone_Y
-      index = index + 1 
+      index = index + 1
       
       if(ncol(featureFrameOuter) > 1) {
         feaFrame[r, 1:ncol(feaVec)] = as.frame(feaVec)
@@ -257,7 +257,7 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
     
     X = clone_X
     Y = clone_Y
-    if(ncol(featureFrameOuter) > 1) 
+    if(ncol(featureFrameOuter) > 1)
       featureFrameOuter = rbind(featureFrameOuter, feaFrame)
   }
   output_hyperparam = removeEmpty(target=cbind(output_accuracy, output_hp), margin="rows")
@@ -280,11 +280,11 @@ getHyperparam = function(Frame[Unknown] pipeline, Frame[Unknown]  hpList, Intege
   for(k in 1:ncol(pipeline))
   {
     op = as.scalar(pipeline[1,k])
-    hasParam = map(hpList[,1], "x->x.split(\",\")[0].equals(\""+op+"\")")    
+    hasParam = map(hpList[,1], "x->x.split(\",\")[0].equals(\""+op+"\")")
     # convert the boolean vector to 0/1 matrix representation
     m_hasParam = hasParam == frame("true", rows=nrow(hasParam), cols=1)
     m_hasParam = as.matrix(m_hasParam)
-    # compute the relevant index 
+    # compute the relevant index
     index = m_hasParam * seq(1, nrow(m_hasParam))
     index = as.scalar(removeEmpty(target = index, margin = "rows"))
     indexes[k] = index
@@ -322,24 +322,24 @@ getHyperparam = function(Frame[Unknown] pipeline, Frame[Unknown]  hpList, Intege
         maxVal = as.scalar(hpList[index, paramValIndex + 1])
         if(type == "FP") {
           val = rand(rows=no_of_res, cols=1, min=minVal,max=maxVal, pdf="uniform");
-          OpParam[, j] = val; 
+          OpParam[, j] = val;
         }
         else if(type == "INT") {
           val = sample(maxVal, no_of_res, TRUE);
           less_than_min = val < minVal;
           val = (less_than_min * minVal) + val;
-          OpParam[, j] = val; 
+          OpParam[, j] = val;
         }
         else if(type == "BOOL") {
           if(maxVal == 1) {
             s = sample(2, no_of_res, TRUE);
             b = s - 1;
-            OpParam[, j] = b; 
+            OpParam[, j] = b;
           } 
-          else  
+          else
             OpParam[, j] = matrix(0, rows=no_of_res, cols=1)
         }
-        else 
+        else
           print("invalid data type")  # TODO handle string set something like {,,}
           
         paramIdx = paramIdx + 2
@@ -391,11 +391,11 @@ extractTopK = function(Frame[Unknown] pipeline, Matrix[Double] hyperparam,
     # for each duplicate record just take the one reocrd and strip the others
     deduplicates = indexes %*% forDedup
   
-    # combine the deduplicated tuples and unique tuples again 
+    # combine the deduplicated tuples and unique tuples again
     forDedup = rbind(uniqueTuples, deduplicates)
   }
   
-  # decode the pipelines 
+  # decode the pipelines
   decoded = transformdecode(target=forDedup[, 1:ncol(pipeline)], meta=dM, spec=jspecDC)
   
   # separate the pipelines and hyper-parameters
@@ -425,14 +425,12 @@ extractBracketWinners = function(Matrix[Double] pipeline, Matrix[Double] hyperpa
   # bestPipeline = frameSort(bestPipeline)
   hyperparam = order(target = hyperparam, by = 1, decreasing=TRUE, index.return=FALSE)
   pipeline = order(target = pipeline, by = 1, decreasing=TRUE, index.return=FALSE)
-  
-  rowIndex = ifelse(nrow(pipeline) > k, k, nrow(pipeline))
+  rowIndex = min(k, nrow(pipeline))
 
   pipeline = pipeline[1:rowIndex,]
   bestHyperparams = hyperparam[1:rowIndex,]
   bestPipeline = frame(data="|", rows=nrow(pipeline), cols=ncol(conf)-1)
-  for(i in 1: nrow(pipeline), check=0)
-  {
+  for(i in 1: nrow(pipeline)) {
     index = as.scalar(pipeline[i, 3])
     bestPipeline[i, 1:ncol(bestPipeline)] = conf[index, 2:ncol(conf)]
   }
@@ -463,7 +461,7 @@ fclassify = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Ma
     print("Y contains only one class")
     accuracy = as.double(0)
   }
-  else { 
+  else {
     print("STARTING "+cv+" CROSS VALIDATIONS")
     # do the k = 3 cross validations
     [accuracyMatrix, T] = crossV(X, Y, cv, mask, MLhp, isWeighted)
@@ -556,8 +554,8 @@ return (Matrix[Double] features)
   features[1, 7] = sum(mask == 0) # number of numerical features
   features[1, 8] = mean(num) # mean value
   colSd = colSds(num)
-  count3sdplus = sum(num > (colMeans(num) + 3*colSd )) 
-  count3sdminus = sum(num < (colMeans(num) - 3*colSd )) 
+  count3sdplus = sum(num > (colMeans(num) + 3*colSd ))
+  count3sdminus = sum(num < (colMeans(num) - 3*colSd ))
   outliers = count3sdplus + count3sdminus
   features[1, 9] = outliers
   # OHE features 
@@ -585,7 +583,7 @@ return (Matrix[Double] features)
 
 ######################################################################
 # # Function for cross validation using hold out method
-# # Inputs: The input dataset X, Y and the value of k validation, mask of the 
+# # Inputs: The input dataset X, Y and the value of k validation, mask of the
 # # dataset for OHE of categorical columns, vector of ML hyper-parameters identified 
 # # via grid-search and a boolean value of (un)weighted accuracy.
 # # Output: It return a matrix having the accuracy of each fold.
diff --git a/src/main/java/org/apache/sysds/hops/DataGenOp.java b/src/main/java/org/apache/sysds/hops/DataGenOp.java
index 32b17b9..20bd9d7 100644
--- a/src/main/java/org/apache/sysds/hops/DataGenOp.java
+++ b/src/main/java/org/apache/sysds/hops/DataGenOp.java
@@ -332,7 +332,7 @@ public class DataGenOp extends MultiThreadedHop
 		Hop input2;
 		Hop input3;
 
-		if ( _op == OpOpDG.RAND || _op == OpOpDG.SINIT ) 
+		if ( _op == OpOpDG.RAND || _op == OpOpDG.SINIT || _op == OpOpDG.FRAMEINIT )
 		{
 			if (_dataType != DataType.TENSOR) {
 				input1 = getInput().get(_paramIndexMap.get(DataExpression.RAND_ROWS)); //rows
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index edf8dfc..0f7691a 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -26,6 +26,7 @@ import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.OpOp1;
 import org.apache.sysds.common.Types.OpOp2;
+import org.apache.sysds.common.Types.OpOp3;
 import org.apache.sysds.common.Types.OpOpData;
 import org.apache.sysds.common.Types.ReOrgOp;
 import org.apache.sysds.common.Types.ValueType;
@@ -1310,6 +1311,8 @@ public class OptimizerUtils
 				ret = rEvalSimpleUnaryDoubleExpression(root, valMemo);
 			else if( root instanceof BinaryOp )
 				ret = rEvalSimpleBinaryDoubleExpression(root, valMemo);
+			else if( root instanceof TernaryOp )
+				ret = rEvalSimpleTernaryDoubleExpression(root, valMemo);
 		}
 		
 		valMemo.put(root.getHopID(), ret);
@@ -1455,6 +1458,23 @@ public class OptimizerUtils
 		return ret;
 	}
 
+	protected static double rEvalSimpleTernaryDoubleExpression( Hop root, HashMap<Long, Double> valMemo ) {
+		//memoization (prevent redundant computation of common subexpr)
+		if( valMemo.containsKey(root.getHopID()) )
+			return valMemo.get(root.getHopID());
+		
+		double ret = Double.MAX_VALUE;
+		TernaryOp troot = (TernaryOp) root;
+		if( troot.getOp()==OpOp3.IFELSE ) {
+			if( HopRewriteUtils.isLiteralOfValue(troot.getInput(0), true) )
+				ret = rEvalSimpleDoubleExpression(troot.getInput().get(1), valMemo);
+			else if( HopRewriteUtils.isLiteralOfValue(troot.getInput(0), false) )
+				ret = rEvalSimpleDoubleExpression(troot.getInput().get(2), valMemo);
+		}
+		valMemo.put(root.getHopID(), ret);
+		return ret;
+	}
+	
 	protected static double rEvalSimpleBinaryDoubleExpression( Hop root, HashMap<Long, Double> valMemo, LocalVariableMap vars ) 
 	{
 		//memoization (prevent redundant computation of common subexpr)
diff --git a/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java b/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java
index 0b00ffd..2c2b018 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java
@@ -1590,6 +1590,11 @@ public class HopRewriteUtils
 		return Arrays.stream(mc).allMatch(h -> h.nnzKnown() || (worstcase && h.dimsKnown()));
 	}
 	
+	public static boolean hasListInputs(Hop hop) {
+		return hop.getInput()!= null 
+			&& hop.getInput().stream().anyMatch(h -> h.getDataType().isList());
+	}
+	
 	public static boolean containsSecondOrderBuiltin(ArrayList<Hop> roots) {
 		Hop.resetVisitStatus(roots);
 		return roots.stream().anyMatch(r -> containsSecondOrderBuiltin(r));
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/CostEstimatorHops.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/CostEstimatorHops.java
index 2d68ef2..6dd194d 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/CostEstimatorHops.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/CostEstimatorHops.java
@@ -24,6 +24,7 @@ import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.hops.Hop;
 import org.apache.sysds.hops.LeftIndexingOp;
 import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.hops.rewrite.HopRewriteUtils;
 import org.apache.sysds.lops.LopProperties.ExecType;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.parfor.opt.OptNode.NodeType;
@@ -69,7 +70,7 @@ public class CostEstimatorHops extends CostEstimator
 			}
 			//check for invalid cp memory estimate
 			else if ( h.getExecType()==ExecType.CP && value >= OptimizerUtils.getLocalMemBudget() ) {
-				if( !forcedExec )
+				if( !forcedExec && !HopRewriteUtils.hasListInputs(h) )
 					LOG.warn("Memory estimate larger than budget but CP exec type (op="+h.getOpString()+", name="+h.getName()+", memest="+h.getMemEstimate()+").");
 				value = DEFAULT_MEM_REMOTE;
 			}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
index cfd0087..91511d4 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
@@ -405,8 +405,9 @@ public class RandSPInstruction extends UnarySPInstruction {
 		JavaPairRDD<Long, FrameBlock> out = seedsRDD
 			.mapToPair(new GenerateRandomFrameBlock(lrows, lcols, brlen, vt, data));
 
-		//step 5: output handling
+		//step 5: output handling, incl meta data
 		sec.setRDDHandleForVariable(output.getName(), out);
+		sec.getDataCharacteristics(output.getName()).set(tmp);
 	}
 
 	private void generateRandData(SparkExecutionContext sec) {
diff --git a/src/test/java/org/apache/sysds/test/functions/frame/FrameConstructorTest.java b/src/test/java/org/apache/sysds/test/functions/frame/FrameConstructorTest.java
index a4a87c8..d2b70b3 100644
--- a/src/test/java/org/apache/sysds/test/functions/frame/FrameConstructorTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/frame/FrameConstructorTest.java
@@ -28,6 +28,7 @@ import org.apache.sysds.test.TestConfiguration;
 import org.junit.Test;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.runtime.util.UtilFunctions;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestUtils;
@@ -53,7 +54,8 @@ public class FrameConstructorTest extends AutomatedTestBase {
 		NO_SCHEMA,
 		RANDOM_DATA,
 		SINGLE_DATA,
-		MULTI_ROW_DATA
+		MULTI_ROW_DATA,
+		UNKNOWN_DIMS,
 	}
 
 	@Override
@@ -124,7 +126,19 @@ public class FrameConstructorTest extends AutomatedTestBase {
 		FrameBlock exp = createExpectedFrame(schemaStrings1, 5,"multi-row");
 		runFrameTest(TestType.MULTI_ROW_DATA, exp, Types.ExecMode.SPARK);
 	}
+	
+	@Test
+	public void testUnknownDims() {
+		FrameBlock exp = createExpectedFrame(schemaStrings1, rows,"constant");
+		runFrameTest(TestType.UNKNOWN_DIMS, exp, Types.ExecMode.SINGLE_NODE);
+	}
 
+	@Test
+	public void testUnknownDimsSP() {
+		FrameBlock exp = createExpectedFrame(schemaStrings1, rows, "constant");
+		runFrameTest(TestType.UNKNOWN_DIMS, exp, Types.ExecMode.SPARK);
+	}
+	
 	private void runFrameTest(TestType type, FrameBlock expectedOutput, Types.ExecMode et) {
 		Types.ExecMode platformOld = setExecMode(et);
 		boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
@@ -144,6 +158,8 @@ public class FrameConstructorTest extends AutomatedTestBase {
 			String[][] R1 = DataConverter.convertToStringFrame(expectedOutput);
 			String[][] R2 = DataConverter.convertToStringFrame(fB);
 			TestUtils.compareFrames(R1, R2, R1.length, R1[0].length);
+			int nrow = type == TestType.MULTI_ROW_DATA ? 5 : 40;
+			checkDMLMetaDataFile("F2", new MatrixCharacteristics(nrow, cols));
 		}
 		catch(Exception ex) {
 			throw new RuntimeException(ex);
diff --git a/src/test/scripts/functions/frame/FrameConstructorTest.dml b/src/test/scripts/functions/frame/FrameConstructorTest.dml
index 53196a6..5a02b74 100644
--- a/src/test/scripts/functions/frame/FrameConstructorTest.dml
+++ b/src/test/scripts/functions/frame/FrameConstructorTest.dml
@@ -31,6 +31,10 @@ if($1 == "SINGLE_DATA")
 if($1 == "MULTI_ROW_DATA")
   f1 = frame(data=["1", "abc", "2.5", "TRUE", "1", "abc", "2.5", "TRUE", "1", "abc", "2.5", "TRUE", "1", "abc", "2.5", "TRUE",
   "1", "abc", "2.5", "TRUE" ], rows=5, cols=4, schema=["INT64", "STRING", "FP64", "BOOLEAN"]) # initialization by row
+if($1 == "UNKNOWN_DIMS") {
+  nrow = ifelse(sum(rand(rows=1e4,cols=2,seed=1))>1e2, 40, 37)
+	f1 = frame(1, rows=nrow, cols=4, schema=["INT64", "STRING", "FP64", "BOOLEAN"]) 
+}
 
 # f1 = frame(1, 4, 3)  # unnamed parameters not working
 write(f1, $2, format="csv")