You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/04/23 00:13:14 UTC

[2/3] systemml git commit: [MINOR] Avoid unnecessary stats maintenance in spark remote parfor

[MINOR] Avoid unnecessary stats maintenance in spark remote parfor

This patch makes a minor performance improvement to spark remote parfor
jobs. In contrast to the MR backend, we don't propagate individual
statistics back to the driver via counters. Therefore, we now disable
statistics maintenance in remote parfor workers in case of spark which
reduces overhead for parfor loops with lots of fine-grained operations
(where stats maintenance can become a bottleneck).


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/d34d6a62
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/d34d6a62
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/d34d6a62

Branch: refs/heads/master
Commit: d34d6a6299b56766f3a92a6cfee258fe8272545a
Parents: e8774a7
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sun Apr 22 16:32:49 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sun Apr 22 16:32:49 2018 -0700

----------------------------------------------------------------------
 .../controlprogram/parfor/ProgramConverter.java     |  8 ++++++--
 .../parfor/RemoteDPParForSparkWorker.java           | 16 +++++-----------
 .../parfor/RemoteParForSparkWorker.java             | 11 +++--------
 3 files changed, 14 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/d34d6a62/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
index 1356634..919b357 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
@@ -1254,8 +1254,11 @@ public class ProgramConverter
 	////////////////////////////////
 	// PARSING 
 	////////////////////////////////
-
 	public static ParForBody parseParForBody( String in, int id ) {
+		return parseParForBody(in, id, false);
+	}
+	
+	public static ParForBody parseParForBody( String in, int id, boolean inSpark ) {
 		ParForBody body = new ParForBody();
 		
 		//header elimination
@@ -1284,7 +1287,8 @@ public class ProgramConverter
 		
 		//handle additional configs
 		String aconfs = st.nextToken();
-		parseAndSetAdditionalConfigurations( aconfs );
+		if( !inSpark )
+			parseAndSetAdditionalConfigurations( aconfs );
 		
 		//handle program
 		String progStr = st.nextToken();

http://git-wip-us.apache.org/repos/asf/systemml/blob/d34d6a62/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index 4ab90ae..885b2b7 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -20,7 +20,6 @@
 package org.apache.sysml.runtime.controlprogram.parfor;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -98,8 +97,6 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 	public Iterator<Tuple2<Long, String>> call(Iterator<Tuple2<Long, Iterable<Writable>>> arg0)
 		throws Exception 
 	{
-		ArrayList<Tuple2<Long,String>> ret = new ArrayList<>();
-		
 		//lazy parworker initialization
 		configureWorker( TaskContext.get().taskAttemptId() );
 	
@@ -132,12 +129,9 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 			_aIters.add( (int)(getExecutedIterations()-numIter) );
 		}
 		
-		//write output if required (matrix indexed write) 
-		ArrayList<String> tmp = RemoteParForUtils.exportResultVariables( _workerID, _ec.getVariables(), _resultVars );
-		for( String val : tmp )
-			ret.add(new Tuple2<>(_workerID, val));
-		
-		return ret.iterator();
+		//write output if required (matrix indexed write)
+		return RemoteParForUtils.exportResultVariables(_workerID, _ec.getVariables(), _resultVars)
+			.stream().map(s -> new Tuple2<>(_workerID, s)).iterator();
 	}
 
 	private void configureWorker( long ID ) 
@@ -150,7 +144,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 			CodegenUtils.getClassSync(e.getKey(), e.getValue());
 	
 		//parse and setup parfor body program
-		ParForBody body = ProgramConverter.parseParForBody(_prog, (int)_workerID);
+		ParForBody body = ProgramConverter.parseParForBody(_prog, (int)_workerID, true);
 		_childBlocks = body.getChildBlocks();
 		_ec          = body.getEc();
 		_resultVars  = body.getResultVariables();
@@ -171,7 +165,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 						CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
 				//register entire working dir for delete on shutdown
 				RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
-			}	
+			}
 		}
 		
 		//ensure that resultvar files are not removed

http://git-wip-us.apache.org/repos/asf/systemml/blob/d34d6a62/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index 45e3bc7..7485602 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -20,7 +20,6 @@
 package org.apache.sysml.runtime.controlprogram.parfor;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -83,12 +82,8 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 		
 		//write output if required (matrix indexed write) 
 		//note: this copy is necessary for environments without spark libraries
-		ArrayList<Tuple2<Long,String>> ret = new ArrayList<>();
-		ArrayList<String> tmp = RemoteParForUtils.exportResultVariables( _workerID, _ec.getVariables(), _resultVars );
-		for( String val : tmp )
-			ret.add(new Tuple2<>(_workerID, val));
-		
-		return ret.iterator();
+		return RemoteParForUtils.exportResultVariables(_workerID, _ec.getVariables(), _resultVars)
+			.stream().map(s -> new Tuple2<>(_workerID, s)).iterator();
 	}
 	
 	private void configureWorker(long taskID) 
@@ -101,7 +96,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 			CodegenUtils.getClassSync(e.getKey(), e.getValue());
 	
 		//parse and setup parfor body program
-		ParForBody body = ProgramConverter.parseParForBody(_prog, (int)_workerID);
+		ParForBody body = ProgramConverter.parseParForBody(_prog, (int)_workerID, true);
 		_childBlocks = body.getChildBlocks();
 		_ec          = body.getEc();
 		_resultVars  = body.getResultVariables();