You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/04/01 04:32:31 UTC

[1/4] systemml git commit: [SYSTEMML-2223] Repartition ultra-sparse matrices to preferred #parts

Repository: systemml
Updated Branches:
  refs/heads/master 696e79218 -> addd6e121


[SYSTEMML-2223] Repartition ultra-sparse matrices to preferred #parts

This patch improves the spark checkpointing (i.e., distributed caching)
logic by repartitioning ultra-sparse matrices to the preferred number of
partitions as multiple of the default parallelism.

Furthermore, this also makes a minor improvement for empty block
handling on aggregating ultra-sparse matrices to avoid unnecessary GC
overhead.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/015b2731
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/015b2731
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/015b2731

Branch: refs/heads/master
Commit: 015b2731893b5f630a0bdfb8cb0efdf86e84fd05
Parents: 696e792
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Mar 31 14:54:28 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Mar 31 14:54:28 2018 -0700

----------------------------------------------------------------------
 .../spark/CheckpointSPInstruction.java          | 25 ++++++++++++++------
 .../spark/utils/RDDAggregateUtils.java          | 10 ++++----
 .../runtime/matrix/MatrixCharacteristics.java   |  7 ++++++
 3 files changed, 31 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/015b2731/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
index ccd7319..33dd494 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -41,6 +41,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class CheckpointSPInstruction extends UnarySPInstruction {
 	// default storage level
@@ -100,19 +101,28 @@ public class CheckpointSPInstruction extends UnarySPInstruction {
 		JavaPairRDD<?,?> out = null;
 		if( !in.getStorageLevel().equals( _level ) ) 
 		{
-			//(trigger coalesce if intended number of partitions exceeded by 20%
-			//and not hash partitioned to avoid losing the existing partitioner)
+			//determine need for coalesce or repartition, and csr conversion
 			int numPartitions = SparkUtils.getNumPreferredPartitions(mcIn, in);
 			boolean coalesce = ( 1.2*numPartitions < in.getNumPartitions()
 				&& !SparkUtils.isHashPartitioned(in) && in.getNumPartitions()
 				> SparkExecutionContext.getDefaultParallelism(true));
+			boolean repartition = mcIn.dimsKnown(true) && mcIn.isUltraSparse()
+				&& numPartitions > in.getNumPartitions();
+			boolean mcsr2csr = input1.getDataType()==DataType.MATRIX 
+				&& OptimizerUtils.checkSparseBlockCSRConversion(mcIn)
+				&& !_level.equals(Checkpoint.SER_STORAGE_LEVEL);
 			
 			//checkpoint pre-processing rdd operations
 			if( coalesce ) {
 				//merge partitions without shuffle if too many partitions
 				out = in.coalesce( numPartitions );
 			}
-			else {
+			else if( repartition ) {
+				//repartition to preferred size as multiple of default parallelism
+				out = in.repartition(UtilFunctions.roundToNext(numPartitions,
+					SparkExecutionContext.getDefaultParallelism(true)));
+			}
+			else if( !mcsr2csr ) {
 				//since persist is an in-place marker for a storage level, we 
 				//apply a narrow shallow copy to allow for short-circuit collects 
 				if( input1.getDataType() == DataType.MATRIX )
@@ -120,13 +130,14 @@ public class CheckpointSPInstruction extends UnarySPInstruction {
 						(JavaPairRDD<MatrixIndexes,MatrixBlock>)in, false);
 				else if( input1.getDataType() == DataType.FRAME)
 					out = ((JavaPairRDD<Long,FrameBlock>)in)
-						.mapValues(new CopyFrameBlockFunction(false));	
+						.mapValues(new CopyFrameBlockFunction(false));
+			}
+			else {
+				out = in;
 			}
 			
 			//convert mcsr into memory-efficient csr if potentially sparse
-			if( input1.getDataType()==DataType.MATRIX 
-				&& OptimizerUtils.checkSparseBlockCSRConversion(mcIn)
-				&& !_level.equals(Checkpoint.SER_STORAGE_LEVEL) ) {
+			if( mcsr2csr ) {
 				out = ((JavaPairRDD<MatrixIndexes,MatrixBlock>)out)
 					.mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR));
 			}

http://git-wip-us.apache.org/repos/asf/systemml/blob/015b2731/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index 476e93f..0101e26 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -256,26 +256,28 @@ public class RDDAggregateUtils
 	{
 		private static final long serialVersionUID = 3703543699467085539L;
 		
-		private AggregateOperator _op = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), true, CorrectionLocationType.NONE);	
+		private AggregateOperator _op = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), true, CorrectionLocationType.NONE);
 		
 		@Override
 		public CorrMatrixBlock call(CorrMatrixBlock arg0, MatrixBlock arg1) 
 			throws Exception 
 		{
+			if( arg1.isEmptyBlock(false) )
+				return arg0;
+			
 			//get current block and correction
 			MatrixBlock value = arg0.getValue();
 			MatrixBlock corr = arg0.getCorrection();
 			
 			//correction block allocation on demand
-			if( corr == null ){
+			if( corr == null )
 				corr = new MatrixBlock(value.getNumRows(), value.getNumColumns(), false);
-			}
 			
 			//aggregate other input and maintain corrections 
 			//(existing value and corr are used in place)
 			OperationsOnMatrixValues.incrementalAggregation(value, corr, arg1, _op, false);
 			return arg0.set(value, corr);
-		}	
+		}
 	}
 
 	private static class MergeSumBlockCombinerFunction implements Function2<CorrMatrixBlock, CorrMatrixBlock, CorrMatrixBlock> 

http://git-wip-us.apache.org/repos/asf/systemml/blob/015b2731/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
index 1443a8c..91d70f8 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashMap;
 
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.lops.MMTSJ.MMTSJType;
 import org.apache.sysml.runtime.instructions.mr.AggregateBinaryInstruction;
 import org.apache.sysml.runtime.instructions.mr.AggregateInstruction;
@@ -61,6 +62,7 @@ import org.apache.sysml.runtime.instructions.mr.UaggOuterChainInstruction;
 import org.apache.sysml.runtime.instructions.mr.UnaryInstruction;
 import org.apache.sysml.runtime.instructions.mr.UnaryMRInstructionBase;
 import org.apache.sysml.runtime.instructions.mr.ZeroOutInstruction;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
@@ -225,6 +227,11 @@ public class MatrixCharacteristics implements Serializable
 		return ( !ubNnz && nonZero >= 0 );
 	}
 	
+	public boolean isUltraSparse() {
+		return dimsKnown(true) && OptimizerUtils.getSparsity(this)
+			< MatrixBlock.ULTRA_SPARSITY_TURN_POINT;
+	}
+	
 	public boolean mightHaveEmptyBlocks() {
 		long singleBlk = Math.max(Math.min(numRows, numRowsPerBlock),1) 
 				* Math.max(Math.min(numColumns, numColumnsPerBlock),1);


[4/4] systemml git commit: [SYSTEMML-2225] Fix reblock ultra-sparse, incl mem efficiency read

Posted by mb...@apache.org.
[SYSTEMML-2225] Fix reblock ultra-sparse, incl mem efficiency read

This patch fixes the robustness of reblocking ultra-sparse matrices by
hardening the CSR index lookups, and better handling of empty blocks on
reblock. Furthermore, this also includes a fix for avoiding unnecessary
csr block creation on initial read for empty blocks.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/addd6e12
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/addd6e12
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/addd6e12

Branch: refs/heads/master
Commit: addd6e121ac8d81af0f90859666b9ac1ec1e5009
Parents: c516145
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Mar 31 19:06:19 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Mar 31 21:27:34 2018 -0700

----------------------------------------------------------------------
 .../spark/functions/CopyBlockPairFunction.java  |  2 +-
 .../functions/ExtractBlockForBinaryReblock.java | 31 ++++++++++----------
 .../sysml/runtime/matrix/data/MatrixBlock.java  |  8 +++--
 .../runtime/matrix/data/SparseBlockCSR.java     |  7 +++--
 4 files changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/addd6e12/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyBlockPairFunction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyBlockPairFunction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyBlockPairFunction.java
index 5423a8f..8ff6c2a 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyBlockPairFunction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyBlockPairFunction.java
@@ -71,7 +71,7 @@ public class CopyBlockPairFunction implements PairFlatMapFunction<Iterator<Tuple
 				MatrixIndexes ix = new MatrixIndexes(arg._1());
 				MatrixBlock block = null;
 				//always create deep copies in more memory-efficient CSR representation 
-				//if block is already in sparse format			
+				//if block is already in sparse format
 				if( Checkpoint.CHECKPOINT_SPARSE_CSR && arg._2.isInSparseFormat() )
 					block = new MatrixBlock(arg._2, SparseBlock.Type.CSR, true);
 				else

http://git-wip-us.apache.org/repos/asf/systemml/blob/addd6e12/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
index a2a1ce0..66a2271 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
@@ -70,37 +70,36 @@ public class ExtractBlockForBinaryReblock implements PairFlatMapFunction<Tuple2<
 		long endRowGlobalCellIndex = getEndGlobalIndex(ixIn.getRowIndex(), true, true);
 		long startColGlobalCellIndex = UtilFunctions.computeCellIndex(ixIn.getColumnIndex(), in_bclen, 0);
 		long endColGlobalCellIndex = getEndGlobalIndex(ixIn.getColumnIndex(), true, false);
-		assert(startRowGlobalCellIndex <= endRowGlobalCellIndex && startColGlobalCellIndex <= endColGlobalCellIndex);
 		
 		long out_startRowBlockIndex = UtilFunctions.computeBlockIndex(startRowGlobalCellIndex, out_brlen);
 		long out_endRowBlockIndex = UtilFunctions.computeBlockIndex(endRowGlobalCellIndex, out_brlen);
 		long out_startColBlockIndex = UtilFunctions.computeBlockIndex(startColGlobalCellIndex, out_bclen);
 		long out_endColBlockIndex = UtilFunctions.computeBlockIndex(endColGlobalCellIndex, out_bclen);
-		assert(out_startRowBlockIndex <= out_endRowBlockIndex && out_startColBlockIndex <= out_endColBlockIndex);
 		
 		ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> retVal = new ArrayList<>();
 		
 		for(long i = out_startRowBlockIndex; i <= out_endRowBlockIndex; i++) {
 			for(long j = out_startColBlockIndex; j <= out_endColBlockIndex; j++) {
 				MatrixIndexes indx = new MatrixIndexes(i, j);
-				long rowLower = Math.max(UtilFunctions.computeCellIndex(i, out_brlen, 0), startRowGlobalCellIndex);
-				long rowUpper = Math.min(getEndGlobalIndex(i, false, true), endRowGlobalCellIndex);
-				long colLower = Math.max(UtilFunctions.computeCellIndex(j, out_bclen, 0), startColGlobalCellIndex);
-				long colUpper = Math.min(getEndGlobalIndex(j, false, false), endColGlobalCellIndex);
-				
 				int new_lrlen = UtilFunctions.computeBlockSize(rlen, i, out_brlen);
 				int new_lclen = UtilFunctions.computeBlockSize(clen, j, out_bclen);
 				MatrixBlock blk = new MatrixBlock(new_lrlen, new_lclen, true);
 				
-				int in_i1 = UtilFunctions.computeCellInBlock(rowLower, in_brlen);
-				int out_i1 = UtilFunctions.computeCellInBlock(rowLower, out_brlen);
-				
-				for(long i1 = rowLower; i1 <= rowUpper; i1++, in_i1++, out_i1++) {
-					int in_j1 = UtilFunctions.computeCellInBlock(colLower, in_bclen);
-					int out_j1 = UtilFunctions.computeCellInBlock(colLower, out_bclen);
-					for(long j1 = colLower; j1 <= colUpper; j1++, in_j1++, out_j1++) {
-						double val = in.getValue(in_i1, in_j1);
-						blk.appendValue(out_i1, out_j1, val);
+				if( !in.isEmptyBlock(false) ) {
+					long rowLower = Math.max(UtilFunctions.computeCellIndex(i, out_brlen, 0), startRowGlobalCellIndex);
+					long rowUpper = Math.min(getEndGlobalIndex(i, false, true), endRowGlobalCellIndex);
+					long colLower = Math.max(UtilFunctions.computeCellIndex(j, out_bclen, 0), startColGlobalCellIndex);
+					long colUpper = Math.min(getEndGlobalIndex(j, false, false), endColGlobalCellIndex);
+					int in_i1 = UtilFunctions.computeCellInBlock(rowLower, in_brlen);
+					int out_i1 = UtilFunctions.computeCellInBlock(rowLower, out_brlen);
+					
+					for(long i1 = rowLower; i1 <= rowUpper; i1++, in_i1++, out_i1++) {
+						int in_j1 = UtilFunctions.computeCellInBlock(colLower, in_bclen);
+						int out_j1 = UtilFunctions.computeCellInBlock(colLower, out_bclen);
+						for(long j1 = colLower; j1 <= colUpper; j1++, in_j1++, out_j1++) {
+							double val = in.quickGetValue(in_i1, in_j1);
+							blk.appendValue(out_i1, out_j1, val);
+						}
 					}
 				}
 				retVal.add(new Tuple2<>(indx, blk));

http://git-wip-us.apache.org/repos/asf/systemml/blob/addd6e12/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index f19fbe5..efe2365 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -184,10 +184,12 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 			throw new RuntimeException("Sparse matrix block expected.");
 		
 		//deep copy and change sparse block type
-		nonZeros = that.nonZeros;
-		estimatedNNzsPerRow = that.estimatedNNzsPerRow;
-		sparseBlock = SparseBlockFactory
+		if( !that.isEmptyBlock(false) ) {
+			nonZeros = that.nonZeros;
+			estimatedNNzsPerRow = that.estimatedNNzsPerRow;
+			sparseBlock = SparseBlockFactory
 				.copySparseBlock(stype, that.sparseBlock, deep);
+		}
 	}
 	
 	////////

http://git-wip-us.apache.org/repos/asf/systemml/blob/addd6e12/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
index de0c34b..6bbc81d 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
@@ -734,16 +734,20 @@ public class SparseBlockCSR extends SparseBlock
 
 	@Override
 	public double get(int r, int c) {
+		if( isEmpty(r) )
+			return 0;
 		int pos = pos(r);
 		int len = size(r);
 		
 		//search for existing col index in [pos,pos+len)
-		int index = Arrays.binarySearch(_indexes, pos, pos+len, c);		
+		int index = Arrays.binarySearch(_indexes, pos, pos+len, c);
 		return (index >= 0) ? _values[index] : 0;
 	}
 	
 	@Override 
 	public SparseRow get(int r) {
+		if( isEmpty(r) )
+			return new SparseRowScalar();
 		int pos = pos(r);
 		int len = size(r);
 		
@@ -751,7 +755,6 @@ public class SparseBlockCSR extends SparseBlock
 		System.arraycopy(_indexes, pos, row.indexes(), 0, len);
 		System.arraycopy(_values, pos, row.values(), 0, len);
 		row.setSize(len);
-		
 		return row;
 	}
 	


[2/4] systemml git commit: [HOTFIX][SYSTEMML-2219] Fix ultra-sparse/ultra-sparse matrix multiply

Posted by mb...@apache.org.
[HOTFIX][SYSTEMML-2219] Fix ultra-sparse/ultra-sparse matrix multiply

This patch fixes the improved ultra-sparse matrix multiply for special
cases of ultra-sparse x ultra-sparse matrix multiply where the rhs has
entirely empty rows.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/2b3aefe7
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/2b3aefe7
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/2b3aefe7

Branch: refs/heads/master
Commit: 2b3aefe79446b3b1ab13640566a37e11f620ee96
Parents: 015b273
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Mar 31 15:05:59 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Mar 31 15:05:59 2018 -0700

----------------------------------------------------------------------
 .../runtime/matrix/data/LibMatrixMult.java      |  7 +-
 ...FullMatrixMultiplicationUltraSparseTest.java | 75 ++++++++++++--------
 2 files changed, 51 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/2b3aefe7/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
index 8919ab6..ef273f6 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
@@ -1512,17 +1512,18 @@ public class LibMatrixMult
 				if( alen==1 ) { 
 					//row selection (now aggregation) with potential scaling
 					int aix = aixs[apos];
+					int lnnz = 0;
 					if( rightSparse ) { //sparse right matrix (full row copy)
 						if( !m2.sparseBlock.isEmpty(aix) ) {
 							ret.rlen=m;
 							ret.allocateSparseRowsBlock(false); //allocation on demand
 							boolean ldeep = (m2.sparseBlock instanceof SparseBlockMCSR);
 							ret.sparseBlock.set(i, m2.sparseBlock.get(aix), ldeep);
-							ret.nonZeros += ret.sparseBlock.size(i);
+							ret.nonZeros += (lnnz = ret.sparseBlock.size(i));
 						}
 					}
 					else { //dense right matrix (append all values)
-						int lnnz = (int)m2.recomputeNonZeros(aix, aix, 0, n-1);
+						lnnz = (int)m2.recomputeNonZeros(aix, aix, 0, n-1);
 						if( lnnz > 0 ) {
 							c.allocate(i, lnnz); //allocate once
 							double[] bvals = m2.getDenseBlock().values(aix);
@@ -1532,7 +1533,7 @@ public class LibMatrixMult
 						}
 					}
 					//optional scaling if not pure selection
-					if( avals[apos] != 1 )
+					if( avals[apos] != 1 && lnnz > 0 )
 						vectMultiplyInPlace(avals[apos], c.values(i), c.pos(i), c.size(i));
 				}
 				else //GENERAL CASE

http://git-wip-us.apache.org/repos/asf/systemml/blob/2b3aefe7/src/test/java/org/apache/sysml/test/integration/functions/binary/matrix_full_other/FullMatrixMultiplicationUltraSparseTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/binary/matrix_full_other/FullMatrixMultiplicationUltraSparseTest.java b/src/test/java/org/apache/sysml/test/integration/functions/binary/matrix_full_other/FullMatrixMultiplicationUltraSparseTest.java
index 2a621fb..538ac1b 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/binary/matrix_full_other/FullMatrixMultiplicationUltraSparseTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/binary/matrix_full_other/FullMatrixMultiplicationUltraSparseTest.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
+import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.lops.LopProperties.ExecType;
 import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
@@ -34,7 +34,6 @@ import org.apache.sysml.test.utils.TestUtils;
 
 public class FullMatrixMultiplicationUltraSparseTest extends AutomatedTestBase 
 {
-	
 	private final static String TEST_NAME = "FullMatrixMultiplication";
 	private final static String TEST_DIR = "functions/binary/matrix_full_other/";
 	private final static String TEST_CLASS_DIR = TEST_DIR + FullMatrixMultiplicationUltraSparseTest.class.getSimpleName() + "/";
@@ -83,62 +82,77 @@ public class FullMatrixMultiplicationUltraSparseTest extends AutomatedTestBase
 	}
 
 	@Test
-	public void testMMDenseUltraSparseCP() 
-	{
+	public void testMMDenseUltraSparseCP() {
 		runMatrixMatrixMultiplicationTest(SparsityType.DENSE, SparsityType.ULTRA_SPARSE, ExecType.CP);
 	}
 	
 	@Test
-	public void testMMSparseUltraSparseCP() 
-	{
+	public void testMMSparseUltraSparseCP() {
 		runMatrixMatrixMultiplicationTest(SparsityType.SPARSE, SparsityType.ULTRA_SPARSE, ExecType.CP);
 	}
 
 	@Test
-	public void testMMUltraSparseDenseCP() 
-	{
+	public void testMMUltraSparseDenseCP() {
 		runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.DENSE, ExecType.CP);
 	}
 	
 	@Test
-	public void testMMUltraSparseSparseCP() 
-	{
+	public void testMMUltraSparseSparseCP() {
 		runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.SPARSE, ExecType.CP);
 	}
 	
 	@Test
-	public void testMMUltraSparseUltraSparseCP() 
-	{
+	public void testMMUltraSparseUltraSparseCP() {
 		runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.ULTRA_SPARSE, ExecType.CP);
 	}
 	
 	@Test
-	public void testMMDenseUltraSparseMR() 
-	{
+	public void testMMDenseUltraSparseSP() {
+		runMatrixMatrixMultiplicationTest(SparsityType.DENSE, SparsityType.ULTRA_SPARSE, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testMMSparseUltraSparseSP() {
+		runMatrixMatrixMultiplicationTest(SparsityType.SPARSE, SparsityType.ULTRA_SPARSE, ExecType.SPARK);
+	}
+
+	@Test
+	public void testMMUltraSparseDenseSP() {
+		runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.DENSE, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testMMUltraSparseSparseSP() {
+		runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.SPARSE, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testMMUltraSparseUltraSparseSP() {
+		runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.ULTRA_SPARSE, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testMMDenseUltraSparseMR() {
 		runMatrixMatrixMultiplicationTest(SparsityType.DENSE, SparsityType.ULTRA_SPARSE, ExecType.MR);
 	}
 	
 	@Test
-	public void testMMSparseUltraSparseMR() 
-	{
+	public void testMMSparseUltraSparseMR() {
 		runMatrixMatrixMultiplicationTest(SparsityType.SPARSE, SparsityType.ULTRA_SPARSE, ExecType.MR);
 	}
 
 	@Test
-	public void testMMUltraSparseDenseMR() 
-	{
+	public void testMMUltraSparseDenseMR() {
 		runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.DENSE, ExecType.MR);
 	}
 	
 	@Test
-	public void testMMUltraSparseSparseMR() 
-	{
+	public void testMMUltraSparseSparseMR() {
 		runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.SPARSE, ExecType.MR);
 	}
 	
 	@Test
-	public void testMMUltraSparseUltraSparseMR() 
-	{
+	public void testMMUltraSparseUltraSparseMR() {
 		runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.ULTRA_SPARSE, ExecType.MR);
 	}
 
@@ -150,12 +164,17 @@ public class FullMatrixMultiplicationUltraSparseTest extends AutomatedTestBase
 	 */
 	private void runMatrixMatrixMultiplicationTest( SparsityType sparseM1, SparsityType sparseM2, ExecType instType)
 	{
-		//setup exec type, rows, cols
-
-		//rtplatform for MR
 		RUNTIME_PLATFORM platformOld = rtplatform;
-		rtplatform = (instType==ExecType.MR) ? RUNTIME_PLATFORM.HADOOP : RUNTIME_PLATFORM.HYBRID;
+		switch( instType ){
+			case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break;
+			case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break;
+			default: rtplatform = RUNTIME_PLATFORM.HYBRID; break;
+		}
 	
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		if( rtplatform == RUNTIME_PLATFORM.SPARK )
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
 		try
 		{
 			TestConfiguration config = getTestConfiguration(TEST_NAME);
@@ -194,8 +213,8 @@ public class FullMatrixMultiplicationUltraSparseTest extends AutomatedTestBase
 			HashMap<CellIndex, Double> rfile  = readRMatrixFromFS("C");
 			TestUtils.compareMatrices(dmlfile, rfile, eps, "Stat-DML", "Stat-R");
 		}
-		finally
-		{
+		finally {
+			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
 			rtplatform = platformOld;
 		}
 	}


[3/4] systemml git commit: [SYSTEMML-2224] Remove unnecessary fields from matrix blocks

Posted by mb...@apache.org.
[SYSTEMML-2224] Remove unnecessary fields from matrix blocks

This patch moves meta data information such as dig (i.e., that a matrix
only has non-zeros on the diagonal) from the matrix block to matrix
object to make the core block more compact. This is especially important
for large distributed ultra-sparse matrices, where these fields lead to
unnecessary GC overhead in case of a large fraction of empty blocks that
cannot be filtered out to guarantee correctness.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/c5161456
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/c5161456
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/c5161456

Branch: refs/heads/master
Commit: c5161456fe5d1a4e9cad7a2ce93dbdec291c2db1
Parents: 2b3aefe
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Mar 31 16:23:46 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Mar 31 16:23:46 2018 -0700

----------------------------------------------------------------------
 .../controlprogram/caching/MatrixObject.java    | 19 +++++++--
 .../instructions/cp/ReorgCPInstruction.java     |  2 +
 .../apache/sysml/runtime/io/MatrixWriter.java   | 33 +++++++-------
 .../sysml/runtime/io/WriterBinaryBlock.java     |  4 +-
 .../sysml/runtime/io/WriterBinaryCell.java      |  3 +-
 .../sysml/runtime/io/WriterMatrixMarket.java    |  2 +-
 .../apache/sysml/runtime/io/WriterTextCSV.java  |  2 +-
 .../apache/sysml/runtime/io/WriterTextCell.java |  2 +-
 .../runtime/matrix/data/LibMatrixReorg.java     |  4 +-
 .../sysml/runtime/matrix/data/MatrixBlock.java  | 45 +++++---------------
 .../sysml/runtime/util/DataConverter.java       | 21 ++++-----
 11 files changed, 58 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index de49222..9714a19 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -71,14 +71,15 @@ public class MatrixObject extends CacheableData<MatrixBlock>
 	
 	//additional matrix-specific flags
 	private UpdateType _updateType = UpdateType.COPY; 
-	
+	private boolean _diag = false;
+
 	//information relevant to partitioned matrices.
 	private boolean _partitioned = false; //indicates if obj partitioned
 	private PDataPartitionFormat _partitionFormat = null; //indicates how obj partitioned
 	private int _partitionSize = -1; //indicates n for BLOCKWISE_N
 	private String _partitionCacheName = null; //name of cache block
 	private MatrixBlock _partitionInMemory = null;
-
+	
 	/**
 	 * Constructor that takes the value type and the HDFS filename.
 	 * 
@@ -119,6 +120,7 @@ public class MatrixObject extends CacheableData<MatrixBlock>
 				                             metaOld.getOutputInfo(), metaOld.getInputInfo());
 		
 		_updateType = mo._updateType;
+		_diag = mo._diag;
 		_partitioned = mo._partitioned;
 		_partitionFormat = mo._partitionFormat;
 		_partitionSize = mo._partitionSize;
@@ -133,6 +135,14 @@ public class MatrixObject extends CacheableData<MatrixBlock>
 		return _updateType;
 	}
 	
+	public boolean isDiag() {
+		return _diag;
+	}
+	
+	public void setDiag(boolean diag) {
+		_diag = diag;
+	}
+	
 	@Override
 	public void updateMatrixCharacteristics (MatrixCharacteristics mc) {
 		_metaData.getMatrixCharacteristics().set(mc);
@@ -531,10 +541,11 @@ public class MatrixObject extends CacheableData<MatrixBlock>
 			if ( oinfo == OutputInfo.BinaryBlockOutputInfo && DMLScript.rtplatform == RUNTIME_PLATFORM.SINGLE_NODE &&
 				(mc.getRowsPerBlock() != ConfigurationManager.getBlocksize() || mc.getColsPerBlock() != ConfigurationManager.getBlocksize()) ) 
 			{
-				DataConverter.writeMatrixToHDFS(_data, fname, oinfo, new MatrixCharacteristics(mc.getRows(), mc.getCols(), ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), mc.getNonZeros()), rep, fprop);
+				DataConverter.writeMatrixToHDFS(_data, fname, oinfo, new MatrixCharacteristics(mc.getRows(), mc.getCols(),
+					ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), mc.getNonZeros()), rep, fprop, _diag);
 			}
 			else {
-				DataConverter.writeMatrixToHDFS(_data, fname, oinfo, mc, rep, fprop);
+				DataConverter.writeMatrixToHDFS(_data, fname, oinfo, mc, rep, fprop, _diag);
 			}
 
 			if( LOG.isTraceEnabled() )

http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
index af4c12e..89d99b7 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
@@ -144,5 +144,7 @@ public class ReorgCPInstruction extends UnaryCPInstruction {
 			ec.releaseMatrixInput(_col.getName());
 		ec.releaseMatrixInput(input1.getName(), getExtendedOpcode());
 		ec.setMatrixOutput(output.getName(), soresBlock, getExtendedOpcode());
+		if( r_op.fn instanceof DiagIndex && soresBlock.getNumColumns()>1 ) //diagV2M
+			ec.getMatrixObject(output.getName()).setDiag(true);
 	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/MatrixWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixWriter.java b/src/main/java/org/apache/sysml/runtime/io/MatrixWriter.java
index 45ba2d9..126a772 100644
--- a/src/main/java/org/apache/sysml/runtime/io/MatrixWriter.java
+++ b/src/main/java/org/apache/sysml/runtime/io/MatrixWriter.java
@@ -21,7 +21,6 @@ package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
 
-import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 
 /**
@@ -33,9 +32,15 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
  */
 public abstract class MatrixWriter 
 {
+	public void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz )
+		throws IOException
+	{
+		writeMatrixToHDFS(src, fname, rlen, clen, brlen, bclen, nnz, false);
+	}
 
-	public abstract void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz )
-		throws IOException, DMLRuntimeException;
+	public abstract void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag )
+		throws IOException;
+	
 	
 	/**
 	 * Writes a minimal entry to represent an empty matrix on hdfs.
@@ -50,33 +55,28 @@ public abstract class MatrixWriter
 	public abstract void writeEmptyMatrixToHDFS( String fname, long rlen, long clen, int brlen, int bclen )
 		throws IOException;
 
-	public static MatrixBlock[] createMatrixBlocksForReuse( long rlen, long clen, int brlen, int bclen, boolean sparse, long nonZeros ) 
-	{
+	public static MatrixBlock[] createMatrixBlocksForReuse( long rlen, long clen, int brlen, int bclen, boolean sparse, long nonZeros ) {
 		MatrixBlock[] blocks = new MatrixBlock[4];
 		double sparsity = ((double)nonZeros)/(rlen*clen);
 		long estNNZ = -1;
 		
 		//full block 
-		if( rlen >= brlen && clen >= bclen )
-		{
+		if( rlen >= brlen && clen >= bclen ) {
 			estNNZ = (long) (brlen*bclen*sparsity);
 			blocks[0] = new MatrixBlock( brlen, bclen, sparse, (int)estNNZ );
 		}
 		//partial col block
-		if( rlen >= brlen && clen%bclen!=0 )
-		{
+		if( rlen >= brlen && clen%bclen!=0 ) {
 			estNNZ = (long) (brlen*(clen%bclen)*sparsity);
 			blocks[1] = new MatrixBlock( brlen, (int)(clen%bclen), sparse, (int)estNNZ );
 		}
 		//partial row block
-		if( rlen%brlen!=0 && clen>=bclen )
-		{
+		if( rlen%brlen!=0 && clen>=bclen ) {
 			estNNZ = (long) ((rlen%brlen)*bclen*sparsity);
 			blocks[2] = new MatrixBlock( (int)(rlen%brlen), bclen, sparse, (int)estNNZ );
 		}
 		//partial row/col block
-		if( rlen%brlen!=0 && clen%bclen!=0 )
-		{
+		if( rlen%brlen!=0 && clen%bclen!=0 ) {
 			estNNZ = (long) ((rlen%brlen)*(clen%bclen)*sparsity);
 			blocks[3] = new MatrixBlock( (int)(rlen%brlen), (int)(clen%bclen), sparse, (int)estNNZ );
 		}
@@ -85,16 +85,14 @@ public abstract class MatrixWriter
 		for( MatrixBlock b : blocks )
 			if( b != null )
 				if( !sparse )
-					b.allocateDenseBlockUnsafe(b.getNumRows(), b.getNumColumns());		
+					b.allocateDenseBlockUnsafe(b.getNumRows(), b.getNumColumns());
 		//NOTE: no preallocation for sparse (preallocate sparserows with estnnz) in order to reduce memory footprint
 		
 		return blocks;
 	}
 
-	public static MatrixBlock getMatrixBlockForReuse( MatrixBlock[] blocks, int rows, int cols, int brlen, int bclen )
-	{
+	public static MatrixBlock getMatrixBlockForReuse( MatrixBlock[] blocks, int rows, int cols, int brlen, int bclen ) {
 		int index = -1;
-		
 		if( rows==brlen && cols==bclen )
 			index = 0;
 		else if( rows==brlen && cols<bclen )
@@ -103,7 +101,6 @@ public abstract class MatrixWriter
 			index = 2;
 		else //if( rows<brlen && cols<bclen )
 			index = 3;
-
 		return blocks[ index ];
 	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
index 59fe573..777828c 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
@@ -45,7 +45,7 @@ public class WriterBinaryBlock extends MatrixWriter
 	}
 
 	@Override
-	public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) 
+	public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag) 
 		throws IOException, DMLRuntimeException 
 	{
 		//prepare file access
@@ -61,7 +61,7 @@ public class WriterBinaryBlock extends MatrixWriter
 			MRJobConfiguration.addBinaryBlockSerializationFramework( job );
 		
 		//core write sequential/parallel
-		if( src.isDiag() )
+		if( diag )
 			writeDiagBinaryBlockMatrixToHDFS(path, job, fs, src, rlen, clen, brlen, bclen);
 		else
 			writeBinaryBlockMatrixToHDFS(path, job, fs, src, rlen, clen, brlen, bclen);

http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java
index a072a6b..b4fe69f 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java
@@ -36,9 +36,8 @@ import org.apache.sysml.runtime.util.MapReduceTool;
 
 public class WriterBinaryCell extends MatrixWriter
 {
-
 	@Override
-	public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) 
+	public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag) 
 		throws IOException, DMLRuntimeException 
 	{
 		//prepare file access

http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
index c9f42bf..2115a19 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
@@ -42,7 +42,7 @@ import org.apache.sysml.runtime.util.MapReduceTool;
 public class WriterMatrixMarket extends MatrixWriter
 {
 	@Override
-	public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) 
+	public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag) 
 		throws IOException, DMLRuntimeException 
 	{
 		//validity check matrix dimensions

http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
index a3015f2..3e89a52 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
@@ -54,7 +54,7 @@ public class WriterTextCSV extends MatrixWriter
 	}
 	
 	@Override
-	public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) 
+	public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag) 
 		throws IOException, DMLRuntimeException 
 	{
 		//validity check matrix dimensions

http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
index 0438f46..413a165 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
@@ -37,7 +37,7 @@ import org.apache.sysml.runtime.util.MapReduceTool;
 public class WriterTextCell extends MatrixWriter
 {
 	@Override
-	public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) 
+	public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag) 
 		throws IOException, DMLRuntimeException 
 	{
 		//validity check matrix dimensions

http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
index 8f7bda9..42c56c4 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
@@ -290,10 +290,8 @@ public class LibMatrixReorg
 		int rlen = in.rlen;
 		int clen = in.clen;
 		
-		if( clen == 1 ){ //diagV2M
+		if( clen == 1 ) //diagV2M
 			diagV2M( in, out );
-			out.setDiag();
-		}
 		else if ( rlen == clen ) //diagM2V
 			diagM2V( in, out );
 		else

http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 9807340..f19fbe5 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -131,13 +131,6 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 	//sparse-block-specific attributes (allocation only)
 	protected int estimatedNNzsPerRow = -1; 
 	
-	//grpaggregate-specific attributes (optional)
-	protected int numGroups = -1;
-	
-	//diag-specific attributes (optional)
-	protected boolean diag = false;
-	
-	
 	////////
 	// Matrix Constructors
 	//
@@ -253,16 +246,10 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 			(int)Math.ceil((double)estnnz/(double)rlen);
 		
 		//reset sparse/dense blocks
-		if( sparse ) {
+		if( sparse )
 			resetSparse();
-		}
-		else {
+		else
 			resetDense(val);
-		}
-		
-		//reset operation-specific attributes
-		numGroups = -1;
-		diag = false;
 	}
 	
 	private void resetSparse() {
@@ -505,15 +492,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		}
 		return ret;
 	}
-	
-	public void setDiag() {
-		diag = true;
-	}
-	
-	public boolean isDiag() {
-		return diag;
-	}
-	
+
 	////////
 	// Data handling
 	
@@ -4723,35 +4702,31 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		if( this.getNumRows() != target.getNumRows() && this.getNumRows() !=Math.max(target.getNumRows(),target.getNumColumns()) || (weights != null && this.getNumRows() != weights.getNumRows()) ) 
 			throw new DMLRuntimeException("groupedAggregate can only operate on matrices with equal dimensions.");
 		
-		// obtain numGroups from instruction, if provided
-		if (ngroups > 0)
-			numGroups = ngroups;
-		
 		// Determine the number of groups
-		if( numGroups <= 0 ) { //reuse if available
+		if( ngroups <= 0 ) { //reuse if available
 			double min = this.min();
 			double max = this.max();
 			if ( min <= 0 )
 				throw new DMLRuntimeException("Invalid value (" + min + ") encountered in 'groups' while computing groupedAggregate");
 			if ( max <= 0 )
 				throw new DMLRuntimeException("Invalid value (" + max + ") encountered in 'groups' while computing groupedAggregate.");
-			numGroups = (int) max;
+			ngroups = (int) max;
 		}
 	
 		// Allocate result matrix
 		boolean rowVector = (target.getNumRows()==1 && target.getNumColumns()>1);
 		MatrixBlock result = checkType(ret);
-		boolean result_sparsity = estimateSparsityOnGroupedAgg(rlen, numGroups);
+		boolean result_sparsity = estimateSparsityOnGroupedAgg(rlen, ngroups);
 		if(result==null)
-			result=new MatrixBlock(numGroups, rowVector?1:target.getNumColumns(), result_sparsity);
+			result=new MatrixBlock(ngroups, rowVector?1:target.getNumColumns(), result_sparsity);
 		else
-			result.reset(numGroups, rowVector?1:target.getNumColumns(), result_sparsity);
+			result.reset(ngroups, rowVector?1:target.getNumColumns(), result_sparsity);
 
 		//execute grouped aggregate operation
 		if( k > 1 )
-			LibMatrixAgg.groupedAggregate(this, target, weights, result, numGroups, op, k);
+			LibMatrixAgg.groupedAggregate(this, target, weights, result, ngroups, op, k);
 		else
-			LibMatrixAgg.groupedAggregate(this, target, weights, result, numGroups, op);
+			LibMatrixAgg.groupedAggregate(this, target, weights, result, ngroups, op);
 		
 		return result;
 	}

http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
index 82eee9d..8748de2 100644
--- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
@@ -67,22 +67,19 @@ public class DataConverter
 	///////
 
 	public static void writeMatrixToHDFS(MatrixBlock mat, String dir, OutputInfo outputinfo,  MatrixCharacteristics mc )
-		throws IOException
-	{
+		throws IOException {
 		writeMatrixToHDFS(mat, dir, outputinfo, mc, -1, null);
 	}
 
 	public static void writeMatrixToHDFS(MatrixBlock mat, String dir, OutputInfo outputinfo, MatrixCharacteristics mc, int replication, FileFormatProperties formatProperties)
-		throws IOException
-	{
-		try {
-			MatrixWriter writer = MatrixWriterFactory.createMatrixWriter( outputinfo, replication, formatProperties );
-			writer.writeMatrixToHDFS(mat, dir, mc.getRows(), mc.getCols(), mc.getRowsPerBlock(), mc.getColsPerBlock(), mc.getNonZeros());
-		}
-		catch(Exception e)
-		{
-			throw new IOException(e);
-		}
+		throws IOException {
+		writeMatrixToHDFS(mat, dir, outputinfo, mc, -1, null, false);
+	}
+	
+	public static void writeMatrixToHDFS(MatrixBlock mat, String dir, OutputInfo outputinfo, MatrixCharacteristics mc, int replication, FileFormatProperties formatProperties, boolean diag)
+		throws IOException {
+		MatrixWriter writer = MatrixWriterFactory.createMatrixWriter( outputinfo, replication, formatProperties );
+		writer.writeMatrixToHDFS(mat, dir, mc.getRows(), mc.getCols(), mc.getRowsPerBlock(), mc.getColsPerBlock(), mc.getNonZeros(), diag);
 	}
 
 	public static MatrixBlock readMatrixFromHDFS(String dir, InputInfo inputinfo, long rlen, long clen, int brlen, int bclen, boolean localFS)