You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/03/24 19:15:23 UTC

[systemds] branch master updated: [SYSTEMDS-2909] Improved broadcast handling of spark parfor jobs

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 67d2234  [SYSTEMDS-2909] Improved broadcast handling of spark parfor jobs
67d2234 is described below

commit 67d2234fae82d34fb1048ea81ec045628b923dba
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Wed Mar 24 19:27:09 2021 +0100

    [SYSTEMDS-2909] Improved broadcast handling of spark parfor jobs
    
    This patch enables by default the transfer of input matrices/frames to
    spark parfor tasks via broadcasts instead of export to hdfs as
    previously used in mapreduce. This approach is only used for applicable
    inputs (non-partitioned matrices, less than 2GB). Furthermore, the code
    path has been optimized to avoid unnecessarily exporting matrices that
    will be broadcast anyway.
---
 .../runtime/controlprogram/ParForProgramBlock.java | 37 +++++++++++++++-------
 .../controlprogram/parfor/RemoteDPParForSpark.java |  6 ++--
 .../controlprogram/parfor/RemoteParForSpark.java   | 20 +++---------
 3 files changed, 32 insertions(+), 31 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index b0b87eb..c3c6909 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -297,7 +297,7 @@ public class ParForProgramBlock extends ForProgramBlock
 	public static final boolean FORCE_CP_ON_REMOTE_SPARK    = true; // compile body to CP if exec type forced to Spark
 	public static final boolean LIVEVAR_AWARE_EXPORT        = true; // export only read variables according to live variable analysis
 	public static final boolean RESET_RECOMPILATION_FLAGs   = true;
-	public static final boolean ALLOW_BROADCAST_INPUTS      = false; // enables to broadcast inputs for remote_spark
+	public static final boolean ALLOW_BROADCAST_INPUTS      = true; // enables to broadcast inputs for remote_spark
 	
 	public static final String PARFOR_FNAME_PREFIX          = "/parfor/"; 
 	public static final String PARFOR_MR_TASKS_TMP_FNAME    = PARFOR_FNAME_PREFIX + "%ID%_MR_taskfile"; 
@@ -866,13 +866,14 @@ public class ParForProgramBlock extends ForProgramBlock
 		if( _monitor )
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
 		
-		//write matrices to HDFS 
-		exportMatricesToHDFS(ec);
+		//handle broadcast / export of inputs
+		Set<String> brVars = getBroadcastVariables(ec, _resultVars);
+		exportMatricesToHDFS(ec, brVars);
 		
 		// Step 3) submit Spark parfor job (no lazy evaluation, since collect on result)
 		boolean topLevelPF = OptimizerUtils.isTopLevelParFor();
 		RemoteParForJobReturn ret = RemoteParForSpark.runJob(_ID, program,
-			clsMap, tasks, ec, _resultVars, _enableCPCaching, _numThreads, topLevelPF);
+			clsMap, tasks, ec, brVars, _resultVars, _enableCPCaching, _numThreads, topLevelPF);
 		
 		if( _monitor ) 
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
@@ -930,7 +931,7 @@ public class ParForProgramBlock extends ForProgramBlock
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
 		
 		//write matrices to HDFS, except DP matrix which is the input to the RemoteDPParForSpark job
-		exportMatricesToHDFS(ec, _colocatedDPMatrix);
+		exportMatricesToHDFS(ec, CollectionUtils.asSet(_colocatedDPMatrix)); //incl colocated
 		
 		// Step 4) submit MR job (wait for finished work)
 		RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(
@@ -1115,11 +1116,24 @@ public class ParForProgramBlock extends ForProgramBlock
 					out.put(var, dataObj);
 			}
 	}
+	
+	private static Set<String> getBroadcastVariables(ExecutionContext ec, List<ResultVar> resultVars) {
+		if( !ALLOW_BROADCAST_INPUTS )
+			return new HashSet<>();
+		LocalVariableMap inputs = ec.getVariables();
+		// exclude the result variables
+		Set<String> retVars = resultVars.stream()
+			.map(v -> v._name).collect(Collectors.toSet());
+		Set<String> brVars = inputs.keySet().stream()
+			.filter(v -> !retVars.contains(v))
+			.filter(v -> ec.getVariable(v).getDataType().isMatrix())
+			.filter(v -> OptimizerUtils.estimateSize(ec.getDataCharacteristics(v))< 2.14e9)
+			.collect(Collectors.toSet());
+		return brVars;
+	}
 
-	private void exportMatricesToHDFS(ExecutionContext ec, String... excludeListNames) 
-	{
+	private void exportMatricesToHDFS(ExecutionContext ec, Set<String> excludeNames)  {
 		ParForStatementBlock sb = (ParForStatementBlock)getStatementBlock();
-		Set<String> excludeList = CollectionUtils.asSet(excludeListNames);
 		
 		if( LIVEVAR_AWARE_EXPORT && sb != null)
 		{
@@ -1127,18 +1141,17 @@ public class ParForProgramBlock extends ForProgramBlock
 			//export only variables that are read in the body
 			VariableSet varsRead = sb.variablesRead();
 			for (String key : ec.getVariables().keySet() ) {
-				if( varsRead.containsVariable(key) && !excludeList.contains(key) ) {
+				if( varsRead.containsVariable(key) && !excludeNames.contains(key) ) {
 					Data d = ec.getVariable(key);
 					if( d.getDataType() == DataType.MATRIX )
 						((MatrixObject)d).exportData(_replicationExport);
 				}
 			}
 		}
-		else
-		{
+		else {
 			//export all matrices in symbol table
 			for (String key : ec.getVariables().keySet() ) {
-				if( !excludeList.contains(key) ) {
+				if( !excludeNames.contains(key) ) {
 					Data d = ec.getVariable(key);
 					if( d.getDataType() == DataType.MATRIX )
 						((MatrixObject)d).exportData(_replicationExport);
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSpark.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSpark.java
index 54e5ed3..6f01f8c 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSpark.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSpark.java
@@ -107,9 +107,9 @@ public class RemoteDPParForSpark
 		RemoteParForJobReturn ret = new RemoteParForJobReturn(true, numTasks, numIters, results);
 		
 		//maintain statistics
-	    Statistics.incrementNoOfCompiledSPInst();
-	    Statistics.incrementNoOfExecutedSPInst();
-	    if( DMLScript.STATISTICS ){
+		Statistics.incrementNoOfCompiledSPInst();
+		Statistics.incrementNoOfExecutedSPInst();
+		if( DMLScript.STATISTICS ){
 			Statistics.maintainCPHeavyHitters(jobname, System.nanoTime()-t0);
 		}
 		
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.java
index bdc528d..87c1a8a 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.java
@@ -19,12 +19,10 @@
 
 package org.apache.sysds.runtime.controlprogram.parfor;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,7 +30,6 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.util.LongAccumulator;
 import org.apache.sysds.api.DMLScript;
-import org.apache.sysds.parser.ParForStatementBlock;
 import org.apache.sysds.parser.ParForStatementBlock.ResultVar;
 import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
@@ -70,7 +67,7 @@ public class RemoteParForSpark
 	private static final IDSequence _jobID = new IDSequence();
 	
 	public static RemoteParForJobReturn runJob(long pfid, String prog, HashMap<String, byte[]> clsMap, List<Task> tasks,
-		ExecutionContext ec, ArrayList<ResultVar> resultVars, boolean cpCaching, int numMappers, boolean topLevelPF)
+		ExecutionContext ec, Set<String> brVars, List<ResultVar> resultVars, boolean cpCaching, int numMappers, boolean topLevelPF)
 	{
 		String jobname = "ParFor-ESP";
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
@@ -89,9 +86,8 @@ public class RemoteParForSpark
 
 		// broadcast the inputs except the result variables
 		Map<String, Broadcast<CacheBlock>> brInputs = null;
-		if (ParForProgramBlock.ALLOW_BROADCAST_INPUTS) {
-			brInputs = broadcastInputs(sec, resultVars);
-		}
+		if (ParForProgramBlock.ALLOW_BROADCAST_INPUTS)
+			brInputs = broadcastInputs(sec, brVars);
 		
 		//prepare lineage
 		Map<String, String> serialLineage = DMLScript.LINEAGE ? ec.getLineage().serialize() : null;
@@ -124,15 +120,7 @@ public class RemoteParForSpark
 	}
 
 	@SuppressWarnings("unchecked")
-	private static Map<String, Broadcast<CacheBlock>> broadcastInputs(SparkExecutionContext sec, ArrayList<ParForStatementBlock.ResultVar> resultVars) {
-		LocalVariableMap inputs = sec.getVariables();
-		// exclude the result variables
-		// TODO use optimizer-picked list of amenable objects (e.g., size constraints)
-		Set<String> retVars = resultVars.stream()
-			.map(v -> v._name).collect(Collectors.toSet());
-		Set<String> brVars = inputs.keySet().stream()
-			.filter(v -> !retVars.contains(v)).collect(Collectors.toSet());
-		
+	private static Map<String, Broadcast<CacheBlock>> broadcastInputs(SparkExecutionContext sec, Set<String> brVars) {
 		// construct broadcast objects
 		Map<String, Broadcast<CacheBlock>> result = new HashMap<>();
 		for (String key : brVars) {