You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/05/13 19:20:14 UTC
[systemds] branch master updated: [SYSTEMDS-2954] Fix spark frame
init in data cleaning pipelines
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new 9e2ebfc [SYSTEMDS-2954] Fix spark frame init in data cleaning pipelines
9e2ebfc is described below
commit 9e2ebfc7a6fef53c53ad56ba9fa590f6264be21f
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Thu May 13 21:19:37 2021 +0200
[SYSTEMDS-2954] Fix spark frame init in data cleaning pipelines
This patch fixes the missing runtime propagation of dimension
information in spark frame init operations, which can cause a dimension
mismatch subsequent spark operations (e.g., in indexing expressions).
Furthermore, we now add respective tests, improve the bandit builtin to
allow size propagation and thus, avoid unnecessary spark jobs, and fix
unnecessary warnings of the parfor cost estimation on list operations.
---
scripts/builtin/bandit.dml | 48 +++++++++++-----------
src/main/java/org/apache/sysds/hops/DataGenOp.java | 2 +-
.../java/org/apache/sysds/hops/OptimizerUtils.java | 20 +++++++++
.../apache/sysds/hops/rewrite/HopRewriteUtils.java | 5 +++
.../parfor/opt/CostEstimatorHops.java | 3 +-
.../instructions/spark/RandSPInstruction.java | 3 +-
.../test/functions/frame/FrameConstructorTest.java | 18 +++++++-
.../functions/frame/FrameConstructorTest.dml | 4 ++
8 files changed, 74 insertions(+), 29 deletions(-)
diff --git a/scripts/builtin/bandit.dml b/scripts/builtin/bandit.dml
index ec93e7e..bcf7f94 100644
--- a/scripts/builtin/bandit.dml
+++ b/scripts/builtin/bandit.dml
@@ -63,11 +63,11 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, List[Unknown
# save the original configuration as a lookup table
lookup = configurations
- if(verbose)
+ if(verbose)
print("n "+ n +"\nR "+ R +"\ns_max "+ s_max +"\nB "+ B +"\nn "+ n +"\nr "+ r)
for( i in 0:s) {
- # successive halving
+ # successive halving
n_i = min(max(as.integer(floor(n * eta^(-i))), 1), nrow(configurations));
r_i = as.integer(floor(r * eta^i));
@@ -77,7 +77,7 @@ m_bandit = function(Matrix[Double] X_train, Matrix[Double] Y_train, List[Unknown
print("iteration ---------------------"+i+" out of "+s)
}
- configurations = configurations[1:n_i, ]
+ configurations = configurations[1:n_i, ]
[outPip,outHp, feaFrameOuter] = run_with_hyperparam(configurations, r_i, X_train, Y_train, metaList,
targetList, param, feaFrameOuter, verbose)
# sort the pipelines by order of accuracy decreasing
@@ -149,13 +149,13 @@ get_physical_configurations = function(Frame[String] logical, Scalar[int] numCon
else if(as.scalar(logical[1,j]) == "MVI")
operator[, j] = mvi;
else if(as.scalar(logical[1,j]) == "NR")
- operator[, j] = noise;
+ operator[, j] = noise;
else if(as.scalar(logical[1,j]) == "CI")
operator[, j] = ci;
else if(as.scalar(logical[1,j]) == "DIM")
operator[, j] = dim;
else if(as.scalar(logical[1,j]) == "DUMMY")
- operator[, j] = dummy;
+ operator[, j] = dummy;
else if(as.scalar(logical[1,j]) == "SCALE")
operator[, j] = scale;
else stop("invalid operation "+as.scalar(logical[1,j]))
@@ -223,7 +223,7 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
feaFrame = frame("", rows = no_of_res, cols = ncol(featureFrameOuter))
pip_toString = pipToString(ph_pip[i])
for(r in 1:no_of_res)
- {
+ {
# as the matrix first block of r rows belongs to first operator and r+1 block of rows to second operator
# we need to extract a row from each block
indexes = matrix(no_of_res, rows=ncol(ph_pip), cols=1)
@@ -244,7 +244,7 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
output_pipelines[index, ] = cbind(as.matrix(index), id[i,1])
X = clone_X
Y = clone_Y
- index = index + 1
+ index = index + 1
if(ncol(featureFrameOuter) > 1) {
feaFrame[r, 1:ncol(feaVec)] = as.frame(feaVec)
@@ -257,7 +257,7 @@ run_with_hyperparam = function(Frame[Unknown] ph_pip, Integer r_i, Matrix[Double
X = clone_X
Y = clone_Y
- if(ncol(featureFrameOuter) > 1)
+ if(ncol(featureFrameOuter) > 1)
featureFrameOuter = rbind(featureFrameOuter, feaFrame)
}
output_hyperparam = removeEmpty(target=cbind(output_accuracy, output_hp), margin="rows")
@@ -280,11 +280,11 @@ getHyperparam = function(Frame[Unknown] pipeline, Frame[Unknown] hpList, Intege
for(k in 1:ncol(pipeline))
{
op = as.scalar(pipeline[1,k])
- hasParam = map(hpList[,1], "x->x.split(\",\")[0].equals(\""+op+"\")")
+ hasParam = map(hpList[,1], "x->x.split(\",\")[0].equals(\""+op+"\")")
# convert the boolean vector to 0/1 matrix representation
m_hasParam = hasParam == frame("true", rows=nrow(hasParam), cols=1)
m_hasParam = as.matrix(m_hasParam)
- # compute the relevant index
+ # compute the relevant index
index = m_hasParam * seq(1, nrow(m_hasParam))
index = as.scalar(removeEmpty(target = index, margin = "rows"))
indexes[k] = index
@@ -322,24 +322,24 @@ getHyperparam = function(Frame[Unknown] pipeline, Frame[Unknown] hpList, Intege
maxVal = as.scalar(hpList[index, paramValIndex + 1])
if(type == "FP") {
val = rand(rows=no_of_res, cols=1, min=minVal,max=maxVal, pdf="uniform");
- OpParam[, j] = val;
+ OpParam[, j] = val;
}
else if(type == "INT") {
val = sample(maxVal, no_of_res, TRUE);
less_than_min = val < minVal;
val = (less_than_min * minVal) + val;
- OpParam[, j] = val;
+ OpParam[, j] = val;
}
else if(type == "BOOL") {
if(maxVal == 1) {
s = sample(2, no_of_res, TRUE);
b = s - 1;
- OpParam[, j] = b;
+ OpParam[, j] = b;
}
- else
+ else
OpParam[, j] = matrix(0, rows=no_of_res, cols=1)
}
- else
+ else
print("invalid data type") # TODO handle string set something like {,,}
paramIdx = paramIdx + 2
@@ -391,11 +391,11 @@ extractTopK = function(Frame[Unknown] pipeline, Matrix[Double] hyperparam,
# for each duplicate record just take the one reocrd and strip the others
deduplicates = indexes %*% forDedup
- # combine the deduplicated tuples and unique tuples again
+ # combine the deduplicated tuples and unique tuples again
forDedup = rbind(uniqueTuples, deduplicates)
}
- # decode the pipelines
+ # decode the pipelines
decoded = transformdecode(target=forDedup[, 1:ncol(pipeline)], meta=dM, spec=jspecDC)
# separate the pipelines and hyper-parameters
@@ -425,14 +425,12 @@ extractBracketWinners = function(Matrix[Double] pipeline, Matrix[Double] hyperpa
# bestPipeline = frameSort(bestPipeline)
hyperparam = order(target = hyperparam, by = 1, decreasing=TRUE, index.return=FALSE)
pipeline = order(target = pipeline, by = 1, decreasing=TRUE, index.return=FALSE)
-
- rowIndex = ifelse(nrow(pipeline) > k, k, nrow(pipeline))
+ rowIndex = min(k, nrow(pipeline))
pipeline = pipeline[1:rowIndex,]
bestHyperparams = hyperparam[1:rowIndex,]
bestPipeline = frame(data="|", rows=nrow(pipeline), cols=ncol(conf)-1)
- for(i in 1: nrow(pipeline), check=0)
- {
+ for(i in 1: nrow(pipeline)) {
index = as.scalar(pipeline[i, 3])
bestPipeline[i, 1:ncol(bestPipeline)] = conf[index, 2:ncol(conf)]
}
@@ -463,7 +461,7 @@ fclassify = function(Matrix[Double] X, Matrix[Double] Y, Matrix[Double] mask, Ma
print("Y contains only one class")
accuracy = as.double(0)
}
- else {
+ else {
print("STARTING "+cv+" CROSS VALIDATIONS")
# do the k = 3 cross validations
[accuracyMatrix, T] = crossV(X, Y, cv, mask, MLhp, isWeighted)
@@ -556,8 +554,8 @@ return (Matrix[Double] features)
features[1, 7] = sum(mask == 0) # number of numerical features
features[1, 8] = mean(num) # mean value
colSd = colSds(num)
- count3sdplus = sum(num > (colMeans(num) + 3*colSd ))
- count3sdminus = sum(num < (colMeans(num) - 3*colSd ))
+ count3sdplus = sum(num > (colMeans(num) + 3*colSd ))
+ count3sdminus = sum(num < (colMeans(num) - 3*colSd ))
outliers = count3sdplus + count3sdminus
features[1, 9] = outliers
# OHE features
@@ -585,7 +583,7 @@ return (Matrix[Double] features)
######################################################################
# # Function for cross validation using hold out method
-# # Inputs: The input dataset X, Y and the value of k validation, mask of the
+# # Inputs: The input dataset X, Y and the value of k validation, mask of the
# # dataset for OHE of categorical columns, vector of ML hyper-parameters identified
# # via grid-search and a boolean value of (un)weighted accuracy.
# # Output: It return a matrix having the accuracy of each fold.
diff --git a/src/main/java/org/apache/sysds/hops/DataGenOp.java b/src/main/java/org/apache/sysds/hops/DataGenOp.java
index 32b17b9..20bd9d7 100644
--- a/src/main/java/org/apache/sysds/hops/DataGenOp.java
+++ b/src/main/java/org/apache/sysds/hops/DataGenOp.java
@@ -332,7 +332,7 @@ public class DataGenOp extends MultiThreadedHop
Hop input2;
Hop input3;
- if ( _op == OpOpDG.RAND || _op == OpOpDG.SINIT )
+ if ( _op == OpOpDG.RAND || _op == OpOpDG.SINIT || _op == OpOpDG.FRAMEINIT )
{
if (_dataType != DataType.TENSOR) {
input1 = getInput().get(_paramIndexMap.get(DataExpression.RAND_ROWS)); //rows
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index edf8dfc..0f7691a 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -26,6 +26,7 @@ import org.apache.sysds.common.Types.ExecMode;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.OpOp1;
import org.apache.sysds.common.Types.OpOp2;
+import org.apache.sysds.common.Types.OpOp3;
import org.apache.sysds.common.Types.OpOpData;
import org.apache.sysds.common.Types.ReOrgOp;
import org.apache.sysds.common.Types.ValueType;
@@ -1310,6 +1311,8 @@ public class OptimizerUtils
ret = rEvalSimpleUnaryDoubleExpression(root, valMemo);
else if( root instanceof BinaryOp )
ret = rEvalSimpleBinaryDoubleExpression(root, valMemo);
+ else if( root instanceof TernaryOp )
+ ret = rEvalSimpleTernaryDoubleExpression(root, valMemo);
}
valMemo.put(root.getHopID(), ret);
@@ -1455,6 +1458,23 @@ public class OptimizerUtils
return ret;
}
+ protected static double rEvalSimpleTernaryDoubleExpression( Hop root, HashMap<Long, Double> valMemo ) {
+ //memoization (prevent redundant computation of common subexpr)
+ if( valMemo.containsKey(root.getHopID()) )
+ return valMemo.get(root.getHopID());
+
+ double ret = Double.MAX_VALUE;
+ TernaryOp troot = (TernaryOp) root;
+ if( troot.getOp()==OpOp3.IFELSE ) {
+ if( HopRewriteUtils.isLiteralOfValue(troot.getInput(0), true) )
+ ret = rEvalSimpleDoubleExpression(troot.getInput().get(1), valMemo);
+ else if( HopRewriteUtils.isLiteralOfValue(troot.getInput(0), false) )
+ ret = rEvalSimpleDoubleExpression(troot.getInput().get(2), valMemo);
+ }
+ valMemo.put(root.getHopID(), ret);
+ return ret;
+ }
+
protected static double rEvalSimpleBinaryDoubleExpression( Hop root, HashMap<Long, Double> valMemo, LocalVariableMap vars )
{
//memoization (prevent redundant computation of common subexpr)
diff --git a/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java b/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java
index 0b00ffd..2c2b018 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java
@@ -1590,6 +1590,11 @@ public class HopRewriteUtils
return Arrays.stream(mc).allMatch(h -> h.nnzKnown() || (worstcase && h.dimsKnown()));
}
+ public static boolean hasListInputs(Hop hop) {
+ return hop.getInput()!= null
+ && hop.getInput().stream().anyMatch(h -> h.getDataType().isList());
+ }
+
public static boolean containsSecondOrderBuiltin(ArrayList<Hop> roots) {
Hop.resetVisitStatus(roots);
return roots.stream().anyMatch(r -> containsSecondOrderBuiltin(r));
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/CostEstimatorHops.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/CostEstimatorHops.java
index 2d68ef2..6dd194d 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/CostEstimatorHops.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/CostEstimatorHops.java
@@ -24,6 +24,7 @@ import org.apache.sysds.common.Types.ExecMode;
import org.apache.sysds.hops.Hop;
import org.apache.sysds.hops.LeftIndexingOp;
import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.hops.rewrite.HopRewriteUtils;
import org.apache.sysds.lops.LopProperties.ExecType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.parfor.opt.OptNode.NodeType;
@@ -69,7 +70,7 @@ public class CostEstimatorHops extends CostEstimator
}
//check for invalid cp memory estimate
else if ( h.getExecType()==ExecType.CP && value >= OptimizerUtils.getLocalMemBudget() ) {
- if( !forcedExec )
+ if( !forcedExec && !HopRewriteUtils.hasListInputs(h) )
LOG.warn("Memory estimate larger than budget but CP exec type (op="+h.getOpString()+", name="+h.getName()+", memest="+h.getMemEstimate()+").");
value = DEFAULT_MEM_REMOTE;
}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
index cfd0087..91511d4 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
@@ -405,8 +405,9 @@ public class RandSPInstruction extends UnarySPInstruction {
JavaPairRDD<Long, FrameBlock> out = seedsRDD
.mapToPair(new GenerateRandomFrameBlock(lrows, lcols, brlen, vt, data));
- //step 5: output handling
+ //step 5: output handling, incl meta data
sec.setRDDHandleForVariable(output.getName(), out);
+ sec.getDataCharacteristics(output.getName()).set(tmp);
}
private void generateRandData(SparkExecutionContext sec) {
diff --git a/src/test/java/org/apache/sysds/test/functions/frame/FrameConstructorTest.java b/src/test/java/org/apache/sysds/test/functions/frame/FrameConstructorTest.java
index a4a87c8..d2b70b3 100644
--- a/src/test/java/org/apache/sysds/test/functions/frame/FrameConstructorTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/frame/FrameConstructorTest.java
@@ -28,6 +28,7 @@ import org.apache.sysds.test.TestConfiguration;
import org.junit.Test;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.util.UtilFunctions;
import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestUtils;
@@ -53,7 +54,8 @@ public class FrameConstructorTest extends AutomatedTestBase {
NO_SCHEMA,
RANDOM_DATA,
SINGLE_DATA,
- MULTI_ROW_DATA
+ MULTI_ROW_DATA,
+ UNKNOWN_DIMS,
}
@Override
@@ -124,7 +126,19 @@ public class FrameConstructorTest extends AutomatedTestBase {
FrameBlock exp = createExpectedFrame(schemaStrings1, 5,"multi-row");
runFrameTest(TestType.MULTI_ROW_DATA, exp, Types.ExecMode.SPARK);
}
+
+ @Test
+ public void testUnknownDims() {
+ FrameBlock exp = createExpectedFrame(schemaStrings1, rows,"constant");
+ runFrameTest(TestType.UNKNOWN_DIMS, exp, Types.ExecMode.SINGLE_NODE);
+ }
+ @Test
+ public void testUnknownDimsSP() {
+ FrameBlock exp = createExpectedFrame(schemaStrings1, rows, "constant");
+ runFrameTest(TestType.UNKNOWN_DIMS, exp, Types.ExecMode.SPARK);
+ }
+
private void runFrameTest(TestType type, FrameBlock expectedOutput, Types.ExecMode et) {
Types.ExecMode platformOld = setExecMode(et);
boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
@@ -144,6 +158,8 @@ public class FrameConstructorTest extends AutomatedTestBase {
String[][] R1 = DataConverter.convertToStringFrame(expectedOutput);
String[][] R2 = DataConverter.convertToStringFrame(fB);
TestUtils.compareFrames(R1, R2, R1.length, R1[0].length);
+ int nrow = type == TestType.MULTI_ROW_DATA ? 5 : 40;
+ checkDMLMetaDataFile("F2", new MatrixCharacteristics(nrow, cols));
}
catch(Exception ex) {
throw new RuntimeException(ex);
diff --git a/src/test/scripts/functions/frame/FrameConstructorTest.dml b/src/test/scripts/functions/frame/FrameConstructorTest.dml
index 53196a6..5a02b74 100644
--- a/src/test/scripts/functions/frame/FrameConstructorTest.dml
+++ b/src/test/scripts/functions/frame/FrameConstructorTest.dml
@@ -31,6 +31,10 @@ if($1 == "SINGLE_DATA")
if($1 == "MULTI_ROW_DATA")
f1 = frame(data=["1", "abc", "2.5", "TRUE", "1", "abc", "2.5", "TRUE", "1", "abc", "2.5", "TRUE", "1", "abc", "2.5", "TRUE",
"1", "abc", "2.5", "TRUE" ], rows=5, cols=4, schema=["INT64", "STRING", "FP64", "BOOLEAN"]) # initialization by row
+if($1 == "UNKNOWN_DIMS") {
+ nrow = ifelse(sum(rand(rows=1e4,cols=2,seed=1))>1e2, 40, 37)
+ f1 = frame(1, rows=nrow, cols=4, schema=["INT64", "STRING", "FP64", "BOOLEAN"])
+}
# f1 = frame(1, 4, 3) # unnamed parameters not working
write(f1, $2, format="csv")