You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/08/12 15:45:19 UTC
[systemds] branch master updated: [SYSTEMDS-3089] Fix parfor
function calls in unoptimized eval scope
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new feba13d [SYSTEMDS-3089] Fix parfor function calls in unoptimized eval scope
feba13d is described below
commit feba13de3012c6b24ed9d9ac6bf8ba33f7a9f3f5
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Thu Aug 12 17:44:55 2021 +0200
[SYSTEMDS-3089] Fix parfor function calls in unoptimized eval scope
Local parallel for loops (parfor) use per worker copies of functions in
order to allow for uncontented recompilation. When a function is called
through eval, unoptimized functions are called that have not been
optimized according to IPA parameter propagation to ensure result
correctness.
This patch addresses issues with non-existing unoptimized functions for
the specific case of calling a complex function through eval which
internally has a parfor loop with normal function calls.
---
.../apache/sysds/parser/FunctionDictionary.java | 15 +++++
.../sysds/runtime/util/ProgramConverter.java | 42 ++++++-------
.../test/functions/misc/FunctionPotpourriTest.java | 6 ++
.../misc/FunPotpourriParforEvalBuiltin.dml | 73 ++++++++++++++++++++++
4 files changed, 114 insertions(+), 22 deletions(-)
diff --git a/src/main/java/org/apache/sysds/parser/FunctionDictionary.java b/src/main/java/org/apache/sysds/parser/FunctionDictionary.java
index 7eabc00..32e2098 100644
--- a/src/main/java/org/apache/sysds/parser/FunctionDictionary.java
+++ b/src/main/java/org/apache/sysds/parser/FunctionDictionary.java
@@ -116,4 +116,19 @@ public class FunctionDictionary<T extends FunctionBlock> {
if( !_funsOrig.containsKey(e.getKey()) )
_funsOrig.put(e.getKey(), e.getValue());
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("Function Dictionary:");
+ sb.append("----------------------------------------");
+ int pos = 0;
+ for( Entry<String, T> e : _funs.entrySet() ) {
+ sb.append("-- [");
+ sb.append(pos++);
+ sb.append("]: ");
+ sb.append(e.getKey());
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
}
diff --git a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
index e3213f4..f1adc3a 100644
--- a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
+++ b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
@@ -372,7 +372,9 @@ public class ProgramConverter
public static void createDeepCopyFunctionProgramBlock(String namespace, String oldName, long pid, int IDPrefix, Program prog, Set<String> fnStack, Set<String> fnCreated, boolean plain)
{
//fpb guaranteed to be non-null (checked inside getFunctionProgramBlock)
- FunctionProgramBlock fpb = prog.getFunctionProgramBlock(namespace, oldName);
+ FunctionProgramBlock fpb1 = prog.getFunctionProgramBlock(namespace, oldName, true);
+ FunctionProgramBlock fpb2 = prog.containsFunctionProgramBlock(namespace, oldName, false) ?
+ prog.getFunctionProgramBlock(namespace, oldName, false) : null;
String fnameNew = (plain)? oldName :(oldName+Lop.CP_CHILD_THREAD+pid);
String fnameNewKey = DMLProgram.constructFunctionKey(namespace,fnameNew);
@@ -380,37 +382,35 @@ public class ProgramConverter
return; //prevent redundant deep copy if already existent
//create deep copy
- FunctionProgramBlock copy = null;
- ArrayList<DataIdentifier> tmp1 = new ArrayList<>();
- ArrayList<DataIdentifier> tmp2 = new ArrayList<>();
- if( fpb.getInputParams()!= null )
- tmp1.addAll(fpb.getInputParams());
- if( fpb.getOutputParams()!= null )
- tmp2.addAll(fpb.getOutputParams());
-
-
+ FunctionProgramBlock copy1 = null;
if( !fnStack.contains(fnameNewKey) ) {
fnStack.add(fnameNewKey);
- copy = new FunctionProgramBlock(prog, tmp1, tmp2);
- copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, fpb.isRecompileOnce()) );
- copy.setRecompileOnce( fpb.isRecompileOnce() );
- copy.setThreadID(pid);
+ copy1 = createDeepCopyFunctionProgramBlock(fpb1, fnStack, fnCreated, pid, IDPrefix, plain);
fnStack.remove(fnameNewKey);
}
else //stop deep copy for recursive function calls
- copy = fpb;
+ copy1 = fpb1;
//copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning
//note: instructions not used by function program block
//put if not existing (recursive processing might have added it)
if( !prog.getFunctionProgramBlocks().containsKey(fnameNewKey) ) {
- prog.addFunctionProgramBlock(namespace, fnameNew, copy);
+ prog.addFunctionProgramBlock(namespace, fnameNew, copy1, true);
+ if( fpb2 != null ) {
+ FunctionProgramBlock copy2 = createDeepCopyFunctionProgramBlock(
+ fpb2, fnStack, fnCreated, pid, IDPrefix, plain);
+ prog.addFunctionProgramBlock(namespace, fnameNew, copy2, false);
+ }
fnCreated.add(DMLProgram.constructFunctionKey(namespace, fnameNew));
}
}
- public static FunctionProgramBlock createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, Set<String> fnStack, Set<String> fnCreated)
+ public static FunctionProgramBlock createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, Set<String> fnStack, Set<String> fnCreated) {
+ return createDeepCopyFunctionProgramBlock(fpb, fnStack, fnCreated, 0, -1, true);
+ }
+
+ public static FunctionProgramBlock createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, Set<String> fnStack, Set<String> fnCreated, long pid, int IDPrefix, boolean plain)
{
if( fpb == null )
throw new DMLRuntimeException("Unable to create a deep copy of a non-existing FunctionProgramBlock.");
@@ -425,15 +425,13 @@ public class ProgramConverter
tmp2.addAll(fpb.getOutputParams());
copy = new FunctionProgramBlock(fpb.getProgram(), tmp1, tmp2);
- copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), 0, -1, fnStack, fnCreated, true, fpb.isRecompileOnce()) );
+ copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, fpb.isRecompileOnce()) );
copy.setStatementBlock( fpb.getStatementBlock() );
copy.setRecompileOnce(fpb.isRecompileOnce());
- //copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning
- //note: instructions not used by function program block
-
+ copy.setThreadID(pid);
+
return copy;
}
-
/**
* Creates a deep copy of an array of instructions and replaces the placeholders of parworker
diff --git a/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java b/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
index f0e230f..214730b 100644
--- a/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
@@ -57,6 +57,7 @@ public class FunctionPotpourriTest extends AutomatedTestBase
"FunPotpourriEvalList2Arg",
"FunPotpourriEvalNamespace",
"FunPotpourriBuiltinPrecedence",
+ "FunPotpourriParforEvalBuiltin",
};
private final static String TEST_DIR = "functions/misc/";
@@ -199,6 +200,11 @@ public class FunctionPotpourriTest extends AutomatedTestBase
runFunctionTest( TEST_NAMES[25], null );
}
+ @Test
+ public void testFunctionParforEvalBuiltin() {
+ runFunctionTest( TEST_NAMES[26], null );
+ }
+
private void runFunctionTest(String testName, Class<?> error) {
TestConfiguration config = getTestConfiguration(testName);
loadTestConfiguration(config);
diff --git a/src/test/scripts/functions/misc/FunPotpourriParforEvalBuiltin.dml b/src/test/scripts/functions/misc/FunPotpourriParforEvalBuiltin.dml
new file mode 100644
index 0000000..97be37e
--- /dev/null
+++ b/src/test/scripts/functions/misc/FunPotpourriParforEvalBuiltin.dml
@@ -0,0 +1,73 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+crossV = function(Matrix[double] X, Matrix[double] y, Integer k, Matrix[Double] MLhp, Boolean isWeighted)
+ return (Matrix[Double] accuracyMatrix)
+{
+ accuracyMatrix = matrix(0, k, 1)
+ dataList = list()
+ testL = list()
+ data = order(target = cbind(y, X), by = 1, decreasing=FALSE, index.return=FALSE)
+ classes = table(data[, 1], 1)
+ ins_per_fold = classes/k
+ start_fold = matrix(1, rows=nrow(ins_per_fold), cols=1)
+ fold_idxes = cbind(start_fold, ins_per_fold)
+
+ start_i = 0; end_i = 0; idx_fold = 1;
+ for(i in 1:k) {
+ fold_i = matrix(0, 0, ncol(data))
+ start=0; end=0;
+ for(j in 1:nrow(classes)) {
+ idx = as.scalar(classes[j, 1])
+ start = end + 1;
+ end = end + idx
+ class_j = data[start:end, ]
+ start_i = as.scalar(fold_idxes[j, 1]);
+ end_i = as.scalar(fold_idxes[j, 2])
+ fold_i = rbind(fold_i, class_j[start_i:end_i, ])
+ }
+ dataList = append(dataList, fold_i)
+ fold_idxes[, 1] = fold_idxes[, 2] + 1
+ fold_idxes[, 2] += ins_per_fold
+ }
+
+ parfor(i in seq(1,k)) {
+ [trainList, hold_out] = remove(dataList, i)
+ trainset = rbind(trainList)
+ testset = as.matrix(hold_out)
+ trainX = trainset[, 2:ncol(trainset)]
+ trainy = trainset[, 1]
+ testX = testset[, 2:ncol(testset)]
+ testy = testset[, 1]
+ beta = multiLogReg(X=trainX, Y=trainy, icpt=as.scalar(MLhp[1,1]), reg=as.scalar(MLhp[1,2]), tol=as.scalar(MLhp[1,3]),
+ maxi=as.scalar(MLhp[1,4]), maxii=50, verbose=FALSE);
+ [prob, yhat, acc] = multiLogRegPredict(testX, beta, testy, FALSE)
+ accuracy = getAccuracy(testy, yhat, isWeighted)
+ accuracyMatrix[i] = accuracy
+ }
+}
+
+X = rand(rows=100, cols=100)
+Y = sample(2, 100, TRUE)
+hp = matrix("1 1e-4 1e-6 100", rows=1, cols=4)
+
+acc = eval("crossV", list(X=X, y=Y, k=3, MLhp=hp, isWeighted=FALSE))
+print("CV accuracy: "+mean(acc))