You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/11/19 21:47:10 UTC
[28/50] [abbrv] incubator-systemml git commit: [SYSML-368] New
partitioned broadcast matrix, incl changes instructions
[SYSML-368] New partitioned broadcast matrix, incl changes instructions
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/743f30c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/743f30c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/743f30c5
Branch: refs/heads/master
Commit: 743f30c5e838fe55d07c8866147749e1dc0c0719
Parents: 12920c6
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Mon Nov 2 20:21:01 2015 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Tue Nov 3 10:12:16 2015 -0800
----------------------------------------------------------------------
.../context/SparkExecutionContext.java | 42 +++++--
.../spark/AppendMSPInstruction.java | 26 ++--
.../instructions/spark/BinarySPInstruction.java | 5 +-
.../spark/MapmmChainSPInstruction.java | 24 ++--
.../instructions/spark/MapmmSPInstruction.java | 57 ++++-----
.../spark/MatrixIndexingSPInstruction.java | 12 +-
.../ParameterizedBuiltinSPInstruction.java | 13 +-
.../instructions/spark/PmmSPInstruction.java | 14 +--
.../spark/QuaternarySPInstruction.java | 35 +++---
.../spark/UaggOuterChainSPInstruction.java | 15 +--
.../spark/data/BroadcastObject.java | 20 ++-
.../spark/data/PartitionedBroadcastMatrix.java | 121 +++++++++++++++++++
.../spark/data/PartitionedMatrixBlock.java | 68 ++++++++---
.../MatrixVectorBinaryOpPartitionFunction.java | 9 +-
.../functions/OuterVectorBinaryOpFunction.java | 12 +-
15 files changed, 319 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
index 5fdef25..9c72cdb 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -45,6 +45,7 @@ import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import com.ibm.bi.dml.runtime.instructions.spark.SPInstruction;
import com.ibm.bi.dml.runtime.instructions.spark.data.BroadcastObject;
import com.ibm.bi.dml.runtime.instructions.spark.data.LineageObject;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
import com.ibm.bi.dml.runtime.instructions.spark.data.RDDObject;
import com.ibm.bi.dml.runtime.instructions.spark.functions.CopyBinaryCellFunction;
@@ -328,30 +329,52 @@ public class SparkExecutionContext extends ExecutionContext
* @throws DMLRuntimeException
* @throws DMLUnsupportedOperationException
*/
- public Broadcast<PartitionedMatrixBlock> getBroadcastForVariable( String varname )
+ @SuppressWarnings("unchecked")
+ public PartitionedBroadcastMatrix getBroadcastForVariable( String varname )
throws DMLRuntimeException, DMLUnsupportedOperationException
{
MatrixObject mo = getMatrixObject(varname);
- Broadcast<PartitionedMatrixBlock> bret = null;
+ PartitionedBroadcastMatrix bret = null;
+
if( mo.getBroadcastHandle()!=null
- && mo.getBroadcastHandle().getBroadcast().isValid() )
+ && mo.getBroadcastHandle().isValid() )
{
//reuse existing broadcast handle
bret = mo.getBroadcastHandle().getBroadcast();
}
else
{
+ //obtain meta data for matrix
int brlen = (int) mo.getNumRowsPerBlock();
int bclen = (int) mo.getNumColumnsPerBlock();
- //read data into memory (no matter where it comes from)
+ //create partitioned matrix block and release memory consumed by input
MatrixBlock mb = mo.acquireRead();
PartitionedMatrixBlock pmb = new PartitionedMatrixBlock(mb, brlen, bclen);
- bret = getSparkContext().broadcast(pmb);
+ mo.release();
+
+ //determine coarse-grained partitioning
+ int numPerPart = PartitionedBroadcastMatrix.computeBlocksPerPartition(mo.getNumRows(), mo.getNumColumns(), brlen, bclen);
+ int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart);
+ Broadcast<PartitionedMatrixBlock>[] ret = new Broadcast[numParts];
+
+ //create coarse-grained partitioned broadcasts
+ if( numParts > 1 ) {
+ for( int i=0; i<numParts; i++ ) {
+ int offset = i * numPerPart;
+ int numBlks = Math.min(numPerPart, pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset);
+ PartitionedMatrixBlock tmp = pmb.createPartition(offset, numBlks);
+ ret[i] = getSparkContext().broadcast(tmp);
+ }
+ }
+ else { //single partition
+ ret[0] = getSparkContext().broadcast( pmb);
+ }
+
+ bret = new PartitionedBroadcastMatrix(ret);
BroadcastObject bchandle = new BroadcastObject(bret, varname);
mo.setBroadcastHandle(bchandle);
- mo.release();
}
return bret;
@@ -818,8 +841,11 @@ public class SparkExecutionContext extends ExecutionContext
//cleanup current lineage object (from driver/executors)
if( lob instanceof RDDObject )
cleanupRDDVariable(((RDDObject)lob).getRDD());
- else if( lob instanceof BroadcastObject )
- cleanupBroadcastVariable(((BroadcastObject)lob).getBroadcast());
+ else if( lob instanceof BroadcastObject ) {
+ PartitionedBroadcastMatrix pbm = ((BroadcastObject)lob).getBroadcast();
+ for( Broadcast<PartitionedMatrixBlock> bc : pbm.getBroadcasts() )
+ cleanupBroadcastVariable(bc);
+ }
//recursively process lineage children
for( LineageObject c : lob.getLineageChilds() ){
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/AppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/AppendMSPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/AppendMSPInstruction.java
index 66be84b..76f198c 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/AppendMSPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/AppendMSPInstruction.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;
@@ -35,7 +34,7 @@ import com.ibm.bi.dml.runtime.instructions.Instruction;
import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
import com.ibm.bi.dml.runtime.instructions.spark.data.LazyIterableIterator;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
import com.ibm.bi.dml.runtime.instructions.spark.utils.SparkUtils;
import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
@@ -92,7 +91,7 @@ public class AppendMSPInstruction extends BinarySPInstruction
int bclen = mc1.getColsPerBlock();
JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
- Broadcast<PartitionedMatrixBlock> in2 = sec.getBroadcastForVariable( input2.getName() );
+ PartitionedBroadcastMatrix in2 = sec.getBroadcastForVariable( input2.getName() );
long off = sec.getScalarInput( _offset.getName(), _offset.getValueType(), _offset.isLiteral()).getLongValue();
//execute map-append operations (partitioning preserving if #in-blocks = #out-blocks)
@@ -139,14 +138,14 @@ public class AppendMSPInstruction extends BinarySPInstruction
{
private static final long serialVersionUID = 2738541014432173450L;
- private Broadcast<PartitionedMatrixBlock> _pm = null;
+ private PartitionedBroadcastMatrix _pm = null;
private boolean _cbind = true;
private long _offset;
private int _brlen;
private int _bclen;
private long _lastBlockColIndex;
- public MapSideAppendFunction(Broadcast<PartitionedMatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen)
+ public MapSideAppendFunction(PartitionedBroadcastMatrix binput, boolean cbind, long offset, int brlen, int bclen)
{
_pm = binput;
_cbind = cbind;
@@ -185,12 +184,12 @@ public class AppendMSPInstruction extends BinarySPInstruction
if( _cbind ) {
ret.add( new Tuple2<MatrixIndexes, MatrixBlock>(
new MatrixIndexes(ix.getRowIndex(), ix.getColumnIndex()+1),
- _pm.getValue().getMatrixBlock((int)ix.getRowIndex(), 1)) );
+ _pm.getMatrixBlock((int)ix.getRowIndex(), 1)) );
}
else { //rbind
ret.add( new Tuple2<MatrixIndexes, MatrixBlock>(
new MatrixIndexes(ix.getRowIndex()+1, ix.getColumnIndex()),
- _pm.getValue().getMatrixBlock(1, (int)ix.getColumnIndex())) );
+ _pm.getMatrixBlock(1, (int)ix.getColumnIndex())) );
}
}
//case 3: append operation on boundary block
@@ -203,7 +202,7 @@ public class AppendMSPInstruction extends BinarySPInstruction
MatrixBlock value_in2 = null;
if( _cbind ) {
- value_in2 = _pm.getValue().getMatrixBlock((int)ix.getRowIndex(), 1);
+ value_in2 = _pm.getMatrixBlock((int)ix.getRowIndex(), 1);
if(in1.getValue().getNumColumns()+value_in2.getNumColumns()>_bclen) {
IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
second.getIndexes().setIndexes(ix.getRowIndex(), ix.getColumnIndex()+1);
@@ -211,7 +210,7 @@ public class AppendMSPInstruction extends BinarySPInstruction
}
}
else { //rbind
- value_in2 = _pm.getValue().getMatrixBlock(1, (int)ix.getColumnIndex());
+ value_in2 = _pm.getMatrixBlock(1, (int)ix.getColumnIndex());
if(in1.getValue().getNumRows()+value_in2.getNumRows()>_brlen) {
IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
second.getIndexes().setIndexes(ix.getRowIndex()+1, ix.getColumnIndex());
@@ -234,11 +233,11 @@ public class AppendMSPInstruction extends BinarySPInstruction
{
private static final long serialVersionUID = 5767240739761027220L;
- private Broadcast<PartitionedMatrixBlock> _pm = null;
+ private PartitionedBroadcastMatrix _pm = null;
private boolean _cbind = true;
private long _lastBlockColIndex = -1;
- public MapSideAppendPartitionFunction(Broadcast<PartitionedMatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen)
+ public MapSideAppendPartitionFunction(PartitionedBroadcastMatrix binput, boolean cbind, long offset, int brlen, int bclen)
{
_pm = binput;
_cbind = cbind;
@@ -270,9 +269,6 @@ public class AppendMSPInstruction extends BinarySPInstruction
protected Tuple2<MatrixIndexes, MatrixBlock> computeNext(Tuple2<MatrixIndexes, MatrixBlock> arg)
throws Exception
{
- //get the broadcast once
- PartitionedMatrixBlock pm = _pm.value();
-
MatrixIndexes ix = arg._1();
MatrixBlock in1 = arg._2();
@@ -284,7 +280,7 @@ public class AppendMSPInstruction extends BinarySPInstruction
else {
int rowix = _cbind ? (int)ix.getRowIndex() : 1;
int colix = _cbind ? 1 : (int)ix.getColumnIndex();
- MatrixBlock in2 = pm.getMatrixBlock(rowix, colix);
+ MatrixBlock in2 = _pm.getMatrixBlock(rowix, colix);
MatrixBlock out = in1.appendOperations(in2, new MatrixBlock(), _cbind);
return new Tuple2<MatrixIndexes,MatrixBlock>(ix, out);
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/BinarySPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/BinarySPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/BinarySPInstruction.java
index 8492601..0736b65 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/BinarySPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/BinarySPInstruction.java
@@ -18,7 +18,6 @@
package com.ibm.bi.dml.runtime.instructions.spark;
import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.broadcast.Broadcast;
import com.ibm.bi.dml.lops.BinaryM.VectorType;
import com.ibm.bi.dml.parser.Expression.DataType;
@@ -29,7 +28,7 @@ import com.ibm.bi.dml.runtime.controlprogram.context.SparkExecutionContext;
import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
import com.ibm.bi.dml.runtime.instructions.cp.ScalarObject;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
import com.ibm.bi.dml.runtime.instructions.spark.functions.MatrixMatrixBinaryOpFunction;
import com.ibm.bi.dml.runtime.instructions.spark.functions.MatrixScalarUnaryFunction;
import com.ibm.bi.dml.runtime.instructions.spark.functions.MatrixVectorBinaryOpPartitionFunction;
@@ -156,7 +155,7 @@ public abstract class BinarySPInstruction extends ComputationSPInstruction
String rddVar = input1.getName();
String bcastVar = input2.getName();
JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( rddVar );
- Broadcast<PartitionedMatrixBlock> in2 = sec.getBroadcastForVariable( bcastVar );
+ PartitionedBroadcastMatrix in2 = sec.getBroadcastForVariable( bcastVar );
MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(rddVar);
MatrixCharacteristics mc2 = sec.getMatrixCharacteristics(bcastVar);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmChainSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmChainSPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmChainSPInstruction.java
index 28744b3..637a6c0 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmChainSPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmChainSPInstruction.java
@@ -21,7 +21,6 @@ package com.ibm.bi.dml.runtime.instructions.spark;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;
@@ -33,7 +32,7 @@ import com.ibm.bi.dml.runtime.controlprogram.context.ExecutionContext;
import com.ibm.bi.dml.runtime.controlprogram.context.SparkExecutionContext;
import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
import com.ibm.bi.dml.runtime.instructions.spark.utils.RDDAggregateUtils;
import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
@@ -127,7 +126,7 @@ public class MapmmChainSPInstruction extends SPInstruction
//get rdd and broadcast inputs
JavaPairRDD<MatrixIndexes,MatrixBlock> inX = sec.getBinaryBlockRDDHandleForVariable( _input1.getName() );
- Broadcast<PartitionedMatrixBlock> inV = sec.getBroadcastForVariable( _input2.getName() );
+ PartitionedBroadcastMatrix inV = sec.getBroadcastForVariable( _input2.getName() );
//execute mapmmchain (guaranteed to have single output block)
MatrixBlock out = null;
@@ -137,7 +136,7 @@ public class MapmmChainSPInstruction extends SPInstruction
out = RDDAggregateUtils.sumStable(tmp);
}
else { // ChainType.XtwXv
- Broadcast<PartitionedMatrixBlock> inW = sec.getBroadcastForVariable( _input3.getName() );
+ PartitionedBroadcastMatrix inW = sec.getBroadcastForVariable( _input3.getName() );
RDDMapMMChainFunction2 fmmc = new RDDMapMMChainFunction2(inV, inW);
JavaPairRDD<MatrixIndexes,MatrixBlock> tmp = inX.mapToPair(fmmc);
out = RDDAggregateUtils.sumStable(tmp);
@@ -157,9 +156,9 @@ public class MapmmChainSPInstruction extends SPInstruction
{
private static final long serialVersionUID = 8197406787010296291L;
- private Broadcast<PartitionedMatrixBlock> _pmV = null;
+ private PartitionedBroadcastMatrix _pmV = null;
- public RDDMapMMChainFunction( Broadcast<PartitionedMatrixBlock> bV)
+ public RDDMapMMChainFunction( PartitionedBroadcastMatrix bV)
throws DMLRuntimeException, DMLUnsupportedOperationException
{
//get first broadcast vector (always single block)
@@ -170,7 +169,7 @@ public class MapmmChainSPInstruction extends SPInstruction
public MatrixBlock call( MatrixBlock arg0 )
throws Exception
{
- MatrixBlock pmV = _pmV.value().getMatrixBlock(1, 1);
+ MatrixBlock pmV = _pmV.getMatrixBlock(1, 1);
//execute mapmmchain operation
MatrixBlock out = new MatrixBlock();
@@ -186,10 +185,10 @@ public class MapmmChainSPInstruction extends SPInstruction
{
private static final long serialVersionUID = -7926980450209760212L;
- private Broadcast<PartitionedMatrixBlock> _pmV = null;
- private Broadcast<PartitionedMatrixBlock> _pmW = null;
+ private PartitionedBroadcastMatrix _pmV = null;
+ private PartitionedBroadcastMatrix _pmW = null;
- public RDDMapMMChainFunction2( Broadcast<PartitionedMatrixBlock> bV, Broadcast<PartitionedMatrixBlock> bW)
+ public RDDMapMMChainFunction2( PartitionedBroadcastMatrix bV, PartitionedBroadcastMatrix bW)
throws DMLRuntimeException, DMLUnsupportedOperationException
{
//get both broadcast vectors (first always single block)
@@ -201,7 +200,7 @@ public class MapmmChainSPInstruction extends SPInstruction
public Tuple2<MatrixIndexes, MatrixBlock> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 )
throws Exception
{
- MatrixBlock pmV = _pmV.value().getMatrixBlock(1, 1);
+ MatrixBlock pmV = _pmV.getMatrixBlock(1, 1);
MatrixIndexes ixIn = arg0._1();
MatrixBlock blkIn = arg0._2();
@@ -211,8 +210,7 @@ public class MapmmChainSPInstruction extends SPInstruction
MatrixBlock blkOut = new MatrixBlock();
//execute mapmmchain operation
- PartitionedMatrixBlock pmW = _pmW.value();
- blkIn.chainMatrixMultOperations(pmV, pmW.getMatrixBlock(rowIx,1), blkOut, ChainType.XtwXv);
+ blkIn.chainMatrixMultOperations(pmV, _pmW.getMatrixBlock(rowIx,1), blkOut, ChainType.XtwXv);
//output new tuple
return new Tuple2<MatrixIndexes, MatrixBlock>(ixOut, blkOut);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmSPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmSPInstruction.java
index 3ffcc5c..f9e0acc 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmSPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmSPInstruction.java
@@ -24,7 +24,6 @@ import java.util.Iterator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;
@@ -40,7 +39,7 @@ import com.ibm.bi.dml.runtime.functionobjects.Plus;
import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
import com.ibm.bi.dml.runtime.instructions.spark.data.LazyIterableIterator;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
import com.ibm.bi.dml.runtime.instructions.spark.functions.FilterNonEmptyBlocksFunction;
import com.ibm.bi.dml.runtime.instructions.spark.utils.RDDAggregateUtils;
import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
@@ -112,26 +111,25 @@ public class MapmmSPInstruction extends BinarySPInstruction
String rddVar = (_type==CacheType.LEFT) ? input2.getName() : input1.getName();
String bcastVar = (_type==CacheType.LEFT) ? input1.getName() : input2.getName();
- MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName());
MatrixCharacteristics mcRdd = sec.getMatrixCharacteristics(rddVar);
MatrixCharacteristics mcBc = sec.getMatrixCharacteristics(bcastVar);
//get inputs
JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( rddVar );
- Broadcast<PartitionedMatrixBlock> in2 = sec.getBroadcastForVariable( bcastVar );
-
+ PartitionedBroadcastMatrix in2 = sec.getBroadcastForVariable( bcastVar );
+
//empty input block filter
if( !_outputEmpty )
in1 = in1.filter(new FilterNonEmptyBlocksFunction());
-
+
//execute mapmult instruction
JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
if( requiresFlatMapFunction(_type, mcBc) )
- out = in1.flatMapToPair( new RDDFlatMapMMFunction(_type, in2, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock()) );
+ out = in1.flatMapToPair( new RDDFlatMapMMFunction(_type, in2) );
else if( preservesPartitioning(mcRdd, _type) )
- out = in1.mapPartitionsToPair(new RDDMapMMPartitionFunction(_type, in2, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock()), true);
+ out = in1.mapPartitionsToPair(new RDDMapMMPartitionFunction(_type, in2), true);
else
- out = in1.mapToPair( new RDDMapMMFunction(_type, in2, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock()) );
+ out = in1.mapToPair( new RDDMapMMFunction(_type, in2) );
//empty output block filter
if( !_outputEmpty )
@@ -197,13 +195,11 @@ public class MapmmSPInstruction extends BinarySPInstruction
private CacheType _type = null;
private AggregateBinaryOperator _op = null;
- private Broadcast<PartitionedMatrixBlock> _pbc = null;
+ private PartitionedBroadcastMatrix _pbc = null;
- public RDDMapMMFunction( CacheType type, Broadcast<PartitionedMatrixBlock> binput, int brlen, int bclen )
+ public RDDMapMMFunction( CacheType type, PartitionedBroadcastMatrix binput )
{
_type = type;
-
- //partition vector for fast in memory lookup
_pbc = binput;
//created operator for reuse
@@ -215,8 +211,6 @@ public class MapmmSPInstruction extends BinarySPInstruction
public Tuple2<MatrixIndexes, MatrixBlock> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 )
throws Exception
{
- PartitionedMatrixBlock pm = _pbc.value();
-
MatrixIndexes ixIn = arg0._1();
MatrixBlock blkIn = arg0._2();
@@ -226,7 +220,7 @@ public class MapmmSPInstruction extends BinarySPInstruction
if( _type == CacheType.LEFT )
{
//get the right hand side matrix
- MatrixBlock left = pm.getMatrixBlock(1, (int)ixIn.getRowIndex());
+ MatrixBlock left = _pbc.getMatrixBlock(1, (int)ixIn.getRowIndex());
//execute matrix-vector mult
OperationsOnMatrixValues.performAggregateBinary(
@@ -235,7 +229,7 @@ public class MapmmSPInstruction extends BinarySPInstruction
else //if( _type == CacheType.RIGHT )
{
//get the right hand side matrix
- MatrixBlock right = pm.getMatrixBlock((int)ixIn.getColumnIndex(), 1);
+ MatrixBlock right = _pbc.getMatrixBlock((int)ixIn.getColumnIndex(), 1);
//execute matrix-vector mult
OperationsOnMatrixValues.performAggregateBinary(
@@ -257,13 +251,11 @@ public class MapmmSPInstruction extends BinarySPInstruction
private CacheType _type = null;
private AggregateBinaryOperator _op = null;
- private Broadcast<PartitionedMatrixBlock> _pbc = null;
+ private PartitionedBroadcastMatrix _pbc = null;
- public RDDMapMMPartitionFunction( CacheType type, Broadcast<PartitionedMatrixBlock> binput, int brlen, int bclen )
+ public RDDMapMMPartitionFunction( CacheType type, PartitionedBroadcastMatrix binput )
{
_type = type;
-
- //partition vector for fast in memory lookup
_pbc = binput;
//created operator for reuse
@@ -300,7 +292,7 @@ public class MapmmSPInstruction extends BinarySPInstruction
if( _type == CacheType.LEFT )
{
//get the right hand side matrix
- MatrixBlock left = _pbc.value().getMatrixBlock(1, (int)ixIn.getRowIndex());
+ MatrixBlock left = _pbc.getMatrixBlock(1, (int)ixIn.getRowIndex());
//execute index preserving matrix multiplication
left.aggregateBinaryOperations(left, blkIn, blkOut, _op);
@@ -308,10 +300,10 @@ public class MapmmSPInstruction extends BinarySPInstruction
else //if( _type == CacheType.RIGHT )
{
//get the right hand side matrix
- MatrixBlock right = _pbc.value().getMatrixBlock((int)ixIn.getColumnIndex(), 1);
+ MatrixBlock right = _pbc.getMatrixBlock((int)ixIn.getColumnIndex(), 1);
//execute index preserving matrix multiplication
- blkIn.aggregateBinaryOperations(blkIn, right, blkOut, _op);
+ blkIn.aggregateBinaryOperations(blkIn, right, blkOut, _op);
}
return new Tuple2<MatrixIndexes,MatrixBlock>(ixIn, blkOut);
@@ -329,13 +321,11 @@ public class MapmmSPInstruction extends BinarySPInstruction
private CacheType _type = null;
private AggregateBinaryOperator _op = null;
- private Broadcast<PartitionedMatrixBlock> _pbc = null;
+ private PartitionedBroadcastMatrix _pbc = null;
- public RDDFlatMapMMFunction( CacheType type, Broadcast<PartitionedMatrixBlock> binput, int brlen, int bclen )
+ public RDDFlatMapMMFunction( CacheType type, PartitionedBroadcastMatrix binput )
{
_type = type;
-
- //partition vector for fast in memory lookup
_pbc = binput;
//created operator for reuse
@@ -348,7 +338,6 @@ public class MapmmSPInstruction extends BinarySPInstruction
throws Exception
{
ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>();
- PartitionedMatrixBlock pm = _pbc.value();
MatrixIndexes ixIn = arg0._1();
MatrixBlock blkIn = arg0._2();
@@ -356,9 +345,10 @@ public class MapmmSPInstruction extends BinarySPInstruction
if( _type == CacheType.LEFT )
{
//for all matching left-hand-side blocks
- for( int i=1; i<=pm.getNumRowBlocks(); i++ )
+ int len = _pbc.getNumRowBlocks();
+ for( int i=1; i<=len; i++ )
{
- MatrixBlock left = pm.getMatrixBlock(i, (int)ixIn.getRowIndex());
+ MatrixBlock left = _pbc.getMatrixBlock(i, (int)ixIn.getRowIndex());
MatrixIndexes ixOut = new MatrixIndexes();
MatrixBlock blkOut = new MatrixBlock();
@@ -372,10 +362,11 @@ public class MapmmSPInstruction extends BinarySPInstruction
else //if( _type == CacheType.RIGHT )
{
//for all matching right-hand-side blocks
- for( int j=1; j<=pm.getNumColumnBlocks(); j++ )
+ int len = _pbc.getNumColumnBlocks();
+ for( int j=1; j<=len; j++ )
{
//get the right hand side matrix
- MatrixBlock right = pm.getMatrixBlock((int)ixIn.getColumnIndex(), j);
+ MatrixBlock right = _pbc.getMatrixBlock((int)ixIn.getColumnIndex(), j);
MatrixIndexes ixOut = new MatrixIndexes();
MatrixBlock blkOut = new MatrixBlock();
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MatrixIndexingSPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
index 8c24b9a..08b5c5e 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;
@@ -36,7 +35,7 @@ import com.ibm.bi.dml.runtime.instructions.Instruction;
import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
import com.ibm.bi.dml.runtime.instructions.spark.data.LazyIterableIterator;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
import com.ibm.bi.dml.runtime.instructions.spark.functions.IsBlockInRange;
import com.ibm.bi.dml.runtime.instructions.spark.utils.RDDAggregateUtils;
import com.ibm.bi.dml.runtime.instructions.spark.utils.SparkUtils;
@@ -181,7 +180,7 @@ public class MatrixIndexingSPInstruction extends UnarySPInstruction
else if ( opcode.equalsIgnoreCase("leftIndex") || opcode.equalsIgnoreCase("mapLeftIndex"))
{
JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
- Broadcast<PartitionedMatrixBlock> broadcastIn2 = null;
+ PartitionedBroadcastMatrix broadcastIn2 = null;
JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = null;
JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
@@ -386,13 +385,13 @@ public class MatrixIndexingSPInstruction extends UnarySPInstruction
{
private static final long serialVersionUID = 1757075506076838258L;
- private Broadcast<PartitionedMatrixBlock> _binput;
+ private PartitionedBroadcastMatrix _binput;
private IndexRange _ixrange;
private int _brlen;
private int _bclen;
- public LeftIndexPartitionFunction(Broadcast<PartitionedMatrixBlock> binput, IndexRange ixrange, MatrixCharacteristics mc)
+ public LeftIndexPartitionFunction(PartitionedBroadcastMatrix binput, IndexRange ixrange, MatrixCharacteristics mc)
{
_binput = binput;
_ixrange = ixrange;
@@ -437,8 +436,7 @@ public class MatrixIndexingSPInstruction extends UnarySPInstruction
long rhs_cu = rhs_cl + (lhs_cu - lhs_cl);
// Provide global zero-based index to sliceOperations
- PartitionedMatrixBlock rhsMatBlock = _binput.getValue();
- MatrixBlock slicedRHSMatBlock = rhsMatBlock.sliceOperations(rhs_rl, rhs_ru, rhs_cl, rhs_cu, new MatrixBlock());
+ MatrixBlock slicedRHSMatBlock = _binput.sliceOperations(rhs_rl, rhs_ru, rhs_cl, rhs_cu, new MatrixBlock());
// Provide local zero-based index to leftIndexingOperations
int lhs_lrl = UtilFunctions.cellInBlockCalculation(lhs_rl, _brlen);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index 5ff00a3..439af56 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -24,7 +24,6 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;
@@ -43,7 +42,7 @@ import com.ibm.bi.dml.runtime.instructions.Instruction;
import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
import com.ibm.bi.dml.runtime.instructions.mr.GroupedAggregateInstruction;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
import com.ibm.bi.dml.runtime.instructions.spark.functions.ExtractGroup;
import com.ibm.bi.dml.runtime.instructions.spark.functions.ExtractGroupNWeights;
import com.ibm.bi.dml.runtime.instructions.spark.functions.PerformGroupByAggInCombiner;
@@ -251,7 +250,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
//get input rdd handle
JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable( rddInVar );
JavaPairRDD<MatrixIndexes,MatrixBlock> off;
- Broadcast<PartitionedMatrixBlock> broadcastOff;
+ PartitionedBroadcastMatrix broadcastOff;
MatrixCharacteristics mcIn = sec.getMatrixCharacteristics(rddInVar);
boolean rows = sec.getScalarInput(params.get("margin"), ValueType.STRING, true).getStringValue().equals("rows");
long maxDim = sec.getScalarInput(params.get("maxdim"), ValueType.DOUBLE, false).getLongValue();
@@ -426,9 +425,9 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
private long _brlen;
private long _bclen;
- Broadcast<PartitionedMatrixBlock> _off = null;
+ private PartitionedBroadcastMatrix _off = null;
- public RDDRemoveEmptyFunctionInMem(boolean rmRows, long len, long brlen, long bclen,Broadcast<PartitionedMatrixBlock> off)
+ public RDDRemoveEmptyFunctionInMem(boolean rmRows, long len, long brlen, long bclen, PartitionedBroadcastMatrix off)
{
_rmRows = rmRows;
_len = len;
@@ -446,9 +445,9 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
//IndexedMatrixValue offsets = SparkUtils.toIndexedMatrixBlock(arg0._1(),arg0._2()._2());
IndexedMatrixValue offsets = null;
if(_rmRows)
- offsets = SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.value().getMatrixBlock((int)arg0._1().getRowIndex(), 1));
+ offsets = SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.getMatrixBlock((int)arg0._1().getRowIndex(), 1));
else
- offsets = SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.value().getMatrixBlock(1, (int)arg0._1().getColumnIndex()));
+ offsets = SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.getMatrixBlock(1, (int)arg0._1().getColumnIndex()));
//execute remove empty operations
ArrayList<IndexedMatrixValue> out = new ArrayList<IndexedMatrixValue>();
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/PmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/PmmSPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/PmmSPInstruction.java
index 27a1d0a..1a6b2fb 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/PmmSPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/PmmSPInstruction.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;
@@ -37,7 +36,7 @@ import com.ibm.bi.dml.runtime.functionobjects.Multiply;
import com.ibm.bi.dml.runtime.functionobjects.Plus;
import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
import com.ibm.bi.dml.runtime.instructions.spark.utils.RDDAggregateUtils;
import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
@@ -106,7 +105,7 @@ public class PmmSPInstruction extends BinarySPInstruction
//get inputs
JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( rddVar );
- Broadcast<PartitionedMatrixBlock> in2 = sec.getBroadcastForVariable( bcastVar );
+ PartitionedBroadcastMatrix in2 = sec.getBroadcastForVariable( bcastVar );
//execute pmm instruction
JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1
@@ -130,18 +129,15 @@ public class PmmSPInstruction extends BinarySPInstruction
{
private static final long serialVersionUID = -1696560050436469140L;
- private Broadcast<PartitionedMatrixBlock> _pmV = null;
+ private PartitionedBroadcastMatrix _pmV = null;
private long _rlen = -1;
private int _brlen = -1;
- public RDDPMMFunction( CacheType type, Broadcast<PartitionedMatrixBlock> binput, long rlen, int brlen )
+ public RDDPMMFunction( CacheType type, PartitionedBroadcastMatrix binput, long rlen, int brlen )
throws DMLRuntimeException, DMLUnsupportedOperationException
{
_brlen = brlen;
_rlen = rlen;
-
- //partition vector for fast in memory lookup (right now always CacheType.LEFT)
- //in-memory colblock partitioning (according to brlen of rdd)
_pmV = binput;
}
@@ -154,7 +150,7 @@ public class PmmSPInstruction extends BinarySPInstruction
MatrixBlock mb2 = arg0._2();
//get the right hand side matrix
- MatrixBlock mb1 = _pmV.value().getMatrixBlock((int)ixIn.getRowIndex(), 1);
+ MatrixBlock mb1 = _pmV.getMatrixBlock((int)ixIn.getRowIndex(), 1);
//compute target block indexes
long minPos = UtilFunctions.toLong( mb1.minNonZero() );
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/QuaternarySPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/QuaternarySPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/QuaternarySPInstruction.java
index c01af7d..e7effbb 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/QuaternarySPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/QuaternarySPInstruction.java
@@ -27,7 +27,6 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;
@@ -48,7 +47,7 @@ import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
import com.ibm.bi.dml.runtime.instructions.cp.DoubleObject;
import com.ibm.bi.dml.runtime.instructions.spark.data.LazyIterableIterator;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
import com.ibm.bi.dml.runtime.instructions.spark.utils.RDDAggregateUtils;
import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
@@ -174,8 +173,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
|| WeightedDivMM.OPCODE.equalsIgnoreCase(getOpcode())
|| WeightedCrossEntropy.OPCODE.equalsIgnoreCase(getOpcode()) )
{
- Broadcast<PartitionedMatrixBlock> bc1 = sec.getBroadcastForVariable( input2.getName() );
- Broadcast<PartitionedMatrixBlock> bc2 = sec.getBroadcastForVariable( input3.getName() );
+ PartitionedBroadcastMatrix bc1 = sec.getBroadcastForVariable( input2.getName() );
+ PartitionedBroadcastMatrix bc2 = sec.getBroadcastForVariable( input3.getName() );
//partitioning-preserving mappartitions (key access required for broadcast loopkup)
boolean noKeyChange = (qop.wtype3 == null || qop.wtype3.isBasic()); //only wsdivmm changes keys
@@ -188,8 +187,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
//reduce-side operation (two/three/four rdd inputs, zero/one/two broadcasts)
else
{
- Broadcast<PartitionedMatrixBlock> bc1 = _cacheU ? sec.getBroadcastForVariable( input2.getName() ) : null;
- Broadcast<PartitionedMatrixBlock> bc2 = _cacheV ? sec.getBroadcastForVariable( input3.getName() ) : null;
+ PartitionedBroadcastMatrix bc1 = _cacheU ? sec.getBroadcastForVariable( input2.getName() ) : null;
+ PartitionedBroadcastMatrix bc2 = _cacheV ? sec.getBroadcastForVariable( input3.getName() ) : null;
JavaPairRDD<MatrixIndexes,MatrixBlock> inU = (!_cacheU) ? sec.getBinaryBlockRDDHandleForVariable( input2.getName() ) : null;
JavaPairRDD<MatrixIndexes,MatrixBlock> inV = (!_cacheV) ? sec.getBinaryBlockRDDHandleForVariable( input3.getName() ) : null;
JavaPairRDD<MatrixIndexes,MatrixBlock> inW = (qop.wtype1!=null && qop.wtype1.hasFourInputs()) ?
@@ -295,10 +294,10 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
private static final long serialVersionUID = -3175397651350954930L;
protected QuaternaryOperator _qop = null;
- protected Broadcast<PartitionedMatrixBlock> _pmU = null;
- protected Broadcast<PartitionedMatrixBlock> _pmV = null;
+ protected PartitionedBroadcastMatrix _pmU = null;
+ protected PartitionedBroadcastMatrix _pmV = null;
- public RDDQuaternaryBaseFunction( QuaternaryOperator qop, Broadcast<PartitionedMatrixBlock> bcU, Broadcast<PartitionedMatrixBlock> bcV ) {
+ public RDDQuaternaryBaseFunction( QuaternaryOperator qop, PartitionedBroadcastMatrix bcU, PartitionedBroadcastMatrix bcV ) {
_qop = qop;
_pmU = bcU;
_pmV = bcV;
@@ -322,7 +321,7 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
{
private static final long serialVersionUID = -8209188316939435099L;
- public RDDQuaternaryFunction1( QuaternaryOperator qop, Broadcast<PartitionedMatrixBlock> bcU, Broadcast<PartitionedMatrixBlock> bcV )
+ public RDDQuaternaryFunction1( QuaternaryOperator qop, PartitionedBroadcastMatrix bcU, PartitionedBroadcastMatrix bcV )
throws DMLRuntimeException, DMLUnsupportedOperationException
{
super(qop, bcU, bcV);
@@ -349,8 +348,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
MatrixBlock blkIn = arg._2();
MatrixBlock blkOut = new MatrixBlock();
- MatrixBlock mbU = _pmU.value().getMatrixBlock((int)ixIn.getRowIndex(), 1);
- MatrixBlock mbV = _pmV.value().getMatrixBlock((int)ixIn.getColumnIndex(), 1);
+ MatrixBlock mbU = _pmU.getMatrixBlock((int)ixIn.getRowIndex(), 1);
+ MatrixBlock mbV = _pmV.getMatrixBlock((int)ixIn.getColumnIndex(), 1);
//execute core operation
blkIn.quaternaryOperations(_qop, mbU, mbV, null, blkOut);
@@ -371,7 +370,7 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
{
private static final long serialVersionUID = 7493974462943080693L;
- public RDDQuaternaryFunction2( QuaternaryOperator qop, Broadcast<PartitionedMatrixBlock> bcU, Broadcast<PartitionedMatrixBlock> bcV )
+ public RDDQuaternaryFunction2( QuaternaryOperator qop, PartitionedBroadcastMatrix bcU, PartitionedBroadcastMatrix bcV )
throws DMLRuntimeException, DMLUnsupportedOperationException
{
super(qop, bcU, bcV);
@@ -386,8 +385,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
MatrixBlock blkIn2 = arg0._2()._2();
MatrixBlock blkOut = new MatrixBlock();
- MatrixBlock mbU = (_pmU!=null)?_pmU.value().getMatrixBlock((int)ixIn.getRowIndex(), 1) : blkIn2;
- MatrixBlock mbV = (_pmV!=null)?_pmV.value().getMatrixBlock((int)ixIn.getColumnIndex(), 1) : blkIn2;
+ MatrixBlock mbU = (_pmU!=null)?_pmU.getMatrixBlock((int)ixIn.getRowIndex(), 1) : blkIn2;
+ MatrixBlock mbV = (_pmV!=null)?_pmV.getMatrixBlock((int)ixIn.getColumnIndex(), 1) : blkIn2;
MatrixBlock mbW = (_qop.wtype1!=null && _qop.wtype1.hasFourInputs()) ? blkIn2 : null;
//execute core operation
@@ -407,7 +406,7 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
{
private static final long serialVersionUID = -2294086455843773095L;
- public RDDQuaternaryFunction3( QuaternaryOperator qop, Broadcast<PartitionedMatrixBlock> bcU, Broadcast<PartitionedMatrixBlock> bcV )
+ public RDDQuaternaryFunction3( QuaternaryOperator qop, PartitionedBroadcastMatrix bcU, PartitionedBroadcastMatrix bcV )
throws DMLRuntimeException, DMLUnsupportedOperationException
{
super(qop, bcU, bcV);
@@ -424,8 +423,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
MatrixBlock blkOut = new MatrixBlock();
- MatrixBlock mbU = (_pmU!=null)?_pmU.value().getMatrixBlock((int)ixIn.getRowIndex(), 1) : blkIn2;
- MatrixBlock mbV = (_pmV!=null)?_pmV.value().getMatrixBlock((int)ixIn.getColumnIndex(), 1) :
+ MatrixBlock mbU = (_pmU!=null)?_pmU.getMatrixBlock((int)ixIn.getRowIndex(), 1) : blkIn2;
+ MatrixBlock mbV = (_pmV!=null)?_pmV.getMatrixBlock((int)ixIn.getColumnIndex(), 1) :
(_pmU!=null)? blkIn2 : blkIn3;
MatrixBlock mbW = (_qop.wtype1!=null && _qop.wtype1.hasFourInputs())? blkIn3 : null;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/UaggOuterChainSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/UaggOuterChainSPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/UaggOuterChainSPInstruction.java
index 9d8ffb2..90b4348 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/UaggOuterChainSPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/UaggOuterChainSPInstruction.java
@@ -41,7 +41,7 @@ import com.ibm.bi.dml.runtime.functionobjects.ReduceRow;
import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
import com.ibm.bi.dml.runtime.instructions.spark.data.LazyIterableIterator;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
import com.ibm.bi.dml.runtime.instructions.spark.functions.AggregateDropCorrectionFunction;
import com.ibm.bi.dml.runtime.instructions.spark.utils.RDDAggregateUtils;
import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
@@ -157,7 +157,7 @@ public class UaggOuterChainSPInstruction extends BinarySPInstruction
}
else
{
- Broadcast<PartitionedMatrixBlock> bv = sec.getBroadcastForVariable( bcastVar );
+ PartitionedBroadcastMatrix bv = sec.getBroadcastForVariable( bcastVar );
//partitioning-preserving map-to-pair (under constraints)
out = in1.mapPartitionsToPair( new RDDMapGenUAggOuterChainFunction(bv, _uaggOp, _aggOp, _bOp, mcIn), noKeyChange );
@@ -313,7 +313,7 @@ public class UaggOuterChainSPInstruction extends BinarySPInstruction
{
private static final long serialVersionUID = 8197406787010296291L;
- private Broadcast<PartitionedMatrixBlock> _pbc = null;
+ private PartitionedBroadcastMatrix _pbc = null;
// Operators
private AggregateUnaryOperator _uaggOp = null;
@@ -326,10 +326,9 @@ public class UaggOuterChainSPInstruction extends BinarySPInstruction
private MatrixValue _tmpVal1 = null;
private MatrixValue _tmpVal2 = null;
- public RDDMapGenUAggOuterChainFunction(Broadcast<PartitionedMatrixBlock> binput, AggregateUnaryOperator uaggOp, AggregateOperator aggOp, BinaryOperator bOp,
+ public RDDMapGenUAggOuterChainFunction(PartitionedBroadcastMatrix binput, AggregateUnaryOperator uaggOp, AggregateOperator aggOp, BinaryOperator bOp,
MatrixCharacteristics mc)
{
- //partition vector for fast in memory lookup
_pbc = binput;
// Operators
@@ -362,8 +361,6 @@ public class UaggOuterChainSPInstruction extends BinarySPInstruction
protected Tuple2<MatrixIndexes, MatrixBlock> computeNext(Tuple2<MatrixIndexes, MatrixBlock> arg)
throws Exception
{
- PartitionedMatrixBlock pm = _pbc.value();
-
MatrixIndexes in1Ix = arg._1();
MatrixBlock in1Val = arg._2();
@@ -372,11 +369,11 @@ public class UaggOuterChainSPInstruction extends BinarySPInstruction
MatrixBlock corr = null;
- long in2_colBlocks = pm.getNumColumnBlocks();
+ long in2_colBlocks = _pbc.getNumColumnBlocks();
for(int bidx=1; bidx <= in2_colBlocks; bidx++)
{
- MatrixValue in2Val = pm.getMatrixBlock(1, bidx);
+ MatrixValue in2Val = _pbc.getMatrixBlock(1, bidx);
//outer block operation
OperationsOnMatrixValues.performBinaryIgnoreIndexes(in1Val, in2Val, _tmpVal1, _bOp);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/BroadcastObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/BroadcastObject.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/BroadcastObject.java
index b46b409..a24001a 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/BroadcastObject.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/BroadcastObject.java
@@ -21,10 +21,9 @@ import org.apache.spark.broadcast.Broadcast;
public class BroadcastObject extends LineageObject
{
-
- private Broadcast<PartitionedMatrixBlock> _bcHandle = null;
+ private PartitionedBroadcastMatrix _bcHandle = null;
- public BroadcastObject( Broadcast<PartitionedMatrixBlock> bvar, String varName )
+ public BroadcastObject( PartitionedBroadcastMatrix bvar, String varName )
{
_bcHandle = bvar;
_varName = varName;
@@ -34,8 +33,21 @@ public class BroadcastObject extends LineageObject
*
* @return
*/
- public Broadcast<PartitionedMatrixBlock> getBroadcast()
+ public PartitionedBroadcastMatrix getBroadcast()
{
return _bcHandle;
}
+
+ /**
+ *
+ * @return
+ */
+ public boolean isValid()
+ {
+ Broadcast<PartitionedMatrixBlock>[] tmp = _bcHandle.getBroadcasts();
+ for( Broadcast<PartitionedMatrixBlock> bc : tmp )
+ if( !bc.isValid() )
+ return false;
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
new file mode 100644
index 0000000..0842e9b
--- /dev/null
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
@@ -0,0 +1,121 @@
+/**
+ * (C) Copyright IBM Corp. 2010, 2015
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.ibm.bi.dml.runtime.instructions.spark.data;
+
+import java.io.Serializable;
+
+import org.apache.spark.broadcast.Broadcast;
+
+import com.ibm.bi.dml.runtime.DMLRuntimeException;
+import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
+import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
+
+/**
+ * This class is a wrapper around an array of broadcasts of partitioned matrix blocks,
+ * which is required due to 2GB limitations of Spark's broadcast handling. Without this
+ * partitioning of Broadcast<PartitionedMatrixBlock> into Broadcast<PartitionedMatrixBlock>[],
+ * we got java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE issue.
+ * Despite various jiras, this issue still showed up in Spark 1.4/1.5.
+ *
+ */
+public class PartitionedBroadcastMatrix implements Serializable
+{
+ private static final long serialVersionUID = 1225135967889810877L;
+
+ private static long BROADCAST_PARTSIZE = 200L*1024*1024; //200M cells ~ 1.6GB
+
+ private Broadcast<PartitionedMatrixBlock>[] _pbc = null;
+
+ public PartitionedBroadcastMatrix(Broadcast<PartitionedMatrixBlock>[] broadcasts)
+ {
+ _pbc = broadcasts;
+ }
+
+ public Broadcast<PartitionedMatrixBlock>[] getBroadcasts() {
+ return _pbc;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public int getNumRowBlocks() {
+ return _pbc[0].value().getNumRowBlocks();
+ }
+
+ public int getNumColumnBlocks() {
+ return _pbc[0].value().getNumColumnBlocks();
+ }
+
+ /**
+ *
+ * @param rowIndex
+ * @param colIndex
+ * @return
+ * @throws DMLRuntimeException
+ */
+ public MatrixBlock getMatrixBlock(int rowIndex, int colIndex)
+ throws DMLRuntimeException
+ {
+ if( _pbc.length > 1 ) {
+ //compute partition index
+ PartitionedMatrixBlock tmp = _pbc[0].value();
+ int numPerPart = computeBlocksPerPartition(tmp.getNumRows(), tmp.getNumCols(),
+ tmp.getNumRowsPerBlock(), tmp.getNumColumnsPerBlock());
+ int ix = (rowIndex-1)*tmp.getNumColumnBlocks()+(colIndex-1);
+ int pix = ix / numPerPart;
+
+ //get matrix block from partition
+ return _pbc[pix].value().getMatrixBlock(rowIndex, colIndex);
+ }
+ else { //single partition
+ return _pbc[0].value().getMatrixBlock(rowIndex, colIndex);
+ }
+
+ }
+
+ public MatrixBlock sliceOperations(long rl, long ru, long cl, long cu, MatrixBlock matrixBlock)
+ throws DMLRuntimeException, DMLUnsupportedOperationException
+ {
+ MatrixBlock ret = null;
+
+ for( Broadcast<PartitionedMatrixBlock> bc : _pbc ) {
+ PartitionedMatrixBlock pm = bc.value();
+ MatrixBlock tmp = pm.sliceOperations(rl, ru, cl, cu, new MatrixBlock());
+ if( ret != null )
+ ret.merge(tmp, false);
+ else
+ ret = tmp;
+ }
+
+ return ret;
+ }
+
+ /**
+ *
+ * @param rlen
+ * @param clen
+ * @param brlen
+ * @param bclen
+ * @return
+ */
+ public static int computeBlocksPerPartition(long rlen, long clen, long brlen, long bclen) {
+ return (int) Math.floor( BROADCAST_PARTSIZE /
+ Math.min(rlen, brlen) / Math.min(clen, bclen));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedMatrixBlock.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedMatrixBlock.java
index 805feef..07f2248 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedMatrixBlock.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedMatrixBlock.java
@@ -57,6 +57,7 @@ public class PartitionedMatrixBlock implements Externalizable
private int _clen = -1;
private int _brlen = -1;
private int _bclen = -1;
+ private int _offset = 0;
public PartitionedMatrixBlock() {
//do nothing (required for Externalizable)
@@ -92,6 +93,8 @@ public class PartitionedMatrixBlock implements Externalizable
catch(Exception ex) {
throw new RuntimeException("Failed partitioning of broadcast variable input.", ex);
}
+
+ _offset = 0;
}
public PartitionedMatrixBlock(int rlen, int clen, int brlen, int bclen)
@@ -107,6 +110,22 @@ public class PartitionedMatrixBlock implements Externalizable
_partBlocks = new MatrixBlock[nrblks * ncblks];
}
+ public long getNumRows() {
+ return _rlen;
+ }
+
+ public long getNumCols() {
+ return _clen;
+ }
+
+ public long getNumRowsPerBlock() {
+ return _brlen;
+ }
+
+ public long getNumColumnsPerBlock() {
+ return _bclen;
+ }
+
/**
*
* @return
@@ -116,14 +135,6 @@ public class PartitionedMatrixBlock implements Externalizable
return (int)Math.ceil((double)_rlen/_brlen);
}
- public long getNumRows() {
- return _rlen;
- }
-
- public long getNumCols() {
- return _clen;
- }
-
/**
*
* @return
@@ -153,7 +164,8 @@ public class PartitionedMatrixBlock implements Externalizable
//get the requested matrix block
int rix = rowIndex - 1;
int cix = colIndex - 1;
- return _partBlocks[rix*ncblks + cix];
+ int ix = rix*ncblks+cix - _offset;
+ return _partBlocks[ ix ];
}
/**
@@ -176,7 +188,8 @@ public class PartitionedMatrixBlock implements Externalizable
//get the requested matrix block
int rix = rowIndex - 1;
int cix = colIndex - 1;
- _partBlocks[rix*ncblks + cix] = mb;
+ int ix = rix*ncblks+cix - _offset;
+ _partBlocks[ ix ] = mb;
}
@@ -186,7 +199,7 @@ public class PartitionedMatrixBlock implements Externalizable
*/
public long estimateSizeInMemory()
{
- long ret = 8; //header
+ long ret = 24; //header
ret += 32; //block array
if( _partBlocks != null )
@@ -202,7 +215,7 @@ public class PartitionedMatrixBlock implements Externalizable
*/
public long estimateSizeOnDisk()
{
- long ret = 8; //header
+ long ret = 24; //header
if( _partBlocks != null )
for( MatrixBlock mb : _partBlocks )
@@ -211,6 +224,26 @@ public class PartitionedMatrixBlock implements Externalizable
return ret;
}
+ /**
+ *
+ * @param offset
+ * @param numBlks
+ * @return
+ */
+ public PartitionedMatrixBlock createPartition( int offset, int numBlks )
+ {
+ PartitionedMatrixBlock ret = new PartitionedMatrixBlock();
+ ret._rlen = _rlen;
+ ret._clen = _clen;
+ ret._brlen = _brlen;
+ ret._bclen = _bclen;
+ ret._partBlocks = new MatrixBlock[numBlks];
+ ret._offset = offset;
+
+ System.arraycopy(_partBlocks, offset, ret._partBlocks, 0, numBlks);
+
+ return ret;
+ }
/**
* Utility for slice operations over partitioned matrices, where the index range can cover
@@ -321,6 +354,8 @@ public class PartitionedMatrixBlock implements Externalizable
dos.writeInt(_clen);
dos.writeInt(_brlen);
dos.writeInt(_bclen);
+ dos.writeInt(_offset);
+ dos.writeInt(_partBlocks.length);
for( MatrixBlock mb : _partBlocks )
mb.write(dos);
}
@@ -337,11 +372,12 @@ public class PartitionedMatrixBlock implements Externalizable
_clen = dis.readInt();
_brlen = dis.readInt();
_bclen = dis.readInt();
- int nrblks = getNumRowBlocks();
- int ncblks = getNumColumnBlocks();
- _partBlocks = new MatrixBlock[nrblks * ncblks];
+ _offset = dis.readInt();
+
+ int len = dis.readInt();
+ _partBlocks = new MatrixBlock[len];
- for( int i=0; i<_partBlocks.length; i++ ){
+ for( int i=0; i<len; i++ ) {
_partBlocks[i] = new MatrixBlock();
_partBlocks[i].readFields(dis);
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
index 28260f2..e810a84 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
@@ -20,13 +20,12 @@ package com.ibm.bi.dml.runtime.instructions.spark.functions;
import java.util.Iterator;
import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;
import com.ibm.bi.dml.lops.BinaryM.VectorType;
import com.ibm.bi.dml.runtime.instructions.spark.data.LazyIterableIterator;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
import com.ibm.bi.dml.runtime.matrix.operators.BinaryOperator;
@@ -36,10 +35,10 @@ public class MatrixVectorBinaryOpPartitionFunction implements PairFlatMapFunctio
private static final long serialVersionUID = 9096091404578628534L;
private BinaryOperator _op = null;
- private Broadcast<PartitionedMatrixBlock> _pmV = null;
+ private PartitionedBroadcastMatrix _pmV = null;
private VectorType _vtype = null;
- public MatrixVectorBinaryOpPartitionFunction( BinaryOperator op, Broadcast<PartitionedMatrixBlock> binput, VectorType vtype )
+ public MatrixVectorBinaryOpPartitionFunction( BinaryOperator op, PartitionedBroadcastMatrix binput, VectorType vtype )
{
_op = op;
_pmV = binput;
@@ -75,7 +74,7 @@ public class MatrixVectorBinaryOpPartitionFunction implements PairFlatMapFunctio
//get the rhs block
int rix= (int)((_vtype==VectorType.COL_VECTOR) ? ix.getRowIndex() : 1);
int cix= (int)((_vtype==VectorType.COL_VECTOR) ? 1 : ix.getColumnIndex());
- MatrixBlock in2 = _pmV.value().getMatrixBlock(rix, cix);
+ MatrixBlock in2 = _pmV.getMatrixBlock(rix, cix);
//execute the binary operation
MatrixBlock ret = (MatrixBlock) (in1.binaryOperations (_op, in2, new MatrixBlock()));
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java
index c9ec159..70b863c 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java
@@ -20,24 +20,22 @@ package com.ibm.bi.dml.runtime.instructions.spark.functions;
import java.util.Iterator;
import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
import com.ibm.bi.dml.runtime.matrix.operators.BinaryOperator;
public class OuterVectorBinaryOpFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes,MatrixBlock>
{
-
private static final long serialVersionUID = 1730704346934726826L;
private BinaryOperator _op;
- private Broadcast<PartitionedMatrixBlock> _pmV;
+ private PartitionedBroadcastMatrix _pmV;
- public OuterVectorBinaryOpFunction( BinaryOperator op, Broadcast<PartitionedMatrixBlock> binput )
+ public OuterVectorBinaryOpFunction( BinaryOperator op, PartitionedBroadcastMatrix binput )
{
_op = op;
_pmV = binput;
@@ -70,7 +68,7 @@ public class OuterVectorBinaryOpFunction implements PairFlatMapFunction<Tuple2<M
@Override
public boolean hasNext() {
return (_currBlk != null
- && _currPos <= _pmV.value().getNumColumnBlocks());
+ && _currPos <= _pmV.getNumColumnBlocks());
}
@Override
@@ -84,7 +82,7 @@ public class OuterVectorBinaryOpFunction implements PairFlatMapFunction<Tuple2<M
MatrixIndexes ix = _currBlk._1();
MatrixBlock in1 = _currBlk._2();
- MatrixBlock in2 = _pmV.value().getMatrixBlock(1, _currPos);
+ MatrixBlock in2 = _pmV.getMatrixBlock(1, _currPos);
MatrixBlock resultBlk = (MatrixBlock)in1.binaryOperations (_op, in2, new MatrixBlock());
resultBlk.examSparsity();
ret = new Tuple2<MatrixIndexes,MatrixBlock>(