You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by de...@apache.org on 2017/08/28 17:21:38 UTC

systemml git commit: [MINOR] Replace QuaternaryOp copy/pasted code with methods

Repository: systemml
Updated Branches:
  refs/heads/master d2efa65c8 -> 11e460573


[MINOR] Replace QuaternaryOp copy/pasted code with methods

Closes #640.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/11e46057
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/11e46057
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/11e46057

Branch: refs/heads/master
Commit: 11e4605735f139b18b008e34a2d505397097389c
Parents: d2efa65
Author: Deron Eriksson <de...@apache.org>
Authored: Mon Aug 28 10:19:48 2017 -0700
Committer: Deron Eriksson <de...@apache.org>
Committed: Mon Aug 28 10:19:48 2017 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/hops/QuaternaryOp.java     | 368 +++++--------------
 1 file changed, 88 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/11e46057/src/main/java/org/apache/sysml/hops/QuaternaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/QuaternaryOp.java b/src/main/java/org/apache/sysml/hops/QuaternaryOp.java
index 17188be..bfbaae7 100644
--- a/src/main/java/org/apache/sysml/hops/QuaternaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/QuaternaryOp.java
@@ -321,6 +321,74 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
 		setLops( wsloss );
 	}
 
+	private Lop obtainlU(Hop U, Hop V, boolean cacheU, double m1Size) throws HopsException, LopsException {
+		Lop lU = null;
+		if (cacheU) {
+			// partitioning of U for read through distributed cache
+			boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
+			lU = U.constructLops();
+			if (needPartU) { // requires partitioning
+				lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE,
+						(m1Size > OptimizerUtils.getLocalMemBudget()) ? ExecType.MR : ExecType.CP,
+						PDataPartitionFormat.ROW_BLOCK_WISE_N);
+				lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(),
+						U.getNnz());
+				setLineNumbers(lU);
+			}
+		} else {
+			// replication of U for shuffle to target block
+			Lop offset = createOffsetLop(V, false); // ncol of t(V) -> nrow of V determines num replicates
+			lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType());
+			lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(),
+					U.getNnz());
+			setLineNumbers(lU);
+
+			Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
+			grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(),
+					-1);
+			setLineNumbers(grpU);
+			lU = grpU;
+		}
+		return lU;
+	}
+
+	private Lop obtainlV(Hop U, Hop V, boolean cacheV, double m2Size) throws HopsException, LopsException {
+		Lop lV = null;
+		if (cacheV) {
+			// partitioning of V for read through distributed cache
+			boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
+			lV = V.constructLops();
+			if (needPartV) { // requires partitioning
+				lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE,
+						(m2Size > OptimizerUtils.getLocalMemBudget()) ? ExecType.MR : ExecType.CP,
+						PDataPartitionFormat.ROW_BLOCK_WISE_N);
+				lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(),
+						V.getNnz());
+				setLineNumbers(lV);
+			}
+		} else {
+			// replication of t(V) for shuffle to target block
+			Transform ltV = new Transform(V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(),
+					getValueType(), ExecType.MR);
+			ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(),
+					V.getNnz());
+			setLineNumbers(ltV);
+
+			Lop offset = createOffsetLop(U, false); // nrow of U determines num replicates
+			lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType());
+			lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(),
+					V.getNnz());
+			setLineNumbers(lV);
+
+			Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
+			grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(),
+					-1);
+			setLineNumbers(grpV);
+			lV = grpV;
+		}
+		return lV;
+	}
+
 	private void constructMRLopsWeightedSquaredLoss(WeightsType wtype) 
 		throws HopsException, LopsException
 	{
@@ -395,62 +463,10 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
 				grpW.getOutputParameters().setDimensions(W.getDim1(), W.getDim2(), W.getRowsInBlock(), W.getColsInBlock(), -1);
 				setLineNumbers(grpW);
 			}
-			
-			Lop lU = null;
-			if( cacheU ) {
-				//partitioning of U for read through distributed cache
-				boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-				lU = U.constructLops();
-				if( needPartU ){ //requires partitioning
-					lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
-					lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz());
-					setLineNumbers(lU);	
-				}
-			}
-			else {
-				//replication of U for shuffle to target block
-				Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates
-				lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType());
-				lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
-						U.getRowsInBlock(), U.getColsInBlock(), U.getNnz());
-				setLineNumbers(lU);
-				
-				Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-				grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1);
-				setLineNumbers(grpU);
-				lU = grpU;
-			}
-			
-			Lop lV = null;
-			if( cacheV ) {
-				//partitioning of V for read through distributed cache
-				boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-				lV = V.constructLops();
-				if( needPartV ){ //requires partitioning
-					lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
-					lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz());
-					setLineNumbers(lV);	
-				}
-			}
-			else {
-				//replication of t(V) for shuffle to target block
-				Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR);
-				ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-						V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
-				setLineNumbers(ltV);
-				
-				Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates
-				lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType());
-				lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-						V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
-				setLineNumbers(lV);
-				
-				Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-				grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1);
-				setLineNumbers(grpV);
-				lV = grpV;
-			}
-			
+
+			Lop lU = obtainlU(U, V, cacheU, m1Size);
+			Lop lV = obtainlV(U, V, cacheV, m2Size);
+
 			//reduce-side wsloss w/ or without broadcast
 			Lop wsloss = new WeightedSquaredLossR( 
 					grpX, lU, lV, grpW, DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR);
@@ -596,62 +612,10 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
 			Group grpX = new Group(X.constructLops(), Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
 			grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz());
 			setLineNumbers(grpX);
-			
-			Lop lU = null;
-			if( cacheU ) {
-				//partitioning of U for read through distributed cache
-				boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-				lU = U.constructLops();
-				if( needPartU ){ //requires partitioning
-					lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
-					lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz());
-					setLineNumbers(lU);	
-				}
-			}
-			else {
-				//replication of U for shuffle to target block
-				Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates
-				lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType());
-				lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
-						U.getRowsInBlock(), U.getColsInBlock(), U.getNnz());
-				setLineNumbers(lU);
-				
-				Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-				grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1);
-				setLineNumbers(grpU);
-				lU = grpU;
-			}
-			
-			Lop lV = null;
-			if( cacheV ) {
-				//partitioning of V for read through distributed cache
-				boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-				lV = V.constructLops();
-				if( needPartV ){ //requires partitioning
-					lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
-					lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz());
-					setLineNumbers(lV);	
-				}
-			}
-			else {
-				//replication of t(V) for shuffle to target block
-				Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR);
-				ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-						V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
-				setLineNumbers(ltV);
-				
-				Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates
-				lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType());
-				lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-						V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
-				setLineNumbers(lV);
-				
-				Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-				grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1);
-				setLineNumbers(grpV);
-				lV = grpV;
-			}
-			
+
+			Lop lU = obtainlU(U, V, cacheU, m1Size);
+			Lop lV = obtainlV(U, V, cacheV, m2Size);
+
 			//reduce-side wsig w/ or without broadcast
 			Lop wsigmoid = new WeightedSigmoidR( 
 					grpX, lU, lV, DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR);
@@ -792,62 +756,10 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
 				grpX = new Group(grpX, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
 			grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz());
 			setLineNumbers(grpX);
-			
-			Lop lU = null;
-			if( cacheU ) {
-				//partitioning of U for read through distributed cache
-				boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-				lU = U.constructLops();
-				if( needPartU ){ //requires partitioning
-					lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
-					lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz());
-					setLineNumbers(lU);	
-				}
-			}
-			else {
-				//replication of U for shuffle to target block
-				Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates
-				lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType());
-				lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
-						U.getRowsInBlock(), U.getColsInBlock(), U.getNnz());
-				setLineNumbers(lU);
-				
-				Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-				grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1);
-				setLineNumbers(grpU);
-				lU = grpU;
-			}
-			
-			Lop lV = null;
-			if( cacheV ) {
-				//partitioning of V for read through distributed cache
-				boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-				lV = V.constructLops();
-				if( needPartV ){ //requires partitioning
-					lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
-					lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz());
-					setLineNumbers(lV);	
-				}
-			}
-			else {
-				//replication of t(V) for shuffle to target block
-				Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR);
-				ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-						V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
-				setLineNumbers(ltV);
-				
-				Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates
-				lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType());
-				lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-						V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
-				setLineNumbers(lV);
-				
-				Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-				grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1);
-				setLineNumbers(grpV);
-				lV = grpV;
-			}
-			
+
+			Lop lU = obtainlU(U, V, cacheU, m1Size);
+			Lop lV = obtainlV(U, V, cacheV, m2Size);
+
 			//reduce-side wdivmm w/ or without broadcast
 			Lop wdivmm = new WeightedDivMMR( grpW, lU, lV, grpX, 
 					DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR);
@@ -1006,62 +918,10 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
 			Group grpX = new Group(X.constructLops(), Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
 			grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), -1);
 			setLineNumbers(grpX);
-			
-			Lop lU = null;
-			if( cacheU ) {
-				//partitioning of U for read through distributed cache
-				boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-				lU = U.constructLops();
-				if( needPartU ){ //requires partitioning
-					lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
-					lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz());
-					setLineNumbers(lU);	
-				}
-			}
-			else {
-				//replication of U for shuffle to target block
-				Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates
-				lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType());
-				lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
-						U.getRowsInBlock(), U.getColsInBlock(), U.getNnz());
-				setLineNumbers(lU);
-				
-				Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-				grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1);
-				setLineNumbers(grpU);
-				lU = grpU;
-			}
-			
-			Lop lV = null;
-			if( cacheV ) {
-				//partitioning of V for read through distributed cache
-				boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-				lV = V.constructLops();
-				if( needPartV ){ //requires partitioning
-					lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
-					lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz());
-					setLineNumbers(lV);	
-				}
-			}
-			else {
-				//replication of t(V) for shuffle to target block
-				Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR);
-				ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-						V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
-				setLineNumbers(ltV);
-				
-				Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates
-				lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType());
-				lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-						V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
-				setLineNumbers(lV);
-				
-				Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-				grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1);
-				setLineNumbers(grpV);
-				lV = grpV;
-			}
-			
+
+			Lop lU = obtainlU(U, V, cacheU, m1Size);
+			Lop lV = obtainlV(U, V, cacheV, m2Size);
+
 			//reduce-side wcemm w/ or without broadcast
 			Lop wcemm = new WeightedCrossEntropyR( grpX, lU, lV, eps.constructLops(),
 					DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR);
@@ -1215,62 +1075,10 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
 			Group grpX = new Group(X.constructLops(), Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
 			grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz());
 			setLineNumbers(grpX);
-			
-			Lop lU = null;
-			if( cacheU ) {
-				//partitioning of U for read through distributed cache
-				boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-				lU = U.constructLops();
-				if( needPartU ){ //requires partitioning
-					lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
-					lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz());
-					setLineNumbers(lU);	
-				}
-			}
-			else {
-				//replication of U for shuffle to target block
-				Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates
-				lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType());
-				lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), 
-						U.getRowsInBlock(), U.getColsInBlock(), U.getNnz());
-				setLineNumbers(lU);
-				
-				Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-				grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1);
-				setLineNumbers(grpU);
-				lU = grpU;
-			}
-			
-			Lop lV = null;
-			if( cacheV ) {
-				//partitioning of V for read through distributed cache
-				boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
-				lV = V.constructLops();
-				if( needPartV ){ //requires partitioning
-					lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
-					lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz());
-					setLineNumbers(lV);	
-				}
-			}
-			else {
-				//replication of t(V) for shuffle to target block
-				Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR);
-				ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-						V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
-				setLineNumbers(ltV);
-				
-				Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates
-				lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType());
-				lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), 
-						V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
-				setLineNumbers(lV);
-				
-				Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
-				grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1);
-				setLineNumbers(grpV);
-				lV = grpV;
-			}
-			
+
+			Lop lU = obtainlU(U, V, cacheU, m1Size);
+			Lop lV = obtainlV(U, V, cacheV, m2Size);
+
 			//reduce-side wumm w/ or without broadcast
 			Lop wumm = new WeightedUnaryMMR( 
 					grpX, lU, lV, DataType.MATRIX, ValueType.DOUBLE, wtype, uop, cacheU, cacheV, ExecType.MR);