You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2015/11/26 19:50:55 UTC

[1/3] incubator-systemml git commit: Fix parfor rewrite 'rdd checkpoint injection' (prog rewrite / recompile)

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 5c88842d1 -> 9fd00a990


Fix parfor rewrite 'rdd checkpoint injection' (prog rewrite / recompile)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/38b8be19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/38b8be19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/38b8be19

Branch: refs/heads/master
Commit: 38b8be1904aa7dd9e9720545f20bc744d9dc9caa
Parents: 5c88842
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Nov 25 14:52:03 2015 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Wed Nov 25 20:06:30 2015 -0800

----------------------------------------------------------------------
 .../rewrite/RewriteInjectSparkLoopCheckpointing.java     |  3 +++
 .../controlprogram/parfor/opt/ProgramRecompiler.java     | 11 ++++++++++-
 2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/38b8be19/src/main/java/com/ibm/bi/dml/hops/rewrite/RewriteInjectSparkLoopCheckpointing.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/hops/rewrite/RewriteInjectSparkLoopCheckpointing.java b/src/main/java/com/ibm/bi/dml/hops/rewrite/RewriteInjectSparkLoopCheckpointing.java
index c259d4d..5e0e54f 100644
--- a/src/main/java/com/ibm/bi/dml/hops/rewrite/RewriteInjectSparkLoopCheckpointing.java
+++ b/src/main/java/com/ibm/bi/dml/hops/rewrite/RewriteInjectSparkLoopCheckpointing.java
@@ -107,6 +107,9 @@ public class RewriteInjectSparkLoopCheckpointing extends StatementBlockRewriteRu
 				sb0.setLiveIn(livein);
 				sb0.setLiveOut(liveout);
 				ret.add(sb0);
+				
+				//maintain rewrite status
+				status.setInjectedCheckpoints();
 			}
 		}
 			

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/38b8be19/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/opt/ProgramRecompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/opt/ProgramRecompiler.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/opt/ProgramRecompiler.java
index 45fe8d1..e9b2f43 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/opt/ProgramRecompiler.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/opt/ProgramRecompiler.java
@@ -33,6 +33,7 @@ import com.ibm.bi.dml.lops.Lop;
 import com.ibm.bi.dml.lops.LopsException;
 import com.ibm.bi.dml.lops.compile.Dag;
 import com.ibm.bi.dml.parser.DMLProgram;
+import com.ibm.bi.dml.parser.DMLTranslator;
 import com.ibm.bi.dml.parser.ForStatement;
 import com.ibm.bi.dml.parser.ForStatementBlock;
 import com.ibm.bi.dml.parser.IfStatement;
@@ -73,13 +74,21 @@ public class ProgramRecompiler
 	 * @throws DMLUnsupportedOperationException 
 	 * @throws DMLRuntimeException 
 	 * @throws LopsException 
+	 * @throws HopsException 
 	 */
 	public static ArrayList<ProgramBlock> generatePartitialRuntimeProgram(Program rtprog, ArrayList<StatementBlock> sbs) 
-		throws LopsException, DMLRuntimeException, DMLUnsupportedOperationException, IOException
+		throws LopsException, DMLRuntimeException, DMLUnsupportedOperationException, IOException, HopsException
 	{
 		ArrayList<ProgramBlock> ret = new ArrayList<ProgramBlock>();
 		DMLConfig config = ConfigurationManager.getConfig();
 		
+		//construct lops from hops if not existing
+		DMLTranslator dmlt = new DMLTranslator(sbs.get(0).getDMLProg());
+		for( StatementBlock sb : sbs ) {
+			dmlt.constructLops(sb);
+		}
+		
+		//construct runtime program from lops
 		for( StatementBlock sb : sbs ) {
 			DMLProgram prog = sb.getDMLProg();
 			ret.add( prog.createRuntimeProgramBlock(rtprog, sb, config) );


[2/3] incubator-systemml git commit: Fix rdd checkpoint compilation (more conservative decision for vectors)

Posted by mb...@apache.org.
Fix rdd checkpoint compilation (more conservative decision for vectors)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/f5196177
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/f5196177
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/f5196177

Branch: refs/heads/master
Commit: f5196177b688720d66a3dafc55fdb7219215234a
Parents: 38b8be1
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Nov 25 18:35:42 2015 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Wed Nov 25 20:06:36 2015 -0800

----------------------------------------------------------------------
 src/main/java/com/ibm/bi/dml/hops/Hop.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f5196177/src/main/java/com/ibm/bi/dml/hops/Hop.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/hops/Hop.java b/src/main/java/com/ibm/bi/dml/hops/Hop.java
index b968952..3545ba3 100644
--- a/src/main/java/com/ibm/bi/dml/hops/Hop.java
+++ b/src/main/java/com/ibm/bi/dml/hops/Hop.java
@@ -351,7 +351,8 @@ public abstract class Hop
 			//(1) avoid unnecessary persist and unpersist calls, and 
 			//(2) avoid unnecessary creation of spark context (incl executors)
 			if(    OptimizerUtils.isHybridExecutionMode() 
-				&& _outputMemEstimate < OptimizerUtils.getLocalMemBudget()
+				&& (getDim2() > 1 && _outputMemEstimate < OptimizerUtils.getLocalMemBudget()
+				|| getDim2() == 1 && _outputMemEstimate < OptimizerUtils.getLocalMemBudget()/3 )
 				|| _etypeForced == ExecType.CP )
 			{
 				et = ExecType.CP;


[3/3] incubator-systemml git commit: New rdd block partitioner, incl aggregation utils; still disabled

Posted by mb...@apache.org.
New rdd block partitioner, incl aggregation utils; still disabled

Local aggregation is crucial for performance by reducing the amount of
shuffled data. Especially for squared sparse matrices (as common for
factorization and graph algorithms), the default hash partitioning leads
to "randomly" distributed blocks and hence, almost no local aggregation
for row or column-wise aggregation. This custom partitioner assigns
squared matrix tiles that are composed of multiple blocks to individual
partitions and hence ensures local aggregation independent of the
aggregation direction. 

With this first change, we introduce the partitioner and extended
aggregation utils (which require the matrix dimensions). Down the road,
this will become the default binary block partitioner if the promising
initial results hold over a variety of additional experiments. Note that
all shuffle-based operations would need to change simultaneously in
order to avoid unnecessary overhead due to partitioner mismatches.



Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/9fd00a99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/9fd00a99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/9fd00a99

Branch: refs/heads/master
Commit: 9fd00a990b90496d03b69442333ecfbf6af5b12f
Parents: f519617
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Nov 26 01:00:00 2015 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Nov 26 01:00:00 2015 -0800

----------------------------------------------------------------------
 .../spark/data/BlockPartitioner.java            | 87 ++++++++++++++++++++
 .../spark/utils/RDDAggregateUtils.java          | 62 ++++++++++++++
 .../spark/utils/RDDConverterUtils.java          |  6 +-
 3 files changed, 152 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9fd00a99/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/BlockPartitioner.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/BlockPartitioner.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/BlockPartitioner.java
new file mode 100644
index 0000000..e69bea2
--- /dev/null
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/BlockPartitioner.java
@@ -0,0 +1,87 @@
+/**
+ * (C) Copyright IBM Corp. 2010, 2015
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ */
+
+package com.ibm.bi.dml.runtime.instructions.spark.data;
+
+import org.apache.spark.Partitioner;
+
+import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
+import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
+
+/**
+ * Default partitioner used for all binary block rdd operations in order
+ * to enable sufficient local aggregation independent of the aggregation
+ * direction (row/col-wise). Especially, on large squared matrices 
+ * (as common for factorization or graph algorithms), this is crucial 
+ * for performance. 
+ * 
+ */
+public class BlockPartitioner extends Partitioner
+{
+	private static final long serialVersionUID = 3207938407732880324L;
+	
+	private int _numParts = -1;
+	private int _ncparts = -1;
+	private long _rbPerPart = -1;
+	private long _cbPerPart = -1;
+	
+	public BlockPartitioner(MatrixCharacteristics mc, int numParts) 
+	{
+		long nrblks = mc.getNumRowBlocks();
+		long ncblks = mc.getNumColBlocks();
+		long nblks = nrblks * ncblks;
+		long nblksPerPart = (long)Math.ceil((double)nblks / numParts); 
+		long dimBlks = (long) Math.ceil(Math.sqrt(nblksPerPart));
+		
+		if( nrblks < dimBlks ) { //short and fat
+			_rbPerPart = nrblks;
+			_cbPerPart = (long)Math.ceil((double)nblksPerPart/_rbPerPart);
+		}
+		else if( ncblks < dimBlks ) { //tall and skinny
+			_cbPerPart = ncblks;
+			_rbPerPart = (long)Math.ceil((double)nblksPerPart/_cbPerPart);
+		}
+		else { //general case
+			_rbPerPart = dimBlks;
+			_cbPerPart = dimBlks; 
+		}
+		
+		_ncparts = (int)(ncblks/_cbPerPart);
+		_numParts = numParts;
+	}
+	
+	@Override
+	public int getPartition(Object arg0) 
+	{
+		//sanity check for valid class
+		if( !(arg0 instanceof MatrixIndexes) ) {
+			throw new RuntimeException("Unsupported key class "
+					+ "(expected MatrixIndexes): "+arg0.getClass().getName());
+		}
+			
+		//get partition id
+		MatrixIndexes ix = (MatrixIndexes) arg0;
+		int ixr = (int)((ix.getRowIndex()-1)/_rbPerPart);
+		int ixc = (int)((ix.getColumnIndex()-1)/_cbPerPart);
+		return ixr * _ncparts + ixc;
+	}
+
+	@Override
+	public int numPartitions() {
+		return _numParts;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9fd00a99/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index a7d2755..3020214 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -25,8 +25,10 @@ import com.ibm.bi.dml.lops.PartialAggregate.CorrectionLocationType;
 import com.ibm.bi.dml.runtime.DMLRuntimeException;
 import com.ibm.bi.dml.runtime.functionobjects.KahanPlus;
 import com.ibm.bi.dml.runtime.instructions.cp.KahanObject;
+import com.ibm.bi.dml.runtime.instructions.spark.data.BlockPartitioner;
 import com.ibm.bi.dml.runtime.instructions.spark.data.CorrMatrixBlock;
 import com.ibm.bi.dml.runtime.instructions.spark.data.RowMatrixBlock;
+import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
 import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
 import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
 import com.ibm.bi.dml.runtime.matrix.data.OperationsOnMatrixValues;
@@ -115,6 +117,28 @@ public class RDDAggregateUtils
 	 * @param in
 	 * @return
 	 */
+	public static JavaPairRDD<MatrixIndexes, MatrixBlock> sumByKeyStable( MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes, MatrixBlock> in )
+	{
+		//stable sum of blocks per key, by passing correction blocks along with aggregates 		
+		JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
+				in.combineByKey( new CreateBlockCombinerFunction(), 
+							     new MergeSumBlockValueFunction(), 
+							     new MergeSumBlockCombinerFunction(),
+							     new BlockPartitioner(mc, in.partitions().size()));
+		
+		//strip-off correction blocks from 					     
+		JavaPairRDD<MatrixIndexes, MatrixBlock> out =  
+				tmp.mapValues( new ExtractMatrixBlock() );
+		
+		//return the aggregate rdd
+		return out;
+	}
+	
+	/**
+	 * 
+	 * @param in
+	 * @return
+	 */
 	public static JavaPairRDD<MatrixIndexes, Double> sumCellsByKeyStable( JavaPairRDD<MatrixIndexes, Double> in )
 	{
 		//stable sum of blocks per key, by passing correction blocks along with aggregates 		
@@ -180,6 +204,30 @@ public class RDDAggregateUtils
 	}
 	
 	/**
+	 * 
+	 * @param mc
+	 * @param in
+	 * @param aop
+	 * @return
+	 */
+	public static JavaPairRDD<MatrixIndexes, MatrixBlock> aggByKeyStable( MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes, MatrixBlock> in, AggregateOperator aop )
+	{
+		//stable sum of blocks per key, by passing correction blocks along with aggregates 		
+		JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
+				in.combineByKey( new CreateBlockCombinerFunction(), 
+							     new MergeAggBlockValueFunction(aop), 
+							     new MergeAggBlockCombinerFunction(aop),
+							     new BlockPartitioner(mc, in.partitions().size()));
+		
+		//strip-off correction blocks from 					     
+		JavaPairRDD<MatrixIndexes, MatrixBlock> out =  
+				tmp.mapValues( new ExtractMatrixBlock() );
+		
+		//return the aggregate rdd
+		return out;
+	}
+	
+	/**
 	 * Merges disjoint data of all blocks per key.
 	 * 
 	 * Note: The behavior of this method is undefined for both sparse and dense data if the 
@@ -188,6 +236,7 @@ public class RDDAggregateUtils
 	 * @param in
 	 * @return
 	 */
+	@Deprecated
 	public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( JavaPairRDD<MatrixIndexes, MatrixBlock> in )
 	{
 		return in.reduceByKey(
@@ -195,6 +244,19 @@ public class RDDAggregateUtils
 	}
 	
 	/**
+	 * 
+	 * @param mc
+	 * @param in
+	 * @return
+	 */
+	public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes, MatrixBlock> in )
+	{
+		return in.reduceByKey(
+				new BlockPartitioner(mc, in.partitions().size()),
+				new MergeBlocksFunction());
+	}
+	
+	/**
 	 * Merges disjoint data of all blocks per key.
 	 * 
 	 * Note: The behavior of this method is undefined for both sparse and dense data if the 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9fd00a99/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/utils/RDDConverterUtils.java
index ae67a27..1e3f29a 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -79,7 +79,7 @@ public class RDDConverterUtils
 		}
 		
 		//aggregate partial matrix blocks
-		out = RDDAggregateUtils.mergeByKey( out ); 
+		out = RDDAggregateUtils.mergeByKey( mcOut, out ); 
 		
 		return out;
 	}
@@ -108,7 +108,7 @@ public class RDDConverterUtils
 		}
 		
 		//aggregate partial matrix blocks
-		out = RDDAggregateUtils.mergeByKey( out ); 
+		out = RDDAggregateUtils.mergeByKey( mcOut, out ); 
 		
 		return out;
 	}
@@ -201,7 +201,7 @@ public class RDDConverterUtils
 					new CSVToBinaryBlockFunction(mcOut, delim, fill, fillValue));
 		
 		//aggregate partial matrix blocks
-		out = RDDAggregateUtils.mergeByKey( out ); 
+		out = RDDAggregateUtils.mergeByKey( mcOut, out ); 
 		
 		return out;
 	}