You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by de...@apache.org on 2017/08/28 17:21:38 UTC
systemml git commit: [MINOR] Replace QuaternaryOp copy/pasted code
with methods
Repository: systemml
Updated Branches:
refs/heads/master d2efa65c8 -> 11e460573
[MINOR] Replace QuaternaryOp copy/pasted code with methods
Closes #640.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/11e46057
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/11e46057
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/11e46057
Branch: refs/heads/master
Commit: 11e4605735f139b18b008e34a2d505397097389c
Parents: d2efa65
Author: Deron Eriksson <de...@apache.org>
Authored: Mon Aug 28 10:19:48 2017 -0700
Committer: Deron Eriksson <de...@apache.org>
Committed: Mon Aug 28 10:19:48 2017 -0700
----------------------------------------------------------------------
.../org/apache/sysml/hops/QuaternaryOp.java | 368 +++++--------------
1 file changed, 88 insertions(+), 280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/11e46057/src/main/java/org/apache/sysml/hops/QuaternaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/QuaternaryOp.java b/src/main/java/org/apache/sysml/hops/QuaternaryOp.java
index 17188be..bfbaae7 100644
--- a/src/main/java/org/apache/sysml/hops/QuaternaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/QuaternaryOp.java
@@ -321,6 +321,74 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
setLops( wsloss );
}
+ private Lop obtainlU(Hop U, Hop V, boolean cacheU, double m1Size) throws HopsException, LopsException {
+ Lop lU = null;
+ if (cacheU) {
+ // partitioning of U for read through distributed cache
+ boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
+ lU = U.constructLops();
+ if (needPartU) { // requires partitioning
+ lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE,
+ (m1Size > OptimizerUtils.getLocalMemBudget()) ? ExecType.MR : ExecType.CP,
+ PDataPartitionFormat.ROW_BLOCK_WISE_N);
+ lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(),
+ U.getNnz());
+ setLineNumbers(lU);
+ }
+ } else {
+ // replication of U for shuffle to target block
+ Lop offset = createOffsetLop(V, false); // ncol of t(V) -> nrow of V determines num replicates
+ lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType());
+ lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(),
+ U.getNnz());
+ setLineNumbers(lU);
+
+ Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
+ grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(),
+ -1);
+ setLineNumbers(grpU);
+ lU = grpU;
+ }
+ return lU;
+ }
+
+ private Lop obtainlV(Hop U, Hop V, boolean cacheV, double m2Size) throws HopsException, LopsException {
+ Lop lV = null;
+ if (cacheV) {
+ // partitioning of V for read through distributed cache
+ boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
+ lV = V.constructLops();
+ if (needPartV) { // requires partitioning
+ lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE,
+ (m2Size > OptimizerUtils.getLocalMemBudget()) ? ExecType.MR : ExecType.CP,
+ PDataPartitionFormat.ROW_BLOCK_WISE_N);
+ lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(),
+ V.getNnz());
+ setLineNumbers(lV);
+ }
+ } else {
+ // replication of t(V) for shuffle to target block
+ Transform ltV = new Transform(V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(),
+ getValueType(), ExecType.MR);
+ ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(),
+ V.getNnz());
+ setLineNumbers(ltV);
+
+ Lop offset = createOffsetLop(U, false); // nrow of U determines num replicates
+ lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType());
+ lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(),
+ V.getNnz());
+ setLineNumbers(lV);
+
+ Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
+ grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(),
+ -1);
+ setLineNumbers(grpV);
+ lV = grpV;
+ }
+ return lV;
+ }
+
private void constructMRLopsWeightedSquaredLoss(WeightsType wtype)
throws HopsException, LopsException
{
@@ -395,62 +463,10 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
grpW.getOutputParameters().setDimensions(W.getDim1(), W.getDim2(), W.getRowsInBlock(), W.getColsInBlock(), -1);
setLineNumbers(grpW);
}
-
- Lop lU = null;
- if( cacheU ) {
- //partitioning of U for read through distributed cache
- boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
- lU = U.constructLops();
- if( needPartU ){ //requires partitioning
- lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
- lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz());
- setLineNumbers(lU);
- }
- }
- else {
- //replication of U for shuffle to target block
- Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates
- lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType());
- lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(),
- U.getRowsInBlock(), U.getColsInBlock(), U.getNnz());
- setLineNumbers(lU);
-
- Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
- grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1);
- setLineNumbers(grpU);
- lU = grpU;
- }
-
- Lop lV = null;
- if( cacheV ) {
- //partitioning of V for read through distributed cache
- boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
- lV = V.constructLops();
- if( needPartV ){ //requires partitioning
- lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
- lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz());
- setLineNumbers(lV);
- }
- }
- else {
- //replication of t(V) for shuffle to target block
- Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR);
- ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(),
- V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
- setLineNumbers(ltV);
-
- Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates
- lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType());
- lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(),
- V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
- setLineNumbers(lV);
-
- Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
- grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1);
- setLineNumbers(grpV);
- lV = grpV;
- }
-
+
+ Lop lU = obtainlU(U, V, cacheU, m1Size);
+ Lop lV = obtainlV(U, V, cacheV, m2Size);
+
//reduce-side wsloss w/ or without broadcast
Lop wsloss = new WeightedSquaredLossR(
grpX, lU, lV, grpW, DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR);
@@ -596,62 +612,10 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
Group grpX = new Group(X.constructLops(), Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz());
setLineNumbers(grpX);
-
- Lop lU = null;
- if( cacheU ) {
- //partitioning of U for read through distributed cache
- boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
- lU = U.constructLops();
- if( needPartU ){ //requires partitioning
- lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
- lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz());
- setLineNumbers(lU);
- }
- }
- else {
- //replication of U for shuffle to target block
- Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates
- lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType());
- lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(),
- U.getRowsInBlock(), U.getColsInBlock(), U.getNnz());
- setLineNumbers(lU);
-
- Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
- grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1);
- setLineNumbers(grpU);
- lU = grpU;
- }
-
- Lop lV = null;
- if( cacheV ) {
- //partitioning of V for read through distributed cache
- boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
- lV = V.constructLops();
- if( needPartV ){ //requires partitioning
- lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
- lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz());
- setLineNumbers(lV);
- }
- }
- else {
- //replication of t(V) for shuffle to target block
- Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR);
- ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(),
- V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
- setLineNumbers(ltV);
-
- Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates
- lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType());
- lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(),
- V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
- setLineNumbers(lV);
-
- Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
- grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1);
- setLineNumbers(grpV);
- lV = grpV;
- }
-
+
+ Lop lU = obtainlU(U, V, cacheU, m1Size);
+ Lop lV = obtainlV(U, V, cacheV, m2Size);
+
//reduce-side wsig w/ or without broadcast
Lop wsigmoid = new WeightedSigmoidR(
grpX, lU, lV, DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR);
@@ -792,62 +756,10 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
grpX = new Group(grpX, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz());
setLineNumbers(grpX);
-
- Lop lU = null;
- if( cacheU ) {
- //partitioning of U for read through distributed cache
- boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
- lU = U.constructLops();
- if( needPartU ){ //requires partitioning
- lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
- lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz());
- setLineNumbers(lU);
- }
- }
- else {
- //replication of U for shuffle to target block
- Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates
- lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType());
- lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(),
- U.getRowsInBlock(), U.getColsInBlock(), U.getNnz());
- setLineNumbers(lU);
-
- Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
- grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1);
- setLineNumbers(grpU);
- lU = grpU;
- }
-
- Lop lV = null;
- if( cacheV ) {
- //partitioning of V for read through distributed cache
- boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
- lV = V.constructLops();
- if( needPartV ){ //requires partitioning
- lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
- lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz());
- setLineNumbers(lV);
- }
- }
- else {
- //replication of t(V) for shuffle to target block
- Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR);
- ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(),
- V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
- setLineNumbers(ltV);
-
- Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates
- lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType());
- lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(),
- V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
- setLineNumbers(lV);
-
- Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
- grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1);
- setLineNumbers(grpV);
- lV = grpV;
- }
-
+
+ Lop lU = obtainlU(U, V, cacheU, m1Size);
+ Lop lV = obtainlV(U, V, cacheV, m2Size);
+
//reduce-side wdivmm w/ or without broadcast
Lop wdivmm = new WeightedDivMMR( grpW, lU, lV, grpX,
DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR);
@@ -1006,62 +918,10 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
Group grpX = new Group(X.constructLops(), Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), -1);
setLineNumbers(grpX);
-
- Lop lU = null;
- if( cacheU ) {
- //partitioning of U for read through distributed cache
- boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
- lU = U.constructLops();
- if( needPartU ){ //requires partitioning
- lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
- lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz());
- setLineNumbers(lU);
- }
- }
- else {
- //replication of U for shuffle to target block
- Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates
- lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType());
- lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(),
- U.getRowsInBlock(), U.getColsInBlock(), U.getNnz());
- setLineNumbers(lU);
-
- Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
- grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1);
- setLineNumbers(grpU);
- lU = grpU;
- }
-
- Lop lV = null;
- if( cacheV ) {
- //partitioning of V for read through distributed cache
- boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
- lV = V.constructLops();
- if( needPartV ){ //requires partitioning
- lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
- lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz());
- setLineNumbers(lV);
- }
- }
- else {
- //replication of t(V) for shuffle to target block
- Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR);
- ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(),
- V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
- setLineNumbers(ltV);
-
- Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates
- lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType());
- lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(),
- V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
- setLineNumbers(lV);
-
- Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
- grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1);
- setLineNumbers(grpV);
- lV = grpV;
- }
-
+
+ Lop lU = obtainlU(U, V, cacheU, m1Size);
+ Lop lV = obtainlV(U, V, cacheV, m2Size);
+
//reduce-side wcemm w/ or without broadcast
Lop wcemm = new WeightedCrossEntropyR( grpX, lU, lV, eps.constructLops(),
DataType.MATRIX, ValueType.DOUBLE, wtype, cacheU, cacheV, ExecType.MR);
@@ -1215,62 +1075,10 @@ public class QuaternaryOp extends Hop implements MultiThreadedHop
Group grpX = new Group(X.constructLops(), Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
grpX.getOutputParameters().setDimensions(X.getDim1(), X.getDim2(), X.getRowsInBlock(), X.getColsInBlock(), X.getNnz());
setLineNumbers(grpX);
-
- Lop lU = null;
- if( cacheU ) {
- //partitioning of U for read through distributed cache
- boolean needPartU = !U.dimsKnown() || U.getDim1() * U.getDim2() > DistributedCacheInput.PARTITION_SIZE;
- lU = U.constructLops();
- if( needPartU ){ //requires partitioning
- lU = new DataPartition(lU, DataType.MATRIX, ValueType.DOUBLE, (m1Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
- lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), getRowsInBlock(), getColsInBlock(), U.getNnz());
- setLineNumbers(lU);
- }
- }
- else {
- //replication of U for shuffle to target block
- Lop offset = createOffsetLop(V, false); //ncol of t(V) -> nrow of V determines num replicates
- lU = new RepMat(U.constructLops(), offset, true, V.getDataType(), V.getValueType());
- lU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(),
- U.getRowsInBlock(), U.getColsInBlock(), U.getNnz());
- setLineNumbers(lU);
-
- Group grpU = new Group(lU, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
- grpU.getOutputParameters().setDimensions(U.getDim1(), U.getDim2(), U.getRowsInBlock(), U.getColsInBlock(), -1);
- setLineNumbers(grpU);
- lU = grpU;
- }
-
- Lop lV = null;
- if( cacheV ) {
- //partitioning of V for read through distributed cache
- boolean needPartV = !V.dimsKnown() || V.getDim1() * V.getDim2() > DistributedCacheInput.PARTITION_SIZE;
- lV = V.constructLops();
- if( needPartV ){ //requires partitioning
- lV = new DataPartition(lV, DataType.MATRIX, ValueType.DOUBLE, (m2Size>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
- lV.getOutputParameters().setDimensions(V.getDim1(), V.getDim2(), getRowsInBlock(), getColsInBlock(), V.getNnz());
- setLineNumbers(lV);
- }
- }
- else {
- //replication of t(V) for shuffle to target block
- Transform ltV = new Transform( V.constructLops(), HopsTransf2Lops.get(ReOrgOp.TRANSPOSE), getDataType(), getValueType(), ExecType.MR);
- ltV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(),
- V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
- setLineNumbers(ltV);
-
- Lop offset = createOffsetLop(U, false); //nrow of U determines num replicates
- lV = new RepMat(ltV, offset, false, V.getDataType(), V.getValueType());
- lV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(),
- V.getColsInBlock(), V.getRowsInBlock(), V.getNnz());
- setLineNumbers(lV);
-
- Group grpV = new Group(lV, Group.OperationTypes.Sort, DataType.MATRIX, ValueType.DOUBLE);
- grpV.getOutputParameters().setDimensions(V.getDim2(), V.getDim1(), V.getColsInBlock(), V.getRowsInBlock(), -1);
- setLineNumbers(grpV);
- lV = grpV;
- }
-
+
+ Lop lU = obtainlU(U, V, cacheU, m1Size);
+ Lop lV = obtainlV(U, V, cacheV, m2Size);
+
//reduce-side wumm w/ or without broadcast
Lop wumm = new WeightedUnaryMMR(
grpX, lU, lV, DataType.MATRIX, ValueType.DOUBLE, wtype, uop, cacheU, cacheV, ExecType.MR);