You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/04/21 07:21:52 UTC

incubator-systemml git commit: [SYSTEMML-1551] New multi-threaded colum-wise rexpand operations

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 2a1eb4c9b -> bda96a8e8


[SYSTEMML-1551] New multi-threaded colum-wise rexpand operations

This patch introduces a multi-threaded runtime for the internal
parameterized built-in function rexpand, specifically column expansion,
along with necessary compiler modifications. The runtime improvements
are moderate for both dense and sparse, ranging from 1.6x to 2x due to
better write bandwidth exploitation (dense) and latency hiding (sparse). 
 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/bda96a8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/bda96a8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/bda96a8e

Branch: refs/heads/master
Commit: bda96a8e8690f8821c476d5370c0d39e41da19a2
Parents: 2a1eb4c
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Apr 21 00:08:09 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Apr 21 00:08:09 2017 -0700

----------------------------------------------------------------------
 .../sysml/hops/ParameterizedBuiltinOp.java      |  3 +-
 .../sysml/hops/rewrite/HopRewriteUtils.java     | 14 ++--
 .../apache/sysml/lops/ParameterizedBuiltin.java | 21 ++++-
 .../runtime/compress/CompressedMatrixBlock.java |  4 +-
 .../parfor/opt/OptimizerRuleBased.java          |  3 +-
 .../cp/ParameterizedBuiltinCPInstruction.java   |  5 +-
 .../runtime/matrix/data/LibMatrixReorg.java     | 87 +++++++++++++++++---
 .../sysml/runtime/matrix/data/MatrixBlock.java  |  4 +-
 8 files changed, 116 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
index 1d6828c..74542f4 100644
--- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
@@ -784,8 +784,9 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop
 	{
 		if( et == ExecType.CP || et == ExecType.SPARK )
 		{
+			int k = OptimizerUtils.getConstrainedNumThreads( _maxNumThreads );
 			ParameterizedBuiltin pbilop = new ParameterizedBuiltin(inputlops, 
-					HopsParameterizedBuiltinLops.get(_op), getDataType(), getValueType(), et);
+					HopsParameterizedBuiltinLops.get(_op), getDataType(), getValueType(), et, k);
 			setOutputDimensions(pbilop);
 			setLineNumbers(pbilop);
 			setLops(pbilop);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java b/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java
index f3baec1..f7be8b9 100644
--- a/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java
+++ b/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java
@@ -1048,23 +1048,27 @@ public class HopRewriteUtils
 	//////////////////////////////////////
 	// utils for lookup tables
 	
-	public static boolean isValidOp( AggOp input, AggOp[] validTab ) {
+	public static boolean isValidOp( AggOp input, AggOp... validTab ) {
 		return ArrayUtils.contains(validTab, input);
 	}
 	
-	public static boolean isValidOp( OpOp1 input, OpOp1[] validTab ) {
+	public static boolean isValidOp( OpOp1 input, OpOp1... validTab ) {
 		return ArrayUtils.contains(validTab, input);
 	}
 	
-	public static boolean isValidOp( OpOp2 input, OpOp2[] validTab ) {
+	public static boolean isValidOp( OpOp2 input, OpOp2... validTab ) {
 		return ArrayUtils.contains(validTab, input);
 	}
 	
-	public static boolean isValidOp( ReOrgOp input, ReOrgOp[] validTab ) {
+	public static boolean isValidOp( ReOrgOp input, ReOrgOp... validTab ) {
 		return ArrayUtils.contains(validTab, input);
 	}
 	
-	public static int getValidOpPos( OpOp2 input, OpOp2[] validTab ) {
+	public static boolean isValidOp( ParamBuiltinOp input, ParamBuiltinOp... validTab ) {
+		return ArrayUtils.contains(validTab, input);
+	}
+	
+	public static int getValidOpPos( OpOp2 input, OpOp2... validTab ) {
 		return ArrayUtils.indexOf(validTab, input);
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java b/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
index 81593b8..fdaf3c5 100644
--- a/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
+++ b/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
@@ -48,7 +48,16 @@ public class ParameterizedBuiltin extends Lop
 	private HashMap<String, Lop> _inputParams;
 	private boolean _bRmEmptyBC;
 
+	//cp-specific parameters
+	private int _numThreads = 1;
+	
 	public ParameterizedBuiltin(HashMap<String, Lop> paramLops, OperationTypes op, DataType dt, ValueType vt, ExecType et) 
+			throws HopsException 
+	{
+		this(paramLops, op, dt, vt, et, 1);
+	}
+	
+	public ParameterizedBuiltin(HashMap<String, Lop> paramLops, OperationTypes op, DataType dt, ValueType vt, ExecType et, int k) 
 		throws HopsException 
 	{
 		super(Lop.Type.ParameterizedBuiltin, dt, vt);
@@ -60,6 +69,7 @@ public class ParameterizedBuiltin extends Lop
 		}
 		
 		_inputParams = paramLops;
+		_numThreads = k;
 		
 		boolean breaksAlignment = false;
 		boolean aligner = false;
@@ -229,8 +239,15 @@ public class ParameterizedBuiltin extends Lop
 			sb.append( _bRmEmptyBC );
 			sb.append(OPERAND_DELIMITOR);
 		}
-
-		sb.append(this.prepOutputOperand(output));
+		
+		if( getExecType()==ExecType.CP && _operation == OperationTypes.REXPAND ) {
+			sb.append( "k" );
+			sb.append( Lop.NAME_VALUE_SEPARATOR );
+			sb.append( _numThreads );	
+			sb.append(OPERAND_DELIMITOR);
+		}
+		
+		sb.append(prepOutputOperand(output));
 		
 		return sb.toString();
 	}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index d7c66f1..fc59065 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -2007,11 +2007,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 
 	@Override
 	public MatrixBlock rexpandOperations(MatrixBlock ret, double max,
-			boolean rows, boolean cast, boolean ignore)
+			boolean rows, boolean cast, boolean ignore, int k)
 			throws DMLRuntimeException {
 		printDecompressWarning("rexpandOperations");
 		MatrixBlock tmp = isCompressed() ? decompress() : this;
-		return tmp.rexpandOperations(ret, max, rows, cast, ignore);
+		return tmp.rexpandOperations(ret, max, rows, cast, ignore, k);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 0ff7a31..851e193 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -1359,7 +1359,8 @@ public class OptimizerRuleBased extends Optimizer
 					if(    ConfigurationManager.isParallelMatrixOperations() 
 						&& h instanceof MultiThreadedHop //abop, datagenop, qop, paramop
 						&& !( h instanceof ParameterizedBuiltinOp //only paramop-grpagg
-							 && ((ParameterizedBuiltinOp)h).getOp()!=ParamBuiltinOp.GROUPEDAGG)
+							 && !HopRewriteUtils.isValidOp(((ParameterizedBuiltinOp)h).getOp(), 
+								ParamBuiltinOp.GROUPEDAGG, ParamBuiltinOp.REXPAND))
 						&& !( h instanceof UnaryOp //only unaryop-cumulativeagg
 							 && !((UnaryOp)h).isCumulativeUnaryOperation() )
 						&& !( h instanceof ReorgOp //only reorgop-transpose

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
index 7b1fb57..a31fe94 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
@@ -57,7 +57,6 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction
 	private static final String TOSTRING_SEPARATOR = " ";
 	private static final String TOSTRING_LINESEPARATOR = "\n";
 	
-	
 	private int arity;
 	protected HashMap<String,String> params;
 	
@@ -248,7 +247,9 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction
 			boolean dirVal = params.get("dir").equals("rows");
 			boolean cast = Boolean.parseBoolean(params.get("cast"));
 			boolean ignore = Boolean.parseBoolean(params.get("ignore"));
-			MatrixBlock ret = (MatrixBlock) target.rexpandOperations(new MatrixBlock(), maxVal, dirVal, cast, ignore);
+			int numThreads = Integer.parseInt(params.get("k"));
+			MatrixBlock ret = (MatrixBlock) target.rexpandOperations(
+				new MatrixBlock(), maxVal, dirVal, cast, ignore, numThreads);
 			
 			//release locks
 			ec.setMatrixOutput(output.getName(), ret);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
index 9f45590..edf69c1 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
@@ -645,7 +645,7 @@ public class LibMatrixReorg
 	 * @return output matrix
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
-	public static MatrixBlock rexpand(MatrixBlock in, MatrixBlock ret, double max, boolean rows, boolean cast, boolean ignore) 
+	public static MatrixBlock rexpand(MatrixBlock in, MatrixBlock ret, double max, boolean rows, boolean cast, boolean ignore, int k) 
 		throws DMLRuntimeException
 	{
 		//prepare parameters
@@ -669,7 +669,7 @@ public class LibMatrixReorg
 		if( rows )
 			return rexpandRows(in, ret, lmax, cast, ignore);
 		else //cols
-			return rexpandColumns(in, ret, lmax, cast, ignore);
+			return rexpandColumns(in, ret, lmax, cast, ignore, k);
 	}
 
 	/**
@@ -694,7 +694,7 @@ public class LibMatrixReorg
 		
 		//execute rexpand operations incl sanity checks
 		//TODO more robust (memory efficient) implementation w/o tmp block
-		MatrixBlock tmp = rexpand(in, new MatrixBlock(), max, rows, cast, ignore);
+		MatrixBlock tmp = rexpand(in, new MatrixBlock(), max, rows, cast, ignore, 1);
 		
 		//prepare outputs blocks (slice tmp block into output blocks ) 
 		if( rows ) //expanded vertically
@@ -1909,7 +1909,7 @@ public class LibMatrixReorg
 		return ret;
 	}
 
-	private static MatrixBlock rexpandColumns(MatrixBlock in, MatrixBlock ret, int max, boolean cast, boolean ignore) 
+	private static MatrixBlock rexpandColumns(MatrixBlock in, MatrixBlock ret, int max, boolean cast, boolean ignore, int k) 
 		throws DMLRuntimeException
 	{
 		//set meta data
@@ -1918,10 +1918,43 @@ public class LibMatrixReorg
 		final long nnz = in.nonZeros;
 		boolean sp = MatrixBlock.evalSparseFormatInMemory(rlen, clen, nnz);
 		ret.reset(rlen, clen, sp);
+		ret.allocateDenseOrSparseBlock();
+		
+		//execute rexpand columns
+		long rnnz = 0; //real nnz (due to cutoff max)
+		if( k <= 1 || in.getNumRows() <= PAR_NUMCELL_THRESHOLD ) {
+			rnnz = rexpandColumns(in, ret, max, cast, ignore, 0, rlen);
+		}
+		else {
+			try {
+				ExecutorService pool = Executors.newFixedThreadPool( k );
+				ArrayList<RExpandColsTask> tasks = new ArrayList<RExpandColsTask>();
+				int blklen = (int)(Math.ceil((double)rlen/k/8));
+				for( int i=0; i<8*k & i*blklen<rlen; i++ )
+					tasks.add(new RExpandColsTask(in, ret, 
+						max, cast, ignore, i*blklen, Math.min((i+1)*blklen, rlen)));
+				List<Future<Long>> taskret = pool.invokeAll(tasks);	
+				pool.shutdown();
+				for( Future<Long> task : taskret )
+					rnnz += task.get();
+			}
+			catch(Exception ex) {
+				throw new DMLRuntimeException(ex);
+			}
+		}
+		
+		//post-processing
+		ret.setNonZeros(rnnz);
 		
+		return ret;
+	}
+
+	private static long rexpandColumns(MatrixBlock in, MatrixBlock ret, int max, boolean cast, boolean ignore, int rl, int ru) 
+		throws DMLRuntimeException
+	{
 		//expand input horizontally (input vector likely dense 
 		//but generic implementation for general case)
-		for( int i=0; i<rlen; i++ )
+		for( int i=rl; i<ru; i++ )
 		{
 			//get value and cast if necessary (table)
 			double val = in.quickGetValue(i, 0);
@@ -1931,15 +1964,23 @@ public class LibMatrixReorg
 			//handle invalid values if not to be ignored
 			if( !ignore && val<=0 )
 				throw new DMLRuntimeException("Invalid input value <= 0 for ignore=false: "+val);
-				
+			
 			//set expanded value if matching
-			if( val == Math.floor(val) && val >= 1 && val <= max )
-				ret.appendValue(i, (int)(val-1), 1);
+			if( val == Math.floor(val) && val >= 1 && val <= max ) {
+				//update target without global nnz maintenance
+				if( ret.isInSparseFormat() ) {
+					ret.sparseBlock.allocate(i, 1);
+					ret.sparseBlock.append(i, (int)(val-1), 1);
+				}
+				else
+					ret.setValueDenseUnsafe(i, (int)(val-1), 1);
+			}
 		}
 		
-		return ret;
+		//recompute nnz of partition
+		return ret.recomputeNonZeros(rl, ru-1, 0, ret.getNumColumns()-1);
 	}
-
+	
 	private static void copyColVector( MatrixBlock in, int ixin, double[] tmp, int[] tmpi, int len)
 	{
 		//copy value array from input matrix
@@ -2145,4 +2186,30 @@ public class LibMatrixReorg
 			return countNnzPerColumn(_in, _rl, _ru);
 		}
 	}
+	
+	private static class RExpandColsTask implements Callable<Long>
+	{
+		private final MatrixBlock _in;
+		private final MatrixBlock _out;
+		private final int _max;
+		private final boolean _cast;
+		private final boolean _ignore;
+		private final int _rl;
+		private final int _ru;
+
+		protected RExpandColsTask(MatrixBlock in, MatrixBlock out, int max, boolean cast, boolean ignore, int rl, int ru) {
+			_in = in;
+			_out = out;
+			_max = max;
+			_cast = cast;
+			_ignore = ignore;
+			_rl = rl;
+			_ru = ru;
+		}
+		
+		@Override
+		public Long call() throws DMLRuntimeException {
+			return rexpandColumns(_in, _out, _max, _cast, _ignore, _rl, _ru);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 0ed64e3..2aa1bd3 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -5067,11 +5067,11 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		return removeEmptyOperations(ret, rows, null);
 	}
 
-	public MatrixBlock rexpandOperations( MatrixBlock ret, double max, boolean rows, boolean cast, boolean ignore )
+	public MatrixBlock rexpandOperations( MatrixBlock ret, double max, boolean rows, boolean cast, boolean ignore, int k )
 		throws DMLRuntimeException 
 	{	
 		MatrixBlock result = checkType(ret);
-		return LibMatrixReorg.rexpand(this, result, max, rows, cast, ignore);
+		return LibMatrixReorg.rexpand(this, result, max, rows, cast, ignore, k);
 	}