You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ni...@apache.org on 2017/02/16 22:29:29 UTC

incubator-systemml git commit: [MINOR] Code refactoring MatrixIndexingSPInstruction to enable parallel improvements in both indexing as well as prefetching

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 066a8213e -> bbc77e71e


[MINOR] Code refactoring MatrixIndexingSPInstruction to enable parallel
improvements in both indexing as well as prefetching

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/bbc77e71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/bbc77e71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/bbc77e71

Branch: refs/heads/master
Commit: bbc77e71eb9b5aa464f0130380bc30d3f42107b6
Parents: 066a821
Author: Niketan Pansare <np...@us.ibm.com>
Authored: Thu Feb 16 14:26:42 2017 -0800
Committer: Niketan Pansare <np...@us.ibm.com>
Committed: Thu Feb 16 14:26:42 2017 -0800

----------------------------------------------------------------------
 .../spark/MatrixIndexingSPInstruction.java      | 139 +++++++++++--------
 1 file changed, 83 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bbc77e71/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
index 9d58718..71bb5ee 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
@@ -85,6 +85,83 @@ public class MatrixIndexingSPInstruction  extends IndexingSPInstruction
 		super(op, lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, istr);
 	}
 	
+	public static MatrixBlock inmemoryIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1, 
+			 MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) throws DMLRuntimeException {
+		if( isSingleBlockLookup(mcIn, ixrange) ) {
+			return singleBlockIndexing(in1, mcIn, mcOut, ixrange);
+		}
+		else if( isMultiBlockLookup(in1, mcIn, mcOut, ixrange) ) {
+			return multiBlockIndexing(in1, mcIn, mcOut, ixrange);
+		}
+		else
+			throw new DMLRuntimeException("Incorrect usage of inmemoryIndexing");
+	}
+	
+	private static MatrixBlock multiBlockIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1, 
+			 MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) throws DMLRuntimeException {
+		//create list of all required matrix indexes
+		List<MatrixIndexes> filter = new ArrayList<MatrixIndexes>();
+		long rlix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock());
+		long ruix = UtilFunctions.computeBlockIndex(ixrange.rowEnd, mcIn.getRowsPerBlock());
+		long clix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock());
+		long cuix = UtilFunctions.computeBlockIndex(ixrange.colEnd, mcIn.getColsPerBlock());
+		for( long r=rlix; r<=ruix; r++ )
+			for( long c=clix; c<=cuix; c++ )
+				filter.add( new MatrixIndexes(r,c) );
+		
+		//wrap PartitionPruningRDD around input to exploit pruning for out-of-core datasets
+		JavaPairRDD<MatrixIndexes,MatrixBlock> out = createPartitionPruningRDD(in1, filter);
+		out = out.filter(new IsBlockInRange(ixrange.rowStart, ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut)) //filter unnecessary blocks 
+				 .mapToPair(new SliceBlock2(ixrange, mcOut));       //slice relevant blocks
+		
+		//collect output without shuffle to avoid side-effects with custom PartitionPruningRDD
+		MatrixBlock mbout = SparkExecutionContext.toMatrixBlock(out, (int)mcOut.getRows(), 
+				(int)mcOut.getCols(), mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1);
+		return mbout;
+	}
+	
+	private static MatrixBlock singleBlockIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1, 
+			 MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) throws DMLRuntimeException {
+		//single block output via lookup (on partitioned inputs, this allows for single partition
+		//access to avoid a full scan of the input; note that this is especially important for 
+		//out-of-core datasets as entire partitions are read, not just keys as in the in-memory setting.
+		long rix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock());
+		long cix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock());
+		List<MatrixBlock> list = in1.lookup(new MatrixIndexes(rix, cix));
+		if( list.size() != 1 )
+			throw new DMLRuntimeException("Block lookup returned "+list.size()+" blocks (expected 1).");
+		
+		MatrixBlock tmp = list.get(0);
+		MatrixBlock mbout = (tmp.getNumRows()==mcOut.getRows() && tmp.getNumColumns()==mcOut.getCols()) ? 
+				tmp : tmp.sliceOperations( //reference full block or slice out sub-block
+				UtilFunctions.computeCellInBlock(ixrange.rowStart, mcIn.getRowsPerBlock()), 
+				UtilFunctions.computeCellInBlock(ixrange.rowEnd, mcIn.getRowsPerBlock()), 
+				UtilFunctions.computeCellInBlock(ixrange.colStart, mcIn.getColsPerBlock()), 
+				UtilFunctions.computeCellInBlock(ixrange.colEnd, mcIn.getColsPerBlock()), new MatrixBlock());
+		return mbout;
+	}
+	
+	public static JavaPairRDD<MatrixIndexes,MatrixBlock> generalCaseRightIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1, 
+			 MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange, SparkAggType aggType) {
+		JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
+		
+		if( isPartitioningPreservingRightIndexing(mcIn, ixrange) ) {
+			out = in1.mapPartitionsToPair(
+					new SliceBlockPartitionFunction(ixrange, mcOut), true);
+		}
+		else if( aggType == SparkAggType.NONE
+			|| OptimizerUtils.isIndexingRangeBlockAligned(ixrange, mcIn) ) {
+			out = in1.filter(new IsBlockInRange(ixrange.rowStart, ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut))
+		             .mapToPair(new SliceSingleBlock(ixrange, mcOut));
+		}
+		else {
+			out = in1.filter(new IsBlockInRange(ixrange.rowStart, ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut))
+		             .flatMapToPair(new SliceMultipleBlocks(ixrange, mcOut));
+			out = RDDAggregateUtils.mergeByKey(out);
+		}
+		return out;
+	}
+	
 	@Override
 	public void processInstruction(ExecutionContext ec)
 			throws DMLRuntimeException 
@@ -112,63 +189,13 @@ public class MatrixIndexingSPInstruction  extends IndexingSPInstruction
 			JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
 			
 			if( isSingleBlockLookup(mcIn, ixrange) ) {
-				//single block output via lookup (on partitioned inputs, this allows for single partition
-				//access to avoid a full scan of the input; note that this is especially important for 
-				//out-of-core datasets as entire partitions are read, not just keys as in the in-memory setting.
-				long rix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock());
-				long cix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock());
-				List<MatrixBlock> list = in1.lookup(new MatrixIndexes(rix, cix));
-				if( list.size() != 1 )
-					throw new DMLRuntimeException("Block lookup returned "+list.size()+" blocks (expected 1).");
-				
-				MatrixBlock tmp = list.get(0);
-				MatrixBlock mbout = (tmp.getNumRows()==mcOut.getRows() && tmp.getNumColumns()==mcOut.getCols()) ? 
-						tmp : tmp.sliceOperations( //reference full block or slice out sub-block
-						UtilFunctions.computeCellInBlock(ixrange.rowStart, mcIn.getRowsPerBlock()), 
-						UtilFunctions.computeCellInBlock(ixrange.rowEnd, mcIn.getRowsPerBlock()), 
-						UtilFunctions.computeCellInBlock(ixrange.colStart, mcIn.getColsPerBlock()), 
-						UtilFunctions.computeCellInBlock(ixrange.colEnd, mcIn.getColsPerBlock()), new MatrixBlock());
-				
-				sec.setMatrixOutput(output.getName(), mbout);
+				sec.setMatrixOutput(output.getName(), singleBlockIndexing(in1, mcIn, mcOut, ixrange));
 			}
 			else if( isMultiBlockLookup(in1, mcIn, mcOut, ixrange) ) {
-				//create list of all required matrix indexes
-				List<MatrixIndexes> filter = new ArrayList<MatrixIndexes>();
-				long rlix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock());
-				long ruix = UtilFunctions.computeBlockIndex(ixrange.rowEnd, mcIn.getRowsPerBlock());
-				long clix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock());
-				long cuix = UtilFunctions.computeBlockIndex(ixrange.colEnd, mcIn.getColsPerBlock());
-				for( long r=rlix; r<=ruix; r++ )
-					for( long c=clix; c<=cuix; c++ )
-						filter.add( new MatrixIndexes(r,c) );
-				
-				//wrap PartitionPruningRDD around input to exploit pruning for out-of-core datasets
-				JavaPairRDD<MatrixIndexes,MatrixBlock> out = createPartitionPruningRDD(in1, filter);
-				out = out.filter(new IsBlockInRange(rl, ru, cl, cu, mcOut)) //filter unnecessary blocks 
-						 .mapToPair(new SliceBlock2(ixrange, mcOut));       //slice relevant blocks
-				
-				//collect output without shuffle to avoid side-effects with custom PartitionPruningRDD
-				MatrixBlock mbout = SparkExecutionContext.toMatrixBlock(out, (int)mcOut.getRows(), 
-						(int)mcOut.getCols(), mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1);
-				sec.setMatrixOutput(output.getName(), mbout);
+				sec.setMatrixOutput(output.getName(), multiBlockIndexing(in1, mcIn, mcOut, ixrange));
 			}
 			else { //rdd output for general case
-				JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
-				
-				if( isPartitioningPreservingRightIndexing(mcIn, ixrange) ) {
-					out = in1.mapPartitionsToPair(
-							new SliceBlockPartitionFunction(ixrange, mcOut), true);
-				}
-				else if( _aggType == SparkAggType.NONE
-					|| OptimizerUtils.isIndexingRangeBlockAligned(ixrange, mcIn) ) {
-					out = in1.filter(new IsBlockInRange(rl, ru, cl, cu, mcOut))
-				             .mapToPair(new SliceSingleBlock(ixrange, mcOut));
-				}
-				else {
-					out = in1.filter(new IsBlockInRange(rl, ru, cl, cu, mcOut))
-				             .flatMapToPair(new SliceMultipleBlocks(ixrange, mcOut));
-					out = RDDAggregateUtils.mergeByKey(out);
-				}
+				JavaPairRDD<MatrixIndexes,MatrixBlock> out = generalCaseRightIndexing(in1, mcIn, mcOut, ixrange, _aggType);
 					
 				//put output RDD handle into symbol table
 				sec.setRDDHandleForVariable(output.getName(), out);
@@ -252,7 +279,7 @@ public class MatrixIndexingSPInstruction  extends IndexingSPInstruction
 	 * @param ixrange index range
 	 * @return true if index range covers a single block of the input matrix
 	 */
-	private static boolean isSingleBlockLookup(MatrixCharacteristics mcIn, IndexRange ixrange) {
+	public static boolean isSingleBlockLookup(MatrixCharacteristics mcIn, IndexRange ixrange) {
 		return UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock())
 			== UtilFunctions.computeBlockIndex(ixrange.rowEnd, mcIn.getRowsPerBlock())
 			&& UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock())
@@ -271,7 +298,7 @@ public class MatrixIndexingSPInstruction  extends IndexingSPInstruction
 	 * @param ixrange index range
 	 * @return true if index range requires a multi-block lookup
 	 */
-	private static boolean isMultiBlockLookup(JavaPairRDD<?,?> in, MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) {
+	public static boolean isMultiBlockLookup(JavaPairRDD<?,?> in, MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) {
 		return SparkUtils.isHashPartitioned(in)                          //existing partitioner
 			&& OptimizerUtils.estimatePartitionedSizeExactSparsity(mcIn) //out-of-core dataset
 			   > SparkExecutionContext.getDataMemoryBudget(true, true)
@@ -557,7 +584,7 @@ public class MatrixIndexingSPInstruction  extends IndexingSPInstruction
 	 * @param filter partition filter
 	 * @return matrix as {@code JavaPairRDD<MatrixIndexes,MatrixBlock>}
 	 */
-	private JavaPairRDD<MatrixIndexes,MatrixBlock> createPartitionPruningRDD( 
+	private static JavaPairRDD<MatrixIndexes,MatrixBlock> createPartitionPruningRDD( 
 			JavaPairRDD<MatrixIndexes,MatrixBlock> in, List<MatrixIndexes> filter )
 	{
 		//build hashset of required partition ids