You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/25 19:50:51 UTC
[3/4] incubator-systemml git commit: [SYSTEMML-1350] Avoid
unnecessary RDD export on parfor spark dpesp
[SYSTEMML-1350] Avoid unnecessary RDD export on parfor spark dpesp
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b028e6ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b028e6ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b028e6ce
Branch: refs/heads/master
Commit: b028e6cee12d8cc9bb4e1728fffc852cef7282c1
Parents: e82de90
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Feb 24 19:03:56 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Feb 25 11:51:05 2017 -0800
----------------------------------------------------------------------
.../controlprogram/ParForProgramBlock.java | 33 +++++++++-----------
1 file changed, 15 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b028e6ce/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index af3a0d1..4cdfa7b 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -23,6 +23,7 @@ import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -1063,8 +1064,8 @@ public class ParForProgramBlock extends ForProgramBlock
if( _monitor )
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
- //write matrices to HDFS
- exportMatricesToHDFS(ec);
+ //write matrices to HDFS, except DP matrix which is the input to the RemoteDPParForSpark job
+ exportMatricesToHDFS(ec, _colocatedDPMatrix);
// Step 4) submit MR job (wait for finished work)
OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PDataPartitionFormat.COLUMN_WISE)||
@@ -1258,37 +1259,33 @@ public class ParForProgramBlock extends ForProgramBlock
}
}
- private void exportMatricesToHDFS( ExecutionContext ec )
+ private void exportMatricesToHDFS(ExecutionContext ec, String... blacklistNames)
throws CacheException
{
ParForStatementBlock sb = (ParForStatementBlock)getStatementBlock();
+ HashSet<String> blacklist = new HashSet<String>(Arrays.asList(blacklistNames));
if( LIVEVAR_AWARE_EXPORT && sb != null)
{
//optimization to prevent unnecessary export of matrices
//export only variables that are read in the body
VariableSet varsRead = sb.variablesRead();
- for (String key : ec.getVariables().keySet() )
- {
- Data d = ec.getVariable(key);
- if ( d.getDataType() == DataType.MATRIX
- && varsRead.containsVariable(key) )
- {
- MatrixObject mo = (MatrixObject)d;
- mo.exportData( _replicationExport );
+ for (String key : ec.getVariables().keySet() ) {
+ if( varsRead.containsVariable(key) && !blacklist.contains(key) ) {
+ Data d = ec.getVariable(key);
+ if( d.getDataType() == DataType.MATRIX )
+ ((MatrixObject)d).exportData(_replicationExport);
}
}
}
else
{
//export all matrices in symbol table
- for (String key : ec.getVariables().keySet() )
- {
- Data d = ec.getVariable(key);
- if ( d.getDataType() == DataType.MATRIX )
- {
- MatrixObject mo = (MatrixObject)d;
- mo.exportData( _replicationExport );
+ for (String key : ec.getVariables().keySet() ) {
+ if( !blacklist.contains(key) ) {
+ Data d = ec.getVariable(key);
+ if( d.getDataType() == DataType.MATRIX )
+ ((MatrixObject)d).exportData(_replicationExport);
}
}
}