You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/22 00:27:04 UTC
[3/3] incubator-systemml git commit: [SYSTEMML-946] Improved
shuffle-based data converters (avoid alloc)
[SYSTEMML-946] Improved shuffle-based data converters (avoid alloc)
This patch contains two improvements of our shuffle-based data
converters, which both avoid unnecessary allocations and block copies
and hence reduce GC pressure and generally improve performance.
(1) Row sparsity estimates on dataframe - matrix conversion, which
affects the grow rate of sparse rows (2x vs 1.1x, and hence reduce
allocations)
(2) Improved mergeByKey primitive: we now use combineByKey instead of
reduceByKey to allocate a combiner block once and merge subsequent
blocks in-place without unnecessary allocation and copy. This applies to
all operations which rely on merging partial blocks. On a scenario of
csv-matrix conversion (1k x 5M, 4 nodes), this improved performance from
1,214s to 443s.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/38d087a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/38d087a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/38d087a7
Branch: refs/heads/master
Commit: 38d087a767a23c5f43a2a1ab45d69bf9bc1b1934
Parents: 80a72d7
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Sep 21 17:26:22 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Wed Sep 21 17:26:22 2016 -0700
----------------------------------------------------------------------
.../spark/utils/RDDAggregateUtils.java | 132 ++++++-------------
.../spark/utils/RDDConverterUtils.java | 4 +-
2 files changed, 46 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/38d087a7/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index c545c30..93bb1d0 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -100,7 +100,7 @@ public class RDDAggregateUtils
{
//stable sum of blocks per key, by passing correction blocks along with aggregates
JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp =
- in.combineByKey( new CreateBlockCombinerFunction(),
+ in.combineByKey( new CreateCorrBlockCombinerFunction(),
new MergeSumBlockValueFunction(),
new MergeSumBlockCombinerFunction() );
@@ -117,28 +117,6 @@ public class RDDAggregateUtils
* @param in
* @return
*/
-// public static JavaPairRDD<MatrixIndexes, MatrixBlock> sumByKeyStable( MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes, MatrixBlock> in )
-// {
-// //stable sum of blocks per key, by passing correction blocks along with aggregates
-// JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp =
-// in.combineByKey( new CreateBlockCombinerFunction(),
-// new MergeSumBlockValueFunction(),
-// new MergeSumBlockCombinerFunction(),
-// new BlockPartitioner(mc, in.partitions().size()));
-//
-// //strip-off correction blocks from
-// JavaPairRDD<MatrixIndexes, MatrixBlock> out =
-// tmp.mapValues( new ExtractMatrixBlock() );
-//
-// //return the aggregate rdd
-// return out;
-// }
-
- /**
- *
- * @param in
- * @return
- */
public static JavaPairRDD<MatrixIndexes, Double> sumCellsByKeyStable( JavaPairRDD<MatrixIndexes, Double> in )
{
//stable sum of blocks per key, by passing correction blocks along with aggregates
@@ -191,7 +169,7 @@ public class RDDAggregateUtils
{
//stable sum of blocks per key, by passing correction blocks along with aggregates
JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp =
- in.combineByKey( new CreateBlockCombinerFunction(),
+ in.combineByKey( new CreateCorrBlockCombinerFunction(),
new MergeAggBlockValueFunction(aop),
new MergeAggBlockCombinerFunction(aop) );
@@ -204,30 +182,6 @@ public class RDDAggregateUtils
}
/**
- *
- * @param mc
- * @param in
- * @param aop
- * @return
- */
-// public static JavaPairRDD<MatrixIndexes, MatrixBlock> aggByKeyStable( MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes, MatrixBlock> in, AggregateOperator aop )
-// {
-// //stable sum of blocks per key, by passing correction blocks along with aggregates
-// JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp =
-// in.combineByKey( new CreateBlockCombinerFunction(),
-// new MergeAggBlockValueFunction(aop),
-// new MergeAggBlockCombinerFunction(aop),
-// new BlockPartitioner(mc, in.partitions().size()));
-//
-// //strip-off correction blocks from
-// JavaPairRDD<MatrixIndexes, MatrixBlock> out =
-// tmp.mapValues( new ExtractMatrixBlock() );
-//
-// //return the aggregate rdd
-// return out;
-// }
-
- /**
* Merges disjoint data of all blocks per key.
*
* Note: The behavior of this method is undefined for both sparse and dense data if the
@@ -238,24 +192,14 @@ public class RDDAggregateUtils
*/
public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( JavaPairRDD<MatrixIndexes, MatrixBlock> in )
{
- return in.reduceByKey(
- new MergeBlocksFunction());
+ //use combine by key to avoid unnecessary deep block copies, i.e.
+ //create combiner block once and merge remaining blocks in-place.
+ return in.combineByKey( new CreateBlockCombinerFunction(),
+ new MergeBlocksFunction(false),
+ new MergeBlocksFunction(false) );
}
/**
- *
- * @param mc
- * @param in
- * @return
- */
-// public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes, MatrixBlock> in )
-// {
-// return in.reduceByKey(
-// new BlockPartitioner(mc, in.partitions().size()),
-// new MergeBlocksFunction());
-// }
-
- /**
* Merges disjoint data of all blocks per key.
*
* Note: The behavior of this method is undefined for both sparse and dense data if the
@@ -268,13 +212,13 @@ public class RDDAggregateUtils
{
return in.combineByKey( new CreateRowBlockCombinerFunction(),
new MergeRowBlockValueFunction(),
- new MergeRowBlockCombinerFunction() );
+ new MergeBlocksFunction(false) );
}
/**
*
*/
- private static class CreateBlockCombinerFunction implements Function<MatrixBlock, CorrMatrixBlock>
+ private static class CreateCorrBlockCombinerFunction implements Function<MatrixBlock, CorrMatrixBlock>
{
private static final long serialVersionUID = -3666451526776017343L;
@@ -349,6 +293,22 @@ public class RDDAggregateUtils
/**
*
*/
+ private static class CreateBlockCombinerFunction implements Function<MatrixBlock, MatrixBlock>
+ {
+ private static final long serialVersionUID = 1987501624176848292L;
+
+ @Override
+ public MatrixBlock call(MatrixBlock arg0)
+ throws Exception
+ {
+ //create deep copy of given block
+ return new MatrixBlock(arg0);
+ }
+ }
+
+ /**
+ *
+ */
private static class CreateRowBlockCombinerFunction implements Function<RowMatrixBlock, MatrixBlock>
{
private static final long serialVersionUID = 2866598914232118425L;
@@ -392,26 +352,6 @@ public class RDDAggregateUtils
/**
*
*/
- private static class MergeRowBlockCombinerFunction implements Function2<MatrixBlock, MatrixBlock, MatrixBlock>
- {
- private static final long serialVersionUID = 5142967296705548000L;
-
- @Override
- public MatrixBlock call(MatrixBlock arg0, MatrixBlock arg1)
- throws Exception
- {
- //merge second matrix block into first
- MatrixBlock out = arg0; //in-place update
- out.merge(arg1, false);
- out.examSparsity();
-
- return out;
- }
- }
-
- /**
- *
- */
private static class CreateCellCombinerFunction implements Function<Double, KahanObject>
{
private static final long serialVersionUID = 3697505233057172994L;
@@ -736,11 +676,25 @@ public class RDDAggregateUtils
private static class MergeBlocksFunction implements Function2<MatrixBlock, MatrixBlock, MatrixBlock>
{
private static final long serialVersionUID = -8881019027250258850L;
-
+ private boolean _deep = false;
+
+ @SuppressWarnings("unused")
+ public MergeBlocksFunction() {
+ //by default deep copy first argument
+ this(true);
+ }
+
+ public MergeBlocksFunction(boolean deep) {
+ _deep = deep;
+ }
+
@Override
public MatrixBlock call(MatrixBlock b1, MatrixBlock b2)
throws Exception
{
+ long b1nnz = b1.getNonZeros();
+ long b2nnz = b2.getNonZeros();
+
// sanity check input dimensions
if (b1.getNumRows() != b2.getNumRows() || b1.getNumColumns() != b2.getNumColumns()) {
throw new DMLRuntimeException("Mismatched block sizes for: "
@@ -749,14 +703,14 @@ public class RDDAggregateUtils
}
// execute merge (never pass by reference)
- MatrixBlock ret = new MatrixBlock(b1);
+ MatrixBlock ret = _deep ? new MatrixBlock(b1) : b1;
ret.merge(b2, false);
ret.examSparsity();
// sanity check output number of non-zeros
- if (ret.getNonZeros() != b1.getNonZeros() + b2.getNonZeros()) {
+ if (ret.getNonZeros() != b1nnz + b2nnz) {
throw new DMLRuntimeException("Number of non-zeros does not match: "
- + ret.getNonZeros() + " != " + b1.getNonZeros() + " + " + b2.getNonZeros());
+ + ret.getNonZeros() + " != " + b1nnz + " + " + b2nnz);
}
return ret;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/38d087a7/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index a619a4d..38ebd7e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -899,6 +899,7 @@ public class RDDConverterUtils
private long _clen = -1;
private int _brlen = -1;
private int _bclen = -1;
+ private double _sparsity = 1.0;
private boolean _sparse = false;
private boolean _containsID;
private boolean _isVector;
@@ -908,6 +909,7 @@ public class RDDConverterUtils
_clen = mc.getCols();
_brlen = mc.getRowsPerBlock();
_bclen = mc.getColsPerBlock();
+ _sparsity = OptimizerUtils.getSparsity(mc);
_sparse = sparse;
_containsID = containsID;
_isVector = isVector;
@@ -976,7 +978,7 @@ public class RDDConverterUtils
for( int cix=1; cix<=ncblks; cix++ ) {
int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);
ix[cix-1] = new MatrixIndexes(rix, cix);
- mb[cix-1] = new MatrixBlock(lrlen, lclen, _sparse);
+ mb[cix-1] = new MatrixBlock(lrlen, lclen, _sparse,(int)(lrlen*lclen*_sparsity));
}
}