You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/02/17 01:17:40 UTC
systemml git commit: [SYSTEMML-2152] Fix spark mapmm selection
(missing dims constraint)
Repository: systemml
Updated Branches:
refs/heads/master 5837953e9 -> a640e57b0
[SYSTEMML-2152] Fix spark mapmm selection (missing dims constraint)
This patch fixes the operator selection logic for the broadcast-based
mapmm. Since the broadcast matrix is created in the driver, the matrix
needs to satisfy the CP dimensions constraint of int32, which can be
exceeded for ultra-sparse broadcast choices.
In the future, we will also support the creation of broadcast from
blocked RDDs and hdfs files, which will allow to remove the constraint.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/a640e57b
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/a640e57b
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/a640e57b
Branch: refs/heads/master
Commit: a640e57b01ab186f9204ee1a256747dbbfe6ccaa
Parents: 5837953
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Feb 16 17:17:33 2018 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Feb 16 17:17:33 2018 -0800
----------------------------------------------------------------------
.../java/org/apache/sysml/hops/AggBinaryOp.java | 24 +++++++++++---------
1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/a640e57b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
index 59cbf95..241dd86 100644
--- a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
@@ -1650,13 +1650,13 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
}
private MMultMethod optFindMMultMethodSpark( long m1_rows, long m1_cols, long m1_rpb, long m1_cpb, long m1_nnz,
- long m2_rows, long m2_cols, long m2_rpb, long m2_cpb, long m2_nnz,
- MMTSJType mmtsj, ChainType chainType, boolean leftPMInput, boolean tmmRewrite )
- {
+ long m2_rows, long m2_cols, long m2_rpb, long m2_cpb, long m2_nnz,
+ MMTSJType mmtsj, ChainType chainType, boolean leftPMInput, boolean tmmRewrite )
+ {
//Notes: Any broadcast needs to fit twice in local memory because we partition the input in cp,
//and needs to fit once in executor broadcast memory. The 2GB broadcast constraint is no longer
//required because the max_int byte buffer constraint has been fixed in Spark 1.4
- double memBudgetExec = MAPMULT_MEM_MULTIPLIER * SparkExecutionContext.getBroadcastMemoryBudget();
+ double memBudgetExec = MAPMULT_MEM_MULTIPLIER * SparkExecutionContext.getBroadcastMemoryBudget();
double memBudgetLocal = OptimizerUtils.getLocalMemBudget();
//reset spark broadcast memory information (for concurrent parfor jobs, awareness of additional
@@ -1699,11 +1699,11 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
+ OptimizerUtils.estimateSize(m1_cols, m2_cols)) < memBudgetLocal )
{
_spBroadcastMemEstimate = 2*(OptimizerUtils.estimateSize(m1_rows, m2_cols)
- + OptimizerUtils.estimateSize(m1_cols, m2_cols));
+ + OptimizerUtils.estimateSize(m1_cols, m2_cols));
return MMultMethod.MAPMM_CHAIN;
}
}
- }
+ }
// Step 3: check for PMM (permutation matrix needs to fit into mapper memory)
// (needs to be checked before mapmult for consistency with removeEmpty compilation
@@ -1735,11 +1735,13 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
{
//apply map mult if one side fits in remote task memory
//(if so pick smaller input for distributed cache)
- if( m1SizeP < m2SizeP && m1_rows>=0 && m1_cols>=0) {
+ //TODO relax requirement of valid CP dimensions once we support broadcast creation from files/RDDs
+ if( m1SizeP < m2SizeP && m1_rows>=0 && m1_cols>=0
+ && OptimizerUtils.isValidCPDimensions(m1_rows, m1_cols) ) {
_spBroadcastMemEstimate = m1Size+m1SizeP;
return MMultMethod.MAPMM_L;
}
- else {
+ else if( OptimizerUtils.isValidCPDimensions(m2_rows, m2_cols) ) {
_spBroadcastMemEstimate = m2Size+m2SizeP;
return MMultMethod.MAPMM_R;
}
@@ -1771,17 +1773,17 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
// Step 7: check for ZIPMM
// If t(X)%*%y -> t(t(y)%*%X) rewrite and ncol(X)<blocksize
if( tmmRewrite && m1_rows >= 0 && m1_rows <= m1_rpb //blocksize constraint left
- && m2_cols >= 0 && m2_cols <= m2_cpb ) //blocksize constraint right
+ && m2_cols >= 0 && m2_cols <= m2_cpb ) //blocksize constraint right
{
return MMultMethod.ZIPMM;
}
-
+
// Step 8: Decide CPMM vs RMM based on io costs
//estimate shuffle costs weighted by parallelism
//TODO currently we reuse the mr estimates, these need to be fine-tune for our spark operators
double rmm_costs = getRMMCostEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m2_rows, m2_cols, m2_rpb, m2_cpb);
double cpmm_costs = getCPMMCostEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m2_rows, m2_cols, m2_rpb, m2_cpb);
-
+
//final mmult method decision
if ( cpmm_costs < rmm_costs )
return MMultMethod.CPMM;