You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/30 08:25:00 UTC

[2/2] incubator-systemml git commit: [SYSTEMML-996] Fix spark frame right indexing shuffle/blocksize mismatch

[SYSTEMML-996] Fix spark frame right indexing shuffle/blocksize mismatch

This fixes implementation issues of our spark frame right indexing
instruction. First, right indexing on frame should never cause shuffle.
Second, creating partial blocks and merging these blocks led to
blocksize mismatches as frame can have variable sizes blocks.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/53ba37ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/53ba37ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/53ba37ec

Branch: refs/heads/master
Commit: 53ba37ecc874c59206902e3f9ddf5d881c149c34
Parents: 11fde8e
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Sep 30 01:07:36 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Sep 30 01:07:36 2016 -0700

----------------------------------------------------------------------
 .../spark/FrameIndexingSPInstruction.java       | 52 +++++++++++---------
 1 file changed, 28 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/53ba37ec/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
index 98b8f70..0fdc1af 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
 
 import scala.Tuple2;
 
@@ -107,11 +108,7 @@ public class FrameIndexingSPInstruction  extends IndexingSPInstruction
 			}
 			else{
 				out = in1.filter(new IsFrameBlockInRange(rl, ru, mcOut))
-			             .flatMapToPair(new SliceBlock(ixrange, mcOut));
-				
-				//aggregation if required 
-				if( _aggType != SparkAggType.NONE )
-					out = FrameRDDAggregateUtils.mergeByKey(out);
+			             .mapToPair(new SliceBlock(ixrange, mcOut));
 			}
 			
 			//put output RDD handle into symbol table
@@ -378,30 +375,35 @@ public class FrameIndexingSPInstruction  extends IndexingSPInstruction
 	/**
 	 * 
 	 */
-	private static class SliceBlock implements PairFlatMapFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock> 
+	private static class SliceBlock implements PairFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock> 
 	{
 		private static final long serialVersionUID = -5270171193018691692L;
 		
 		private IndexRange _ixrange;
-		private int _brlen; 
-		private int _bclen;
 		
 		public SliceBlock(IndexRange ixrange, MatrixCharacteristics mcOut) {
 			_ixrange = ixrange;
-			_brlen = OptimizerUtils.getDefaultFrameSize();
-			_bclen = (int) mcOut.getCols();
 		}
 
 		@Override
-		public Iterable<Tuple2<Long, FrameBlock>> call(Tuple2<Long, FrameBlock> kv) 
+		public Tuple2<Long, FrameBlock> call(Tuple2<Long, FrameBlock> kv) 
 			throws Exception 
 		{	
-			Pair<Long, FrameBlock> in = SparkUtils.toIndexedFrameBlock(kv);
+			long rowindex = kv._1();
+			FrameBlock in = kv._2();
+			
+			//prepare local index range (block guaranteed to be in range)
+			int rl = (int) ((rowindex > _ixrange.rowStart) ? 0 : _ixrange.rowStart-rowindex);
+			int ru = (int) ((_ixrange.rowEnd-rowindex >= in.getNumRows()) ? 
+					in.getNumRows()-1 : _ixrange.rowEnd-rowindex);
 			
-			ArrayList<Pair<Long, FrameBlock>> outlist = new ArrayList<Pair<Long, FrameBlock>>();
-			OperationsOnMatrixValues.performSlice(in, _ixrange, _brlen, _bclen, outlist);
+			//slice out the block
+			FrameBlock out = in.sliceOperations(rl, ru, (int)(_ixrange.colStart-1), 
+					(int)(_ixrange.colEnd-1), new FrameBlock());
 			
-			return SparkUtils.fromIndexedFrameBlock(outlist);
+			//return block with shifted row index
+			long rowindex2 = (rowindex > _ixrange.rowStart) ? rowindex-_ixrange.rowStart+1 : 1; 
+			return new Tuple2<Long,FrameBlock>(rowindex2, out);
 		}		
 	}
 
@@ -413,13 +415,9 @@ public class FrameIndexingSPInstruction  extends IndexingSPInstruction
 		private static final long serialVersionUID = -1655390518299307588L;
 		
 		private IndexRange _ixrange;
-		private int _brlen; 
-		private int _bclen;
 		
 		public SliceBlockPartitionFunction(IndexRange ixrange, MatrixCharacteristics mcOut) {
 			_ixrange = ixrange;
-			_brlen = (int) Math.min(OptimizerUtils.getDefaultFrameSize(), mcOut.getRows());
-			_bclen = (int) mcOut.getCols();
 		}
 
 		@Override
@@ -429,6 +427,10 @@ public class FrameIndexingSPInstruction  extends IndexingSPInstruction
 			return new SliceBlockPartitionIterator(arg0);
 		}	
 		
+		/**
+		 * NOTE: this function is only applied for slicing columns (which preserved all rows
+		 * and hence the existing partitioning). 
+		 */
 		private class SliceBlockPartitionIterator extends LazyIterableIterator<Tuple2<Long, FrameBlock>>
 		{
 			public SliceBlockPartitionIterator(Iterator<Tuple2<Long, FrameBlock>> in) {
@@ -439,13 +441,15 @@ public class FrameIndexingSPInstruction  extends IndexingSPInstruction
 			protected Tuple2<Long, FrameBlock> computeNext(Tuple2<Long, FrameBlock> arg)
 				throws Exception
 			{
-				Pair<Long, FrameBlock> in = SparkUtils.toIndexedFrameBlock(arg);
+				long rowindex = arg._1();
+				FrameBlock in = arg._2();
 				
-				ArrayList<Pair<Long, FrameBlock>> outlist = new ArrayList<Pair<Long, FrameBlock>>();
-				OperationsOnMatrixValues.performSlice(in, _ixrange, _brlen, _bclen, outlist);
+				//slice out the block
+				FrameBlock out = in.sliceOperations(0, in.getNumRows()-1, 
+						(int)_ixrange.colStart-1, (int)_ixrange.colEnd-1, new FrameBlock());
 				
-				assert(outlist.size() == 1); //1-1 row/column block indexing
-				return SparkUtils.fromIndexedFrameBlock(outlist.get(0));
+				//return block with shifted row index
+				return new Tuple2<Long,FrameBlock>(rowindex, out);		
 			}			
 		}
 	}