You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/08/12 15:45:19 UTC

[systemds] branch master updated: [SYSTEMDS-3089] Fix parfor function calls in unoptimized eval scope

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new feba13d  [SYSTEMDS-3089] Fix parfor function calls in unoptimized eval scope
feba13d is described below

commit feba13de3012c6b24ed9d9ac6bf8ba33f7a9f3f5
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Thu Aug 12 17:44:55 2021 +0200

    [SYSTEMDS-3089] Fix parfor function calls in unoptimized eval scope
    
    Local parallel for loops (parfor) use per worker copies of functions in
    order to allow for uncontented recompilation. When a function is called
    through eval, unoptimized functions are called that have not been
    optimized according to IPA parameter propagation to ensure result
    correctness.
    
    This patch addresses issues with non-existing unoptimized functions for
    the specific case of calling a complex function through eval which
    internally has a parfor loop with normal function calls.
---
 .../apache/sysds/parser/FunctionDictionary.java    | 15 +++++
 .../sysds/runtime/util/ProgramConverter.java       | 42 ++++++-------
 .../test/functions/misc/FunctionPotpourriTest.java |  6 ++
 .../misc/FunPotpourriParforEvalBuiltin.dml         | 73 ++++++++++++++++++++++
 4 files changed, 114 insertions(+), 22 deletions(-)

diff --git a/src/main/java/org/apache/sysds/parser/FunctionDictionary.java b/src/main/java/org/apache/sysds/parser/FunctionDictionary.java
index 7eabc00..32e2098 100644
--- a/src/main/java/org/apache/sysds/parser/FunctionDictionary.java
+++ b/src/main/java/org/apache/sysds/parser/FunctionDictionary.java
@@ -116,4 +116,19 @@ public class FunctionDictionary<T extends FunctionBlock> {
 				if( !_funsOrig.containsKey(e.getKey()) )
 					_funsOrig.put(e.getKey(), e.getValue());
 	}
+	
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder("Function Dictionary:");
+		sb.append("----------------------------------------");
+		int pos = 0;
+		for( Entry<String, T> e : _funs.entrySet() ) {
+			sb.append("-- [");
+			sb.append(pos++);
+			sb.append("]: ");
+			sb.append(e.getKey());
+			sb.append("\n");
+		}
+		return sb.toString();
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
index e3213f4..f1adc3a 100644
--- a/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
+++ b/src/main/java/org/apache/sysds/runtime/util/ProgramConverter.java
@@ -372,7 +372,9 @@ public class ProgramConverter
 	public static void createDeepCopyFunctionProgramBlock(String namespace, String oldName, long pid, int IDPrefix, Program prog, Set<String> fnStack, Set<String> fnCreated, boolean plain) 
 	{
 		//fpb guaranteed to be non-null (checked inside getFunctionProgramBlock)
-		FunctionProgramBlock fpb = prog.getFunctionProgramBlock(namespace, oldName);
+		FunctionProgramBlock fpb1 = prog.getFunctionProgramBlock(namespace, oldName, true);
+		FunctionProgramBlock fpb2 = prog.containsFunctionProgramBlock(namespace, oldName, false) ?
+			prog.getFunctionProgramBlock(namespace, oldName, false) : null;
 		String fnameNew = (plain)? oldName :(oldName+Lop.CP_CHILD_THREAD+pid); 
 		String fnameNewKey = DMLProgram.constructFunctionKey(namespace,fnameNew);
 
@@ -380,37 +382,35 @@ public class ProgramConverter
 			return; //prevent redundant deep copy if already existent
 		
 		//create deep copy
-		FunctionProgramBlock copy = null;
-		ArrayList<DataIdentifier> tmp1 = new ArrayList<>();
-		ArrayList<DataIdentifier> tmp2 = new ArrayList<>();
-		if( fpb.getInputParams()!= null )
-			tmp1.addAll(fpb.getInputParams());
-		if( fpb.getOutputParams()!= null )
-			tmp2.addAll(fpb.getOutputParams());
-		
-		
+		FunctionProgramBlock copy1 = null;
 		if( !fnStack.contains(fnameNewKey) ) {
 			fnStack.add(fnameNewKey);
-			copy = new FunctionProgramBlock(prog, tmp1, tmp2);
-			copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, fpb.isRecompileOnce()) );
-			copy.setRecompileOnce( fpb.isRecompileOnce() );
-			copy.setThreadID(pid);
+			copy1 = createDeepCopyFunctionProgramBlock(fpb1, fnStack, fnCreated, pid, IDPrefix, plain);
 			fnStack.remove(fnameNewKey);
 		}
 		else //stop deep copy for recursive function calls
-			copy = fpb;
+			copy1 = fpb1;
 		
 		//copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning
 		//note: instructions not used by function program block
 		
 		//put if not existing (recursive processing might have added it)
 		if( !prog.getFunctionProgramBlocks().containsKey(fnameNewKey) ) {
-			prog.addFunctionProgramBlock(namespace, fnameNew, copy);
+			prog.addFunctionProgramBlock(namespace, fnameNew, copy1, true);
+			if( fpb2 != null ) {
+				FunctionProgramBlock copy2 = createDeepCopyFunctionProgramBlock(
+					fpb2, fnStack, fnCreated, pid, IDPrefix, plain);
+				prog.addFunctionProgramBlock(namespace, fnameNew, copy2, false);
+			}
 			fnCreated.add(DMLProgram.constructFunctionKey(namespace, fnameNew));
 		}
 	}
 
-	public static FunctionProgramBlock createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, Set<String> fnStack, Set<String> fnCreated) 
+	public static FunctionProgramBlock createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, Set<String> fnStack, Set<String> fnCreated) {
+		return createDeepCopyFunctionProgramBlock(fpb, fnStack, fnCreated, 0, -1, true);
+	}
+	
+	public static FunctionProgramBlock createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, Set<String> fnStack, Set<String> fnCreated, long pid, int IDPrefix, boolean plain) 
 	{
 		if( fpb == null )
 			throw new DMLRuntimeException("Unable to create a deep copy of a non-existing FunctionProgramBlock.");
@@ -425,15 +425,13 @@ public class ProgramConverter
 			tmp2.addAll(fpb.getOutputParams());
 		
 		copy = new FunctionProgramBlock(fpb.getProgram(), tmp1, tmp2);
-		copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), 0, -1, fnStack, fnCreated, true, fpb.isRecompileOnce()) );
+		copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, fpb.isRecompileOnce()) );
 		copy.setStatementBlock( fpb.getStatementBlock() );
 		copy.setRecompileOnce(fpb.isRecompileOnce());
-		//copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning
-		//note: instructions not used by function program block
-	
+		copy.setThreadID(pid);
+		
 		return copy;
 	}
-
 	
 	/**
 	 * Creates a deep copy of an array of instructions and replaces the placeholders of parworker
diff --git a/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java b/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
index f0e230f..214730b 100644
--- a/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
@@ -57,6 +57,7 @@ public class FunctionPotpourriTest extends AutomatedTestBase
 		"FunPotpourriEvalList2Arg",
 		"FunPotpourriEvalNamespace",
 		"FunPotpourriBuiltinPrecedence",
+		"FunPotpourriParforEvalBuiltin",
 	};
 	
 	private final static String TEST_DIR = "functions/misc/";
@@ -199,6 +200,11 @@ public class FunctionPotpourriTest extends AutomatedTestBase
 		runFunctionTest( TEST_NAMES[25], null );
 	}
 	
+	@Test
+	public void testFunctionParforEvalBuiltin() {
+		runFunctionTest( TEST_NAMES[26], null );
+	}
+	
 	private void runFunctionTest(String testName, Class<?> error) {
 		TestConfiguration config = getTestConfiguration(testName);
 		loadTestConfiguration(config);
diff --git a/src/test/scripts/functions/misc/FunPotpourriParforEvalBuiltin.dml b/src/test/scripts/functions/misc/FunPotpourriParforEvalBuiltin.dml
new file mode 100644
index 0000000..97be37e
--- /dev/null
+++ b/src/test/scripts/functions/misc/FunPotpourriParforEvalBuiltin.dml
@@ -0,0 +1,73 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+crossV = function(Matrix[double] X, Matrix[double] y, Integer k, Matrix[Double] MLhp, Boolean isWeighted) 
+  return (Matrix[Double] accuracyMatrix)
+{
+  accuracyMatrix = matrix(0, k, 1)
+  dataList = list()
+  testL = list()
+  data = order(target = cbind(y, X),  by = 1, decreasing=FALSE, index.return=FALSE)
+  classes = table(data[, 1], 1)
+  ins_per_fold = classes/k
+  start_fold = matrix(1, rows=nrow(ins_per_fold), cols=1)
+  fold_idxes = cbind(start_fold, ins_per_fold)
+
+  start_i = 0; end_i = 0; idx_fold = 1;
+  for(i in 1:k) {
+    fold_i = matrix(0, 0, ncol(data))
+    start=0; end=0; 
+    for(j in 1:nrow(classes)) {
+      idx = as.scalar(classes[j, 1])
+      start = end + 1;
+      end = end + idx
+      class_j =  data[start:end, ]
+      start_i = as.scalar(fold_idxes[j, 1]);
+      end_i = as.scalar(fold_idxes[j, 2])
+      fold_i = rbind(fold_i, class_j[start_i:end_i, ])
+    }
+    dataList = append(dataList, fold_i)
+    fold_idxes[, 1] = fold_idxes[, 2] + 1
+    fold_idxes[, 2] += ins_per_fold
+  }
+
+  parfor(i in seq(1,k)) {
+    [trainList, hold_out] = remove(dataList, i)
+    trainset = rbind(trainList)
+    testset = as.matrix(hold_out)
+    trainX = trainset[, 2:ncol(trainset)]
+    trainy = trainset[, 1]
+    testX = testset[, 2:ncol(testset)]
+    testy = testset[, 1]
+    beta = multiLogReg(X=trainX, Y=trainy, icpt=as.scalar(MLhp[1,1]), reg=as.scalar(MLhp[1,2]), tol=as.scalar(MLhp[1,3]), 
+    maxi=as.scalar(MLhp[1,4]), maxii=50, verbose=FALSE);
+    [prob, yhat, acc] = multiLogRegPredict(testX, beta, testy, FALSE)
+    accuracy = getAccuracy(testy, yhat, isWeighted)
+    accuracyMatrix[i] = accuracy
+  }
+}
+
+X = rand(rows=100, cols=100)
+Y = sample(2, 100, TRUE)
+hp = matrix("1 1e-4 1e-6 100", rows=1, cols=4)
+
+acc = eval("crossV", list(X=X, y=Y, k=3, MLhp=hp, isWeighted=FALSE))
+print("CV accuracy: "+mean(acc))