You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/09/02 23:30:23 UTC
[1/2] systemml git commit: [SYSTEMML-1879] Performance parfor remote
spark (reuse shared inputs)
Repository: systemml
Updated Branches:
refs/heads/master 912c65506 -> ba73291c9
[SYSTEMML-1879] Performance parfor remote spark (reuse shared inputs)
In parfor remote spark jobs, each worker is initialized with its own
deserialized symbol table, which causes redundant reads of shared inputs
in each parfor worker and is unnecessarily memory-inefficient. This
patch introduces a principled approach to reusing shared inputs, where
we reuse all variables except for result variables and partitioned
matrices. By simply using common instances of matrix objects, the
sharing happens automatically through the bufferpool similar to local
parfor execution and without additional pinned memory requirements. On
the perftest scenario MSVM 1M x 1K, sparse with 150 classes and 25
iterations, the end-to-end runtime (including read and spark context
creation) improved from 94s to 72s.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/2c57cf77
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/2c57cf77
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/2c57cf77
Branch: refs/heads/master
Commit: 2c57cf779e3b1a583ee4062bbd268f592537ef05
Parents: 912c655
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Sep 1 20:29:49 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Sep 1 20:31:08 2017 -0700
----------------------------------------------------------------------
.../org/apache/sysml/parser/DMLTranslator.java | 14 +-
.../controlprogram/LocalVariableMap.java | 5 +
.../controlprogram/ParForProgramBlock.java | 464 ++++++++-----------
.../context/ExecutionContext.java | 45 +-
.../parfor/CachedReuseVariables.java | 59 +++
.../controlprogram/parfor/ProgramConverter.java | 8 +-
.../parfor/RemoteParForSpark.java | 32 +-
.../parfor/RemoteParForSparkWorker.java | 33 +-
.../parfor/TaskPartitionerFactoring.java | 33 +-
9 files changed, 356 insertions(+), 337 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/parser/DMLTranslator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/DMLTranslator.java b/src/main/java/org/apache/sysml/parser/DMLTranslator.java
index e6b8590..f44c0a4 100644
--- a/src/main/java/org/apache/sysml/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysml/parser/DMLTranslator.java
@@ -617,12 +617,11 @@ public class DMLTranslator
ForProgramBlock rtpb = null;
IterablePredicate iterPred = fsb.getIterPredicate();
- if( sb instanceof ParForStatementBlock )
- {
+ if( sb instanceof ParForStatementBlock ) {
sbName = "ParForStatementBlock";
- rtpb = new ParForProgramBlock(prog, iterPred.getIterVar().getName(), iterPred.getParForParams());
+ rtpb = new ParForProgramBlock(prog, iterPred.getIterVar().getName(),
+ iterPred.getParForParams(), ((ParForStatementBlock)sb).getResultVariables());
ParForProgramBlock pfrtpb = (ParForProgramBlock)rtpb;
- pfrtpb.setResultVariables( ((ParForStatementBlock)sb).getResultVariables() );
pfrtpb.setStatementBlock((ParForStatementBlock)sb); //used for optimization and creating unscoped variables
}
else {//ForStatementBlock
@@ -636,8 +635,8 @@ public class DMLTranslator
// process the body of the for statement block
if (fsb.getNumStatements() > 1){
- LOG.error(fsb.printBlockErrorLocation() + " " + sbName + " should have 1 statement" );
- throw new LopsException(fsb.printBlockErrorLocation() + " " + sbName + " should have 1 statement" );
+ LOG.error(fsb.printBlockErrorLocation() + " " + sbName + " should have 1 statement" );
+ throw new LopsException(fsb.printBlockErrorLocation() + " " + sbName + " should have 1 statement" );
}
ForStatement fs = (ForStatement)fsb.getStatement(0);
for (StatementBlock sblock : fs.getBody()){
@@ -653,9 +652,6 @@ public class DMLTranslator
retPB = rtpb;
- //post processing for generating missing instructions
- //retPB = verifyAndCorrectProgramBlock(sb.liveIn(), sb.liveOut(), sb._kill, retPB);
-
// add statement block
retPB.setStatementBlock(sb);
http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
index 94d85ad..7e59951 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
@@ -91,6 +91,11 @@ public class LocalVariableMap implements Cloneable
localMap.clear();
}
+ public void removeAllIn(Set<String> blacklist) {
+ localMap.entrySet().removeIf(
+ e -> blacklist.contains(e.getKey()));
+ }
+
public void removeAllNotIn(Set<String> blacklist) {
localMap.entrySet().removeIf(
e -> !blacklist.contains(e.getKey()));
http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index ce8bbed..b393cd1 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -125,22 +125,22 @@ public class ParForProgramBlock extends ForProgramBlock
{
// execution modes
public enum PExecMode {
- LOCAL, //local (master) multi-core execution mode
- REMOTE_MR, //remote (MR cluster) execution mode
- REMOTE_MR_DP, //remote (MR cluster) execution mode, fused with data partitioning
- REMOTE_SPARK, //remote (Spark cluster) execution mode
+ LOCAL, //local (master) multi-core execution mode
+ REMOTE_MR, //remote (MR cluster) execution mode
+ REMOTE_MR_DP, //remote (MR cluster) execution mode, fused with data partitioning
+ REMOTE_SPARK, //remote (Spark cluster) execution mode
REMOTE_SPARK_DP,//remote (Spark cluster) execution mode, fused with data partitioning
UNSPECIFIED
}
// task partitioner
public enum PTaskPartitioner {
- FIXED, //fixed-sized task partitioner, uses tasksize
- NAIVE, //naive task partitioner (tasksize=1)
- STATIC, //static task partitioner (numIterations/numThreads)
- FACTORING, //factoring task partitioner
- FACTORING_CMIN, //constrained factoring task partitioner, uses tasksize as min constraint
- FACTORING_CMAX, //constrained factoring task partitioner, uses tasksize as max constraint
+ FIXED, //fixed-sized task partitioner, uses tasksize
+ NAIVE, //naive task partitioner (tasksize=1)
+ STATIC, //static task partitioner (numIterations/numThreads)
+ FACTORING, //factoring task partitioner
+ FACTORING_CMIN, //constrained factoring task partitioner, uses tasksize as min constraint
+ FACTORING_CMAX, //constrained factoring task partitioner, uses tasksize as max constraint
UNSPECIFIED
}
@@ -259,10 +259,10 @@ public class ParForProgramBlock extends ForProgramBlock
}
public enum PDataPartitioner {
- NONE, // no data partitioning
- LOCAL, // local file based partition split on master node
- REMOTE_MR, // remote partition split using a reblock MR job
- REMOTE_SPARK, // remote partition split using a spark job
+ NONE, // no data partitioning
+ LOCAL, // local file based partition split on master node
+ REMOTE_MR, // remote partition split using a reblock MR job
+ REMOTE_SPARK, // remote partition split using a spark job
UNSPECIFIED,
}
@@ -277,21 +277,21 @@ public class ParForProgramBlock extends ForProgramBlock
//optimizer
public enum POptMode{
- NONE, //no optimization, use defaults and specified parameters
- RULEBASED, //rule-based rewritings with memory constraints
- CONSTRAINED, //same as rule-based but with given params as constraints
- HEURISTIC, //smae as rule-based but with time-based cost estimates
+ NONE, //no optimization, use defaults and specified parameters
+ RULEBASED, //rule-based rewritings with memory constraints
+ CONSTRAINED, //same as rule-based but with given params as constraints
+ HEURISTIC, //same as rule-based but with time-based cost estimates
}
-
+
// internal parameters
- public static final boolean OPTIMIZE = true; // run all automatic optimizations on top-level parfor
+ public static final boolean OPTIMIZE = true; // run all automatic optimizations on top-level parfor
public static final boolean USE_PB_CACHE = false; // reuse copied program blocks whenever possible, not there can be issues related to recompile
- public static boolean USE_RANGE_TASKS_IF_USEFUL = true; // use range tasks whenever size>3, false, otherwise wrong split order in remote
- public static final boolean USE_STREAMING_TASK_CREATION = true; // start working while still creating tasks, prevents blocking due to too small task queue
- public static final boolean ALLOW_NESTED_PARALLELISM = true; // if not, transparently change parfor to for on program conversions (local,remote)
- public static boolean ALLOW_REUSE_MR_JVMS = true; // potential benefits: less setup costs per task, NOTE> cannot be used MR4490 in Hadoop 1.0.3, still not fixed in 1.1.1
+ public static final boolean USE_RANGE_TASKS_IF_USEFUL = true; // use range tasks whenever size>3, false, otherwise wrong split order in remote
+ public static final boolean USE_STREAMING_TASK_CREATION = true; // start working while still creating tasks, prevents blocking due to too small task queue
+ public static final boolean ALLOW_NESTED_PARALLELISM = true; // if not, transparently change parfor to for on program conversions (local,remote)
+ public static boolean ALLOW_REUSE_MR_JVMS = true; // potential benefits: less setup costs per task, NOTE> cannot be used MR4490 in Hadoop 1.0.3, still not fixed in 1.1.1
public static boolean ALLOW_REUSE_MR_PAR_WORKER = ALLOW_REUSE_MR_JVMS; //potential benefits: less initialization, reuse in-memory objects and result consolidation!
- public static final boolean USE_PARALLEL_RESULT_MERGE = false; // if result merge is run in parallel or serial
+ public static final boolean USE_PARALLEL_RESULT_MERGE = false; // if result merge is run in parallel or serial
public static final boolean USE_PARALLEL_RESULT_MERGE_REMOTE = true; // if remote result merge should be run in parallel for multiple result vars
public static final boolean ALLOW_DATA_COLOCATION = true;
public static final boolean CREATE_UNSCOPED_RESULTVARS = true;
@@ -299,7 +299,7 @@ public class ParForProgramBlock extends ForProgramBlock
public static final int WRITE_REPLICATION_FACTOR = 1;
public static final int MAX_RETRYS_ON_ERROR = 1;
public static final boolean FORCE_CP_ON_REMOTE_MR = true; // compile body to CP if exec type forced to MR
- public static final boolean LIVEVAR_AWARE_EXPORT = true; //export only read variables according to live variable analysis
+ public static final boolean LIVEVAR_AWARE_EXPORT = true; // export only read variables according to live variable analysis
public static final boolean RESET_RECOMPILATION_FLAGs = true;
public static final String PARFOR_FNAME_PREFIX = "/parfor/";
@@ -311,69 +311,61 @@ public class ParForProgramBlock extends ForProgramBlock
public static final String PARFOR_COUNTER_GROUP_NAME = "SystemML ParFOR Counters";
// static ID generator sequences
- private static IDSequence _pfIDSeq = null;
- private static IDSequence _pwIDSeq = null;
+ private final static IDSequence _pfIDSeq = new IDSequence();
+ private final static IDSequence _pwIDSeq = new IDSequence();
// runtime parameters
- protected HashMap<String,String> _params = null;
- protected int _numThreads = -1;
- protected PTaskPartitioner _taskPartitioner = null;
- protected long _taskSize = -1;
+ protected final HashMap<String,String> _params;
+ protected final boolean _monitor;
+ protected final Level _optLogLevel;
+ protected int _numThreads = -1;
+ protected long _taskSize = -1;
+ protected PTaskPartitioner _taskPartitioner = null;
protected PDataPartitioner _dataPartitioner = null;
- protected PResultMerge _resultMerge = null;
- protected PExecMode _execMode = null;
- protected POptMode _optMode = null;
- protected boolean _monitor = false;
- protected Level _optLogLevel = null;
-
+ protected PResultMerge _resultMerge = null;
+ protected PExecMode _execMode = null;
+ protected POptMode _optMode = null;
//specifics used for optimization
- protected long _numIterations = -1;
+ protected long _numIterations = -1;
//specifics used for data partitioning
protected LocalVariableMap _variablesDPOriginal = null;
- protected LocalVariableMap _variablesDPReuse = null;
- protected String _colocatedDPMatrix = null;
- protected boolean _tSparseCol = false;
- protected int _replicationDP = WRITE_REPLICATION_FACTOR;
- protected int _replicationExport = -1;
+ protected LocalVariableMap _variablesDPReuse = null;
+ protected String _colocatedDPMatrix = null;
+ protected boolean _tSparseCol = false;
+ protected int _replicationDP = WRITE_REPLICATION_FACTOR;
+ protected int _replicationExport = -1;
//specifics used for result partitioning
- protected boolean _jvmReuse = true;
+ protected boolean _jvmReuse = true;
//specifics used for recompilation
- protected double _oldMemoryBudget = -1;
- protected double _recompileMemoryBudget = -1;
+ protected double _oldMemoryBudget = -1;
+ protected double _recompileMemoryBudget = -1;
//specifics for caching
- protected boolean _enableCPCaching = true;
- protected boolean _enableRuntimePiggybacking = false;
+ protected boolean _enableCPCaching = true;
+ protected boolean _enableRuntimePiggybacking = false;
//specifics for spark
protected Collection<String> _variablesRP = null;
protected Collection<String> _variablesECache = null;
// program block meta data
- protected long _ID = -1;
- protected int _IDPrefix = -1;
- protected ArrayList<String> _resultVars = null;
- protected IDSequence _resultVarsIDSeq = null;
- protected IDSequence _dpVarsIDSeq = null;
- protected boolean _monitorReport = false;
- protected boolean _hasFunctions = true;
+ protected final ArrayList<String> _resultVars;
+ protected final IDSequence _resultVarsIDSeq;
+ protected final IDSequence _dpVarsIDSeq;
+ protected final boolean _hasFunctions;
+
+ protected long _ID = -1;
+ protected int _IDPrefix = -1;
+ protected boolean _monitorReport = false;
// local parworker data
- protected long[] _pwIDs = null;
protected HashMap<Long,ArrayList<ProgramBlock>> _pbcache = null;
+ protected long[] _pwIDs = null;
-
- static
- {
- //init static ID sequence generators
- _pfIDSeq = new IDSequence();
- _pwIDSeq = new IDSequence();
- }
-
- public ParForProgramBlock(Program prog, String iterPredVar, HashMap<String,String> params)
+ public ParForProgramBlock(Program prog, String iterPredVar, HashMap<String,String> params, ArrayList<String> resultVars)
throws DMLRuntimeException
{
- this( -1, prog, iterPredVar, params);
+ this( -1, prog, iterPredVar, params, resultVars);
}
/**
@@ -384,9 +376,10 @@ public class ParForProgramBlock extends ForProgramBlock
* @param prog runtime program
* @param iterPredVars ?
* @param params map of parameters
+ * @param resultVars list of result variable names
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
- public ParForProgramBlock(int ID, Program prog, String iterPredVar, HashMap<String,String> params)
+ public ParForProgramBlock(int ID, Program prog, String iterPredVar, HashMap<String,String> params, ArrayList<String> resultVars)
{
super(prog, iterPredVar);
@@ -395,6 +388,7 @@ public class ParForProgramBlock extends ForProgramBlock
//ID generation and setting
setParForProgramBlockIDs( ID );
+ _resultVars = resultVars;
_resultVarsIDSeq = new IDSequence();
_dpVarsIDSeq = new IDSequence();
@@ -407,18 +401,18 @@ public class ParForProgramBlock extends ForProgramBlock
_dataPartitioner = PDataPartitioner.valueOf( getParForParam(ParForStatementBlock.DATA_PARTITIONER) );
_resultMerge = PResultMerge.valueOf( getParForParam(ParForStatementBlock.RESULT_MERGE) );
_execMode = PExecMode.valueOf( getParForParam(ParForStatementBlock.EXEC_MODE) );
- _optMode = POptMode.valueOf( getParForParam(ParForStatementBlock.OPT_MODE) );
+ _optMode = POptMode.valueOf( getParForParam(ParForStatementBlock.OPT_MODE) );
_optLogLevel = Level.toLevel( getParForParam(ParForStatementBlock.OPT_LOG) );
_monitor = (Integer.parseInt(getParForParam(ParForStatementBlock.PROFILE) ) == 1);
}
catch(Exception ex) {
throw new RuntimeException("Error parsing specified ParFOR parameters.",ex);
}
-
+
//reset the internal opt mode if optimization globally disabled.
if( !OPTIMIZE )
_optMode = POptMode.NONE;
-
+
_variablesDPOriginal = new LocalVariableMap();
_variablesDPReuse = new LocalVariableMap();
@@ -453,19 +447,14 @@ public class ParForProgramBlock extends ForProgramBlock
public String getParForParam(String key) {
String tmp = getParForParams().get(key);
- return (tmp == null) ? null :
+ return (tmp == null) ? null :
UtilFunctions.unquote(tmp).toUpperCase();
}
- public ArrayList<String> getResultVariables()
- {
+ public ArrayList<String> getResultVariables() {
return _resultVars;
}
- public void setResultVariables(ArrayList<String> resultVars) {
- _resultVars = resultVars;
- }
-
public void disableOptimization() {
_optMode = POptMode.NONE;
}
@@ -574,7 +563,7 @@ public class ParForProgramBlock extends ForProgramBlock
ALLOW_REUSE_MR_PAR_WORKER = ALLOW_REUSE_MR_JVMS;
}
- @Override
+ @Override
public void execute(ExecutionContext ec)
throws DMLRuntimeException
{
@@ -621,7 +610,7 @@ public class ParForProgramBlock extends ForProgramBlock
if( _monitor )
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_DATA_T, time.stop());
-
+
// initialize iter var to form value
IntObject iterVar = new IntObject(_iterPredVar, from.getLongValue() );
@@ -630,8 +619,7 @@ public class ParForProgramBlock extends ForProgramBlock
///////
LOG.trace("EXECUTE PARFOR ID = "+_ID+" with mode = "+_execMode+", numThreads = "+_numThreads+", taskpartitioner = "+_taskPartitioner);
- if( _monitor )
- {
+ if( _monitor ) {
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTHREADS, _numThreads);
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_TASKSIZE, _taskSize);
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_TASKPARTITIONER, _taskPartitioner.ordinal());
@@ -644,13 +632,13 @@ public class ParForProgramBlock extends ForProgramBlock
HashMap<String, Boolean> varState = ec.pinVariables(varList);
try
- {
+ {
switch( _execMode )
{
case LOCAL: //create parworkers as local threads
executeLocalParFor(ec, iterVar, from, to, incr);
break;
-
+
case REMOTE_MR: // create parworkers as MR tasks (one job per parfor)
executeRemoteMRParFor(ec, iterVar, from, to, incr);
break;
@@ -669,10 +657,9 @@ public class ParForProgramBlock extends ForProgramBlock
default:
throw new DMLRuntimeException("Undefined execution mode: '"+_execMode+"'.");
- }
+ }
}
- catch(Exception ex)
- {
+ catch(Exception ex) {
throw new DMLRuntimeException("PARFOR: Failed to execute loop in parallel.",ex);
}
@@ -688,8 +675,7 @@ public class ParForProgramBlock extends ForProgramBlock
//ensure that subsequent program blocks never see partitioned data (invalid plans!)
//we can replace those variables, because partitioning only applied for read-only matrices
- for( String var : _variablesDPOriginal.keySet() )
- {
+ for( String var : _variablesDPOriginal.keySet() ) {
//cleanup partitioned matrix (if not reused)
if( !_variablesDPReuse.keySet().contains(var) )
VariableCPInstruction.processRemoveVariableInstruction(ec, var);
@@ -704,19 +690,19 @@ public class ParForProgramBlock extends ForProgramBlock
//print profiling report (only if top-level parfor because otherwise in parallel context)
if( _monitorReport )
- LOG.info("\n"+StatisticMonitor.createReport());
+ LOG.info("\n"+StatisticMonitor.createReport());
//reset flags/modifications made by optimizer
//TODO reset of hop parallelism constraint (e.g., ba+*)
for( String dpvar : _variablesDPOriginal.keySet() ) //release forced exectypes
- ProgramRecompiler.rFindAndRecompileIndexingHOP(sb, this, dpvar, ec, false);
+ ProgramRecompiler.rFindAndRecompileIndexingHOP(sb, this, dpvar, ec, false);
//release forced exectypes for fused dp/exec
if( _execMode == PExecMode.REMOTE_MR_DP || _execMode == PExecMode.REMOTE_SPARK_DP )
ProgramRecompiler.rFindAndRecompileIndexingHOP(sb, this, _colocatedDPMatrix, ec, false);
resetOptimizerFlags(); //after release, deletes dp_varnames
//execute exit instructions (usually empty)
- executeInstructions(_exitInstructions, ec);
+ executeInstructions(_exitInstructions, ec);
}
@@ -814,8 +800,7 @@ public class ParForProgramBlock extends ForProgramBlock
if( _monitor )
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
-
-
+
// Step 4) collecting results from each parallel worker
//obtain results and cleanup other intermediates before result merge
LocalVariableMap [] localVariables = new LocalVariableMap [_numThreads];
@@ -862,16 +847,15 @@ public class ParForProgramBlock extends ForProgramBlock
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations);
}
}
- }
+ }
private void executeRemoteMRParFor( ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr )
throws DMLRuntimeException, IOException
{
/* Step 0) check and recompile MR inst
* Step 1) serialize child PB and inst
- * Step 2) create tasks
- * serialize tasks
- * Step 3) submit MR Jobs and wait for results
+ * Step 2) create and serialize tasks
+ * Step 3) submit MR Jobs and wait for results
* Step 4) collect results from each parallel worker
*/
@@ -884,10 +868,10 @@ public class ParForProgramBlock extends ForProgramBlock
//tid = 0 because replaced in remote parworker
flagForced = checkMRAndRecompileToCP(0);
}
-
+
// Step 1) init parallel workers (serialize PBs)
- // NOTES: each mapper changes filenames with regard to his ID as we submit a single job,
- // cannot reuse serialized string, since variables are serialized as well.
+ // NOTES: each mapper changes filenames with regard to his ID as we submit a single
+ // job, cannot reuse serialized string, since variables are serialized as well.
ParForBody body = new ParForBody( _childBlocks, _resultVars, ec );
String program = ProgramConverter.serializeParForBody( body );
@@ -902,64 +886,60 @@ public class ParForProgramBlock extends ForProgramBlock
long numIterations = partitioner.getNumIterations();
int maxDigits = (int)Math.log10(to.getLongValue()) + 1;
long numCreatedTasks = -1;
- if( USE_STREAMING_TASK_CREATION )
- {
+ if( USE_STREAMING_TASK_CREATION ) {
LocalTaskQueue<Task> queue = new LocalTaskQueue<Task>();
-
+
//put tasks into queue and start writing to taskFile
numCreatedTasks = partitioner.createTasks(queue);
- taskFile = writeTasksToFile( taskFile, queue, maxDigits );
+ taskFile = writeTasksToFile( taskFile, queue, maxDigits );
}
else
{
//sequentially create tasks and write to disk
List<Task> tasks = partitioner.createTasks();
- numCreatedTasks = tasks.size();
- taskFile = writeTasksToFile( taskFile, tasks, maxDigits );
+ numCreatedTasks = tasks.size();
+ taskFile = writeTasksToFile( taskFile, tasks, maxDigits );
}
-
+
if( _monitor )
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
//write matrices to HDFS
exportMatricesToHDFS(ec);
-
+
// Step 3) submit MR job (wait for finished work)
MatrixObject colocatedDPMatrixObj = (_colocatedDPMatrix!=null)? ec.getMatrixObject(_colocatedDPMatrix) : null;
- RemoteParForJobReturn ret = RemoteParForMR.runJob(_ID, program, taskFile, resultFile, colocatedDPMatrixObj, _enableCPCaching,
- _numThreads, WRITE_REPLICATION_FACTOR, MAX_RETRYS_ON_ERROR, getMinMemory(ec),
- (ALLOW_REUSE_MR_JVMS & _jvmReuse) );
+ RemoteParForJobReturn ret = RemoteParForMR.runJob(_ID, program, taskFile, resultFile,
+ colocatedDPMatrixObj, _enableCPCaching,_numThreads, WRITE_REPLICATION_FACTOR, MAX_RETRYS_ON_ERROR,
+ getMinMemory(ec), (ALLOW_REUSE_MR_JVMS & _jvmReuse) );
if( _monitor )
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
-
-
+
// Step 4) collecting results from each parallel worker
int numExecutedTasks = ret.getNumExecutedTasks();
int numExecutedIterations = ret.getNumExecutedIterations();
//consolidate results into global symbol table
- consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations , numExecutedTasks,
- ret.getVariables() );
+ consolidateAndCheckResults( ec, numIterations, numCreatedTasks,
+ numExecutedIterations , numExecutedTasks, ret.getVariables() );
if( flagForced ) //see step 0
releaseForcedRecompile(0);
- if( _monitor )
- {
+ if( _monitor ) {
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop());
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks);
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations);
- }
- }
+ }
+ }
private void executeRemoteMRParForDP( ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr )
throws DMLRuntimeException, IOException
{
/* Step 0) check and recompile MR inst
* Step 1) serialize child PB and inst
- * Step 2) create tasks
- * serialize tasks
- * Step 3) submit MR Jobs and wait for results
+ * Step 2) create and serialize tasks
+ * Step 3) submit MR Jobs and wait for results
* Step 4) collect results from each parallel worker
*/
@@ -975,8 +955,8 @@ public class ParForProgramBlock extends ForProgramBlock
inputMatrix.setPartitioned(inputDPF._dpf, inputDPF._N); //mark matrix var as partitioned
// Step 2) init parallel workers (serialize PBs)
- // NOTES: each mapper changes filenames with regard to his ID as we submit a single job,
- // cannot reuse serialized string, since variables are serialized as well.
+ // NOTES: each mapper changes filenames with regard to his ID as we submit a single
+ // job, cannot reuse serialized string, since variables are serialized as well.
ParForBody body = new ParForBody( _childBlocks, _resultVars, ec );
String program = ProgramConverter.serializeParForBody( body );
@@ -988,41 +968,40 @@ public class ParForProgramBlock extends ForProgramBlock
String resultFile = constructResultFileName();
long numIterations = partitioner.getNumIterations();
long numCreatedTasks = numIterations;//partitioner.createTasks().size();
-
+
if( _monitor )
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
//write matrices to HDFS
exportMatricesToHDFS(ec);
-
+
// Step 4) submit MR job (wait for finished work)
- OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PartitionFormat.COLUMN_WISE)||
- (inputMatrix.getSparsity()<0.001 && inputDPF==PartitionFormat.ROW_WISE))?
- OutputInfo.BinaryCellOutputInfo : OutputInfo.BinaryBlockOutputInfo;
+ OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PartitionFormat.COLUMN_WISE)
+ || (inputMatrix.getSparsity()<0.001 && inputDPF==PartitionFormat.ROW_WISE)) ?
+ OutputInfo.BinaryCellOutputInfo : OutputInfo.BinaryBlockOutputInfo;
RemoteParForJobReturn ret = RemoteDPParForMR.runJob(_ID, itervar.getName(), _colocatedDPMatrix, program, resultFile,
- inputMatrix, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads, _replicationDP );
+ inputMatrix, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads, _replicationDP );
- if( _monitor )
+ if( _monitor )
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
-
+
// Step 5) collecting results from each parallel worker
int numExecutedTasks = ret.getNumExecutedTasks();
int numExecutedIterations = ret.getNumExecutedIterations();
//consolidate results into global symbol table
- consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations, numExecutedTasks,
- ret.getVariables() );
+ consolidateAndCheckResults( ec, numIterations, numCreatedTasks,
+ numExecutedIterations, numExecutedTasks, ret.getVariables() );
if( flagForced ) //see step 0
releaseForcedRecompile(0);
inputMatrix.unsetPartitioned();
- if( _monitor )
- {
+ if( _monitor ) {
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop());
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks);
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations);
- }
+ }
}
private void executeRemoteSparkParFor(ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr)
@@ -1032,15 +1011,15 @@ public class ParForProgramBlock extends ForProgramBlock
// Step 0) check and compile to CP (if forced remote parfor)
boolean flagForced = false;
- if( FORCE_CP_ON_REMOTE_MR && (_optMode == POptMode.NONE || (_optMode == POptMode.CONSTRAINED && _execMode==PExecMode.REMOTE_SPARK)) )
- {
+ if( FORCE_CP_ON_REMOTE_MR && (_optMode == POptMode.NONE
+ || (_optMode == POptMode.CONSTRAINED && _execMode==PExecMode.REMOTE_SPARK)) ) {
//tid = 0 because replaced in remote parworker
flagForced = checkMRAndRecompileToCP(0);
}
// Step 1) init parallel workers (serialize PBs)
- // NOTES: each mapper changes filenames with regard to his ID as we submit a single job,
- // cannot reuse serialized string, since variables are serialized as well.
+ // NOTES: each mapper changes filenames with regard to his ID as we submit a single
+ // job, cannot reuse serialized string, since variables are serialized as well.
ParForBody body = new ParForBody(_childBlocks, _resultVars, ec);
HashMap<String, byte[]> clsMap = new HashMap<String, byte[]>();
String program = ProgramConverter.serializeParForBody(body, clsMap);
@@ -1055,37 +1034,35 @@ public class ParForProgramBlock extends ForProgramBlock
//sequentially create tasks as input to parfor job
List<Task> tasks = partitioner.createTasks();
long numCreatedTasks = tasks.size();
-
+
if( _monitor )
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
//write matrices to HDFS
exportMatricesToHDFS(ec);
-
+
// Step 3) submit Spark parfor job (no lazy evaluation, since collect on result)
//MatrixObject colocatedDPMatrixObj = (_colocatedDPMatrix!=null)? (MatrixObject)ec.getVariable(_colocatedDPMatrix) : null;
RemoteParForJobReturn ret = RemoteParForSpark.runJob(_ID, program, clsMap, tasks, ec, _enableCPCaching, _numThreads);
if( _monitor )
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
-
-
+
// Step 4) collecting results from each parallel worker
int numExecutedTasks = ret.getNumExecutedTasks();
int numExecutedIterations = ret.getNumExecutedIterations();
//consolidate results into global symbol table
- consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations , numExecutedTasks,
- ret.getVariables() );
+ consolidateAndCheckResults( ec, numIterations, numCreatedTasks,
+ numExecutedIterations , numExecutedTasks, ret.getVariables() );
if( flagForced ) //see step 0
releaseForcedRecompile(0);
- if( _monitor )
- {
+ if( _monitor ) {
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop());
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks);
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations);
- }
+ }
}
private void executeRemoteSparkParForDP( ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr )
@@ -1096,15 +1073,15 @@ public class ParForProgramBlock extends ForProgramBlock
// Step 0) check and compile to CP (if forced remote parfor)
boolean flagForced = checkMRAndRecompileToCP(0);
- // Step 1) prepare partitioned input matrix (needs to happen before serializing the progam)
+ // Step 1) prepare partitioned input matrix (needs to happen before serializing the program)
ParForStatementBlock sb = (ParForStatementBlock) getStatementBlock();
MatrixObject inputMatrix = ec.getMatrixObject(_colocatedDPMatrix );
PartitionFormat inputDPF = sb.determineDataPartitionFormat( _colocatedDPMatrix );
- inputMatrix.setPartitioned(inputDPF._dpf, inputDPF._N); //mark matrix var as partitioned
+ inputMatrix.setPartitioned(inputDPF._dpf, inputDPF._N); //mark matrix var as partitioned
// Step 2) init parallel workers (serialize PBs)
- // NOTES: each mapper changes filenames with regard to his ID as we submit a single job,
- // cannot reuse serialized string, since variables are serialized as well.
+ // NOTES: each mapper changes filenames with regard to his ID as we submit a single
+ // job, cannot reuse serialized string, since variables are serialized as well.
ParForBody body = new ParForBody( _childBlocks, _resultVars, ec );
HashMap<String, byte[]> clsMap = new HashMap<String, byte[]>();
String program = ProgramConverter.serializeParForBody( body, clsMap );
@@ -1117,7 +1094,7 @@ public class ParForProgramBlock extends ForProgramBlock
String resultFile = constructResultFileName();
long numIterations = partitioner.getNumIterations();
long numCreatedTasks = numIterations;//partitioner.createTasks().size();
-
+
if( _monitor )
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
@@ -1126,34 +1103,30 @@ public class ParForProgramBlock extends ForProgramBlock
// Step 4) submit MR job (wait for finished work)
//TODO runtime support for binary cell partitioning
- //OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PDataPartitionFormat.COLUMN_WISE)||
- // (inputMatrix.getSparsity()<0.001 && inputDPF==PDataPartitionFormat.ROW_WISE))?
- // OutputInfo.BinaryCellOutputInfo : OutputInfo.BinaryBlockOutputInfo;
OutputInfo inputOI = OutputInfo.BinaryBlockOutputInfo;
- RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(_ID, itervar.getName(), _colocatedDPMatrix, program, clsMap,
- resultFile, inputMatrix, ec, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads );
+ RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(_ID, itervar.getName(), _colocatedDPMatrix, program,
+ clsMap, resultFile, inputMatrix, ec, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads );
if( _monitor )
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
-
+
// Step 5) collecting results from each parallel worker
int numExecutedTasks = ret.getNumExecutedTasks();
int numExecutedIterations = ret.getNumExecutedIterations();
//consolidate results into global symbol table
- consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations, numExecutedTasks,
- ret.getVariables() );
+ consolidateAndCheckResults( ec, numIterations, numCreatedTasks,
+ numExecutedIterations, numExecutedTasks, ret.getVariables() );
if( flagForced ) //see step 0
releaseForcedRecompile(0);
inputMatrix.unsetPartitioned();
- if( _monitor )
- {
+ if( _monitor ) {
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop());
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks);
StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations);
- }
+ }
}
private void handleDataPartitioning( ExecutionContext ec )
@@ -1161,7 +1134,7 @@ public class ParForProgramBlock extends ForProgramBlock
{
PDataPartitioner dataPartitioner = _dataPartitioner;
if( dataPartitioner != PDataPartitioner.NONE )
- {
+ {
ParForStatementBlock sb = (ParForStatementBlock) getStatementBlock();
if( sb == null )
throw new DMLRuntimeException("ParFor statement block required for reasoning about data partitioning.");
@@ -1171,19 +1144,20 @@ public class ParForProgramBlock extends ForProgramBlock
{
Data dat = ec.getVariable(var);
//skip non-existing input matrices (which are due to unknown sizes marked for
- //partitioning but typically related branches are never executed)
+ //partitioning but typically related branches are never executed)
if( dat != null && dat instanceof MatrixObject )
{
MatrixObject moVar = (MatrixObject) dat; //unpartitioned input
PartitionFormat dpf = sb.determineDataPartitionFormat( var );
- LOG.trace("PARFOR ID = "+_ID+", Partitioning read-only input variable "+var+" (format="+dpf+", mode="+_dataPartitioner+")");
+ LOG.trace("PARFOR ID = "+_ID+", Partitioning read-only input variable "
+ + var + " (format="+dpf+", mode="+_dataPartitioner+")");
if( dpf != PartitionFormat.NONE )
{
if( dataPartitioner != PDataPartitioner.REMOTE_SPARK && dpf.isBlockwise() ) {
LOG.warn("PARFOR ID = "+_ID+", Switching data partitioner from " + dataPartitioner +
- " to " + PDataPartitioner.REMOTE_SPARK.name()+" for blockwise-n partitioning.");
+ " to " + PDataPartitioner.REMOTE_SPARK.name()+" for blockwise-n partitioning.");
dataPartitioner = PDataPartitioner.REMOTE_SPARK;
}
@@ -1214,9 +1188,8 @@ public class ParForProgramBlock extends ForProgramBlock
//store original and partitioned matrix (for reuse if applicable)
_variablesDPOriginal.put(var, moVar);
- if( ALLOW_REUSE_PARTITION_VARS
- && ProgramRecompiler.isApplicableForReuseVariable(sb.getDMLProg(), sb, var) )
- {
+ if( ALLOW_REUSE_PARTITION_VARS
+ && ProgramRecompiler.isApplicableForReuseVariable(sb.getDMLProg(), sb, var) ) {
_variablesDPReuse.put(var, dpdatNew);
}
@@ -1233,7 +1206,6 @@ public class ParForProgramBlock extends ForProgramBlock
if( OptimizerUtils.isSparkExecutionMode() &&
_variablesRP != null && !_variablesRP.isEmpty() ) {
SparkExecutionContext sec = (SparkExecutionContext) ec;
-
for( String var : _variablesRP )
sec.repartitionAndCacheMatrixObject(var);
}
@@ -1245,7 +1217,6 @@ public class ParForProgramBlock extends ForProgramBlock
if( OptimizerUtils.isSparkExecutionMode() &&
_variablesECache != null && !_variablesECache.isEmpty() ) {
SparkExecutionContext sec = (SparkExecutionContext) ec;
-
for( String var : _variablesECache )
sec.cacheMatrixObject(var);
}
@@ -1262,8 +1233,7 @@ public class ParForProgramBlock extends ForProgramBlock
private void cleanWorkerResultVariables(ExecutionContext ec, MatrixObject out, MatrixObject[] in)
throws DMLRuntimeException
{
- for( MatrixObject tmp : in )
- {
+ for( MatrixObject tmp : in ) {
//check for empty inputs (no iterations executed)
if( tmp != null && tmp != out )
ec.cleanupMatrixObject(tmp);
@@ -1299,8 +1269,7 @@ public class ParForProgramBlock extends ForProgramBlock
switch( datatype )
{
case SCALAR:
- switch( valuetype )
- {
+ switch( valuetype ) {
case BOOLEAN: dataObj = new BooleanObject(var,false); break;
case INT: dataObj = new IntObject(var,-1); break;
case DOUBLE: dataObj = new DoubleObject(var,-1d); break;
@@ -1408,20 +1377,17 @@ public class ParForProgramBlock extends ForProgramBlock
HashSet<String> fnNames = new HashSet<String>();
if( USE_PB_CACHE )
{
- if( _pbcache.containsKey(pwID) )
- {
+ if( _pbcache.containsKey(pwID) ) {
cpChildBlocks = _pbcache.get(pwID);
}
- else
- {
+ else {
cpChildBlocks = ProgramConverter.rcreateDeepCopyProgramBlocks(_childBlocks, pwID, _IDPrefix, new HashSet<String>(), fnNames, false, false);
_pbcache.put(pwID, cpChildBlocks);
}
}
- else
- {
+ else {
cpChildBlocks = ProgramConverter.rcreateDeepCopyProgramBlocks(_childBlocks, pwID, _IDPrefix, new HashSet<String>(), fnNames, false, false);
- }
+ }
//deep copy execution context (including prepare parfor update-in-place)
ExecutionContext cpEc = ProgramConverter.createDeepCopyExecutionContext(ec);
@@ -1443,8 +1409,7 @@ public class ParForProgramBlock extends ForProgramBlock
pw = new LocalParWorker( pwID, queue, body, cconf, MAX_RETRYS_ON_ERROR, _monitor );
pw.setFunctionNames(fnNames);
}
- catch(Exception ex)
- {
+ catch(Exception ex) {
throw new DMLRuntimeException(ex);
}
@@ -1519,7 +1484,7 @@ public class ParForProgramBlock extends ForProgramBlock
int maxNumRed = InfrastructureAnalyzer.getRemoteParallelReduceTasks();
//correction max number of reducers on yarn clusters
if( InfrastructureAnalyzer.isYarnEnabled() )
- maxNumRed = (int)Math.max( maxNumRed, YarnClusterAnalyzer.getNumCores()/2 );
+ maxNumRed = (int)Math.max( maxNumRed, YarnClusterAnalyzer.getNumCores()/2 );
int numRed = Math.min(numReducers,maxNumRed);
//create data partitioner
@@ -1530,12 +1495,12 @@ public class ParForProgramBlock extends ForProgramBlock
break;
case REMOTE_MR:
dp = new DataPartitionerRemoteMR( dpf, _ID, numRed,
- _replicationDP, ALLOW_REUSE_MR_JVMS, false );
+ _replicationDP, ALLOW_REUSE_MR_JVMS, false );
break;
case REMOTE_SPARK:
dp = new DataPartitionerRemoteSpark( dpf, ec, numRed,
- _replicationDP, false );
- break;
+ _replicationDP, false );
+ break;
default:
throw new DMLRuntimeException("Unknown data partitioner: '" +dataPartitioner.name()+"'.");
}
@@ -1557,18 +1522,18 @@ public class ParForProgramBlock extends ForProgramBlock
else {
int numReducers = ConfigurationManager.getNumReducers();
maxMap = InfrastructureAnalyzer.getRemoteParallelMapTasks();
- maxRed = Math.min(numReducers,
+ maxRed = Math.min(numReducers,
InfrastructureAnalyzer.getRemoteParallelReduceTasks());
//correction max number of reducers on yarn clusters
- if( InfrastructureAnalyzer.isYarnEnabled() ) {
- maxMap = (int)Math.max( maxMap, YarnClusterAnalyzer.getNumCores() );
- maxRed = (int)Math.max( maxRed, YarnClusterAnalyzer.getNumCores()/2 );
+ if( InfrastructureAnalyzer.isYarnEnabled() ) {
+ maxMap = (int)Math.max( maxMap, YarnClusterAnalyzer.getNumCores() );
+ maxRed = (int)Math.max( maxRed, YarnClusterAnalyzer.getNumCores()/2 );
}
}
int numMap = Math.max(_numThreads, maxMap);
int numRed = maxRed;
- //create result merge implementation
+ //create result merge implementation
switch( prm )
{
case LOCAL_MEM:
@@ -1581,10 +1546,9 @@ public class ParForProgramBlock extends ForProgramBlock
rm = new ResultMergeLocalAutomatic( out, in, fname );
break;
case REMOTE_MR:
- rm = new ResultMergeRemoteMR( out, in, fname, _ID, numMap, numRed,
- WRITE_REPLICATION_FACTOR,
- MAX_RETRYS_ON_ERROR,
- ALLOW_REUSE_MR_JVMS );
+ rm = new ResultMergeRemoteMR( out, in, fname,
+ _ID, numMap, numRed, WRITE_REPLICATION_FACTOR,
+ MAX_RETRYS_ON_ERROR, ALLOW_REUSE_MR_JVMS );
break;
case REMOTE_SPARK:
rm = new ResultMergeRemoteSpark( out, in, fname, ec, numMap, numRed );
@@ -1628,8 +1592,8 @@ public class ParForProgramBlock extends ForProgramBlock
private void releaseForcedRecompile(long tid)
throws DMLRuntimeException
{
- HashSet<String> fnStack = new HashSet<String>();
- Recompiler.recompileProgramBlockHierarchy2Forced(_childBlocks, tid, fnStack, null);
+ Recompiler.recompileProgramBlockHierarchy2Forced(
+ _childBlocks, tid, new HashSet<String>(), null);
}
private String writeTasksToFile(String fname, List<Task> tasks, int maxDigits)
@@ -1641,17 +1605,15 @@ public class ParForProgramBlock extends ForProgramBlock
Path path = new Path(fname);
FileSystem fs = IOUtilFunctions.getFileSystem(path);
br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));
-
+
boolean flagFirst = true; //workaround for keeping gen order
- for( Task t : tasks )
- {
+ for( Task t : tasks ) {
br.write( createTaskFileLine( t, maxDigits, flagFirst ) );
if( flagFirst )
flagFirst = false;
}
}
- catch(Exception ex)
- {
+ catch(Exception ex) {
throw new DMLRuntimeException("Error writing tasks to taskfile "+fname, ex);
}
finally {
@@ -1670,18 +1632,16 @@ public class ParForProgramBlock extends ForProgramBlock
Path path = new Path( fname );
FileSystem fs = IOUtilFunctions.getFileSystem(path);
br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));
-
+
Task t = null;
boolean flagFirst = true; //workaround for keeping gen order
- while( (t = queue.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS )
- {
+ while( (t = queue.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS ) {
br.write( createTaskFileLine( t, maxDigits, flagFirst ) );
if( flagFirst )
flagFirst = false;
}
}
- catch(Exception ex)
- {
+ catch(Exception ex) {
throw new DMLRuntimeException("Error writing tasks to taskfile "+fname, ex);
}
finally {
@@ -1691,11 +1651,9 @@ public class ParForProgramBlock extends ForProgramBlock
return fname;
}
- private String createTaskFileLine( Task t, int maxDigits, boolean flagFirst )
- {
+ private String createTaskFileLine( Task t, int maxDigits, boolean flagFirst ) {
//always pad to max digits in order to preserve task order
- String ret = t.toCompactString(maxDigits) + (flagFirst?" ":"") + "\n";
- return ret;
+ return t.toCompactString(maxDigits) + (flagFirst?" ":"") + "\n";
}
private void consolidateAndCheckResults(ExecutionContext ec, long expIters, long expTasks, long numIters, long numTasks, LocalVariableMap [] results)
@@ -1752,20 +1710,20 @@ public class ParForProgramBlock extends ForProgramBlock
MatrixObject out = (MatrixObject) dat;
MatrixObject[] in = new MatrixObject[ results.length ];
for( int i=0; i< results.length; i++ )
- in[i] = (MatrixObject) results[i].get( var );
+ in[i] = (MatrixObject) results[i].get( var );
String fname = constructResultMergeFileName();
ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, ec);
MatrixObject outNew = null;
if( USE_PARALLEL_RESULT_MERGE )
outNew = rm.executeParallelMerge( _numThreads );
else
- outNew = rm.executeSerialMerge();
+ outNew = rm.executeSerialMerge();
//cleanup existing var
Data exdata = ec.removeVariable(var);
if( exdata != null && exdata != outNew && exdata instanceof MatrixObject )
ec.cleanupMatrixObject((MatrixObject)exdata);
-
+
//cleanup of intermediate result variables
cleanWorkerResultVariables( ec, out, in );
@@ -1796,21 +1754,18 @@ public class ParForProgramBlock extends ForProgramBlock
*
* @return true if ?
*/
- private boolean checkParallelRemoteResultMerge()
- {
- return (USE_PARALLEL_RESULT_MERGE_REMOTE
- && _resultVars.size() > 1
- && ( _resultMerge == PResultMerge.REMOTE_MR
- ||_resultMerge == PResultMerge.REMOTE_SPARK) );
+ private boolean checkParallelRemoteResultMerge() {
+ return (USE_PARALLEL_RESULT_MERGE_REMOTE && _resultVars.size() > 1
+ && ( _resultMerge == PResultMerge.REMOTE_MR
+ ||_resultMerge == PResultMerge.REMOTE_SPARK) );
}
- private void setParForProgramBlockIDs(int IDPrefix)
- {
+ private void setParForProgramBlockIDs(int IDPrefix) {
_IDPrefix = IDPrefix;
if( _IDPrefix == -1 ) //not specified
_ID = _pfIDSeq.getNextID(); //generated new ID
else //remote case (further nested parfors are all in one JVM)
- _ID = IDHandler.concatIntIDsToLong(_IDPrefix, (int)_pfIDSeq.getNextID());
+ _ID = IDHandler.concatIntIDsToLong(_IDPrefix, (int)_pfIDSeq.getNextID());
}
/**
@@ -1826,8 +1781,7 @@ public class ParForProgramBlock extends ForProgramBlock
_pwIDs = new long[ _numThreads ];
- for( int i=0; i<_numThreads; i++ )
- {
+ for( int i=0; i<_numThreads; i++ ) {
if(_IDPrefix == -1)
_pwIDs[i] = _pwIDSeq.getNextID();
else
@@ -1838,8 +1792,7 @@ public class ParForProgramBlock extends ForProgramBlock
}
}
- private long computeNumIterations( IntObject from, IntObject to, IntObject incr )
- {
+ private long computeNumIterations( IntObject from, IntObject to, IntObject incr ) {
return (long)Math.ceil(((double)(to.getLongValue() - from.getLongValue() + 1)) / incr.getLongValue());
}
@@ -1849,10 +1802,8 @@ public class ParForProgramBlock extends ForProgramBlock
*
* @return task file name
*/
- private String constructTaskFileName()
- {
+ private String constructTaskFileName() {
String scratchSpaceLoc = ConfigurationManager.getScratchSpace();
-
StringBuilder sb = new StringBuilder();
sb.append(scratchSpaceLoc);
sb.append(Lop.FILE_SEPARATOR);
@@ -1869,10 +1820,8 @@ public class ParForProgramBlock extends ForProgramBlock
*
* @return result file name
*/
- private String constructResultFileName()
- {
+ private String constructResultFileName() {
String scratchSpaceLoc = ConfigurationManager.getScratchSpace();
-
StringBuilder sb = new StringBuilder();
sb.append(scratchSpaceLoc);
sb.append(Lop.FILE_SEPARATOR);
@@ -1880,13 +1829,11 @@ public class ParForProgramBlock extends ForProgramBlock
sb.append(DMLScript.getUUID());
sb.append(PARFOR_MR_RESULT_TMP_FNAME.replaceAll("%ID%", String.valueOf(_ID)));
- return sb.toString();
+ return sb.toString();
}
- private String constructResultMergeFileName()
- {
+ private String constructResultMergeFileName() {
String scratchSpaceLoc = ConfigurationManager.getScratchSpace();
-
String fname = PARFOR_MR_RESULTMERGE_FNAME;
fname = fname.replaceAll("%ID%", String.valueOf(_ID)); //replace workerID
fname = fname.replaceAll("%VAR%", String.valueOf(_resultVarsIDSeq.getNextID()));
@@ -1898,13 +1845,11 @@ public class ParForProgramBlock extends ForProgramBlock
sb.append(DMLScript.getUUID());
sb.append(fname);
- return sb.toString();
+ return sb.toString();
}
- private String constructDataPartitionsFileName()
- {
+ private String constructDataPartitionsFileName() {
String scratchSpaceLoc = ConfigurationManager.getScratchSpace();
-
String fname = PARFOR_DATAPARTITIONS_FNAME;
fname = fname.replaceAll("%ID%", String.valueOf(_ID)); //replace workerID
fname = fname.replaceAll("%VAR%", String.valueOf(_dpVarsIDSeq.getNextID()));
@@ -1916,7 +1861,7 @@ public class ParForProgramBlock extends ForProgramBlock
sb.append(DMLScript.getUUID());
sb.append(fname);
- return sb.toString();
+ return sb.toString();
}
private long getMinMemory(ExecutionContext ec)
@@ -1946,10 +1891,8 @@ public class ParForProgramBlock extends ForProgramBlock
return ret;
}
- private void setMemoryBudget()
- {
- if( _recompileMemoryBudget > 0 )
- {
+ private void setMemoryBudget() {
+ if( _recompileMemoryBudget > 0 ) {
// store old budget for reset after exec
_oldMemoryBudget = (double)InfrastructureAnalyzer.getLocalMaxMemory();
@@ -1959,16 +1902,12 @@ public class ParForProgramBlock extends ForProgramBlock
}
}
- private void resetMemoryBudget()
- {
+ private void resetMemoryBudget() {
if( _recompileMemoryBudget > 0 )
- {
InfrastructureAnalyzer.setLocalMaxMemory((long)_oldMemoryBudget);
- }
}
- private void resetOptimizerFlags()
- {
+ private void resetOptimizerFlags() {
//reset all state that was set but is not guaranteed to be overwritten by optimizer
_variablesDPOriginal.removeAll();
_colocatedDPMatrix = null;
@@ -2021,7 +1960,7 @@ public class ParForProgramBlock extends ForProgramBlock
MatrixObject[] in = new MatrixObject[ _refVars.length ];
for( int i=0; i< _refVars.length; i++ )
- in[i] = (MatrixObject) _refVars[i].get( varname );
+ in[i] = (MatrixObject) _refVars[i].get( varname );
String fname = constructResultMergeFileName();
ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, _ec);
@@ -2029,7 +1968,7 @@ public class ParForProgramBlock extends ForProgramBlock
if( USE_PARALLEL_RESULT_MERGE )
outNew = rm.executeParallelMerge( _numThreads );
else
- outNew = rm.executeSerialMerge();
+ outNew = rm.executeSerialMerge();
synchronized( _ec.getVariables() ){
_ec.getVariables().put( varname, outNew);
@@ -2051,5 +1990,4 @@ public class ParForProgramBlock extends ForProgramBlock
public String printBlockErrorLocation(){
return "ERROR: Runtime error in parfor program block generated from parfor statement block between lines " + _beginLine + " and " + _endLine + " -- ";
}
-
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
index 0bd73d1..59d2589 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
@@ -544,23 +544,18 @@ public class ExecutionContext {
for( String var : varList )
{
Data dat = _variables.get(var);
- if( dat instanceof MatrixObject )
- {
+ if( dat instanceof MatrixObject ) {
MatrixObject mo = (MatrixObject)dat;
varsState.put( var, mo.isCleanupEnabled() );
- //System.out.println("pre-pin "+var+" ("+mo.isCleanupEnabled()+")");
}
}
//step 2) pin variables
- for( String var : varList )
- {
+ for( String var : varList ) {
Data dat = _variables.get(var);
- if( dat instanceof MatrixObject )
- {
+ if( dat instanceof MatrixObject ) {
MatrixObject mo = (MatrixObject)dat;
mo.enableCleanup(false);
- //System.out.println("pin "+var);
}
}
@@ -583,11 +578,8 @@ public class ExecutionContext {
* @param varList variable list
* @param varsState variable state
*/
- public void unpinVariables(ArrayList<String> varList, HashMap<String,Boolean> varsState)
- {
- for( String var : varList)
- {
- //System.out.println("unpin "+var+" ("+varsState.get(var)+")");
+ public void unpinVariables(ArrayList<String> varList, HashMap<String,Boolean> varsState) {
+ for( String var : varList) {
Data dat = _variables.get(var);
if( dat instanceof MatrixObject )
((MatrixObject)dat).enableCleanup(varsState.get(var));
@@ -597,15 +589,28 @@ public class ExecutionContext {
/**
* NOTE: No order guaranteed, so keep same list for pin and unpin.
*
- * @return variable list as strings
+ * @return list of all variable names.
*/
- public ArrayList<String> getVarList()
- {
- ArrayList<String> varlist = new ArrayList<String>();
- varlist.addAll(_variables.keySet());
- return varlist;
+ public ArrayList<String> getVarList() {
+ return new ArrayList<>(_variables.keySet());
}
-
+
+ /**
+ * NOTE: No order guaranteed, so keep same list for pin and unpin.
+ *
+ * @return list of all variable names of partitioned matrices.
+ */
+ public ArrayList<String> getVarListPartitioned() {
+ ArrayList<String> ret = new ArrayList<>();
+ for( String var : _variables.keySet() ) {
+ Data dat = _variables.get(var);
+ if( dat instanceof MatrixObject
+ && ((MatrixObject)dat).isPartitioned() )
+ ret.add(var);
+ }
+ return ret;
+ }
+
public void cleanupMatrixObject(MatrixObject mo)
throws DMLRuntimeException
{
http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java
new file mode 100644
index 0000000..48ef883
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.parfor;
+
+import java.lang.ref.SoftReference;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
+
+public class CachedReuseVariables
+{
+ private final HashMap<Long, SoftReference<LocalVariableMap>> _data;
+
+ public CachedReuseVariables() {
+ _data = new HashMap<>();
+ }
+
+ public synchronized void reuseVariables(long pfid, LocalVariableMap vars, Collection<String> blacklist) {
+ //check for existing reuse map
+ LocalVariableMap tmp = null;
+ if( _data.containsKey(pfid) )
+ tmp = _data.get(pfid).get();
+
+ //build reuse map if not created yet or evicted
+ if( tmp == null ) {
+ tmp = new LocalVariableMap(vars);
+ tmp.removeAllIn(new HashSet<>(blacklist));
+ _data.put(pfid, new SoftReference<>(tmp));
+ }
+ //reuse existing reuse map
+ else {
+ for( String varName : tmp.keySet() )
+ vars.put(varName, tmp.get(varName));
+ }
+ }
+
+ public synchronized void clearVariables(long pfid) {
+ _data.remove(pfid);
+ }
+}
http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
index 8dc86d4..6d0cd3a 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
@@ -328,16 +328,15 @@ public class ProgramConverter
ParForProgramBlock tmpPB = null;
if( IDPrefix == -1 ) //still on master node
- tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), pfpb.getParForParams());
+ tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables());
else //child of remote ParWorker at any level
- tmpPB = new ParForProgramBlock(IDPrefix, prog, pfpb.getIterVar(), pfpb.getParForParams());
+ tmpPB = new ParForProgramBlock(IDPrefix, prog, pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables());
tmpPB.setStatementBlock( createForStatementBlockCopy( (ForStatementBlock) pfpb.getStatementBlock(), pid, plain, forceDeepCopy) );
tmpPB.setThreadID(pid);
tmpPB.disableOptimization(); //already done in top-level parfor
tmpPB.disableMonitorReport(); //already done in top-level parfor
- tmpPB.setResultVariables( pfpb.getResultVariables() );
tmpPB.setFromInstructions( createDeepCopyInstructionSet(pfpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
tmpPB.setToInstructions( createDeepCopyInstructionSet(pfpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
@@ -1514,9 +1513,8 @@ public class ProgramConverter
//program blocks //reset id to preinit state, replaced during exec
ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, 0);
- ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, iterVar, params);
+ ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, iterVar, params, resultVars);
pfpb.disableOptimization(); //already done in top-level parfor
- pfpb.setResultVariables(resultVars);
pfpb.setFromInstructions(from);
pfpb.setToInstructions(to);
pfpb.setIncrementInstructions(incr);
http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
index 49ac9db..10b44a2 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
@@ -34,6 +34,8 @@ import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysml.utils.Statistics;
/**
@@ -51,12 +53,14 @@ import org.apache.sysml.utils.Statistics;
*/
public class RemoteParForSpark
{
-
protected static final Log LOG = LogFactory.getLog(RemoteParForSpark.class.getName());
-
- public static RemoteParForJobReturn runJob(long pfid, String program, HashMap<String, byte[]> clsMap,
+
+ //globally unique id for parfor spark job instances (unique across spark contexts)
+ private static final IDSequence _jobID = new IDSequence();
+
+ public static RemoteParForJobReturn runJob(long pfid, String prog, HashMap<String, byte[]> clsMap,
List<Task> tasks, ExecutionContext ec, boolean cpCaching, int numMappers)
- throws DMLRuntimeException
+ throws DMLRuntimeException
{
String jobname = "ParFor-ESP";
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
@@ -68,13 +72,16 @@ public class RemoteParForSpark
LongAccumulator aTasks = sc.sc().longAccumulator("tasks");
LongAccumulator aIters = sc.sc().longAccumulator("iterations");
+ //reset cached shared inputs for correctness in local mode
+ long jobid = _jobID.getNextID();
+ if( InfrastructureAnalyzer.isLocalMode() )
+ RemoteParForSparkWorker.cleanupCachedVariables(jobid);
+
//run remote_spark parfor job
//(w/o lazy evaluation to fit existing parfor framework, e.g., result merge)
- RemoteParForSparkWorker func = new RemoteParForSparkWorker(program, clsMap, cpCaching, aTasks, aIters);
- List<Tuple2<Long,String>> out = sc
- .parallelize(tasks, tasks.size()) //create rdd of parfor tasks
- .flatMapToPair(func) //execute parfor tasks
- .collect(); //get output handles
+ List<Tuple2<Long,String>> out = sc.parallelize(tasks, tasks.size()) //create rdd of parfor tasks
+ .flatMapToPair(new RemoteParForSparkWorker(jobid, prog, clsMap, cpCaching, aTasks, aIters))
+ .collect(); //execute and get output handles
//de-serialize results
LocalVariableMap[] results = RemoteParForUtils.getResults(out, LOG);
@@ -85,11 +92,10 @@ public class RemoteParForSpark
RemoteParForJobReturn ret = new RemoteParForJobReturn(true, numTasks, numIters, results);
//maintain statistics
- Statistics.incrementNoOfCompiledSPInst();
- Statistics.incrementNoOfExecutedSPInst();
- if( DMLScript.STATISTICS ){
+ Statistics.incrementNoOfCompiledSPInst();
+ Statistics.incrementNoOfExecutedSPInst();
+ if( DMLScript.STATISTICS )
Statistics.maintainCPHeavyHitters(jobname, System.nanoTime()-t0);
- }
return ret;
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index e1410da..033d398 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -21,10 +21,12 @@ package org.apache.sysml.runtime.controlprogram.parfor;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.util.LongAccumulator;
@@ -40,8 +42,11 @@ import scala.Tuple2;
public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFunction<Task, Long, String>
{
private static final long serialVersionUID = -3254950138084272296L;
-
- private final String _prog;
+
+ private static final CachedReuseVariables reuseVars = new CachedReuseVariables();
+
+ private final long _jobid;
+ private final String _prog;
private final HashMap<String, byte[]> _clsMap;
private boolean _initialized = false;
private boolean _caching = true;
@@ -49,9 +54,10 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
private final LongAccumulator _aTasks;
private final LongAccumulator _aIters;
- public RemoteParForSparkWorker(String program, HashMap<String, byte[]> clsMap, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters)
+ public RemoteParForSparkWorker(long jobid, String program, HashMap<String, byte[]> clsMap, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters)
throws DMLRuntimeException
{
+ _jobid = jobid;
_prog = program;
_clsMap = clsMap;
_initialized = false;
@@ -68,7 +74,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
{
//lazy parworker initialization
if( !_initialized )
- configureWorker( TaskContext.get().taskAttemptId() );
+ configureWorker(TaskContext.get().taskAttemptId());
//execute a single task
long numIter = getExecutedIterations();
@@ -88,10 +94,11 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
return ret.iterator();
}
- private void configureWorker( long ID )
+ @SuppressWarnings("unchecked")
+ private void configureWorker(long taskID)
throws DMLRuntimeException, IOException
{
- _workerID = ID;
+ _workerID = taskID;
//initialize codegen class cache (before program parsing)
synchronized( CodegenUtils.class ) {
@@ -106,7 +113,13 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
_resultVars = body.getResultVarNames();
_numTasks = 0;
_numIters = 0;
-
+
+ //reuse shared inputs (to read shared inputs once per process instead of once per core;
+ //we reuse everything except result variables and partitioned input matrices)
+ _ec.pinVariables(_ec.getVarList()); //avoid cleanup of shared inputs
+ Collection<String> blacklist = CollectionUtils.union(_resultVars, _ec.getVarListPartitioned());
+ reuseVars.reuseVariables(_jobid, _ec.getVariables(), blacklist);
+
//init and register-cleanup of buffer pool (in parfor spark, multiple tasks might
//share the process-local, i.e., per executor, buffer pool; hence we synchronize
//the initialization and immediately register the created directory for cleanup
@@ -121,7 +134,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID;
//register entire working dir for delete on shutdown
RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
- }
+ }
}
//ensure that resultvar files are not removed
@@ -134,4 +147,8 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
//mark as initialized
_initialized = true;
}
+
+ public static void cleanupCachedVariables(long pfid) {
+ reuseVars.clearVariables(pfid);
+ }
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
index d202e07..7dd25bf 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
@@ -57,14 +57,14 @@ public class TaskPartitionerFactoring extends TaskPartitioner
{
LinkedList<Task> tasks = new LinkedList<Task>();
- long lFrom = _fromVal.getLongValue();
- long lTo = _toVal.getLongValue();
- long lIncr = _incrVal.getLongValue();
+ long lFrom = _fromVal.getLongValue();
+ long lTo = _toVal.getLongValue();
+ long lIncr = _incrVal.getLongValue();
int P = _numThreads; // number of parallel workers
- long N = _numIter; // total number of iterations
- long R = N; // remaining number of iterations
- long K = -1; // next _numThreads task sizes
+ long N = _numIter; // total number of iterations
+ long R = N; // remaining number of iterations
+ long K = -1; // next _numThreads task sizes
TaskType type = null; // type of iterations: range tasks (similar to run-length encoding) make only sense if taskSize>3
for( long i = lFrom; i<=lTo; )
@@ -73,7 +73,7 @@ public class TaskPartitionerFactoring extends TaskPartitioner
R -= (K * P);
type = (ParForProgramBlock.USE_RANGE_TASKS_IF_USEFUL && K>3 ) ?
- TaskType.RANGE : TaskType.SET;
+ TaskType.RANGE : TaskType.SET;
//for each logical processor
for( int j=0; j<P; j++ )
@@ -86,16 +86,12 @@ public class TaskPartitionerFactoring extends TaskPartitioner
tasks.addLast(lTask);
// add iterations to task
- if( type == TaskType.SET )
- {
+ if( type == TaskType.SET ) {
//value based tasks
for( long k=0; k<K && i<=lTo; k++, i+=lIncr )
- {
- lTask.addIteration(new IntObject(_iterVarName, i));
- }
+ lTask.addIteration(new IntObject(_iterVarName, i));
}
- else
- {
+ else {
//determine end of task
long to = Math.min( i+(K-1)*lIncr, lTo );
@@ -103,7 +99,6 @@ public class TaskPartitionerFactoring extends TaskPartitioner
lTask.addIteration(new IntObject(_iterVarName, i)); //from
lTask.addIteration(new IntObject(_iterVarName, to)); //to
lTask.addIteration(new IntObject(_iterVarName, lIncr)); //increment
-
i = to + lIncr;
}
}
@@ -122,11 +117,11 @@ public class TaskPartitionerFactoring extends TaskPartitioner
long lTo = _toVal.getLongValue();
long lIncr = _incrVal.getLongValue();
- int P = _numThreads; // number of parallel workers
+ int P = _numThreads; // number of parallel workers
long N = _numIter; // total number of iterations
- long R = N; // remaining number of iterations
- long K = -1; //next _numThreads task sizes
- TaskType type = null; // type of iterations: range tasks (similar to run-length encoding) make only sense if taskSize>3
+ long R = N; // remaining number of iterations
+ long K = -1; //next _numThreads task sizes
+ TaskType type = null; // type of iterations: range tasks (similar to run-length encoding) make only sense if taskSize>3
try
{
[2/2] systemml git commit: [SYSTEMML-1881] Tuning parfor degree of
parallelism (over-provisioning)
Posted by mb...@apache.org.
[SYSTEMML-1881] Tuning parfor degree of parallelism (over-provisioning)
This patch addresses issues of under-utilized CPU resources in parfor
contexts. For example, on Kmeans or MSVM with few runs or classes that
are not a factor of the number of hardware threads, we assign the
remaining parallelism too conservatively. Consider Kmeans with 10 runs
and 16 hardware threads - in this case, we assign k=10 to parfor and k=1
to the operations in the parfor body. This patch fine-tunes this
assignment by slightly over-provisioning CPU resources, which is usually
a good idea due to barriers between operations. We now assign the
remaining operation parallelism with k=round(maxK/parforK).
On the perftest Kmeans 100K x 1K scenario with 50 classes, 10 runs, and
16 hardware threads, this patch improved performance from 196s to 168s
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/ba73291c
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/ba73291c
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/ba73291c
Branch: refs/heads/master
Commit: ba73291c985d876eaeeb5719623461131bcc7f66
Parents: 2c57cf7
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Sep 2 14:30:11 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Sep 2 14:30:11 2017 -0700
----------------------------------------------------------------------
.../parfor/opt/OptimizerRuleBased.java | 34 +++++++++++++-------
1 file changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/ba73291c/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index b8da25a..9dada01 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -1247,12 +1247,12 @@ public class OptimizerRuleBased extends Optimizer
//constrain max parfor parallelism by problem size
int parforK = (int)((_N<kMax)? _N : kMax);
-
-
+
// if gpu mode is enabled, the amount of parallelism is set to
// the smaller of the number of iterations and the number of GPUs
// otherwise it default to the number of CPU cores and the
// operations are run in CP mode
+ //FIXME rework for nested parfor parallelism and body w/o gpu ops
if (DMLScript.USE_ACCELERATOR) {
long perGPUBudget = GPUContextPool.initialGPUMemBudget();
double maxMemUsage = getMaxCPOnlyBudget(n);
@@ -1264,15 +1264,14 @@ public class OptimizerRuleBased extends Optimizer
parforK + "]");
}
}
-
//set parfor degree of parallelism
pfpb.setDegreeOfParallelism(parforK);
- n.setK(parforK);
+ n.setK(parforK);
//distribute remaining parallelism
- int remainParforK = (int)Math.ceil(((double)(kMax-parforK+1))/parforK);
- int remainOpsK = Math.max(_lkmaxCP / parforK, 1);
+ int remainParforK = getRemainingParallelismParFor(kMax, parforK);
+ int remainOpsK = getRemainingParallelismOps(_lkmaxCP, parforK);
rAssignRemainingParallelism( n, remainParforK, remainOpsK );
}
else // ExecType.MR/ExecType.SPARK
@@ -1334,13 +1333,13 @@ public class OptimizerRuleBased extends Optimizer
//set parfor degree of parallelism
long id = c.getID();
c.setK(tmpK);
- ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
- .getAbstractPlanMapping().getMappedProg(id)[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
+ OptTreeConverter.getAbstractPlanMapping().getMappedProg(id)[1];
pfpb.setDegreeOfParallelism(tmpK);
- //distribute remaining parallelism
- int remainParforK = (int)Math.ceil(((double)(parforK-tmpK+1))/tmpK);
- int remainOpsK = Math.max(opsK / tmpK, 1);
+ //distribute remaining parallelism
+ int remainParforK = getRemainingParallelismParFor(parforK, tmpK);
+ int remainOpsK = getRemainingParallelismOps(opsK, tmpK);
rAssignRemainingParallelism(c, remainParforK, remainOpsK);
}
else if( c.getNodeType() == NodeType.HOP )
@@ -1387,7 +1386,18 @@ public class OptimizerRuleBased extends Optimizer
}
}
}
-
+
+ private static int getRemainingParallelismParFor(int parforK, int tmpK) {
+ //compute max remaining parfor parallelism k such that k * tmpK <= parforK
+ return (int)Math.ceil((double)(parforK-tmpK+1) / tmpK);
+ }
+
+ private static int getRemainingParallelismOps(int opsK, int tmpK) {
+ //compute max remaining operations parallelism k with slight over-provisioning
+ //such that k * tmpK <= 1.5 * opsK; note that if parfor already exploits the
+ //maximum parallelism, this will not introduce any over-provisioning.
+ return (int)Math.max(Math.round((double)opsK / tmpK), 1);
+ }
///////
//REWRITE set task partitioner