You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/01/11 18:36:17 UTC
incubator-systemml git commit: [SYSTEMML-261] Fix memory/error
handling parallel parfor result merge
Repository: incubator-systemml
Updated Branches:
refs/heads/master 572752a9e -> a13ba188b
[SYSTEMML-261] Fix memory/error handling parallel parfor result merge
Parallel parfor result merge runs min(vcores,numvars) concurrent MR
jobs. In local mode, this led to out-of-memory errors because all jobs
are run in the same jvm and each job allocates
mapreduce.task.io.sort.mb=100, which causes large memory requirements on
platforms with many cores. This fix introduces an additional memory
constraint on selecting the degree of parallelism for parallel result
merge. Furthermore, we now also explicitly check for errors during
result merge. Finally, this allows us to enable the (currently disabled)
parallel result merge tests with many variables again.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/a13ba188
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/a13ba188
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/a13ba188
Branch: refs/heads/master
Commit: a13ba188b7d9bafd9165fe1efbe170a0c980165d
Parents: 572752a
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sun Jan 10 19:25:38 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sun Jan 10 19:25:38 2016 -0800
----------------------------------------------------------------------
.../controlprogram/ParForProgramBlock.java | 23 ++++++++++++++++----
.../ParForParallelRemoteResultMergeTest.java | 6 +----
2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a13ba188/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 39ce856..83fcc0f 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -1738,6 +1738,11 @@ public class ParForProgramBlock extends ForProgramBlock
//execute result merge in parallel for all result vars
int par = Math.min( _resultVars.size(),
InfrastructureAnalyzer.getLocalParallelism() );
+ if( InfrastructureAnalyzer.isLocalMode() ) {
+ int parmem = (int)Math.floor(OptimizerUtils.getLocalMemBudget() /
+ InfrastructureAnalyzer.getRemoteMaxMemorySortBuffer());
+ par = Math.min(par, Math.max(parmem, 1)); //reduce k if necessary
+ }
try
{
@@ -1749,13 +1754,16 @@ public class ParForProgramBlock extends ForProgramBlock
q.closeInput();
//run result merge workers
- Thread[] rmWorkers = new Thread[par];
+ ResultMergeWorker[] rmWorkers = new ResultMergeWorker[par];
for( int i=0; i<par; i++ )
- rmWorkers[i] = new Thread(new ResultMergeWorker(q, results, ec));
+ rmWorkers[i] = new ResultMergeWorker(q, results, ec);
for( int i=0; i<par; i++ ) //start all
rmWorkers[i].start();
- for( int i=0; i<par; i++ ) //wait for all
+ for( int i=0; i<par; i++ ) { //wait for all
rmWorkers[i].join();
+ if( !rmWorkers[i].finishedNoError() )
+ throw new DMLRuntimeException("Error occured in parallel result merge worker.");
+ }
}
catch(Exception ex)
{
@@ -2064,11 +2072,12 @@ public class ParForProgramBlock extends ForProgramBlock
/**
* Helper class for parallel invocation of REMOTE_MR result merge for multiple variables.
*/
- private class ResultMergeWorker implements Runnable
+ private class ResultMergeWorker extends Thread
{
private LocalTaskQueue<String> _q = null;
private LocalVariableMap[] _refVars = null;
private ExecutionContext _ec = null;
+ private boolean _success = false;
public ResultMergeWorker( LocalTaskQueue<String> q, LocalVariableMap[] results, ExecutionContext ec )
{
@@ -2077,6 +2086,10 @@ public class ParForProgramBlock extends ForProgramBlock
_ec = ec;
}
+ public boolean finishedNoError() {
+ return _success;
+ }
+
@Override
public void run()
{
@@ -2112,6 +2125,8 @@ public class ParForProgramBlock extends ForProgramBlock
//cleanup of intermediate result variables
cleanWorkerResultVariables( _ec, out, in );
}
+
+ _success = true;
}
catch(Exception ex)
{
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a13ba188/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForParallelRemoteResultMergeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForParallelRemoteResultMergeTest.java b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForParallelRemoteResultMergeTest.java
index e263cb3..4d2c3c8 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForParallelRemoteResultMergeTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForParallelRemoteResultMergeTest.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
import org.junit.Assert;
import org.junit.Test;
-import org.junit.Ignore;
import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
import org.apache.sysml.test.integration.AutomatedTestBase;
@@ -32,8 +31,7 @@ import org.apache.sysml.test.utils.TestUtils;
import org.apache.sysml.utils.Statistics;
public class ParForParallelRemoteResultMergeTest extends AutomatedTestBase
-{
-
+{
private final static String TEST_NAME1 = "parfor_pr_resultmerge2";
private final static String TEST_NAME2 = "parfor_pr_resultmerge32";
private final static String TEST_DIR = "functions/parfor/";
@@ -69,14 +67,12 @@ public class ParForParallelRemoteResultMergeTest extends AutomatedTestBase
}
@Test
- @Ignore
public void testMultipleResultMergeManyDense()
{
runParallelRemoteResultMerge(TEST_NAME2, false);
}
@Test
- @Ignore
public void testMultipleResultMergeManySparse()
{
runParallelRemoteResultMerge(TEST_NAME2, true);