You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/01/11 18:36:17 UTC

incubator-systemml git commit: [SYSTEMML-261] Fix memory/error handling parallel parfor result merge

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 572752a9e -> a13ba188b


[SYSTEMML-261] Fix memory/error handling parallel parfor result merge

Parallel parfor result merge runs min(vcores,numvars) concurrent MR
jobs. In local mode, this led to out-of-memory errors because all jobs
are run in the same jvm and each job allocates
mapreduce.task.io.sort.mb=100, which causes large memory requirements on
platforms with many cores. This fix introduces an additional memory
constraint on selecting the degree of parallelism for parallel result
merge. Furthermore, we now also explicitly check for errors during
result merge. Finally, this allows us to enable the (currently disabled)
parallel result merge tests with many variables again.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/a13ba188
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/a13ba188
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/a13ba188

Branch: refs/heads/master
Commit: a13ba188b7d9bafd9165fe1efbe170a0c980165d
Parents: 572752a
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sun Jan 10 19:25:38 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sun Jan 10 19:25:38 2016 -0800

----------------------------------------------------------------------
 .../controlprogram/ParForProgramBlock.java      | 23 ++++++++++++++++----
 .../ParForParallelRemoteResultMergeTest.java    |  6 +----
 2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a13ba188/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 39ce856..83fcc0f 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -1738,6 +1738,11 @@ public class ParForProgramBlock extends ForProgramBlock
 			//execute result merge in parallel for all result vars
 			int par = Math.min( _resultVars.size(), 
 					            InfrastructureAnalyzer.getLocalParallelism() );
+			if( InfrastructureAnalyzer.isLocalMode() ) {
+				int parmem = (int)Math.floor(OptimizerUtils.getLocalMemBudget() / 
+						InfrastructureAnalyzer.getRemoteMaxMemorySortBuffer());
+				par = Math.min(par, Math.max(parmem, 1)); //reduce k if necessary
+			}
 			
 			try
 			{
@@ -1749,13 +1754,16 @@ public class ParForProgramBlock extends ForProgramBlock
 				q.closeInput();
 				
 				//run result merge workers
-				Thread[] rmWorkers = new Thread[par];
+				ResultMergeWorker[] rmWorkers = new ResultMergeWorker[par];
 				for( int i=0; i<par; i++ )
-					rmWorkers[i] = new Thread(new ResultMergeWorker(q, results, ec));
+					rmWorkers[i] = new ResultMergeWorker(q, results, ec);
 				for( int i=0; i<par; i++ ) //start all
 					rmWorkers[i].start();
-				for( int i=0; i<par; i++ ) //wait for all
+				for( int i=0; i<par; i++ ) { //wait for all
 					rmWorkers[i].join();
+					if( !rmWorkers[i].finishedNoError() )
+						throw new DMLRuntimeException("Error occured in parallel result merge worker.");
+				}
 			}
 			catch(Exception ex)
 			{
@@ -2064,11 +2072,12 @@ public class ParForProgramBlock extends ForProgramBlock
 	/**
 	 * Helper class for parallel invocation of REMOTE_MR result merge for multiple variables.
 	 */
-	private class ResultMergeWorker implements Runnable
+	private class ResultMergeWorker extends Thread
 	{
 		private LocalTaskQueue<String> _q = null;
 		private LocalVariableMap[] _refVars = null;
 		private ExecutionContext _ec = null;
+		private boolean _success = false;
 		
 		public ResultMergeWorker( LocalTaskQueue<String> q, LocalVariableMap[] results, ExecutionContext ec )
 		{
@@ -2077,6 +2086,10 @@ public class ParForProgramBlock extends ForProgramBlock
 			_ec = ec;
 		}
 		
+		public boolean finishedNoError() {
+			return _success;
+		}
+		
 		@Override
 		public void run() 
 		{
@@ -2112,6 +2125,8 @@ public class ParForProgramBlock extends ForProgramBlock
 					//cleanup of intermediate result variables
 					cleanWorkerResultVariables( _ec, out, in );
 				}
+				
+				_success = true;
 			}
 			catch(Exception ex)
 			{

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a13ba188/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForParallelRemoteResultMergeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForParallelRemoteResultMergeTest.java b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForParallelRemoteResultMergeTest.java
index e263cb3..4d2c3c8 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForParallelRemoteResultMergeTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForParallelRemoteResultMergeTest.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
 
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.Ignore;
 
 import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
 import org.apache.sysml.test.integration.AutomatedTestBase;
@@ -32,8 +31,7 @@ import org.apache.sysml.test.utils.TestUtils;
 import org.apache.sysml.utils.Statistics;
 
 public class ParForParallelRemoteResultMergeTest extends AutomatedTestBase 
-{
-	
+{	
 	private final static String TEST_NAME1 = "parfor_pr_resultmerge2";
 	private final static String TEST_NAME2 = "parfor_pr_resultmerge32";
 	private final static String TEST_DIR = "functions/parfor/";
@@ -69,14 +67,12 @@ public class ParForParallelRemoteResultMergeTest extends AutomatedTestBase
 	}
 	
 	@Test
-	@Ignore
 	public void testMultipleResultMergeManyDense() 
 	{
 		runParallelRemoteResultMerge(TEST_NAME2, false);
 	}
 	
 	@Test
-	@Ignore
 	public void testMultipleResultMergeManySparse() 
 	{
 		runParallelRemoteResultMerge(TEST_NAME2, true);