You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/14 02:53:42 UTC

incubator-systemml git commit: [SYSTEMML-400] Improved multi-threaded sparse transpose (par count nnz)

Repository: incubator-systemml
Updated Branches:
  refs/heads/master ca95d1363 -> f4b50cdb1


[SYSTEMML-400] Improved multi-threaded sparse transpose (par count nnz)

So far, we redundantly computed the column nnzs per thread in order to
enable sparse row pre-allocation in case of sparse-sparse transpose.
This patch, removes this redundancy by computing the column nnzs over
disjoint row partitions, aggregating them, and subsequently submitting
the actual transpose tasks. On the imagenet dataset, this change
improved performance from 7.7s to 3.9s.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/f4b50cdb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/f4b50cdb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/f4b50cdb

Branch: refs/heads/master
Commit: f4b50cdb1b961c2072332f8b01fd58943053f020
Parents: ca95d13
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Sep 14 04:51:02 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Wed Sep 14 04:51:02 2016 +0200

----------------------------------------------------------------------
 .../runtime/matrix/data/LibMatrixReorg.java     | 98 +++++++++++++++-----
 1 file changed, 77 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f4b50cdb/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
index a2e1252..179f835 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
@@ -162,7 +162,8 @@ public class LibMatrixReorg
 		if( !in.sparse && !out.sparse )
 			transposeDenseToDense( in, out, 0, in.rlen, 0, in.clen );
 		else if( in.sparse && out.sparse )
-			transposeSparseToSparse( in, out, 0, in.rlen, 0, in.clen );
+			transposeSparseToSparse( in, out, 0, in.rlen, 0, in.clen, 
+				countNnzPerColumn(in, 0, in.rlen));
 		else if( in.sparse )
 			transposeSparseToDense( in, out, 0, in.rlen, 0, in.clen );
 		else
@@ -205,14 +206,25 @@ public class LibMatrixReorg
 		//core multi-threaded transpose
 		try {
 			ExecutorService pool = Executors.newFixedThreadPool( k );
+			//pre-processing (compute nnz per column once for sparse)
+			int[] cnt = null;
+			if( in.sparse && out.sparse ) {
+				ArrayList<CountNnzTask> tasks = new ArrayList<CountNnzTask>();
+				int blklen = (int)(Math.ceil((double)in.rlen/k));
+				for( int i=0; i<k & i*blklen<in.rlen; i++ )
+					tasks.add(new CountNnzTask(in, i*blklen, Math.min((i+1)*blklen, in.rlen)));
+				List<Future<int[]>> rtasks = pool.invokeAll(tasks);
+				for( Future<int[]> rtask : rtasks )
+					cnt = mergeNnzCounts(cnt, rtask.get());
+			} 
+			//compute actual transpose and check for errors
 			ArrayList<TransposeTask> tasks = new ArrayList<TransposeTask>();
 			boolean row = (in.sparse || in.rlen >= in.clen) && !out.sparse;
 			int len = row ? in.rlen : in.clen;
 			int blklen = (int)(Math.ceil((double)len/k));
 			blklen += (blklen%8 != 0)?8-blklen%8:0;
 			for( int i=0; i<k & i*blklen<len; i++ )
-				tasks.add(new TransposeTask(in, out, row, i*blklen, Math.min((i+1)*blklen, len)));
-			//execute tasks and check for errors
+				tasks.add(new TransposeTask(in, out, row, i*blklen, Math.min((i+1)*blklen, len), cnt));
 			List<Future<Object>> taskret = pool.invokeAll(tasks);	
 			pool.shutdown();
 			for( Future<Object> task : taskret )
@@ -893,14 +905,12 @@ public class LibMatrixReorg
 	 * @param in
 	 * @param out
 	 */
-	private static void transposeSparseToSparse(MatrixBlock in, MatrixBlock out, int rl, int ru, int cl, int cu)
+	private static void transposeSparseToSparse(MatrixBlock in, MatrixBlock out, int rl, int ru, int cl, int cu, int[] cnt)
 	{
 		//NOTE: called only in sequential or column-wise parallel execution
 		if( rl > 0 || ru < in.rlen )
 			throw new RuntimeException("Unsupported row-parallel transposeSparseToSparse: "+rl+", "+ru);
 		
-		final int m = in.rlen;
-		final int n = in.clen;
 		final int m2 = out.rlen;
 		final int n2 = out.clen;
 		final int ennz2 = (int) (in.nonZeros/m2); 
@@ -908,17 +918,6 @@ public class LibMatrixReorg
 		SparseBlock a = in.getSparseBlock();
 		SparseBlock c = out.getSparseBlock();
 
-		//initial pass to determine capacity (this helps to prevent
-		//sparse row reallocations and mem inefficiency w/ skew
-		int[] cnt = null;
-		if( n <= 4096 ) { //16KB
-			cnt = new int[n];
-			for( int i=0; i<m; i++ ) {
-				if( !a.isEmpty(i) )
-					countAgg(cnt, a.indexes(i), a.pos(i), a.size(i));
-			}
-		}
-		
 		//allocate output sparse rows
 		if( cnt != null ) {
 			for( int i=cl; i<cu; i++ )
@@ -1065,6 +1064,42 @@ public class LibMatrixReorg
 	/**
 	 * 
 	 * @param in
+	 * @param rl
+	 * @param ru
+	 * @return
+	 */
+	private static int[] countNnzPerColumn(MatrixBlock in, int rl, int ru) {
+		//initial pass to determine capacity (this helps to prevent
+		//sparse row reallocations and mem inefficiency w/ skew
+		int[] cnt = null;
+		if( in.sparse && in.clen <= 4096 ) { //16KB
+			SparseBlock a = in.sparseBlock;
+			cnt = new int[in.clen];
+			for( int i=rl; i<ru; i++ ) {
+				if( !a.isEmpty(i) )
+					countAgg(cnt, a.indexes(i), a.pos(i), a.size(i));
+			}
+		}
+		return cnt;
+	}
+	
+	/**
+	 * 
+	 * @param cnt
+	 * @param cnt2
+	 * @return
+	 */
+	private static int[] mergeNnzCounts(int[] cnt, int[] cnt2) {
+		if( cnt == null )
+			return cnt2;
+		for( int i=0; i<cnt.length; i++ )
+			cnt[i] += cnt2[i];
+		return cnt;
+	}
+	
+	/**
+	 * 
+	 * @param in
 	 * @param out
 	 * @throws DMLRuntimeException
 	 */
@@ -2349,15 +2384,15 @@ public class LibMatrixReorg
 		private boolean _row = false;
 		private int _rl = -1;
 		private int _ru = -1;
+		private int[] _cnt = null;
 
-		protected TransposeTask(MatrixBlock in, MatrixBlock out, boolean row, int rl, int ru) 
-			throws DMLRuntimeException
-		{
+		protected TransposeTask(MatrixBlock in, MatrixBlock out, boolean row, int rl, int ru, int[] cnt) {
 			_in = in;
 			_out = out;
 			_row = row;
 			_rl = rl;
 			_ru = ru;
+			_cnt = cnt;
 		}
 		
 		@Override
@@ -2372,7 +2407,7 @@ public class LibMatrixReorg
 			if( !_in.sparse && !_out.sparse )
 				transposeDenseToDense( _in, _out, rl, ru, cl, cu );
 			else if( _in.sparse && _out.sparse )
-				transposeSparseToSparse( _in, _out, rl, ru, cl, cu );
+				transposeSparseToSparse( _in, _out, rl, ru, cl, cu, _cnt );
 			else if( _in.sparse )
 				transposeSparseToDense( _in, _out, rl, ru, cl, cu );
 			else
@@ -2381,4 +2416,25 @@ public class LibMatrixReorg
 			return null;
 		}
 	}
+	
+	/**
+	 * 
+	 */
+	private static class CountNnzTask implements Callable<int[]>
+	{
+		private MatrixBlock _in = null;
+		private int _rl = -1;
+		private int _ru = -1;
+
+		protected CountNnzTask(MatrixBlock in, int rl, int ru) {
+			_in = in;
+			_rl = rl;
+			_ru = ru;
+		}
+		
+		@Override
+		public int[] call() throws DMLRuntimeException {
+			return countNnzPerColumn(_in, _rl, _ru);
+		}
+	}
 }