You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/30 08:24:59 UTC

[1/2] incubator-systemml git commit: [SYSTEMML-993] Improved converters dataframe-matrix and csv-matrix

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 0cc4d23e5 -> 53ba37ecc


[SYSTEMML-993] Improved converters dataframe-matrix and csv-matrix

This patch avoids unnecessary re-allocations of sparse rows during
append. Even with estimated nnz, we still required log N re-allocations
per row. Now, we pre-compute the nnz per and allocate the sparse row
once, which reduces GC pressure. On a scenario of 100K x 64K csv to
matrix, this improved performance from 330s to 306s (despite
string-double parsing and shuffle). 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/11fde8e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/11fde8e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/11fde8e9

Branch: refs/heads/master
Commit: 11fde8e9c07efefb6f9e440bd5defb85578696a8
Parents: 0cc4d23
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Sep 30 00:19:27 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Sep 30 00:19:27 2016 -0700

----------------------------------------------------------------------
 .../spark/utils/RDDConverterUtils.java          | 93 ++++++++++++++------
 .../sysml/runtime/io/IOUtilFunctions.java       | 32 +++++++
 2 files changed, 96 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/11fde8e9/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 38ebd7e..29c7a44 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -383,6 +383,42 @@ public class RDDConverterUtils
 		return partsize/rowsize/blksz < MatrixBlock.SPARSITY_TURN_POINT;
 	}
 	
+	/**
+	 * 
+	 * @param vect
+	 * @param isVector
+	 * @return
+	 */
+	private static int countNnz(Object vect, boolean isVector, int off) {
+		if( isVector ) //note: numNonzeros scans entries but handles sparse/dense
+			return ((Vector) vect).numNonzeros();
+		else 
+			return countNnz(vect, isVector, off, ((Row)vect).length()-off);
+	}
+	
+	/**
+	 * 
+	 * @param vect
+	 * @param isVector
+	 * @param pos
+	 * @param len
+	 * @return
+	 */
+	private static int countNnz(Object vect, boolean isVector, int pos, int len ) {
+		int lnnz = 0;
+		if( isVector ) {
+			Vector vec = (Vector) vect;
+			for( int i=pos; i<pos+len; i++ )
+				lnnz += (vec.apply(i) != 0) ? 1 : 0;
+		}
+		else { //row
+			Row row = (Row) vect;
+			for( int i=pos; i<pos+len; i++ )
+				lnnz += UtilFunctions.isNonZero(row.get(i)) ? 1 : 0;
+		}
+		return lnnz;
+	}
+	
 	/////////////////////////////////
 	// BINARYBLOCK-SPECIFIC FUNCTIONS
 
@@ -621,11 +657,7 @@ public class RDDConverterUtils
 			String[] cols = IOUtilFunctions.split(line, _delim);
 			
 			//determine number of non-zeros of row (w/o string parsing)
-			long lnnz = 0;
-			for( String col : cols ) {
-				lnnz += (!col.isEmpty() && !col.equals("0") 
-						&& !col.equals("0.0")) ? 1 : 0;
-			}
+			int lnnz = IOUtilFunctions.countNnz(cols);
 			
 			//update counters
 			_aNnz.add( (double)lnnz );
@@ -708,7 +740,12 @@ public class RDDConverterUtils
 				boolean emptyFound = false;
 				for( int cix=1, pix=0; cix<=ncblks; cix++ ) 
 				{
-					int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);				
+					int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);
+					if( mb[cix-1].isInSparseFormat() ) {
+						//allocate row once (avoid re-allocations)
+						int lnnz = IOUtilFunctions.countNnz(parts, pix, lclen);
+						mb[cix-1].getSparseBlock().allocate(pos, lnnz);
+					}
 					for( int j=0; j<lclen; j++ ) {
 						String part = parts[pix++];
 						emptyFound |= part.isEmpty() && !_fill;
@@ -739,7 +776,8 @@ public class RDDConverterUtils
 			for( int cix=1; cix<=ncblks; cix++ ) {
 				int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);				
 				ix[cix-1] = new MatrixIndexes(rix, cix);
-				mb[cix-1] = new MatrixBlock(lrlen, lclen, _sparse, (int)(lrlen*lclen*_sparsity));		
+				mb[cix-1] = new MatrixBlock(lrlen, lclen, _sparse, (int)(lrlen*lclen*_sparsity));
+				mb[cix-1].allocateDenseOrSparseBlock();
 			}
 		}
 		
@@ -943,18 +981,22 @@ public class RDDConverterUtils
 				
 				//process row data
 				int off = _containsID ? 1: 0;
-				if( _isVector ) {
-					Vector vect = (Vector) tmp._1().get(off);
-					for( int cix=1, pix=0; cix<=ncblks; cix++ ) {
-						int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);				
+				Object obj = _isVector ? tmp._1().get(off) : tmp._1();
+				for( int cix=1, pix=_isVector?0:off; cix<=ncblks; cix++ ) {
+					int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);				
+					//allocate sparse row once (avoid re-allocations)
+					if( mb[cix-1].isInSparseFormat() ) {
+						int lnnz = countNnz(obj, _isVector, pix, lclen);
+						mb[cix-1].getSparseBlock().allocate(pos, lnnz);
+					}
+					//append data to matrix blocks
+					if( _isVector ) {
+						Vector vect = (Vector) obj;
 						for( int j=0; j<lclen; j++ )
-							mb[cix-1].appendValue(pos, j, vect.apply(pix++));
-					}	
-				}
-				else { //row
-					Row row = tmp._1();
-					for( int cix=1, pix=off; cix<=ncblks; cix++ ) {
-						int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);				
+							mb[cix-1].appendValue(pos, j, vect.apply(pix++));	
+					}
+					else { //row
+						Row row = (Row) obj;
 						for( int j=0; j<lclen; j++ )
 							mb[cix-1].appendValue(pos, j, UtilFunctions.getDouble(row.get(pix++)));
 					}
@@ -979,6 +1021,7 @@ public class RDDConverterUtils
 				int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);				
 				ix[cix-1] = new MatrixIndexes(rix, cix);
 				mb[cix-1] = new MatrixBlock(lrlen, lclen, _sparse,(int)(lrlen*lclen*_sparsity));
+				mb[cix-1].allocateDenseOrSparseBlock();
 			}
 		}
 		
@@ -1016,17 +1059,9 @@ public class RDDConverterUtils
 		public Row call(Row arg0) throws Exception {
 			//determine number of non-zeros of row
 			int off = _containsID ? 1 : 0;
-			long lnnz = 0;
-			if( _isVector ) {
-				//note: numNonzeros scans entries but handles sparse/dense
-				Vector vec = (Vector) arg0.get(off);
-				lnnz += vec.numNonzeros();
-			}
-			else { //row
-				for(int i=off; i<arg0.length(); i++)
-					lnnz += UtilFunctions.isNonZero(arg0.get(i)) ? 1 : 0;
-			}
-		
+			Object vect = _isVector ? arg0.get(off) : arg0;
+			int lnnz = countNnz(vect, _isVector, off);
+			
 			//update counters
 			_aNnz.add( (double)lnnz );
 			return arg0;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/11fde8e9/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
index 7327796..b2d25ec 100644
--- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
@@ -285,6 +285,38 @@ public class IOUtilFunctions
 	}
 	
 	/**
+	 * Returns the number of non-zero entries but avoids the expensive 
+	 * string to double parsing. This function is guaranteed to never
+	 * underestimate.
+	 * 
+	 * @param cols
+	 * @return
+	 */
+	public static int countNnz(String[] cols) {
+		return countNnz(cols, 0, cols.length);
+	}
+	
+	/**
+	 * Returns the number of non-zero entries but avoids the expensive 
+	 * string to double parsing. This function is guaranteed to never
+	 * underestimate.
+	 * 
+	 * @param cols
+	 * @param pos
+	 * @param len
+	 * @return
+	 */
+	public static int countNnz(String[] cols, int pos, int len) {
+		int lnnz = 0;
+		for( int i=pos; i<pos+len; i++ ) {
+			String col = cols[i];
+			lnnz += (!col.isEmpty() && !col.equals("0") 
+					&& !col.equals("0.0")) ? 1 : 0;
+		}
+		return lnnz;
+	}
+	
+	/**
 	 * Returns the serialized size in bytes of the given string value,
 	 * following the modified UTF-8 specification as used by Java's
 	 * DataInput/DataOutput.


[2/2] incubator-systemml git commit: [SYSTEMML-996] Fix spark frame right indexing shuffle/blocksize mismatch

Posted by mb...@apache.org.
[SYSTEMML-996] Fix spark frame right indexing shuffle/blocksize mismatch

This fixes implementation issues of our spark frame right indexing
instruction. First, right indexing on frame should never cause shuffle.
Second, creating partial blocks and merging these blocks led to
blocksize mismatches as frame can have variable sizes blocks.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/53ba37ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/53ba37ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/53ba37ec

Branch: refs/heads/master
Commit: 53ba37ecc874c59206902e3f9ddf5d881c149c34
Parents: 11fde8e
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Sep 30 01:07:36 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Sep 30 01:07:36 2016 -0700

----------------------------------------------------------------------
 .../spark/FrameIndexingSPInstruction.java       | 52 +++++++++++---------
 1 file changed, 28 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/53ba37ec/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
index 98b8f70..0fdc1af 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
 
 import scala.Tuple2;
 
@@ -107,11 +108,7 @@ public class FrameIndexingSPInstruction  extends IndexingSPInstruction
 			}
 			else{
 				out = in1.filter(new IsFrameBlockInRange(rl, ru, mcOut))
-			             .flatMapToPair(new SliceBlock(ixrange, mcOut));
-				
-				//aggregation if required 
-				if( _aggType != SparkAggType.NONE )
-					out = FrameRDDAggregateUtils.mergeByKey(out);
+			             .mapToPair(new SliceBlock(ixrange, mcOut));
 			}
 			
 			//put output RDD handle into symbol table
@@ -378,30 +375,35 @@ public class FrameIndexingSPInstruction  extends IndexingSPInstruction
 	/**
 	 * 
 	 */
-	private static class SliceBlock implements PairFlatMapFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock> 
+	private static class SliceBlock implements PairFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock> 
 	{
 		private static final long serialVersionUID = -5270171193018691692L;
 		
 		private IndexRange _ixrange;
-		private int _brlen; 
-		private int _bclen;
 		
 		public SliceBlock(IndexRange ixrange, MatrixCharacteristics mcOut) {
 			_ixrange = ixrange;
-			_brlen = OptimizerUtils.getDefaultFrameSize();
-			_bclen = (int) mcOut.getCols();
 		}
 
 		@Override
-		public Iterable<Tuple2<Long, FrameBlock>> call(Tuple2<Long, FrameBlock> kv) 
+		public Tuple2<Long, FrameBlock> call(Tuple2<Long, FrameBlock> kv) 
 			throws Exception 
 		{	
-			Pair<Long, FrameBlock> in = SparkUtils.toIndexedFrameBlock(kv);
+			long rowindex = kv._1();
+			FrameBlock in = kv._2();
+			
+			//prepare local index range (block guaranteed to be in range)
+			int rl = (int) ((rowindex > _ixrange.rowStart) ? 0 : _ixrange.rowStart-rowindex);
+			int ru = (int) ((_ixrange.rowEnd-rowindex >= in.getNumRows()) ? 
+					in.getNumRows()-1 : _ixrange.rowEnd-rowindex);
 			
-			ArrayList<Pair<Long, FrameBlock>> outlist = new ArrayList<Pair<Long, FrameBlock>>();
-			OperationsOnMatrixValues.performSlice(in, _ixrange, _brlen, _bclen, outlist);
+			//slice out the block
+			FrameBlock out = in.sliceOperations(rl, ru, (int)(_ixrange.colStart-1), 
+					(int)(_ixrange.colEnd-1), new FrameBlock());
 			
-			return SparkUtils.fromIndexedFrameBlock(outlist);
+			//return block with shifted row index
+			long rowindex2 = (rowindex > _ixrange.rowStart) ? rowindex-_ixrange.rowStart+1 : 1; 
+			return new Tuple2<Long,FrameBlock>(rowindex2, out);
 		}		
 	}
 
@@ -413,13 +415,9 @@ public class FrameIndexingSPInstruction  extends IndexingSPInstruction
 		private static final long serialVersionUID = -1655390518299307588L;
 		
 		private IndexRange _ixrange;
-		private int _brlen; 
-		private int _bclen;
 		
 		public SliceBlockPartitionFunction(IndexRange ixrange, MatrixCharacteristics mcOut) {
 			_ixrange = ixrange;
-			_brlen = (int) Math.min(OptimizerUtils.getDefaultFrameSize(), mcOut.getRows());
-			_bclen = (int) mcOut.getCols();
 		}
 
 		@Override
@@ -429,6 +427,10 @@ public class FrameIndexingSPInstruction  extends IndexingSPInstruction
 			return new SliceBlockPartitionIterator(arg0);
 		}	
 		
+		/**
+		 * NOTE: this function is only applied for slicing columns (which preserved all rows
+		 * and hence the existing partitioning). 
+		 */
 		private class SliceBlockPartitionIterator extends LazyIterableIterator<Tuple2<Long, FrameBlock>>
 		{
 			public SliceBlockPartitionIterator(Iterator<Tuple2<Long, FrameBlock>> in) {
@@ -439,13 +441,15 @@ public class FrameIndexingSPInstruction  extends IndexingSPInstruction
 			protected Tuple2<Long, FrameBlock> computeNext(Tuple2<Long, FrameBlock> arg)
 				throws Exception
 			{
-				Pair<Long, FrameBlock> in = SparkUtils.toIndexedFrameBlock(arg);
+				long rowindex = arg._1();
+				FrameBlock in = arg._2();
 				
-				ArrayList<Pair<Long, FrameBlock>> outlist = new ArrayList<Pair<Long, FrameBlock>>();
-				OperationsOnMatrixValues.performSlice(in, _ixrange, _brlen, _bclen, outlist);
+				//slice out the block
+				FrameBlock out = in.sliceOperations(0, in.getNumRows()-1, 
+						(int)_ixrange.colStart-1, (int)_ixrange.colEnd-1, new FrameBlock());
 				
-				assert(outlist.size() == 1); //1-1 row/column block indexing
-				return SparkUtils.fromIndexedFrameBlock(outlist.get(0));
+				//return block with shifted row index
+				return new Tuple2<Long,FrameBlock>(rowindex, out);		
 			}			
 		}
 	}