You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/04/17 06:59:05 UTC
[1/3] systemml git commit: [MINOR] Simplify spark key-value list
handling via streams
Repository: systemml
Updated Branches:
refs/heads/master a044ab21d -> f5d97c551
[MINOR] Simplify spark key-value list handling via streams
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/25feb997
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/25feb997
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/25feb997
Branch: refs/heads/master
Commit: 25feb997978e990eb5dc11eaaa2dc6ef7af03c61
Parents: a044ab2
Author: Matthias Boehm <mb...@gmail.com>
Authored: Mon Apr 16 19:31:47 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Mon Apr 16 19:31:47 2018 -0700
----------------------------------------------------------------------
.../instructions/spark/utils/SparkUtils.java | 28 ++++++--------------
.../matrix/data/OperationsOnMatrixValues.java | 7 +++--
2 files changed, 11 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/25feb997/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
index cdd64f0..49232da 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
@@ -70,40 +70,28 @@ public class SparkUtils
return new Tuple2<>(in.getIndexes(), (MatrixBlock)in.getValue());
}
- public static ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> fromIndexedMatrixBlock( ArrayList<IndexedMatrixValue> in ) {
- ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<>();
- for( IndexedMatrixValue imv : in )
- ret.add(fromIndexedMatrixBlock(imv));
- return ret;
+ public static List<Tuple2<MatrixIndexes,MatrixBlock>> fromIndexedMatrixBlock( List<IndexedMatrixValue> in ) {
+ return in.stream().map(imv -> fromIndexedMatrixBlock(imv)).collect(Collectors.toList());
}
public static Pair<MatrixIndexes,MatrixBlock> fromIndexedMatrixBlockToPair( IndexedMatrixValue in ){
return new Pair<>(in.getIndexes(), (MatrixBlock)in.getValue());
}
- public static ArrayList<Pair<MatrixIndexes,MatrixBlock>> fromIndexedMatrixBlockToPair( ArrayList<IndexedMatrixValue> in ) {
- ArrayList<Pair<MatrixIndexes,MatrixBlock>> ret = new ArrayList<>();
- for( IndexedMatrixValue imv : in )
- ret.add(fromIndexedMatrixBlockToPair(imv));
- return ret;
+ public static List<Pair<MatrixIndexes,MatrixBlock>> fromIndexedMatrixBlockToPair( List<IndexedMatrixValue> in ) {
+ return in.stream().map(imv -> fromIndexedMatrixBlockToPair(imv)).collect(Collectors.toList());
}
public static Tuple2<Long,FrameBlock> fromIndexedFrameBlock( Pair<Long, FrameBlock> in ){
return new Tuple2<>(in.getKey(), in.getValue());
}
- public static ArrayList<Tuple2<Long,FrameBlock>> fromIndexedFrameBlock( ArrayList<Pair<Long, FrameBlock>> in ) {
- ArrayList<Tuple2<Long, FrameBlock>> ret = new ArrayList<>();
- for( Pair<Long, FrameBlock> ifv : in )
- ret.add(fromIndexedFrameBlock(ifv));
- return ret;
+ public static List<Tuple2<Long,FrameBlock>> fromIndexedFrameBlock(List<Pair<Long, FrameBlock>> in) {
+ return in.stream().map(ifv -> fromIndexedFrameBlock(ifv)).collect(Collectors.toList());
}
- public static ArrayList<Pair<Long,Long>> toIndexedLong( List<Tuple2<Long, Long>> in ) {
- ArrayList<Pair<Long, Long>> ret = new ArrayList<>();
- for( Tuple2<Long, Long> e : in )
- ret.add(new Pair<>(e._1(), e._2()));
- return ret;
+ public static List<Pair<Long,Long>> toIndexedLong( List<Tuple2<Long, Long>> in ) {
+ return in.stream().map(e -> new Pair<>(e._1(), e._2())).collect(Collectors.toList());
}
public static Pair<Long,FrameBlock> toIndexedFrameBlock( Tuple2<Long,FrameBlock> in ) {
http://git-wip-us.apache.org/repos/asf/systemml/blob/25feb997/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
index a698e46..1e1c003 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
@@ -22,6 +22,7 @@ package org.apache.sysml.runtime.matrix.data;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.DMLRuntimeException;
@@ -238,7 +239,7 @@ public class OperationsOnMatrixValues
}
@SuppressWarnings("rawtypes")
- public static ArrayList performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, CacheBlock in) {
+ public static List performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, CacheBlock in) {
if( in instanceof MatrixBlock )
return performSlice(ixrange, brlen, bclen, iix, jix, (MatrixBlock)in);
else if( in instanceof FrameBlock )
@@ -246,13 +247,11 @@ public class OperationsOnMatrixValues
throw new DMLRuntimeException("Unsupported cache block type: "+in.getClass().getName());
}
-
@SuppressWarnings("rawtypes")
- public static ArrayList performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, MatrixBlock in) {
+ public static List performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, MatrixBlock in) {
IndexedMatrixValue imv = new IndexedMatrixValue(new MatrixIndexes(iix, jix), (MatrixBlock)in);
ArrayList<IndexedMatrixValue> outlist = new ArrayList<>();
performSlice(imv, ixrange, brlen, bclen, outlist);
-
return SparkUtils.fromIndexedMatrixBlockToPair(outlist);
}
[2/3] systemml git commit: [SYSTEMML-2246] Performance dense-sparse
vector transpose (in MCSR)
Posted by mb...@apache.org.
[SYSTEMML-2246] Performance dense-sparse vector transpose (in MCSR)
This patch makes a moderate performance improvement for dense-sparse
vector transpose which avoids unnecessary binary search operations per
vector. This is useful for scenarios with ultra-sparse vectors and spark
operations where large vectors are partitioned into many 1K vectors.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/6aaea2fd
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/6aaea2fd
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/6aaea2fd
Branch: refs/heads/master
Commit: 6aaea2fddfcf0937c7f02bcc181c7684718b98bf
Parents: 25feb99
Author: Matthias Boehm <mb...@gmail.com>
Authored: Mon Apr 16 19:57:09 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Mon Apr 16 19:57:09 2018 -0700
----------------------------------------------------------------------
.../sysml/runtime/matrix/data/SparseRowVector.java | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/6aaea2fd/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
index 4f05af0..696f417 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
@@ -209,8 +209,9 @@ public final class SparseRowVector extends SparseRow implements Serializable
return (index >= 0) ? values[index] : 0;
}
- public int searchIndexesFirstLTE(int col)
- {
+ public int searchIndexesFirstLTE(int col) {
+ if( size == 0 ) return -1;
+
//search for existing col index
int index = Arrays.binarySearch(indexes, 0, size, col);
if( index >= 0 )
@@ -221,8 +222,9 @@ public final class SparseRowVector extends SparseRow implements Serializable
return (index-1 < size) ? index-1 : -1;
}
- public int searchIndexesFirstGTE(int col)
- {
+ public int searchIndexesFirstGTE(int col) {
+ if( size == 0 ) return -1;
+
//search for existing col index
int index = Arrays.binarySearch(indexes, 0, size, col);
if( index >= 0 )
@@ -233,8 +235,9 @@ public final class SparseRowVector extends SparseRow implements Serializable
return (index < size) ? index : -1;
}
- public int searchIndexesFirstGT(int col)
- {
+ public int searchIndexesFirstGT(int col) {
+ if( size == 0 ) return -1;
+
//search for existing col index
int index = Arrays.binarySearch(indexes, 0, size, col);
if( index >= 0 )
[3/3] systemml git commit: [SYSTEMML-2237] Performance spark mapmm
(lazy-iter) / reshape (sparse)
Posted by mb...@apache.org.
[SYSTEMML-2237] Performance spark mapmm (lazy-iter) / reshape (sparse)
This patch makes a minor performance improvement to spark mapmm
operations that require flatmap, by returning a lazy iterator which has
the potential to improve memory efficiency and thus reduce unnecessary
GC overhead. Furthermore, this also includes some cleanups including
spark reshape operations.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/f5d97c55
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/f5d97c55
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/f5d97c55
Branch: refs/heads/master
Commit: f5d97c55162bc9c7db57e685c04be9aafe9657f6
Parents: 6aaea2f
Author: Matthias Boehm <mb...@gmail.com>
Authored: Mon Apr 16 23:41:31 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Mon Apr 16 23:41:31 2018 -0700
----------------------------------------------------------------------
.../mr/AggregateBinaryInstruction.java | 6 +-
.../mr/MatrixReshapeMRInstruction.java | 22 +--
.../instructions/spark/CpmmSPInstruction.java | 4 +-
.../instructions/spark/MapmmSPInstruction.java | 49 +++---
.../spark/MatrixReshapeSPInstruction.java | 6 +-
.../instructions/spark/PMapmmSPInstruction.java | 4 +-
.../instructions/spark/RmmSPInstruction.java | 3 +-
.../instructions/spark/Tsmm2SPInstruction.java | 4 +-
.../instructions/spark/ZipmmSPInstruction.java | 5 +-
.../runtime/matrix/data/LibMatrixReorg.java | 151 ++++++++-----------
.../matrix/data/OperationsOnMatrixValues.java | 4 +-
.../mapred/MMCJMRReducerWithAggregator.java | 8 +-
12 files changed, 110 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/mr/AggregateBinaryInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/mr/AggregateBinaryInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/mr/AggregateBinaryInstruction.java
index 2e75c9a..3c4a10c 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/mr/AggregateBinaryInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/mr/AggregateBinaryInstruction.java
@@ -154,7 +154,7 @@ public class AggregateBinaryInstruction extends BinaryMRInstructionBase implemen
out=cachedValues.holdPlace(output, valueClass);
//process instruction
- OperationsOnMatrixValues.performAggregateBinary(
+ OperationsOnMatrixValues.matMult(
in1.getIndexes(), (MatrixBlock) in1.getValue(),
in2.getIndexes(), (MatrixBlock) in2.getValue(),
out.getIndexes(), (MatrixBlock) out.getValue(),
@@ -200,7 +200,7 @@ public class AggregateBinaryInstruction extends BinaryMRInstructionBase implemen
IndexedMatrixValue out = cachedValues.holdPlace(output, valueClass);
//process instruction
- OperationsOnMatrixValues.performAggregateBinary(in1.getIndexes(), (MatrixBlock)in1.getValue(),
+ OperationsOnMatrixValues.matMult(in1.getIndexes(), (MatrixBlock)in1.getValue(),
in2BlockIndex, (MatrixBlock) in2BlockValue, out.getIndexes(), (MatrixBlock)out.getValue(),
((AggregateBinaryOperator)optr));
removeOutput &= ( !_outputEmptyBlocks && out.getValue().isEmpty() );
@@ -227,7 +227,7 @@ public class AggregateBinaryInstruction extends BinaryMRInstructionBase implemen
IndexedMatrixValue out = cachedValues.holdPlace(output, valueClass);
//process instruction
- OperationsOnMatrixValues.performAggregateBinary(
+ OperationsOnMatrixValues.matMult(
in1BlockIndex, (MatrixBlock)in1BlockValue,
in2.getIndexes(), (MatrixBlock)in2.getValue(),
out.getIndexes(), (MatrixBlock)out.getValue(),
http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/mr/MatrixReshapeMRInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/mr/MatrixReshapeMRInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/mr/MatrixReshapeMRInstruction.java
index fc1e05f..802851c 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/mr/MatrixReshapeMRInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/mr/MatrixReshapeMRInstruction.java
@@ -20,6 +20,7 @@
package org.apache.sysml.runtime.instructions.mr;
import java.util.ArrayList;
+import java.util.List;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.instructions.InstructionUtils;
@@ -36,8 +37,6 @@ public class MatrixReshapeMRInstruction extends UnaryInstruction {
private MatrixCharacteristics _mcIn = null;
private MatrixCharacteristics _mcOut = null;
- private ArrayList<IndexedMatrixValue> _cache = null;
-
private MatrixReshapeMRInstruction(Operator op, byte in, long rows, long cols, boolean byrow, byte out,
String istr) {
super(MRType.MMTSJ, op, in, out, istr);
@@ -46,8 +45,7 @@ public class MatrixReshapeMRInstruction extends UnaryInstruction {
_byrow = byrow;
}
- public void setMatrixCharacteristics( MatrixCharacteristics mcIn, MatrixCharacteristics mcOut )
- {
+ public void setMatrixCharacteristics( MatrixCharacteristics mcIn, MatrixCharacteristics mcOut ) {
_mcIn = mcIn;
}
@@ -73,25 +71,17 @@ public class MatrixReshapeMRInstruction extends UnaryInstruction {
{
ArrayList<IndexedMatrixValue> blkList = cachedValues.get(input);
if( blkList != null )
- for(IndexedMatrixValue imv : blkList)
- {
- if( imv == null )
- continue;
+ for(IndexedMatrixValue imv : blkList) {
+ if( imv == null ) continue;
- //get cached blocks
- ArrayList<IndexedMatrixValue> out = _cache;
-
//process instruction
_mcOut.setBlockSize(brlen, bclen);
- out = LibMatrixReorg.reshape(imv, _mcIn, out, _mcOut, _byrow, true);
+ List<IndexedMatrixValue> out =
+ LibMatrixReorg.reshape(imv, _mcIn, _mcOut, _byrow, true);
//put the output values in the output cache
for( IndexedMatrixValue outBlk : out )
cachedValues.add(output, outBlk);
-
- //put blocks into own cache
- if( LibMatrixReorg.ALLOW_BLOCK_REUSE )
- _cache = out;
}
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
index de08d83..308e60f 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
@@ -207,7 +207,7 @@ public class CpmmSPInstruction extends BinarySPInstruction {
//core block matrix multiplication
MatrixBlock blkOut = OperationsOnMatrixValues
- .performAggregateBinaryIgnoreIndexes(blkIn1, blkIn2, new MatrixBlock(), _op);
+ .matMult(blkIn1, blkIn2, new MatrixBlock(), _op);
//return target block
ixOut.setIndexes(arg0._2()._1().getIndexes().getRowIndex(),
@@ -236,7 +236,7 @@ public class CpmmSPInstruction extends BinarySPInstruction {
.reorgOperations(_rop, new MatrixBlock(), 0, 0, 0);
//core block matrix multiplication
return OperationsOnMatrixValues
- .performAggregateBinaryIgnoreIndexes(in1, in2, new MatrixBlock(), _op);
+ .matMult(in1, in2, new MatrixBlock(), _op);
}
}
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
index 21c1be3..7c8d606 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
@@ -20,8 +20,8 @@
package org.apache.sysml.runtime.instructions.spark;
-import java.util.ArrayList;
import java.util.Iterator;
+import java.util.stream.IntStream;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@@ -275,8 +275,8 @@ public class MapmmSPInstruction extends BinarySPInstruction {
MatrixBlock left = _pbc.getBlock(1, (int)ixIn.getRowIndex());
//execute matrix-vector mult
- OperationsOnMatrixValues.performAggregateBinary(
- new MatrixIndexes(1,ixIn.getRowIndex()), left, ixIn, blkIn, ixOut, blkOut, _op);
+ OperationsOnMatrixValues.matMult(new MatrixIndexes(1,ixIn.getRowIndex()),
+ left, ixIn, blkIn, ixOut, blkOut, _op);
}
else //if( _type == CacheType.RIGHT )
{
@@ -284,8 +284,8 @@ public class MapmmSPInstruction extends BinarySPInstruction {
MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), 1);
//execute matrix-vector mult
- OperationsOnMatrixValues.performAggregateBinary(
- ixIn, blkIn, new MatrixIndexes(ixIn.getColumnIndex(),1), right, ixOut, blkOut, _op);
+ OperationsOnMatrixValues.matMult(ixIn, blkIn,
+ new MatrixIndexes(ixIn.getColumnIndex(),1), right, ixOut, blkOut, _op);
}
//output new tuple
@@ -327,7 +327,7 @@ public class MapmmSPInstruction extends BinarySPInstruction {
MatrixBlock left = _pbc.getBlock(1, (int)ixIn.getRowIndex());
//execute matrix-vector mult
- return OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes(
+ return OperationsOnMatrixValues.matMult(
left, blkIn, new MatrixBlock(), _op);
}
else //if( _type == CacheType.RIGHT )
@@ -336,7 +336,7 @@ public class MapmmSPInstruction extends BinarySPInstruction {
MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), 1);
//execute matrix-vector mult
- return OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes(
+ return OperationsOnMatrixValues.matMult(
blkIn, right, new MatrixBlock(), _op);
}
}
@@ -392,7 +392,7 @@ public class MapmmSPInstruction extends BinarySPInstruction {
MatrixBlock left = _pbc.getBlock(1, (int)ixIn.getRowIndex());
//execute index preserving matrix multiplication
- OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes(left, blkIn, blkOut, _op);
+ OperationsOnMatrixValues.matMult(left, blkIn, blkOut, _op);
}
else //if( _type == CacheType.RIGHT )
{
@@ -400,7 +400,7 @@ public class MapmmSPInstruction extends BinarySPInstruction {
MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), 1);
//execute index preserving matrix multiplication
- OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes(blkIn, right, blkOut, _op);
+ OperationsOnMatrixValues.matMult(blkIn, right, blkOut, _op);
}
return new Tuple2<>(ixIn, blkOut);
@@ -430,32 +430,23 @@ public class MapmmSPInstruction extends BinarySPInstruction {
public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 )
throws Exception
{
- ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<>();
MatrixIndexes ixIn = arg0._1();
MatrixBlock blkIn = arg0._2();
-
+
if( _type == CacheType.LEFT ) {
- //for all matching left-hand-side blocks
- int len = _pbc.getNumRowBlocks();
- for( int i=1; i<=len; i++ ) {
- MatrixBlock left = _pbc.getBlock(i, (int)ixIn.getRowIndex());
- MatrixBlock blkOut = OperationsOnMatrixValues
- .performAggregateBinaryIgnoreIndexes(left, blkIn, new MatrixBlock(), _op);
- ret.add(new Tuple2<>(new MatrixIndexes(i, ixIn.getColumnIndex()), blkOut));
- }
+ //for all matching left-hand-side blocks, returned as lazy iterator
+ return IntStream.range(1, _pbc.getNumRowBlocks()+1).mapToObj(i ->
+ new Tuple2<>(new MatrixIndexes(i, ixIn.getColumnIndex()),
+ OperationsOnMatrixValues.matMult(_pbc.getBlock(i, (int)ixIn.getRowIndex()), blkIn,
+ new MatrixBlock(), _op))).iterator();
}
else { //RIGHT
- //for all matching right-hand-side blocks
- int len = _pbc.getNumColumnBlocks();
- for( int j=1; j<=len; j++ ) {
- MatrixBlock right = _pbc.getBlock((int)ixIn.getColumnIndex(), j);
- MatrixBlock blkOut = OperationsOnMatrixValues
- .performAggregateBinaryIgnoreIndexes(blkIn, right, new MatrixBlock(), _op);
- ret.add(new Tuple2<>(new MatrixIndexes(ixIn.getRowIndex(), j), blkOut));
- }
+ //for all matching right-hand-side blocks, returned as lazy iterator
+ return IntStream.range(1, _pbc.getNumColumnBlocks()+1).mapToObj(j ->
+ new Tuple2<>(new MatrixIndexes(ixIn.getRowIndex(), j),
+ OperationsOnMatrixValues.matMult(blkIn, _pbc.getBlock((int)ixIn.getColumnIndex(), j),
+ new MatrixBlock(), _op))).iterator();
}
-
- return ret.iterator();
}
}
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
index 8a1c325..97f112c 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
@@ -19,8 +19,8 @@
package org.apache.sysml.runtime.instructions.spark;
-import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
@@ -138,8 +138,8 @@ public class MatrixReshapeSPInstruction extends UnarySPInstruction
IndexedMatrixValue in = SparkUtils.toIndexedMatrixBlock(arg0);
//execute actual reshape operation
- ArrayList<IndexedMatrixValue> out = LibMatrixReorg
- .reshape(in, _mcIn, new ArrayList<>(), _mcOut, _byrow, _outputEmptyBlocks);
+ List<IndexedMatrixValue> out = LibMatrixReorg
+ .reshape(in, _mcIn, _mcOut, _byrow, _outputEmptyBlocks);
//output conversion (for compatibility w/ rdd schema)
return SparkUtils.fromIndexedMatrixBlock(out).iterator();
http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
index 2e7ac11..1b6435b 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
@@ -193,8 +193,8 @@ public class PMapmmSPInstruction extends BinarySPInstruction {
MatrixBlock left = pm.getBlock(i, (int)ixIn.getRowIndex());
//execute matrix-vector mult
- OperationsOnMatrixValues.performAggregateBinary(
- new MatrixIndexes(i,ixIn.getRowIndex()), left, ixIn, blkIn, ixOut, blkOut, _op);
+ OperationsOnMatrixValues.matMult(new MatrixIndexes(i,ixIn.getRowIndex()),
+ left, ixIn, blkIn, ixOut, blkOut, _op);
//output new tuple
ixOut.setIndexes(_offset+i, ixOut.getColumnIndex());
http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
index 294c142..90e5396 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
@@ -191,8 +191,7 @@ public class RmmSPInstruction extends BinarySPInstruction {
MatrixBlock blkIn2 = arg0._2()._2();
//core block matrix multiplication
- MatrixBlock blkOut = OperationsOnMatrixValues
- .performAggregateBinaryIgnoreIndexes(blkIn1, blkIn2, new MatrixBlock(), _op);
+ MatrixBlock blkOut = OperationsOnMatrixValues.matMult(blkIn1, blkIn2, new MatrixBlock(), _op);
//output new tuple
return new Tuple2<>(ixOut, blkOut);
http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
index cabc2c8..5bb686b 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
@@ -154,7 +154,7 @@ public class Tsmm2SPInstruction extends UnarySPInstruction {
(int)(_type.isLeft()?1:ixin.getColumnIndex()));
MatrixBlock mbin2t = transpose(mbin2, new MatrixBlock()); //prep for transpose rewrite mm
- MatrixBlock out2 = (MatrixBlock) OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes( //mm
+ MatrixBlock out2 = (MatrixBlock) OperationsOnMatrixValues.matMult( //mm
_type.isLeft() ? mbin2t : mbin, _type.isLeft() ? mbin : mbin2t, new MatrixBlock(), _op);
MatrixIndexes ixout2 = _type.isLeft() ? new MatrixIndexes(2,1) : new MatrixIndexes(1,2);
ret.add(new Tuple2<>(ixout2, out2));
@@ -215,7 +215,7 @@ public class Tsmm2SPInstruction extends UnarySPInstruction {
(int)(_type.isLeft()?1:ixin.getColumnIndex()));
MatrixBlock mbin2t = transpose(mbin2, new MatrixBlock()); //prep for transpose rewrite mm
- MatrixBlock out2 = OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes( //mm
+ MatrixBlock out2 = OperationsOnMatrixValues.matMult( //mm
_type.isLeft() ? mbin2t : mbin, _type.isLeft() ? mbin : mbin2t, new MatrixBlock(), _op);
MatrixIndexes ixout2 = _type.isLeft() ? new MatrixIndexes(2,1) : new MatrixIndexes(1,2);
http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java
index 4f168c1..42313fa 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ZipmmSPInstruction.java
@@ -123,10 +123,9 @@ public class ZipmmSPInstruction extends BinarySPInstruction {
//transpose right input (for vectors no-op)
MatrixBlock tmp = (MatrixBlock)in2.reorgOperations(_rop, new MatrixBlock(), 0, 0, 0);
-
+
//core matrix multiplication (for t(y)%*%X or t(X)%*%y)
- return OperationsOnMatrixValues
- .performAggregateBinaryIgnoreIndexes(tmp, in1, new MatrixBlock(), _abop);
+ return OperationsOnMatrixValues.matMult(tmp, in1, new MatrixBlock(), _abop);
}
}
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
index 8de6f13..78b730c 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
@@ -28,10 +28,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map.Entry;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.stream.Collectors;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
@@ -69,9 +70,6 @@ public class LibMatrixReorg
//safe due to copy-on-write and safe update-in-place handling)
public static final boolean SHALLOW_COPY_REORG = true;
- //allow reuse of temporary blocks for certain operations
- public static final boolean ALLOW_BLOCK_REUSE = false;
-
//use csr instead of mcsr sparse block for rexpand columns / diag v2m
public static final boolean SPARSE_OUTPUTS_IN_CSR = true;
@@ -465,15 +463,15 @@ public class LibMatrixReorg
* @param outputEmptyBlocks output blocks with nnz=0
* @return list of indexed matrix values
*/
- public static ArrayList<IndexedMatrixValue> reshape( IndexedMatrixValue in, MatrixCharacteristics mcIn,
- ArrayList<IndexedMatrixValue> out, MatrixCharacteristics mcOut, boolean rowwise, boolean outputEmptyBlocks ) {
+ public static List<IndexedMatrixValue> reshape(IndexedMatrixValue in, MatrixCharacteristics mcIn,
+ MatrixCharacteristics mcOut, boolean rowwise, boolean outputEmptyBlocks ) {
//prepare inputs
MatrixIndexes ixIn = in.getIndexes();
MatrixBlock mbIn = (MatrixBlock) in.getValue();
//prepare result blocks (no reuse in order to guarantee mem constraints)
Collection<MatrixIndexes> rix = computeAllResultBlockIndexes(ixIn, mcIn, mcOut, mbIn, rowwise, outputEmptyBlocks);
- HashMap<MatrixIndexes, MatrixBlock> rblk = createAllResultBlocks(rix, mbIn.nonZeros, mcIn, mcOut, rowwise, out);
+ Map<MatrixIndexes, MatrixBlock> rblk = createAllResultBlocks(rix, mbIn.nonZeros, mcOut);
//basic algorithm
long row_offset = (ixIn.getRowIndex()-1)*mcIn.getRowsPerBlock();
@@ -483,15 +481,11 @@ public class LibMatrixReorg
else //dense
reshapeDense(mbIn, row_offset, col_offset, rblk, mcIn, mcOut, rowwise);
- //prepare output
- out = new ArrayList<>();
- for( Entry<MatrixIndexes, MatrixBlock> e : rblk.entrySet() )
- if( outputEmptyBlocks || !e.getValue().isEmptyBlock(false) ) {
- e.getValue().examSparsity(); //ensure correct format
- out.add(new IndexedMatrixValue(e.getKey(),e.getValue()));
- }
-
- return out;
+ //prepare output (sparsity switch, wrapper)
+ return rblk.entrySet().stream()
+ .filter( e -> outputEmptyBlocks || !e.getValue().isEmptyBlock(false))
+ .map(e -> {e.getValue().examSparsity(); return new IndexedMatrixValue(e.getKey(),e.getValue());})
+ .collect(Collectors.toList());
}
/**
@@ -1552,46 +1546,26 @@ public class LibMatrixReorg
row_offset+cell.getI(), col_offset+cell.getJ(), mcIn, mcOut, rowwise));
}
}
-
- @SuppressWarnings("unused")
- private static HashMap<MatrixIndexes, MatrixBlock> createAllResultBlocks( Collection<MatrixIndexes> rix,
- long nnz, MatrixCharacteristics mcIn, MatrixCharacteristics mcOut,
- boolean rowwise, ArrayList<IndexedMatrixValue> reuse )
- {
- HashMap<MatrixIndexes, MatrixBlock> ret = new HashMap<MatrixIndexes,MatrixBlock>();
- long nBlocks = rix.size();
- int count = 0;
-
- for( MatrixIndexes ix : rix )
- {
- //compute indexes
- long bi = ix.getRowIndex();
- long bj = ix.getColumnIndex();
- int lbrlen = UtilFunctions.computeBlockSize(mcOut.getRows(), bi, mcOut.getRowsPerBlock());
- int lbclen = UtilFunctions.computeBlockSize(mcOut.getCols(), bj, mcOut.getColsPerBlock());
-
- //create result block
- int estnnz = (int) (nnz/nBlocks); //force initialcapacity per row to 1, for many blocks
- boolean sparse = MatrixBlock.evalSparseFormatInMemory(lbrlen, lbclen, estnnz);
- MatrixBlock block = null;
- if( ALLOW_BLOCK_REUSE && reuse!=null && !reuse.isEmpty()) {
- block = (MatrixBlock) reuse.get(count++).getValue();
- block.reset(lbrlen, lbclen, sparse, estnnz);
- }
- else
- block = new MatrixBlock(lbrlen, lbclen, sparse, estnnz);
-
- if( lbrlen<1 || lbclen<1 )
- throw new RuntimeException("Computed block dimensions ("+bi+","+bj+" -> "+lbrlen+","+lbclen+") are invalid!");
-
- ret.put(ix, block);
- }
-
- return ret;
+
+ private static Map<MatrixIndexes, MatrixBlock> createAllResultBlocks(Collection<MatrixIndexes> rix, long nnz, MatrixCharacteristics mcOut) {
+ return rix.stream().collect(Collectors.toMap(ix -> ix, ix -> createResultBlock(ix, nnz, rix.size(), mcOut)));
+ }
+
+ private static MatrixBlock createResultBlock(MatrixIndexes ix, long nnz, int nBlocks, MatrixCharacteristics mcOut) {
+ //compute indexes
+ long bi = ix.getRowIndex();
+ long bj = ix.getColumnIndex();
+ int lbrlen = UtilFunctions.computeBlockSize(mcOut.getRows(), bi, mcOut.getRowsPerBlock());
+ int lbclen = UtilFunctions.computeBlockSize(mcOut.getCols(), bj, mcOut.getColsPerBlock());
+ if( lbrlen<1 || lbclen<1 )
+ throw new DMLRuntimeException("Computed block dimensions ("+bi+","+bj+" -> "+lbrlen+","+lbclen+") are invalid!");
+ //create result block
+ int estnnz = (int) (nnz/nBlocks); //force initial capacity per row to 1, for many blocks
+ boolean sparse = MatrixBlock.evalSparseFormatInMemory(lbrlen, lbclen, estnnz);
+ return new MatrixBlock(lbrlen, lbclen, sparse, estnnz);
}
- private static void reshapeDense( MatrixBlock in, long row_offset, long col_offset,
- HashMap<MatrixIndexes,MatrixBlock> rix,
+ private static void reshapeDense( MatrixBlock in, long row_offset, long col_offset, Map<MatrixIndexes,MatrixBlock> rix,
MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, boolean rowwise ) {
if( in.isEmptyBlock(false) )
return;
@@ -1622,14 +1596,12 @@ public class LibMatrixReorg
//cleanup for sparse blocks
if( !rowwise && mcIn.getRows() > 1 ) {
- for( MatrixBlock block : rix.values() )
- if( block.sparse )
- block.sortSparseRows();
+ rix.values().stream().filter(b -> b.sparse)
+ .forEach(b -> b.sortSparseRows());
}
}
- private static void reshapeSparse( MatrixBlock in, long row_offset, long col_offset,
- HashMap<MatrixIndexes,MatrixBlock> rix,
+ private static void reshapeSparse( MatrixBlock in, long row_offset, long col_offset, Map<MatrixIndexes,MatrixBlock> rix,
MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, boolean rowwise ) {
if( in.isEmptyBlock(false) )
return;
@@ -1639,35 +1611,36 @@ public class LibMatrixReorg
//append all values to right blocks
MatrixIndexes ixtmp = new MatrixIndexes();
- for( int i=0; i<rlen; i++ )
- {
- if( !a.isEmpty(i) ) {
- long ai = row_offset+i;
- int apos = a.pos(i);
- int alen = a.size(i);
- int[] aix = a.indexes(i);
- double[] avals = a.values(i);
-
- for( int j=apos; j<apos+alen; j++ ) {
- long aj = col_offset+aix[j];
- ixtmp = computeResultBlockIndex(ixtmp, ai, aj, mcIn, mcOut, rowwise);
- MatrixBlock out = rix.get(ixtmp);
- if( out == null )
- throw new DMLRuntimeException("Missing result block: "+ixtmp);
- ixtmp = computeInBlockIndex(ixtmp, ai, aj, mcIn, mcOut, rowwise);
- out.appendValue((int)ixtmp.getRowIndex(),(int)ixtmp.getColumnIndex(), avals[j]);
- }
+ for( int i=0; i<rlen; i++ ) {
+ if( a.isEmpty(i) ) continue;
+ long ai = row_offset+i;
+ int apos = a.pos(i);
+ int alen = a.size(i);
+ int[] aix = a.indexes(i);
+ double[] avals = a.values(i);
+ for( int j=apos; j<apos+alen; j++ ) {
+ long aj = col_offset+aix[j];
+ ixtmp = computeResultBlockIndex(ixtmp, ai, aj, mcIn, mcOut, rowwise);
+ MatrixBlock out = getAllocatedBlock(rix, ixtmp);
+ ixtmp = computeInBlockIndex(ixtmp, ai, aj, mcIn, mcOut, rowwise);
+ out.appendValue((int)ixtmp.getRowIndex(),(int)ixtmp.getColumnIndex(), avals[j]);
}
}
//cleanup for sparse blocks
if( !rowwise && mcIn.getRows() > 1 ) {
- for( MatrixBlock block : rix.values() )
- if( block.sparse )
- block.sortSparseRows();
+ rix.values().stream().filter(b -> b.sparse)
+ .forEach(b -> b.sortSparseRows());
}
}
+ private static MatrixBlock getAllocatedBlock(Map<MatrixIndexes,MatrixBlock> rix, MatrixIndexes ix) {
+ MatrixBlock out = rix.get(ix);
+ if( out == null )
+ throw new DMLRuntimeException("Missing result block: "+ix);
+ return out;
+ }
+
/**
* Assumes internal (0-begin) indices ai, aj as input; computes external block indexes (1-begin)
*
@@ -1682,25 +1655,27 @@ public class LibMatrixReorg
private static MatrixIndexes computeResultBlockIndex( MatrixIndexes ixout, long ai, long aj,
MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, boolean rowwise )
{
- long tempc = rowwise ? ai*mcIn.getCols()+aj : ai+mcIn.getRows()*aj;
+ long tempc = computeGlobalCellIndex(mcIn, ai, aj, rowwise);
long ci = rowwise ? tempc/mcOut.getCols() : tempc%mcOut.getRows();
long cj = rowwise ? tempc%mcOut.getCols() : tempc/mcOut.getRows();
long bci = ci/mcOut.getRowsPerBlock() + 1;
- long bcj = cj/mcOut.getColsPerBlock() + 1;
- return (ixout != null) ? ixout.setIndexes(bci, bcj) :
- new MatrixIndexes(bci, bcj);
+ long bcj = cj/mcOut.getColsPerBlock() + 1;
+ return ixout.setIndexes(bci, bcj);
}
-
+
private static MatrixIndexes computeInBlockIndex( MatrixIndexes ixout, long ai, long aj,
MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, boolean rowwise )
{
- long tempc = rowwise ? ai*mcIn.getCols()+aj : ai+mcIn.getRows()*aj;
+ long tempc = computeGlobalCellIndex(mcIn, ai, aj, rowwise);
long ci = rowwise ? (tempc/mcOut.getCols())%mcOut.getRowsPerBlock() :
(tempc%mcOut.getRows())%mcOut.getRowsPerBlock();
long cj = rowwise ? (tempc%mcOut.getCols())%mcOut.getColsPerBlock() :
(tempc/mcOut.getRows())%mcOut.getColsPerBlock();
- return (ixout != null) ? ixout.setIndexes(ci, cj) :
- new MatrixIndexes(ci, cj);
+ return ixout.setIndexes(ci, cj);
+ }
+
+ private static long computeGlobalCellIndex(MatrixCharacteristics mcIn, long ai, long aj, boolean rowwise) {
+ return rowwise ? ai*mcIn.getCols()+aj : ai+mcIn.getRows()*aj;
}
private static MatrixBlock removeEmptyRows(MatrixBlock in, MatrixBlock ret, MatrixBlock select, boolean emptyReturn) {
http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
index 1e1c003..0e77b8e 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
@@ -218,7 +218,7 @@ public class OperationsOnMatrixValues
valueIn.aggregateUnaryOperations(op, valueOut, brlen, bclen, indexesIn);
}
- public static MatrixBlock performAggregateBinary(MatrixIndexes indexes1, MatrixBlock value1, MatrixIndexes indexes2, MatrixBlock value2,
+ public static MatrixBlock matMult(MatrixIndexes indexes1, MatrixBlock value1, MatrixIndexes indexes2, MatrixBlock value2,
MatrixIndexes indexesOut, MatrixBlock valueOut, AggregateBinaryOperator op) {
//compute output index
indexesOut.setIndexes(indexes1.getRowIndex(), indexes2.getColumnIndex());
@@ -229,7 +229,7 @@ public class OperationsOnMatrixValues
return value1.aggregateBinaryOperations(indexes1, value1, indexes2, value2, valueOut, op);
}
- public static MatrixBlock performAggregateBinaryIgnoreIndexes(MatrixBlock value1, MatrixBlock value2,
+ public static MatrixBlock matMult(MatrixBlock value1, MatrixBlock value2,
MatrixBlock valueOut, AggregateBinaryOperator op) {
//perform on the value
if( value2 instanceof CompressedMatrixBlock )
http://git-wip-us.apache.org/repos/asf/systemml/blob/f5d97c55/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRReducerWithAggregator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRReducerWithAggregator.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRReducerWithAggregator.java
index a9e1714..3d11062 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRReducerWithAggregator.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MMCJMRReducerWithAggregator.java
@@ -123,15 +123,15 @@ public class MMCJMRReducerWithAggregator extends MMCJMRCombinerReducerBase
{
//perform matrix multiplication
indexesbuffer.setIndexes(tmp.getKey().getRowIndex(), inIndex);
- OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes((MatrixBlock)tmp.getValue(),
- (MatrixBlock)inValue, (MatrixBlock)valueBuffer, (AggregateBinaryOperator)aggBinInstruction.getOperator());
+ OperationsOnMatrixValues.matMult((MatrixBlock)tmp.getValue(), (MatrixBlock)inValue,
+ (MatrixBlock)valueBuffer, (AggregateBinaryOperator)aggBinInstruction.getOperator());
}
else //right cached
{
//perform matrix multiplication
indexesbuffer.setIndexes(inIndex, tmp.getKey().getColumnIndex());
- OperationsOnMatrixValues.performAggregateBinaryIgnoreIndexes((MatrixBlock)inValue,
- (MatrixBlock)tmp.getValue(), (MatrixBlock)valueBuffer, (AggregateBinaryOperator)aggBinInstruction.getOperator());
+ OperationsOnMatrixValues.matMult((MatrixBlock)inValue, (MatrixBlock)tmp.getValue(),
+ (MatrixBlock)valueBuffer, (AggregateBinaryOperator)aggBinInstruction.getOperator());
}
//aggregate block to output buffer or direct output