You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2021/03/24 19:15:23 UTC
[systemds] branch master updated: [SYSTEMDS-2909] Improved
broadcast handling of spark parfor jobs
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new 67d2234 [SYSTEMDS-2909] Improved broadcast handling of spark parfor jobs
67d2234 is described below
commit 67d2234fae82d34fb1048ea81ec045628b923dba
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Wed Mar 24 19:27:09 2021 +0100
[SYSTEMDS-2909] Improved broadcast handling of spark parfor jobs
This patch enables by default the transfer of input matrices/frames to
spark parfor tasks via broadcasts instead of export to hdfs as
previously used in mapreduce. This approach is only used for applicable
inputs (non-partitioned matrices, less than 2GB). Furthermore, the code
path has been optimized to avoid unnecessarily exporting matrices that
will be broadcast anyway.
---
.../runtime/controlprogram/ParForProgramBlock.java | 37 +++++++++++++++-------
.../controlprogram/parfor/RemoteDPParForSpark.java | 6 ++--
.../controlprogram/parfor/RemoteParForSpark.java | 20 +++---------
3 files changed, 32 insertions(+), 31 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index b0b87eb..c3c6909 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -297,7 +297,7 @@ public class ParForProgramBlock extends ForProgramBlock
public static final boolean FORCE_CP_ON_REMOTE_SPARK = true; // compile body to CP if exec type forced to Spark
public static final boolean LIVEVAR_AWARE_EXPORT = true; // export only read variables according to live variable analysis
public static final boolean RESET_RECOMPILATION_FLAGs = true;
- public static final boolean ALLOW_BROADCAST_INPUTS = false; // enables to broadcast inputs for remote_spark
+ public static final boolean ALLOW_BROADCAST_INPUTS = true; // enables to broadcast inputs for remote_spark
public static final String PARFOR_FNAME_PREFIX = "/parfor/";
public static final String PARFOR_MR_TASKS_TMP_FNAME = PARFOR_FNAME_PREFIX + "%ID%_MR_taskfile";
@@ -866,13 +866,14 @@ public class ParForProgramBlock extends ForProgramBlock
if( _monitor )
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
- //write matrices to HDFS
- exportMatricesToHDFS(ec);
+ //handle broadcast / export of inputs
+ Set<String> brVars = getBroadcastVariables(ec, _resultVars);
+ exportMatricesToHDFS(ec, brVars);
// Step 3) submit Spark parfor job (no lazy evaluation, since collect on result)
boolean topLevelPF = OptimizerUtils.isTopLevelParFor();
RemoteParForJobReturn ret = RemoteParForSpark.runJob(_ID, program,
- clsMap, tasks, ec, _resultVars, _enableCPCaching, _numThreads, topLevelPF);
+ clsMap, tasks, ec, brVars, _resultVars, _enableCPCaching, _numThreads, topLevelPF);
if( _monitor )
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
@@ -930,7 +931,7 @@ public class ParForProgramBlock extends ForProgramBlock
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
//write matrices to HDFS, except DP matrix which is the input to the RemoteDPParForSpark job
- exportMatricesToHDFS(ec, _colocatedDPMatrix);
+ exportMatricesToHDFS(ec, CollectionUtils.asSet(_colocatedDPMatrix)); //incl colocated
// Step 4) submit MR job (wait for finished work)
RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(
@@ -1115,11 +1116,24 @@ public class ParForProgramBlock extends ForProgramBlock
out.put(var, dataObj);
}
}
+
+ private static Set<String> getBroadcastVariables(ExecutionContext ec, List<ResultVar> resultVars) {
+ if( !ALLOW_BROADCAST_INPUTS )
+ return new HashSet<>();
+ LocalVariableMap inputs = ec.getVariables();
+ // exclude the result variables
+ Set<String> retVars = resultVars.stream()
+ .map(v -> v._name).collect(Collectors.toSet());
+ Set<String> brVars = inputs.keySet().stream()
+ .filter(v -> !retVars.contains(v))
+ .filter(v -> ec.getVariable(v).getDataType().isMatrix())
+ .filter(v -> OptimizerUtils.estimateSize(ec.getDataCharacteristics(v))< 2.14e9)
+ .collect(Collectors.toSet());
+ return brVars;
+ }
- private void exportMatricesToHDFS(ExecutionContext ec, String... excludeListNames)
- {
+ private void exportMatricesToHDFS(ExecutionContext ec, Set<String> excludeNames) {
ParForStatementBlock sb = (ParForStatementBlock)getStatementBlock();
- Set<String> excludeList = CollectionUtils.asSet(excludeListNames);
if( LIVEVAR_AWARE_EXPORT && sb != null)
{
@@ -1127,18 +1141,17 @@ public class ParForProgramBlock extends ForProgramBlock
//export only variables that are read in the body
VariableSet varsRead = sb.variablesRead();
for (String key : ec.getVariables().keySet() ) {
- if( varsRead.containsVariable(key) && !excludeList.contains(key) ) {
+ if( varsRead.containsVariable(key) && !excludeNames.contains(key) ) {
Data d = ec.getVariable(key);
if( d.getDataType() == DataType.MATRIX )
((MatrixObject)d).exportData(_replicationExport);
}
}
}
- else
- {
+ else {
//export all matrices in symbol table
for (String key : ec.getVariables().keySet() ) {
- if( !excludeList.contains(key) ) {
+ if( !excludeNames.contains(key) ) {
Data d = ec.getVariable(key);
if( d.getDataType() == DataType.MATRIX )
((MatrixObject)d).exportData(_replicationExport);
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSpark.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSpark.java
index 54e5ed3..6f01f8c 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSpark.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteDPParForSpark.java
@@ -107,9 +107,9 @@ public class RemoteDPParForSpark
RemoteParForJobReturn ret = new RemoteParForJobReturn(true, numTasks, numIters, results);
//maintain statistics
- Statistics.incrementNoOfCompiledSPInst();
- Statistics.incrementNoOfExecutedSPInst();
- if( DMLScript.STATISTICS ){
+ Statistics.incrementNoOfCompiledSPInst();
+ Statistics.incrementNoOfExecutedSPInst();
+ if( DMLScript.STATISTICS ){
Statistics.maintainCPHeavyHitters(jobname, System.nanoTime()-t0);
}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.java
index bdc528d..87c1a8a 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.java
@@ -19,12 +19,10 @@
package org.apache.sysds.runtime.controlprogram.parfor;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,7 +30,6 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.util.LongAccumulator;
import org.apache.sysds.api.DMLScript;
-import org.apache.sysds.parser.ParForStatementBlock;
import org.apache.sysds.parser.ParForStatementBlock.ResultVar;
import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
@@ -70,7 +67,7 @@ public class RemoteParForSpark
private static final IDSequence _jobID = new IDSequence();
public static RemoteParForJobReturn runJob(long pfid, String prog, HashMap<String, byte[]> clsMap, List<Task> tasks,
- ExecutionContext ec, ArrayList<ResultVar> resultVars, boolean cpCaching, int numMappers, boolean topLevelPF)
+ ExecutionContext ec, Set<String> brVars, List<ResultVar> resultVars, boolean cpCaching, int numMappers, boolean topLevelPF)
{
String jobname = "ParFor-ESP";
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
@@ -89,9 +86,8 @@ public class RemoteParForSpark
// broadcast the inputs except the result variables
Map<String, Broadcast<CacheBlock>> brInputs = null;
- if (ParForProgramBlock.ALLOW_BROADCAST_INPUTS) {
- brInputs = broadcastInputs(sec, resultVars);
- }
+ if (ParForProgramBlock.ALLOW_BROADCAST_INPUTS)
+ brInputs = broadcastInputs(sec, brVars);
//prepare lineage
Map<String, String> serialLineage = DMLScript.LINEAGE ? ec.getLineage().serialize() : null;
@@ -124,15 +120,7 @@ public class RemoteParForSpark
}
@SuppressWarnings("unchecked")
- private static Map<String, Broadcast<CacheBlock>> broadcastInputs(SparkExecutionContext sec, ArrayList<ParForStatementBlock.ResultVar> resultVars) {
- LocalVariableMap inputs = sec.getVariables();
- // exclude the result variables
- // TODO use optimizer-picked list of amenable objects (e.g., size constraints)
- Set<String> retVars = resultVars.stream()
- .map(v -> v._name).collect(Collectors.toSet());
- Set<String> brVars = inputs.keySet().stream()
- .filter(v -> !retVars.contains(v)).collect(Collectors.toSet());
-
+ private static Map<String, Broadcast<CacheBlock>> broadcastInputs(SparkExecutionContext sec, Set<String> brVars) {
// construct broadcast objects
Map<String, Broadcast<CacheBlock>> result = new HashMap<>();
for (String key : brVars) {