You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ni...@apache.org on 2017/02/16 22:29:29 UTC
incubator-systemml git commit: [MINOR] Code refactoring
MatrixIndexingSPInstruction to enable parallel improvements in both indexing
as well as prefetching
Repository: incubator-systemml
Updated Branches:
refs/heads/master 066a8213e -> bbc77e71e
[MINOR] Code refactoring MatrixIndexingSPInstruction to enable parallel
improvements in both indexing as well as prefetching
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/bbc77e71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/bbc77e71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/bbc77e71
Branch: refs/heads/master
Commit: bbc77e71eb9b5aa464f0130380bc30d3f42107b6
Parents: 066a821
Author: Niketan Pansare <np...@us.ibm.com>
Authored: Thu Feb 16 14:26:42 2017 -0800
Committer: Niketan Pansare <np...@us.ibm.com>
Committed: Thu Feb 16 14:26:42 2017 -0800
----------------------------------------------------------------------
.../spark/MatrixIndexingSPInstruction.java | 139 +++++++++++--------
1 file changed, 83 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bbc77e71/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
index 9d58718..71bb5ee 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
@@ -85,6 +85,83 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction
super(op, lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, istr);
}
+ public static MatrixBlock inmemoryIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1,
+ MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) throws DMLRuntimeException {
+ if( isSingleBlockLookup(mcIn, ixrange) ) {
+ return singleBlockIndexing(in1, mcIn, mcOut, ixrange);
+ }
+ else if( isMultiBlockLookup(in1, mcIn, mcOut, ixrange) ) {
+ return multiBlockIndexing(in1, mcIn, mcOut, ixrange);
+ }
+ else
+ throw new DMLRuntimeException("Incorrect usage of inmemoryIndexing");
+ }
+
+ private static MatrixBlock multiBlockIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1,
+ MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) throws DMLRuntimeException {
+ //create list of all required matrix indexes
+ List<MatrixIndexes> filter = new ArrayList<MatrixIndexes>();
+ long rlix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock());
+ long ruix = UtilFunctions.computeBlockIndex(ixrange.rowEnd, mcIn.getRowsPerBlock());
+ long clix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock());
+ long cuix = UtilFunctions.computeBlockIndex(ixrange.colEnd, mcIn.getColsPerBlock());
+ for( long r=rlix; r<=ruix; r++ )
+ for( long c=clix; c<=cuix; c++ )
+ filter.add( new MatrixIndexes(r,c) );
+
+ //wrap PartitionPruningRDD around input to exploit pruning for out-of-core datasets
+ JavaPairRDD<MatrixIndexes,MatrixBlock> out = createPartitionPruningRDD(in1, filter);
+ out = out.filter(new IsBlockInRange(ixrange.rowStart, ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut)) //filter unnecessary blocks
+ .mapToPair(new SliceBlock2(ixrange, mcOut)); //slice relevant blocks
+
+ //collect output without shuffle to avoid side-effects with custom PartitionPruningRDD
+ MatrixBlock mbout = SparkExecutionContext.toMatrixBlock(out, (int)mcOut.getRows(),
+ (int)mcOut.getCols(), mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1);
+ return mbout;
+ }
+
+ private static MatrixBlock singleBlockIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1,
+ MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) throws DMLRuntimeException {
+ //single block output via lookup (on partitioned inputs, this allows for single partition
+ //access to avoid a full scan of the input; note that this is especially important for
+ //out-of-core datasets as entire partitions are read, not just keys as in the in-memory setting.
+ long rix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock());
+ long cix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock());
+ List<MatrixBlock> list = in1.lookup(new MatrixIndexes(rix, cix));
+ if( list.size() != 1 )
+ throw new DMLRuntimeException("Block lookup returned "+list.size()+" blocks (expected 1).");
+
+ MatrixBlock tmp = list.get(0);
+ MatrixBlock mbout = (tmp.getNumRows()==mcOut.getRows() && tmp.getNumColumns()==mcOut.getCols()) ?
+ tmp : tmp.sliceOperations( //reference full block or slice out sub-block
+ UtilFunctions.computeCellInBlock(ixrange.rowStart, mcIn.getRowsPerBlock()),
+ UtilFunctions.computeCellInBlock(ixrange.rowEnd, mcIn.getRowsPerBlock()),
+ UtilFunctions.computeCellInBlock(ixrange.colStart, mcIn.getColsPerBlock()),
+ UtilFunctions.computeCellInBlock(ixrange.colEnd, mcIn.getColsPerBlock()), new MatrixBlock());
+ return mbout;
+ }
+
+ public static JavaPairRDD<MatrixIndexes,MatrixBlock> generalCaseRightIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1,
+ MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange, SparkAggType aggType) {
+ JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
+
+ if( isPartitioningPreservingRightIndexing(mcIn, ixrange) ) {
+ out = in1.mapPartitionsToPair(
+ new SliceBlockPartitionFunction(ixrange, mcOut), true);
+ }
+ else if( aggType == SparkAggType.NONE
+ || OptimizerUtils.isIndexingRangeBlockAligned(ixrange, mcIn) ) {
+ out = in1.filter(new IsBlockInRange(ixrange.rowStart, ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut))
+ .mapToPair(new SliceSingleBlock(ixrange, mcOut));
+ }
+ else {
+ out = in1.filter(new IsBlockInRange(ixrange.rowStart, ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut))
+ .flatMapToPair(new SliceMultipleBlocks(ixrange, mcOut));
+ out = RDDAggregateUtils.mergeByKey(out);
+ }
+ return out;
+ }
+
@Override
public void processInstruction(ExecutionContext ec)
throws DMLRuntimeException
@@ -112,63 +189,13 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction
JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
if( isSingleBlockLookup(mcIn, ixrange) ) {
- //single block output via lookup (on partitioned inputs, this allows for single partition
- //access to avoid a full scan of the input; note that this is especially important for
- //out-of-core datasets as entire partitions are read, not just keys as in the in-memory setting.
- long rix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock());
- long cix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock());
- List<MatrixBlock> list = in1.lookup(new MatrixIndexes(rix, cix));
- if( list.size() != 1 )
- throw new DMLRuntimeException("Block lookup returned "+list.size()+" blocks (expected 1).");
-
- MatrixBlock tmp = list.get(0);
- MatrixBlock mbout = (tmp.getNumRows()==mcOut.getRows() && tmp.getNumColumns()==mcOut.getCols()) ?
- tmp : tmp.sliceOperations( //reference full block or slice out sub-block
- UtilFunctions.computeCellInBlock(ixrange.rowStart, mcIn.getRowsPerBlock()),
- UtilFunctions.computeCellInBlock(ixrange.rowEnd, mcIn.getRowsPerBlock()),
- UtilFunctions.computeCellInBlock(ixrange.colStart, mcIn.getColsPerBlock()),
- UtilFunctions.computeCellInBlock(ixrange.colEnd, mcIn.getColsPerBlock()), new MatrixBlock());
-
- sec.setMatrixOutput(output.getName(), mbout);
+ sec.setMatrixOutput(output.getName(), singleBlockIndexing(in1, mcIn, mcOut, ixrange));
}
else if( isMultiBlockLookup(in1, mcIn, mcOut, ixrange) ) {
- //create list of all required matrix indexes
- List<MatrixIndexes> filter = new ArrayList<MatrixIndexes>();
- long rlix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock());
- long ruix = UtilFunctions.computeBlockIndex(ixrange.rowEnd, mcIn.getRowsPerBlock());
- long clix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock());
- long cuix = UtilFunctions.computeBlockIndex(ixrange.colEnd, mcIn.getColsPerBlock());
- for( long r=rlix; r<=ruix; r++ )
- for( long c=clix; c<=cuix; c++ )
- filter.add( new MatrixIndexes(r,c) );
-
- //wrap PartitionPruningRDD around input to exploit pruning for out-of-core datasets
- JavaPairRDD<MatrixIndexes,MatrixBlock> out = createPartitionPruningRDD(in1, filter);
- out = out.filter(new IsBlockInRange(rl, ru, cl, cu, mcOut)) //filter unnecessary blocks
- .mapToPair(new SliceBlock2(ixrange, mcOut)); //slice relevant blocks
-
- //collect output without shuffle to avoid side-effects with custom PartitionPruningRDD
- MatrixBlock mbout = SparkExecutionContext.toMatrixBlock(out, (int)mcOut.getRows(),
- (int)mcOut.getCols(), mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1);
- sec.setMatrixOutput(output.getName(), mbout);
+ sec.setMatrixOutput(output.getName(), multiBlockIndexing(in1, mcIn, mcOut, ixrange));
}
else { //rdd output for general case
- JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
-
- if( isPartitioningPreservingRightIndexing(mcIn, ixrange) ) {
- out = in1.mapPartitionsToPair(
- new SliceBlockPartitionFunction(ixrange, mcOut), true);
- }
- else if( _aggType == SparkAggType.NONE
- || OptimizerUtils.isIndexingRangeBlockAligned(ixrange, mcIn) ) {
- out = in1.filter(new IsBlockInRange(rl, ru, cl, cu, mcOut))
- .mapToPair(new SliceSingleBlock(ixrange, mcOut));
- }
- else {
- out = in1.filter(new IsBlockInRange(rl, ru, cl, cu, mcOut))
- .flatMapToPair(new SliceMultipleBlocks(ixrange, mcOut));
- out = RDDAggregateUtils.mergeByKey(out);
- }
+ JavaPairRDD<MatrixIndexes,MatrixBlock> out = generalCaseRightIndexing(in1, mcIn, mcOut, ixrange, _aggType);
//put output RDD handle into symbol table
sec.setRDDHandleForVariable(output.getName(), out);
@@ -252,7 +279,7 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction
* @param ixrange index range
* @return true if index range covers a single block of the input matrix
*/
- private static boolean isSingleBlockLookup(MatrixCharacteristics mcIn, IndexRange ixrange) {
+ public static boolean isSingleBlockLookup(MatrixCharacteristics mcIn, IndexRange ixrange) {
return UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock())
== UtilFunctions.computeBlockIndex(ixrange.rowEnd, mcIn.getRowsPerBlock())
&& UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock())
@@ -271,7 +298,7 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction
* @param ixrange index range
* @return true if index range requires a multi-block lookup
*/
- private static boolean isMultiBlockLookup(JavaPairRDD<?,?> in, MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) {
+ public static boolean isMultiBlockLookup(JavaPairRDD<?,?> in, MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) {
return SparkUtils.isHashPartitioned(in) //existing partitioner
&& OptimizerUtils.estimatePartitionedSizeExactSparsity(mcIn) //out-of-core dataset
> SparkExecutionContext.getDataMemoryBudget(true, true)
@@ -557,7 +584,7 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction
* @param filter partition filter
* @return matrix as {@code JavaPairRDD<MatrixIndexes,MatrixBlock>}
*/
- private JavaPairRDD<MatrixIndexes,MatrixBlock> createPartitionPruningRDD(
+ private static JavaPairRDD<MatrixIndexes,MatrixBlock> createPartitionPruningRDD(
JavaPairRDD<MatrixIndexes,MatrixBlock> in, List<MatrixIndexes> filter )
{
//build hashset of required partition ids