You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/25 19:50:51 UTC

[3/4] incubator-systemml git commit: [SYSTEMML-1350] Avoid unnecessary RDD export on parfor spark dpesp

[SYSTEMML-1350] Avoid unnecessary RDD export on parfor spark dpesp

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b028e6ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b028e6ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b028e6ce

Branch: refs/heads/master
Commit: b028e6cee12d8cc9bb4e1728fffc852cef7282c1
Parents: e82de90
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Feb 24 19:03:56 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Feb 25 11:51:05 2017 -0800

----------------------------------------------------------------------
 .../controlprogram/ParForProgramBlock.java      | 33 +++++++++-----------
 1 file changed, 15 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b028e6ce/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index af3a0d1..4cdfa7b 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -23,6 +23,7 @@ import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1063,8 +1064,8 @@ public class ParForProgramBlock extends ForProgramBlock
 		if( _monitor )
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
 		
-		//write matrices to HDFS 
-		exportMatricesToHDFS(ec);
+		//write matrices to HDFS, except DP matrix which is the input to the RemoteDPParForSpark job
+		exportMatricesToHDFS(ec, _colocatedDPMatrix);
 				
 		// Step 4) submit MR job (wait for finished work)
 		OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PDataPartitionFormat.COLUMN_WISE)||
@@ -1258,37 +1259,33 @@ public class ParForProgramBlock extends ForProgramBlock
 			}
 	}
 
-	private void exportMatricesToHDFS( ExecutionContext ec ) 
+	private void exportMatricesToHDFS(ExecutionContext ec, String... blacklistNames) 
 		throws CacheException 
 	{
 		ParForStatementBlock sb = (ParForStatementBlock)getStatementBlock();
+		HashSet<String> blacklist = new HashSet<String>(Arrays.asList(blacklistNames));
 		
 		if( LIVEVAR_AWARE_EXPORT && sb != null)
 		{
 			//optimization to prevent unnecessary export of matrices
 			//export only variables that are read in the body
 			VariableSet varsRead = sb.variablesRead();
-			for (String key : ec.getVariables().keySet() ) 
-			{
-				Data d = ec.getVariable(key);
-				if (    d.getDataType() == DataType.MATRIX
-					 && varsRead.containsVariable(key)  )
-				{
-					MatrixObject mo = (MatrixObject)d;
-					mo.exportData( _replicationExport );
+			for (String key : ec.getVariables().keySet() ) {
+				if( varsRead.containsVariable(key) && !blacklist.contains(key) ) {
+					Data d = ec.getVariable(key);
+					if( d.getDataType() == DataType.MATRIX )
+						((MatrixObject)d).exportData(_replicationExport);
 				}
 			}
 		}
 		else
 		{
 			//export all matrices in symbol table
-			for (String key : ec.getVariables().keySet() ) 
-			{
-				Data d = ec.getVariable(key);
-				if ( d.getDataType() == DataType.MATRIX )
-				{
-					MatrixObject mo = (MatrixObject)d;
-					mo.exportData( _replicationExport );
+			for (String key : ec.getVariables().keySet() ) {
+				if( !blacklist.contains(key) ) {
+					Data d = ec.getVariable(key);
+					if( d.getDataType() == DataType.MATRIX )
+						((MatrixObject)d).exportData(_replicationExport);
 				}
 			}
 		}