You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2020/07/24 13:36:26 UTC

[systemds] branch master updated: [SYSTEMDS-2576] Fix dop unoptimized functions (parfor-eval/parmserv)

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new b3511d2  [SYSTEMDS-2576] Fix dop unoptimized functions (parfor-eval/parmserv)
b3511d2 is described below

commit b3511d216524a60674daae211806186ffcf371c8
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Fri Jul 24 15:34:50 2020 +0200

    [SYSTEMDS-2576] Fix dop unoptimized functions (parfor-eval/parmserv)
    
    This patch fixes the degree of parallelism of unoptimized functions as
    called in second-order functions such as eval and paramserv to avoid
    excessive CPU over-commitment. For example, on a box with 32 threads,
    paramserv would run 32 local workers and each worker use 32 threads for
    individual operations (1024 threads total).
---
 .../controlprogram/paramserv/ParamservUtils.java   |  7 +++--
 .../parfor/opt/OptTreePlanMappingAbstract.java     |  4 +++
 .../parfor/opt/OptimizerRuleBased.java             | 24 ++++++++++-----
 .../test/functions/misc/FunctionPotpourriTest.java |  7 +++++
 .../misc/FunPotpourriNestedParforEval.dml          | 36 ++++++++++++++++++++++
 5 files changed, 67 insertions(+), 11 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
index 699b72f..968cb1d 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
@@ -218,10 +218,11 @@ public class ParamservUtils {
 	{
 		Program prog = ec.getProgram();
 
-		// 1. Recompile the internal program blocks
+		// 1. Recompile the internal program blocks 
 		recompileProgramBlocks(k, prog.getProgramBlocks());
 		// 2. Recompile the imported function blocks
-		prog.getFunctionProgramBlocks().forEach((fname, fvalue) -> recompileProgramBlocks(k, fvalue.getChildBlocks()));
+		prog.getFunctionProgramBlocks(false)
+			.forEach((fname, fvalue) -> recompileProgramBlocks(k, fvalue.getChildBlocks()));
 
 		// 3. Copy all functions 
 		return ExecutionContextFactory.createContext(
@@ -247,7 +248,7 @@ public class ParamservUtils {
 		return newProg;
 	}
 
-	private static void recompileProgramBlocks(int k, List<ProgramBlock> pbs) {
+	public static void recompileProgramBlocks(int k, List<ProgramBlock> pbs) {
 		// Reset the visit status from root
 		for (ProgramBlock pb : pbs)
 			DMLTranslator.resetHopsDAGVisitStatus(pb.getStatementBlock());
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanMappingAbstract.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanMappingAbstract.java
index eced04f..956ca12 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanMappingAbstract.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreePlanMappingAbstract.java
@@ -84,6 +84,10 @@ public class OptTreePlanMappingAbstract extends OptTreePlanMapping
 		return ret;
 	}
 	
+	public ProgramBlock getMappedProgramBlock(long id) {
+		return (ProgramBlock) _id_rtprog.get(id);
+	}
+	
 	public void replaceMapping( ProgramBlock pb, OptNode n ) {
 		long id = n.getID();
 		_id_rtprog.put(id, pb);
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 7a6933e..63ae8af 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.common.Types.OpOpN;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.AggBinaryOp;
 import org.apache.sysds.hops.AggBinaryOp.MMultMethod;
@@ -66,6 +67,7 @@ import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.controlprogram.paramserv.ParamservUtils;
 import org.apache.sysds.runtime.controlprogram.parfor.ResultMergeLocalFile;
 import org.apache.sysds.runtime.controlprogram.parfor.opt.CostEstimator.ExcludeType;
 import org.apache.sysds.runtime.controlprogram.parfor.opt.CostEstimator.TestMeasure;
@@ -1247,7 +1249,7 @@ public class OptimizerRuleBased extends Optimizer
 					long id = c.getID();
 					c.setK(tmpK);
 					ParForProgramBlock pfpb = (ParForProgramBlock) 
-						OptTreeConverter.getAbstractPlanMapping().getMappedProg(id)[1];
+						OptTreeConverter.getAbstractPlanMapping().getMappedProgramBlock(id);
 					pfpb.setDegreeOfParallelism(tmpK);
 					
 					//distribute remaining parallelism
@@ -1275,6 +1277,13 @@ public class OptimizerRuleBased extends Optimizer
 						mhop.setMaxNumThreads(1); //set max constraint in hop
 						c.setK(1); //set optnode k (for explain)
 					}
+					
+					//if parfor contains eval call, make unoptimized functions single-threaded
+					if( HopRewriteUtils.isNary(h, OpOpN.EVAL) ) {
+						ProgramBlock pb = OptTreeConverter.getAbstractPlanMapping().getMappedProgramBlock(n.getID());
+						pb.getProgram().getFunctionProgramBlocks(false)
+							.forEach((fname, fvalue) -> ParamservUtils.recompileProgramBlocks(1, fvalue.getChildBlocks()));
+					}
 				}
 				else
 					rAssignRemainingParallelism(c, parforK, opsK);
@@ -1284,7 +1293,7 @@ public class OptimizerRuleBased extends Optimizer
 			if( recompileSB ) {
 				try {
 					//guaranteed to be a last-level block (see hop change)
-					ProgramBlock pb = (ProgramBlock) OptTreeConverter.getAbstractPlanMapping().getMappedProg(n.getID())[1];
+					ProgramBlock pb = OptTreeConverter.getAbstractPlanMapping().getMappedProgramBlock(n.getID());
 					Recompiler.recompileProgramBlockInstructions(pb);
 				}
 				catch(Exception ex){
@@ -1356,7 +1365,7 @@ public class OptimizerRuleBased extends Optimizer
 		
 		// modify rtprog
 		ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
-                                     .getAbstractPlanMapping().getMappedProg(id)[1];
+			.getAbstractPlanMapping().getMappedProgramBlock(id);
 		pfpb.setTaskPartitioner(partitioner);
 		
 		// modify plan
@@ -2403,10 +2412,9 @@ public class OptimizerRuleBased extends Optimizer
 	{
 		boolean ret = false;
 		
-		if( n.getNodeType() == NodeType.PARFOR )
-		{
-			ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
-		    						.getAbstractPlanMapping().getMappedProg(n.getID())[1];	
+		if( n.getNodeType() == NodeType.PARFOR ) {
+			ProgramBlock pfpb = OptTreeConverter
+				.getAbstractPlanMapping().getMappedProgramBlock(n.getID());
 			ret = (parfor == pfpb);
 		}
 		
@@ -2425,7 +2433,7 @@ public class OptimizerRuleBased extends Optimizer
 		if( n.getNodeType()==NodeType.PARFOR )
 		{
 			ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
-									.getAbstractPlanMapping().getMappedProg(n.getID())[1];
+				.getAbstractPlanMapping().getMappedProgramBlock(n.getID());
 			pbs.add(pfpb);
 		}
 		
diff --git a/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java b/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
index 9181351..89ec265 100644
--- a/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/misc/FunctionPotpourriTest.java
@@ -48,6 +48,7 @@ public class FunctionPotpourriTest extends AutomatedTestBase
 	private final static String TEST_NAME17 = "FunPotpourriNamedArgsQuotedAssign";
 	private final static String TEST_NAME18 = "FunPotpourriMultiReturnBuiltin1";
 	private final static String TEST_NAME19 = "FunPotpourriMultiReturnBuiltin2";
+	private final static String TEST_NAME20 = "FunPotpourriNestedParforEval";
 	
 	private final static String TEST_DIR = "functions/misc/";
 	private final static String TEST_CLASS_DIR = TEST_DIR + FunctionPotpourriTest.class.getSimpleName() + "/";
@@ -74,6 +75,7 @@ public class FunctionPotpourriTest extends AutomatedTestBase
 		addTestConfiguration( TEST_NAME17, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME17, new String[] { "R" }) );
 		addTestConfiguration( TEST_NAME18, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME18, new String[] { "R" }) );
 		addTestConfiguration( TEST_NAME19, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME19, new String[] { "R" }) );
+		addTestConfiguration( TEST_NAME20, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME20, new String[] { "R" }) );
 	}
 
 	@Test
@@ -181,6 +183,11 @@ public class FunctionPotpourriTest extends AutomatedTestBase
 		runFunctionTest( TEST_NAME19, false );
 	}
 	
+	@Test
+	public void testFunctionNestedParforEval() {
+		runFunctionTest( TEST_NAME20, false );
+	}
+	
 	private void runFunctionTest(String testName, boolean error) {
 		TestConfiguration config = getTestConfiguration(testName);
 		loadTestConfiguration(config);
diff --git a/src/test/scripts/functions/misc/FunPotpourriNestedParforEval.dml b/src/test/scripts/functions/misc/FunPotpourriNestedParforEval.dml
new file mode 100644
index 0000000..4dc37cb
--- /dev/null
+++ b/src/test/scripts/functions/misc/FunPotpourriNestedParforEval.dml
@@ -0,0 +1,36 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+foo1 = function(Matrix[Double] A, Matrix[Double] B) return (Matrix[Double] C) {
+  while(FALSE){} # no inlining
+  C = A %*% B + 7;
+}
+
+X1 = matrix(1.1, 100, 100)
+X2 = matrix(1.2, 100, 100)
+f = "foo1";
+
+R = matrix(0, 100, 100)
+parfor(i in 1:nrow(R) )
+  parfor(j in 1:ncol(R) )
+    R[i,j] = sum(eval(f, list(A=X1, B=X2)));
+
+print("out: " + sum(R))