You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/11/19 21:47:10 UTC

[28/50] [abbrv] incubator-systemml git commit: [SYSML-368] New partitioned broadcast matrix, incl changes instructions

[SYSML-368] New partitioned broadcast matrix, incl changes instructions

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/743f30c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/743f30c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/743f30c5

Branch: refs/heads/master
Commit: 743f30c5e838fe55d07c8866147749e1dc0c0719
Parents: 12920c6
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Mon Nov 2 20:21:01 2015 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Tue Nov 3 10:12:16 2015 -0800

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          |  42 +++++--
 .../spark/AppendMSPInstruction.java             |  26 ++--
 .../instructions/spark/BinarySPInstruction.java |   5 +-
 .../spark/MapmmChainSPInstruction.java          |  24 ++--
 .../instructions/spark/MapmmSPInstruction.java  |  57 ++++-----
 .../spark/MatrixIndexingSPInstruction.java      |  12 +-
 .../ParameterizedBuiltinSPInstruction.java      |  13 +-
 .../instructions/spark/PmmSPInstruction.java    |  14 +--
 .../spark/QuaternarySPInstruction.java          |  35 +++---
 .../spark/UaggOuterChainSPInstruction.java      |  15 +--
 .../spark/data/BroadcastObject.java             |  20 ++-
 .../spark/data/PartitionedBroadcastMatrix.java  | 121 +++++++++++++++++++
 .../spark/data/PartitionedMatrixBlock.java      |  68 ++++++++---
 .../MatrixVectorBinaryOpPartitionFunction.java  |   9 +-
 .../functions/OuterVectorBinaryOpFunction.java  |  12 +-
 15 files changed, 319 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
index 5fdef25..9c72cdb 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -45,6 +45,7 @@ import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import com.ibm.bi.dml.runtime.instructions.spark.SPInstruction;
 import com.ibm.bi.dml.runtime.instructions.spark.data.BroadcastObject;
 import com.ibm.bi.dml.runtime.instructions.spark.data.LineageObject;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
 import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
 import com.ibm.bi.dml.runtime.instructions.spark.data.RDDObject;
 import com.ibm.bi.dml.runtime.instructions.spark.functions.CopyBinaryCellFunction;
@@ -328,30 +329,52 @@ public class SparkExecutionContext extends ExecutionContext
 	 * @throws DMLRuntimeException
 	 * @throws DMLUnsupportedOperationException
 	 */
-	public Broadcast<PartitionedMatrixBlock> getBroadcastForVariable( String varname ) 
+	@SuppressWarnings("unchecked")
+	public PartitionedBroadcastMatrix getBroadcastForVariable( String varname ) 
 		throws DMLRuntimeException, DMLUnsupportedOperationException
 	{
 		MatrixObject mo = getMatrixObject(varname);
 		
-		Broadcast<PartitionedMatrixBlock> bret = null;
+		PartitionedBroadcastMatrix bret = null;
+		
 		if(    mo.getBroadcastHandle()!=null 
-			&& mo.getBroadcastHandle().getBroadcast().isValid() ) 
+			&& mo.getBroadcastHandle().isValid() ) 
 		{
 			//reuse existing broadcast handle
 			bret = mo.getBroadcastHandle().getBroadcast();
 		}
 		else 
 		{
+			//obtain meta data for matrix 
 			int brlen = (int) mo.getNumRowsPerBlock();
 			int bclen = (int) mo.getNumColumnsPerBlock();
 			
-			//read data into memory (no matter where it comes from)
+			//create partitioned matrix block and release memory consumed by input
 			MatrixBlock mb = mo.acquireRead();
 			PartitionedMatrixBlock pmb = new PartitionedMatrixBlock(mb, brlen, bclen);
-			bret = getSparkContext().broadcast(pmb);
+			mo.release();
+			
+			//determine coarse-grained partitioning
+			int numPerPart = PartitionedBroadcastMatrix.computeBlocksPerPartition(mo.getNumRows(), mo.getNumColumns(), brlen, bclen);
+			int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart); 
+			Broadcast<PartitionedMatrixBlock>[] ret = new Broadcast[numParts];
+					
+			//create coarse-grained partitioned broadcasts
+			if( numParts > 1 ) {
+				for( int i=0; i<numParts; i++ ) {
+					int offset = i * numPerPart;
+					int numBlks = Math.min(numPerPart, pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset);
+					PartitionedMatrixBlock tmp = pmb.createPartition(offset, numBlks);
+					ret[i] = getSparkContext().broadcast(tmp);
+				}
+			}
+			else { //single partition
+				ret[0] = getSparkContext().broadcast( pmb);
+			}
+		
+			bret = new PartitionedBroadcastMatrix(ret);
 			BroadcastObject bchandle = new BroadcastObject(bret, varname);
 			mo.setBroadcastHandle(bchandle);
-			mo.release();
 		}
 		
 		return bret;
@@ -818,8 +841,11 @@ public class SparkExecutionContext extends ExecutionContext
 		//cleanup current lineage object (from driver/executors)
 		if( lob instanceof RDDObject )
 			cleanupRDDVariable(((RDDObject)lob).getRDD());
-		else if( lob instanceof BroadcastObject )
-			cleanupBroadcastVariable(((BroadcastObject)lob).getBroadcast());
+		else if( lob instanceof BroadcastObject ) {
+			PartitionedBroadcastMatrix pbm = ((BroadcastObject)lob).getBroadcast();
+			for( Broadcast<PartitionedMatrixBlock> bc : pbm.getBroadcasts() )
+				cleanupBroadcastVariable(bc);
+		}
 	
 		//recursively process lineage children
 		for( LineageObject c : lob.getLineageChilds() ){

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/AppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/AppendMSPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/AppendMSPInstruction.java
index 66be84b..76f198c 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/AppendMSPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/AppendMSPInstruction.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.broadcast.Broadcast;
 
 import scala.Tuple2;
 
@@ -35,7 +34,7 @@ import com.ibm.bi.dml.runtime.instructions.Instruction;
 import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
 import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
 import com.ibm.bi.dml.runtime.instructions.spark.data.LazyIterableIterator;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
 import com.ibm.bi.dml.runtime.instructions.spark.utils.SparkUtils;
 import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
 import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
@@ -92,7 +91,7 @@ public class AppendMSPInstruction extends BinarySPInstruction
 		int bclen = mc1.getColsPerBlock();
 		
 		JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
-		Broadcast<PartitionedMatrixBlock> in2 = sec.getBroadcastForVariable( input2.getName() );
+		PartitionedBroadcastMatrix in2 = sec.getBroadcastForVariable( input2.getName() );
 		long off = sec.getScalarInput( _offset.getName(), _offset.getValueType(), _offset.isLiteral()).getLongValue();
 		
 		//execute map-append operations (partitioning preserving if #in-blocks = #out-blocks)
@@ -139,14 +138,14 @@ public class AppendMSPInstruction extends BinarySPInstruction
 	{
 		private static final long serialVersionUID = 2738541014432173450L;
 		
-		private Broadcast<PartitionedMatrixBlock> _pm = null;
+		private PartitionedBroadcastMatrix _pm = null;
 		private boolean _cbind = true;
 		private long _offset; 
 		private int _brlen; 
 		private int _bclen;
 		private long _lastBlockColIndex;
 		
-		public MapSideAppendFunction(Broadcast<PartitionedMatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen)  
+		public MapSideAppendFunction(PartitionedBroadcastMatrix binput, boolean cbind, long offset, int brlen, int bclen)  
 		{
 			_pm = binput;
 			_cbind = cbind;
@@ -185,12 +184,12 @@ public class AppendMSPInstruction extends BinarySPInstruction
 				if( _cbind ) {
 					ret.add( new Tuple2<MatrixIndexes, MatrixBlock>(
 							new MatrixIndexes(ix.getRowIndex(), ix.getColumnIndex()+1),
-							_pm.getValue().getMatrixBlock((int)ix.getRowIndex(), 1)) );
+							_pm.getMatrixBlock((int)ix.getRowIndex(), 1)) );
 				}
 				else { //rbind
 					ret.add( new Tuple2<MatrixIndexes, MatrixBlock>(
 							new MatrixIndexes(ix.getRowIndex()+1, ix.getColumnIndex()),
-							_pm.getValue().getMatrixBlock(1, (int)ix.getColumnIndex())) );	
+							_pm.getMatrixBlock(1, (int)ix.getColumnIndex())) );	
 				}
 			}
 			//case 3: append operation on boundary block
@@ -203,7 +202,7 @@ public class AppendMSPInstruction extends BinarySPInstruction
 				
 				MatrixBlock value_in2 = null;
 				if( _cbind ) {
-					value_in2 = _pm.getValue().getMatrixBlock((int)ix.getRowIndex(), 1);
+					value_in2 = _pm.getMatrixBlock((int)ix.getRowIndex(), 1);
 					if(in1.getValue().getNumColumns()+value_in2.getNumColumns()>_bclen) {
 						IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
 						second.getIndexes().setIndexes(ix.getRowIndex(), ix.getColumnIndex()+1);
@@ -211,7 +210,7 @@ public class AppendMSPInstruction extends BinarySPInstruction
 					}
 				}
 				else { //rbind
-					value_in2 = _pm.getValue().getMatrixBlock(1, (int)ix.getColumnIndex());
+					value_in2 = _pm.getMatrixBlock(1, (int)ix.getColumnIndex());
 					if(in1.getValue().getNumRows()+value_in2.getNumRows()>_brlen) {
 						IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
 						second.getIndexes().setIndexes(ix.getRowIndex()+1, ix.getColumnIndex());
@@ -234,11 +233,11 @@ public class AppendMSPInstruction extends BinarySPInstruction
 	{
 		private static final long serialVersionUID = 5767240739761027220L;
 
-		private Broadcast<PartitionedMatrixBlock> _pm = null;
+		private PartitionedBroadcastMatrix _pm = null;
 		private boolean _cbind = true;
 		private long _lastBlockColIndex = -1;
 		
-		public MapSideAppendPartitionFunction(Broadcast<PartitionedMatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen)  
+		public MapSideAppendPartitionFunction(PartitionedBroadcastMatrix binput, boolean cbind, long offset, int brlen, int bclen)  
 		{
 			_pm = binput;
 			_cbind = cbind;
@@ -270,9 +269,6 @@ public class AppendMSPInstruction extends BinarySPInstruction
 			protected Tuple2<MatrixIndexes, MatrixBlock> computeNext(Tuple2<MatrixIndexes, MatrixBlock> arg)
 				throws Exception
 			{
-				//get the broadcast once
-				PartitionedMatrixBlock pm = _pm.value();
-				
 				MatrixIndexes ix = arg._1();
 				MatrixBlock in1 = arg._2();
 				
@@ -284,7 +280,7 @@ public class AppendMSPInstruction extends BinarySPInstruction
 				else {
 					int rowix = _cbind ? (int)ix.getRowIndex() : 1;
 					int colix = _cbind ? 1 : (int)ix.getColumnIndex();					
-					MatrixBlock in2 = pm.getMatrixBlock(rowix, colix);
+					MatrixBlock in2 = _pm.getMatrixBlock(rowix, colix);
 					MatrixBlock out = in1.appendOperations(in2, new MatrixBlock(), _cbind);
 					return new Tuple2<MatrixIndexes,MatrixBlock>(ix, out);
 				}	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/BinarySPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/BinarySPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/BinarySPInstruction.java
index 8492601..0736b65 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/BinarySPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/BinarySPInstruction.java
@@ -18,7 +18,6 @@
 package com.ibm.bi.dml.runtime.instructions.spark;
 
 import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.broadcast.Broadcast;
 
 import com.ibm.bi.dml.lops.BinaryM.VectorType;
 import com.ibm.bi.dml.parser.Expression.DataType;
@@ -29,7 +28,7 @@ import com.ibm.bi.dml.runtime.controlprogram.context.SparkExecutionContext;
 import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
 import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
 import com.ibm.bi.dml.runtime.instructions.cp.ScalarObject;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
 import com.ibm.bi.dml.runtime.instructions.spark.functions.MatrixMatrixBinaryOpFunction;
 import com.ibm.bi.dml.runtime.instructions.spark.functions.MatrixScalarUnaryFunction;
 import com.ibm.bi.dml.runtime.instructions.spark.functions.MatrixVectorBinaryOpPartitionFunction;
@@ -156,7 +155,7 @@ public abstract class BinarySPInstruction extends ComputationSPInstruction
 		String rddVar = input1.getName(); 
 		String bcastVar = input2.getName();
 		JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( rddVar );
-		Broadcast<PartitionedMatrixBlock> in2 = sec.getBroadcastForVariable( bcastVar );
+		PartitionedBroadcastMatrix in2 = sec.getBroadcastForVariable( bcastVar );
 		MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(rddVar);
 		MatrixCharacteristics mc2 = sec.getMatrixCharacteristics(bcastVar);
 		

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmChainSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmChainSPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmChainSPInstruction.java
index 28744b3..637a6c0 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmChainSPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmChainSPInstruction.java
@@ -21,7 +21,6 @@ package com.ibm.bi.dml.runtime.instructions.spark;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
 
 import scala.Tuple2;
 
@@ -33,7 +32,7 @@ import com.ibm.bi.dml.runtime.controlprogram.context.ExecutionContext;
 import com.ibm.bi.dml.runtime.controlprogram.context.SparkExecutionContext;
 import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
 import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
 import com.ibm.bi.dml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
 import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
@@ -127,7 +126,7 @@ public class MapmmChainSPInstruction extends SPInstruction
 		
 		//get rdd and broadcast inputs
 		JavaPairRDD<MatrixIndexes,MatrixBlock> inX = sec.getBinaryBlockRDDHandleForVariable( _input1.getName() );
-		Broadcast<PartitionedMatrixBlock> inV = sec.getBroadcastForVariable( _input2.getName() );
+		PartitionedBroadcastMatrix inV = sec.getBroadcastForVariable( _input2.getName() );
 		
 		//execute mapmmchain (guaranteed to have single output block)
 		MatrixBlock out = null;
@@ -137,7 +136,7 @@ public class MapmmChainSPInstruction extends SPInstruction
 			out = RDDAggregateUtils.sumStable(tmp);		
 		}
 		else { // ChainType.XtwXv
-			Broadcast<PartitionedMatrixBlock> inW = sec.getBroadcastForVariable( _input3.getName() );
+			PartitionedBroadcastMatrix inW = sec.getBroadcastForVariable( _input3.getName() );
 			RDDMapMMChainFunction2 fmmc = new RDDMapMMChainFunction2(inV, inW);
 			JavaPairRDD<MatrixIndexes,MatrixBlock> tmp = inX.mapToPair(fmmc);
 			out = RDDAggregateUtils.sumStable(tmp);		
@@ -157,9 +156,9 @@ public class MapmmChainSPInstruction extends SPInstruction
 	{
 		private static final long serialVersionUID = 8197406787010296291L;
 
-		private Broadcast<PartitionedMatrixBlock> _pmV = null;
+		private PartitionedBroadcastMatrix _pmV = null;
 		
-		public RDDMapMMChainFunction( Broadcast<PartitionedMatrixBlock> bV) 
+		public RDDMapMMChainFunction( PartitionedBroadcastMatrix bV) 
 			throws DMLRuntimeException, DMLUnsupportedOperationException
 		{			
 			//get first broadcast vector (always single block)
@@ -170,7 +169,7 @@ public class MapmmChainSPInstruction extends SPInstruction
 		public MatrixBlock call( MatrixBlock arg0 ) 
 			throws Exception 
 		{
-			MatrixBlock pmV = _pmV.value().getMatrixBlock(1, 1);
+			MatrixBlock pmV = _pmV.getMatrixBlock(1, 1);
 			
 			//execute mapmmchain operation
 			MatrixBlock out = new MatrixBlock();
@@ -186,10 +185,10 @@ public class MapmmChainSPInstruction extends SPInstruction
 	{
 		private static final long serialVersionUID = -7926980450209760212L;
 
-		private Broadcast<PartitionedMatrixBlock> _pmV = null;
-		private Broadcast<PartitionedMatrixBlock> _pmW = null;
+		private PartitionedBroadcastMatrix _pmV = null;
+		private PartitionedBroadcastMatrix _pmW = null;
 		
-		public RDDMapMMChainFunction2( Broadcast<PartitionedMatrixBlock> bV, Broadcast<PartitionedMatrixBlock> bW) 
+		public RDDMapMMChainFunction2( PartitionedBroadcastMatrix bV, PartitionedBroadcastMatrix bW) 
 			throws DMLRuntimeException, DMLUnsupportedOperationException
 		{			
 			//get both broadcast vectors (first always single block)
@@ -201,7 +200,7 @@ public class MapmmChainSPInstruction extends SPInstruction
 		public Tuple2<MatrixIndexes, MatrixBlock> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
 			throws Exception 
 		{
-			MatrixBlock pmV = _pmV.value().getMatrixBlock(1, 1);
+			MatrixBlock pmV = _pmV.getMatrixBlock(1, 1);
 			
 			MatrixIndexes ixIn = arg0._1();
 			MatrixBlock blkIn = arg0._2();
@@ -211,8 +210,7 @@ public class MapmmChainSPInstruction extends SPInstruction
 			MatrixBlock blkOut = new MatrixBlock();
 			
 			//execute mapmmchain operation
-			PartitionedMatrixBlock pmW = _pmW.value();
-			blkIn.chainMatrixMultOperations(pmV, pmW.getMatrixBlock(rowIx,1), blkOut, ChainType.XtwXv);
+			blkIn.chainMatrixMultOperations(pmV, _pmW.getMatrixBlock(rowIx,1), blkOut, ChainType.XtwXv);
 				
 			//output new tuple
 			return new Tuple2<MatrixIndexes, MatrixBlock>(ixOut, blkOut);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmSPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmSPInstruction.java
index 3ffcc5c..f9e0acc 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmSPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MapmmSPInstruction.java
@@ -24,7 +24,6 @@ import java.util.Iterator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
 
 import scala.Tuple2;
 
@@ -40,7 +39,7 @@ import com.ibm.bi.dml.runtime.functionobjects.Plus;
 import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
 import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
 import com.ibm.bi.dml.runtime.instructions.spark.data.LazyIterableIterator;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
 import com.ibm.bi.dml.runtime.instructions.spark.functions.FilterNonEmptyBlocksFunction;
 import com.ibm.bi.dml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
@@ -112,26 +111,25 @@ public class MapmmSPInstruction extends BinarySPInstruction
 		
 		String rddVar = (_type==CacheType.LEFT) ? input2.getName() : input1.getName();
 		String bcastVar = (_type==CacheType.LEFT) ? input1.getName() : input2.getName();
-		MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName());
 		MatrixCharacteristics mcRdd = sec.getMatrixCharacteristics(rddVar);
 		MatrixCharacteristics mcBc = sec.getMatrixCharacteristics(bcastVar);
 		
 		//get inputs
 		JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( rddVar );
-		Broadcast<PartitionedMatrixBlock> in2 = sec.getBroadcastForVariable( bcastVar ); 
-				
+		PartitionedBroadcastMatrix in2 = sec.getBroadcastForVariable( bcastVar ); 
+		
 		//empty input block filter
 		if( !_outputEmpty )
 			in1 = in1.filter(new FilterNonEmptyBlocksFunction());
-			
+		
 		//execute mapmult instruction
 		JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
 		if( requiresFlatMapFunction(_type, mcBc) ) 
-			out = in1.flatMapToPair( new RDDFlatMapMMFunction(_type, in2, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock()) );
+			out = in1.flatMapToPair( new RDDFlatMapMMFunction(_type, in2) );
 		else if( preservesPartitioning(mcRdd, _type) )
-			out = in1.mapPartitionsToPair(new RDDMapMMPartitionFunction(_type, in2, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock()), true);
+			out = in1.mapPartitionsToPair(new RDDMapMMPartitionFunction(_type, in2), true);
 		else
-			out = in1.mapToPair( new RDDMapMMFunction(_type, in2, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock()) );
+			out = in1.mapToPair( new RDDMapMMFunction(_type, in2) );
 		
 		//empty output block filter
 		if( !_outputEmpty )
@@ -197,13 +195,11 @@ public class MapmmSPInstruction extends BinarySPInstruction
 
 		private CacheType _type = null;
 		private AggregateBinaryOperator _op = null;
-		private Broadcast<PartitionedMatrixBlock> _pbc = null;
+		private PartitionedBroadcastMatrix _pbc = null;
 		
-		public RDDMapMMFunction( CacheType type, Broadcast<PartitionedMatrixBlock> binput, int brlen, int bclen )
+		public RDDMapMMFunction( CacheType type, PartitionedBroadcastMatrix binput )
 		{
 			_type = type;
-			
-			//partition vector for fast in memory lookup
 			_pbc = binput;
 			
 			//created operator for reuse
@@ -215,8 +211,6 @@ public class MapmmSPInstruction extends BinarySPInstruction
 		public Tuple2<MatrixIndexes, MatrixBlock> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
 			throws Exception 
 		{
-			PartitionedMatrixBlock pm = _pbc.value();
-			
 			MatrixIndexes ixIn = arg0._1();
 			MatrixBlock blkIn = arg0._2();
 
@@ -226,7 +220,7 @@ public class MapmmSPInstruction extends BinarySPInstruction
 			if( _type == CacheType.LEFT )
 			{
 				//get the right hand side matrix
-				MatrixBlock left = pm.getMatrixBlock(1, (int)ixIn.getRowIndex());
+				MatrixBlock left = _pbc.getMatrixBlock(1, (int)ixIn.getRowIndex());
 				
 				//execute matrix-vector mult
 				OperationsOnMatrixValues.performAggregateBinary( 
@@ -235,7 +229,7 @@ public class MapmmSPInstruction extends BinarySPInstruction
 			else //if( _type == CacheType.RIGHT )
 			{
 				//get the right hand side matrix
-				MatrixBlock right = pm.getMatrixBlock((int)ixIn.getColumnIndex(), 1);
+				MatrixBlock right = _pbc.getMatrixBlock((int)ixIn.getColumnIndex(), 1);
 				
 				//execute matrix-vector mult
 				OperationsOnMatrixValues.performAggregateBinary(
@@ -257,13 +251,11 @@ public class MapmmSPInstruction extends BinarySPInstruction
 	
 		private CacheType _type = null;
 		private AggregateBinaryOperator _op = null;
-		private Broadcast<PartitionedMatrixBlock> _pbc = null;
+		private PartitionedBroadcastMatrix _pbc = null;
 		
-		public RDDMapMMPartitionFunction( CacheType type, Broadcast<PartitionedMatrixBlock> binput, int brlen, int bclen )
+		public RDDMapMMPartitionFunction( CacheType type, PartitionedBroadcastMatrix binput )
 		{
 			_type = type;
-			
-			//partition vector for fast in memory lookup
 			_pbc = binput;
 			
 			//created operator for reuse
@@ -300,7 +292,7 @@ public class MapmmSPInstruction extends BinarySPInstruction
 				if( _type == CacheType.LEFT )
 				{
 					//get the right hand side matrix
-					MatrixBlock left = _pbc.value().getMatrixBlock(1, (int)ixIn.getRowIndex());
+					MatrixBlock left = _pbc.getMatrixBlock(1, (int)ixIn.getRowIndex());
 					
 					//execute index preserving matrix multiplication
 					left.aggregateBinaryOperations(left, blkIn, blkOut, _op);						
@@ -308,10 +300,10 @@ public class MapmmSPInstruction extends BinarySPInstruction
 				else //if( _type == CacheType.RIGHT )
 				{
 					//get the right hand side matrix
-					MatrixBlock right = _pbc.value().getMatrixBlock((int)ixIn.getColumnIndex(), 1);
+					MatrixBlock right = _pbc.getMatrixBlock((int)ixIn.getColumnIndex(), 1);
 
 					//execute index preserving matrix multiplication
-					blkIn.aggregateBinaryOperations(blkIn, right, blkOut, _op);					
+					blkIn.aggregateBinaryOperations(blkIn, right, blkOut, _op);	
 				}
 			
 				return new Tuple2<MatrixIndexes,MatrixBlock>(ixIn, blkOut);
@@ -329,13 +321,11 @@ public class MapmmSPInstruction extends BinarySPInstruction
 		
 		private CacheType _type = null;
 		private AggregateBinaryOperator _op = null;
-		private Broadcast<PartitionedMatrixBlock> _pbc = null;
+		private PartitionedBroadcastMatrix _pbc = null;
 		
-		public RDDFlatMapMMFunction( CacheType type, Broadcast<PartitionedMatrixBlock> binput, int brlen, int bclen )
+		public RDDFlatMapMMFunction( CacheType type, PartitionedBroadcastMatrix binput )
 		{
 			_type = type;
-			
-			//partition vector for fast in memory lookup
 			_pbc = binput;
 			
 			//created operator for reuse
@@ -348,7 +338,6 @@ public class MapmmSPInstruction extends BinarySPInstruction
 			throws Exception 
 		{
 			ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>();
-			PartitionedMatrixBlock pm = _pbc.value();
 			
 			MatrixIndexes ixIn = arg0._1();
 			MatrixBlock blkIn = arg0._2();
@@ -356,9 +345,10 @@ public class MapmmSPInstruction extends BinarySPInstruction
 			if( _type == CacheType.LEFT )
 			{
 				//for all matching left-hand-side blocks
-				for( int i=1; i<=pm.getNumRowBlocks(); i++ ) 
+				int len = _pbc.getNumRowBlocks();
+				for( int i=1; i<=len; i++ ) 
 				{
-					MatrixBlock left = pm.getMatrixBlock(i, (int)ixIn.getRowIndex());
+					MatrixBlock left = _pbc.getMatrixBlock(i, (int)ixIn.getRowIndex());
 					MatrixIndexes ixOut = new MatrixIndexes();
 					MatrixBlock blkOut = new MatrixBlock();
 					
@@ -372,10 +362,11 @@ public class MapmmSPInstruction extends BinarySPInstruction
 			else //if( _type == CacheType.RIGHT )
 			{
 				//for all matching right-hand-side blocks
-				for( int j=1; j<=pm.getNumColumnBlocks(); j++ ) 
+				int len = _pbc.getNumColumnBlocks();
+				for( int j=1; j<=len; j++ ) 
 				{
 					//get the right hand side matrix
-					MatrixBlock right = pm.getMatrixBlock((int)ixIn.getColumnIndex(), j);
+					MatrixBlock right = _pbc.getMatrixBlock((int)ixIn.getColumnIndex(), j);
 					MatrixIndexes ixOut = new MatrixIndexes();
 					MatrixBlock blkOut = new MatrixBlock();
 					

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MatrixIndexingSPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
index 8c24b9a..08b5c5e 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
 
 import scala.Tuple2;
 
@@ -36,7 +35,7 @@ import com.ibm.bi.dml.runtime.instructions.Instruction;
 import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
 import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
 import com.ibm.bi.dml.runtime.instructions.spark.data.LazyIterableIterator;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
 import com.ibm.bi.dml.runtime.instructions.spark.functions.IsBlockInRange;
 import com.ibm.bi.dml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import com.ibm.bi.dml.runtime.instructions.spark.utils.SparkUtils;
@@ -181,7 +180,7 @@ public class MatrixIndexingSPInstruction  extends UnarySPInstruction
 		else if ( opcode.equalsIgnoreCase("leftIndex") || opcode.equalsIgnoreCase("mapLeftIndex"))
 		{
 			JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
-			Broadcast<PartitionedMatrixBlock> broadcastIn2 = null;
+			PartitionedBroadcastMatrix broadcastIn2 = null;
 			JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = null;
 			JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
 			
@@ -386,13 +385,13 @@ public class MatrixIndexingSPInstruction  extends UnarySPInstruction
 	{
 		private static final long serialVersionUID = 1757075506076838258L;
 		
-		private Broadcast<PartitionedMatrixBlock> _binput;
+		private PartitionedBroadcastMatrix _binput;
 		private IndexRange _ixrange;
 		private int _brlen;
 		private int _bclen;
 		
 		
-		public LeftIndexPartitionFunction(Broadcast<PartitionedMatrixBlock> binput, IndexRange ixrange, MatrixCharacteristics mc) 
+		public LeftIndexPartitionFunction(PartitionedBroadcastMatrix binput, IndexRange ixrange, MatrixCharacteristics mc) 
 		{
 			_binput = binput;
 			_ixrange = ixrange;
@@ -437,8 +436,7 @@ public class MatrixIndexingSPInstruction  extends UnarySPInstruction
 				long rhs_cu = rhs_cl + (lhs_cu - lhs_cl);
 				
 				// Provide global zero-based index to sliceOperations
-				PartitionedMatrixBlock rhsMatBlock = _binput.getValue();
-				MatrixBlock slicedRHSMatBlock = rhsMatBlock.sliceOperations(rhs_rl, rhs_ru, rhs_cl, rhs_cu, new MatrixBlock());
+				MatrixBlock slicedRHSMatBlock = _binput.sliceOperations(rhs_rl, rhs_ru, rhs_cl, rhs_cu, new MatrixBlock());
 				
 				// Provide local zero-based index to leftIndexingOperations
 				int lhs_lrl = UtilFunctions.cellInBlockCalculation(lhs_rl, _brlen);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index 5ff00a3..439af56 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -24,7 +24,6 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
 
 import scala.Tuple2;
 
@@ -43,7 +42,7 @@ import com.ibm.bi.dml.runtime.instructions.Instruction;
 import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
 import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
 import com.ibm.bi.dml.runtime.instructions.mr.GroupedAggregateInstruction;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
 import com.ibm.bi.dml.runtime.instructions.spark.functions.ExtractGroup;
 import com.ibm.bi.dml.runtime.instructions.spark.functions.ExtractGroupNWeights;
 import com.ibm.bi.dml.runtime.instructions.spark.functions.PerformGroupByAggInCombiner;
@@ -251,7 +250,7 @@ public class ParameterizedBuiltinSPInstruction  extends ComputationSPInstruction
 			//get input rdd handle
 			JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable( rddInVar );
 			JavaPairRDD<MatrixIndexes,MatrixBlock> off;
-			Broadcast<PartitionedMatrixBlock> broadcastOff;
+			PartitionedBroadcastMatrix broadcastOff;
 			MatrixCharacteristics mcIn = sec.getMatrixCharacteristics(rddInVar);
 			boolean rows = sec.getScalarInput(params.get("margin"), ValueType.STRING, true).getStringValue().equals("rows");
 			long maxDim = sec.getScalarInput(params.get("maxdim"), ValueType.DOUBLE, false).getLongValue();
@@ -426,9 +425,9 @@ public class ParameterizedBuiltinSPInstruction  extends ComputationSPInstruction
 		private long _brlen;
 		private long _bclen;
 		
-		Broadcast<PartitionedMatrixBlock> _off = null;
+		private PartitionedBroadcastMatrix _off = null;
 				
-		public RDDRemoveEmptyFunctionInMem(boolean rmRows, long len, long brlen, long bclen,Broadcast<PartitionedMatrixBlock> off) 
+		public RDDRemoveEmptyFunctionInMem(boolean rmRows, long len, long brlen, long bclen, PartitionedBroadcastMatrix off) 
 		{
 			_rmRows = rmRows;
 			_len = len;
@@ -446,9 +445,9 @@ public class ParameterizedBuiltinSPInstruction  extends ComputationSPInstruction
 			//IndexedMatrixValue offsets = SparkUtils.toIndexedMatrixBlock(arg0._1(),arg0._2()._2());
 			IndexedMatrixValue offsets = null;
 			if(_rmRows)
-				offsets = SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.value().getMatrixBlock((int)arg0._1().getRowIndex(), 1));
+				offsets = SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.getMatrixBlock((int)arg0._1().getRowIndex(), 1));
 			else
-				offsets = SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.value().getMatrixBlock(1, (int)arg0._1().getColumnIndex()));
+				offsets = SparkUtils.toIndexedMatrixBlock(arg0._1(), _off.getMatrixBlock(1, (int)arg0._1().getColumnIndex()));
 			
 			//execute remove empty operations
 			ArrayList<IndexedMatrixValue> out = new ArrayList<IndexedMatrixValue>();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/PmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/PmmSPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/PmmSPInstruction.java
index 27a1d0a..1a6b2fb 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/PmmSPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/PmmSPInstruction.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.broadcast.Broadcast;
 
 import scala.Tuple2;
 
@@ -37,7 +36,7 @@ import com.ibm.bi.dml.runtime.functionobjects.Multiply;
 import com.ibm.bi.dml.runtime.functionobjects.Plus;
 import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
 import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
 import com.ibm.bi.dml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
 import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
@@ -106,7 +105,7 @@ public class PmmSPInstruction extends BinarySPInstruction
 		
 		//get inputs
 		JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( rddVar );
-		Broadcast<PartitionedMatrixBlock> in2 = sec.getBroadcastForVariable( bcastVar ); 
+		PartitionedBroadcastMatrix in2 = sec.getBroadcastForVariable( bcastVar ); 
 		
 		//execute pmm instruction
 		JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1
@@ -130,18 +129,15 @@ public class PmmSPInstruction extends BinarySPInstruction
 	{
 		private static final long serialVersionUID = -1696560050436469140L;
 		
-		private Broadcast<PartitionedMatrixBlock> _pmV = null;
+		private PartitionedBroadcastMatrix _pmV = null;
 		private long _rlen = -1;
 		private int _brlen = -1;
 		
-		public RDDPMMFunction( CacheType type, Broadcast<PartitionedMatrixBlock> binput, long rlen, int brlen ) 
+		public RDDPMMFunction( CacheType type, PartitionedBroadcastMatrix binput, long rlen, int brlen ) 
 			throws DMLRuntimeException, DMLUnsupportedOperationException
 		{
 			_brlen = brlen;
 			_rlen = rlen;
-			
-			//partition vector for fast in memory lookup (right now always CacheType.LEFT) 
-			//in-memory colblock partitioning (according to brlen of rdd)
 			_pmV = binput;
 		}
 		
@@ -154,7 +150,7 @@ public class PmmSPInstruction extends BinarySPInstruction
 			MatrixBlock mb2 = arg0._2();
 			
 			//get the right hand side matrix
-			MatrixBlock mb1 = _pmV.value().getMatrixBlock((int)ixIn.getRowIndex(), 1);
+			MatrixBlock mb1 = _pmV.getMatrixBlock((int)ixIn.getRowIndex(), 1);
 			
 			//compute target block indexes
 			long minPos = UtilFunctions.toLong( mb1.minNonZero() );

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/QuaternarySPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/QuaternarySPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/QuaternarySPInstruction.java
index c01af7d..e7effbb 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/QuaternarySPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/QuaternarySPInstruction.java
@@ -27,7 +27,6 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
 
 import scala.Tuple2;
 
@@ -48,7 +47,7 @@ import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
 import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
 import com.ibm.bi.dml.runtime.instructions.cp.DoubleObject;
 import com.ibm.bi.dml.runtime.instructions.spark.data.LazyIterableIterator;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
 import com.ibm.bi.dml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
 import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
@@ -174,8 +173,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
 			|| WeightedDivMM.OPCODE.equalsIgnoreCase(getOpcode()) 
 			|| WeightedCrossEntropy.OPCODE.equalsIgnoreCase(getOpcode()) ) 
 		{
-			Broadcast<PartitionedMatrixBlock> bc1 = sec.getBroadcastForVariable( input2.getName() );
-			Broadcast<PartitionedMatrixBlock> bc2 = sec.getBroadcastForVariable( input3.getName() );
+			PartitionedBroadcastMatrix bc1 = sec.getBroadcastForVariable( input2.getName() );
+			PartitionedBroadcastMatrix bc2 = sec.getBroadcastForVariable( input3.getName() );
 			
 			//partitioning-preserving mappartitions (key access required for broadcast loopkup)
 			boolean noKeyChange = (qop.wtype3 == null || qop.wtype3.isBasic()); //only wsdivmm changes keys
@@ -188,8 +187,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
 		//reduce-side operation (two/three/four rdd inputs, zero/one/two broadcasts)
 		else 
 		{
-			Broadcast<PartitionedMatrixBlock> bc1 = _cacheU ? sec.getBroadcastForVariable( input2.getName() ) : null;
-			Broadcast<PartitionedMatrixBlock> bc2 = _cacheV ? sec.getBroadcastForVariable( input3.getName() ) : null;
+			PartitionedBroadcastMatrix bc1 = _cacheU ? sec.getBroadcastForVariable( input2.getName() ) : null;
+			PartitionedBroadcastMatrix bc2 = _cacheV ? sec.getBroadcastForVariable( input3.getName() ) : null;
 			JavaPairRDD<MatrixIndexes,MatrixBlock> inU = (!_cacheU) ? sec.getBinaryBlockRDDHandleForVariable( input2.getName() ) : null;
 			JavaPairRDD<MatrixIndexes,MatrixBlock> inV = (!_cacheV) ? sec.getBinaryBlockRDDHandleForVariable( input3.getName() ) : null;
 			JavaPairRDD<MatrixIndexes,MatrixBlock> inW = (qop.wtype1!=null && qop.wtype1.hasFourInputs()) ? 
@@ -295,10 +294,10 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
 		private static final long serialVersionUID = -3175397651350954930L;
 		
 		protected QuaternaryOperator _qop = null;
-		protected Broadcast<PartitionedMatrixBlock> _pmU = null;
-		protected Broadcast<PartitionedMatrixBlock> _pmV = null;
+		protected PartitionedBroadcastMatrix _pmU = null;
+		protected PartitionedBroadcastMatrix _pmV = null;
 		
-		public RDDQuaternaryBaseFunction( QuaternaryOperator qop, Broadcast<PartitionedMatrixBlock> bcU, Broadcast<PartitionedMatrixBlock> bcV ) {
+		public RDDQuaternaryBaseFunction( QuaternaryOperator qop, PartitionedBroadcastMatrix bcU, PartitionedBroadcastMatrix bcV ) {
 			_qop = qop;		
 			_pmU = bcU;
 			_pmV = bcV;
@@ -322,7 +321,7 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
 	{
 		private static final long serialVersionUID = -8209188316939435099L;
 		
-		public RDDQuaternaryFunction1( QuaternaryOperator qop, Broadcast<PartitionedMatrixBlock> bcU, Broadcast<PartitionedMatrixBlock> bcV ) 
+		public RDDQuaternaryFunction1( QuaternaryOperator qop, PartitionedBroadcastMatrix bcU, PartitionedBroadcastMatrix bcV ) 
 			throws DMLRuntimeException, DMLUnsupportedOperationException
 		{
 			super(qop, bcU, bcV);
@@ -349,8 +348,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
 				MatrixBlock blkIn = arg._2();
 				MatrixBlock blkOut = new MatrixBlock();
 				
-				MatrixBlock mbU = _pmU.value().getMatrixBlock((int)ixIn.getRowIndex(), 1);
-				MatrixBlock mbV = _pmV.value().getMatrixBlock((int)ixIn.getColumnIndex(), 1);
+				MatrixBlock mbU = _pmU.getMatrixBlock((int)ixIn.getRowIndex(), 1);
+				MatrixBlock mbV = _pmV.getMatrixBlock((int)ixIn.getColumnIndex(), 1);
 				
 				//execute core operation
 				blkIn.quaternaryOperations(_qop, mbU, mbV, null, blkOut);
@@ -371,7 +370,7 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
 	{
 		private static final long serialVersionUID = 7493974462943080693L;
 		
-		public RDDQuaternaryFunction2( QuaternaryOperator qop, Broadcast<PartitionedMatrixBlock> bcU, Broadcast<PartitionedMatrixBlock> bcV ) 
+		public RDDQuaternaryFunction2( QuaternaryOperator qop, PartitionedBroadcastMatrix bcU, PartitionedBroadcastMatrix bcV ) 
 			throws DMLRuntimeException, DMLUnsupportedOperationException
 		{
 			super(qop, bcU, bcV);
@@ -386,8 +385,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
 			MatrixBlock blkIn2 = arg0._2()._2();
 			MatrixBlock blkOut = new MatrixBlock();
 			
-			MatrixBlock mbU = (_pmU!=null)?_pmU.value().getMatrixBlock((int)ixIn.getRowIndex(), 1) : blkIn2;
-			MatrixBlock mbV = (_pmV!=null)?_pmV.value().getMatrixBlock((int)ixIn.getColumnIndex(), 1) : blkIn2;
+			MatrixBlock mbU = (_pmU!=null)?_pmU.getMatrixBlock((int)ixIn.getRowIndex(), 1) : blkIn2;
+			MatrixBlock mbV = (_pmV!=null)?_pmV.getMatrixBlock((int)ixIn.getColumnIndex(), 1) : blkIn2;
 			MatrixBlock mbW = (_qop.wtype1!=null && _qop.wtype1.hasFourInputs()) ? blkIn2 : null;
 			
 			//execute core operation
@@ -407,7 +406,7 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
 	{
 		private static final long serialVersionUID = -2294086455843773095L;
 		
-		public RDDQuaternaryFunction3( QuaternaryOperator qop, Broadcast<PartitionedMatrixBlock> bcU, Broadcast<PartitionedMatrixBlock> bcV ) 
+		public RDDQuaternaryFunction3( QuaternaryOperator qop, PartitionedBroadcastMatrix bcU, PartitionedBroadcastMatrix bcV ) 
 			throws DMLRuntimeException, DMLUnsupportedOperationException
 		{
 			super(qop, bcU, bcV);
@@ -424,8 +423,8 @@ public class QuaternarySPInstruction extends ComputationSPInstruction
 			
 			MatrixBlock blkOut = new MatrixBlock();
 			
-			MatrixBlock mbU = (_pmU!=null)?_pmU.value().getMatrixBlock((int)ixIn.getRowIndex(), 1) : blkIn2;
-			MatrixBlock mbV = (_pmV!=null)?_pmV.value().getMatrixBlock((int)ixIn.getColumnIndex(), 1) : 
+			MatrixBlock mbU = (_pmU!=null)?_pmU.getMatrixBlock((int)ixIn.getRowIndex(), 1) : blkIn2;
+			MatrixBlock mbV = (_pmV!=null)?_pmV.getMatrixBlock((int)ixIn.getColumnIndex(), 1) : 
 				              (_pmU!=null)? blkIn2 : blkIn3;
 			MatrixBlock mbW = (_qop.wtype1!=null && _qop.wtype1.hasFourInputs())? blkIn3 : null;
 			

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/UaggOuterChainSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/UaggOuterChainSPInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/UaggOuterChainSPInstruction.java
index 9d8ffb2..90b4348 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/UaggOuterChainSPInstruction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/UaggOuterChainSPInstruction.java
@@ -41,7 +41,7 @@ import com.ibm.bi.dml.runtime.functionobjects.ReduceRow;
 import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
 import com.ibm.bi.dml.runtime.instructions.cp.CPOperand;
 import com.ibm.bi.dml.runtime.instructions.spark.data.LazyIterableIterator;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
 import com.ibm.bi.dml.runtime.instructions.spark.functions.AggregateDropCorrectionFunction;
 import com.ibm.bi.dml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
@@ -157,7 +157,7 @@ public class UaggOuterChainSPInstruction extends BinarySPInstruction
 		}
 		else
 		{
-			Broadcast<PartitionedMatrixBlock> bv = sec.getBroadcastForVariable( bcastVar ); 
+			PartitionedBroadcastMatrix bv = sec.getBroadcastForVariable( bcastVar ); 
 			
 			//partitioning-preserving map-to-pair (under constraints)
 			out = in1.mapPartitionsToPair( new RDDMapGenUAggOuterChainFunction(bv, _uaggOp, _aggOp, _bOp, mcIn), noKeyChange );	
@@ -313,7 +313,7 @@ public class UaggOuterChainSPInstruction extends BinarySPInstruction
 	{
 		private static final long serialVersionUID = 8197406787010296291L;
 
-		private Broadcast<PartitionedMatrixBlock> _pbc = null;
+		private PartitionedBroadcastMatrix _pbc = null;
 		
 		// Operators
 		private AggregateUnaryOperator _uaggOp = null;
@@ -326,10 +326,9 @@ public class UaggOuterChainSPInstruction extends BinarySPInstruction
 		private MatrixValue _tmpVal1 = null;
 		private MatrixValue _tmpVal2 = null;
 
-		public RDDMapGenUAggOuterChainFunction(Broadcast<PartitionedMatrixBlock> binput, AggregateUnaryOperator uaggOp, AggregateOperator aggOp, BinaryOperator bOp, 
+		public RDDMapGenUAggOuterChainFunction(PartitionedBroadcastMatrix binput, AggregateUnaryOperator uaggOp, AggregateOperator aggOp, BinaryOperator bOp, 
 				MatrixCharacteristics mc)
 		{
-			//partition vector for fast in memory lookup
 			_pbc = binput;
 			
 			// Operators
@@ -362,8 +361,6 @@ public class UaggOuterChainSPInstruction extends BinarySPInstruction
 			protected Tuple2<MatrixIndexes, MatrixBlock> computeNext(Tuple2<MatrixIndexes, MatrixBlock> arg)
 				throws Exception
 			{
-				PartitionedMatrixBlock pm = _pbc.value();
-				
 				MatrixIndexes in1Ix = arg._1();
 				MatrixBlock in1Val  = arg._2();
 
@@ -372,11 +369,11 @@ public class UaggOuterChainSPInstruction extends BinarySPInstruction
 				MatrixBlock corr = null;
 				
 					
-				long  in2_colBlocks = pm.getNumColumnBlocks();
+				long  in2_colBlocks = _pbc.getNumColumnBlocks();
 				
 				for(int bidx=1; bidx <= in2_colBlocks; bidx++) 
 				{
-					MatrixValue in2Val = pm.getMatrixBlock(1, bidx);
+					MatrixValue in2Val = _pbc.getMatrixBlock(1, bidx);
 					
 					//outer block operation
 					OperationsOnMatrixValues.performBinaryIgnoreIndexes(in1Val, in2Val, _tmpVal1, _bOp);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/BroadcastObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/BroadcastObject.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/BroadcastObject.java
index b46b409..a24001a 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/BroadcastObject.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/BroadcastObject.java
@@ -21,10 +21,9 @@ import org.apache.spark.broadcast.Broadcast;
 
 public class BroadcastObject extends LineageObject
 {
-
-	private Broadcast<PartitionedMatrixBlock> _bcHandle = null;
+	private PartitionedBroadcastMatrix _bcHandle = null;
 	
-	public BroadcastObject( Broadcast<PartitionedMatrixBlock> bvar, String varName )
+	public BroadcastObject( PartitionedBroadcastMatrix bvar, String varName )
 	{
 		_bcHandle = bvar;
 		_varName = varName;
@@ -34,8 +33,21 @@ public class BroadcastObject extends LineageObject
 	 * 
 	 * @return
 	 */
-	public Broadcast<PartitionedMatrixBlock> getBroadcast()
+	public PartitionedBroadcastMatrix getBroadcast()
 	{
 		return _bcHandle;
 	}
+	
+	/**
+	 * 
+	 * @return
+	 */
+	public boolean isValid() 
+	{
+		Broadcast<PartitionedMatrixBlock>[] tmp = _bcHandle.getBroadcasts();
+		for( Broadcast<PartitionedMatrixBlock> bc : tmp )
+			if( !bc.isValid() )
+				return false;		
+		return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
new file mode 100644
index 0000000..0842e9b
--- /dev/null
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedBroadcastMatrix.java
@@ -0,0 +1,121 @@
+/**
+ * (C) Copyright IBM Corp. 2010, 2015
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ */
+
+package com.ibm.bi.dml.runtime.instructions.spark.data;
+
+import java.io.Serializable;
+
+import org.apache.spark.broadcast.Broadcast;
+
+import com.ibm.bi.dml.runtime.DMLRuntimeException;
+import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
+import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
+
+/**
+ * This class is a wrapper around an array of broadcasts of partitioned matrix blocks,
+ * which is required due to 2GB limitations of Spark's broadcast handling. Without this
+ * partitioning of Broadcast<PartitionedMatrixBlock> into Broadcast<PartitionedMatrixBlock>[],
+ * we got java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE issue.
+ * Despite various jiras, this issue still showed up in Spark 1.4/1.5. 
+ * 
+ */
+public class PartitionedBroadcastMatrix implements Serializable
+{
+	private static final long serialVersionUID = 1225135967889810877L;
+
+	private static long BROADCAST_PARTSIZE = 200L*1024*1024; //200M cells ~ 1.6GB 
+	
+	private Broadcast<PartitionedMatrixBlock>[] _pbc = null;
+	
+	public PartitionedBroadcastMatrix(Broadcast<PartitionedMatrixBlock>[] broadcasts)
+	{
+		_pbc = broadcasts;
+	}
+	
+	public Broadcast<PartitionedMatrixBlock>[] getBroadcasts() {
+		return _pbc;
+	}
+	
+	/**
+	 * 
+	 * @return
+	 */
+	public int getNumRowBlocks() {
+		return _pbc[0].value().getNumRowBlocks();
+	}
+	
+	public int getNumColumnBlocks() {
+		return _pbc[0].value().getNumColumnBlocks();
+	}
+	
+	/**
+	 * 
+	 * @param rowIndex
+	 * @param colIndex
+	 * @return
+	 * @throws DMLRuntimeException 
+	 */
+	public MatrixBlock getMatrixBlock(int rowIndex, int colIndex) 
+		throws DMLRuntimeException 
+	{
+		if( _pbc.length > 1 ) { 
+			//compute partition index
+			PartitionedMatrixBlock tmp = _pbc[0].value();
+			int numPerPart = computeBlocksPerPartition(tmp.getNumRows(), tmp.getNumCols(), 
+					tmp.getNumRowsPerBlock(), tmp.getNumColumnsPerBlock());
+			int ix = (rowIndex-1)*tmp.getNumColumnBlocks()+(colIndex-1);
+			int pix = ix / numPerPart;
+			
+			//get matrix block from partition
+			return _pbc[pix].value().getMatrixBlock(rowIndex, colIndex);	
+		}
+		else { //single partition
+			return _pbc[0].value().getMatrixBlock(rowIndex, colIndex);
+		}
+		
+	}
+	
+	public MatrixBlock sliceOperations(long rl, long ru, long cl, long cu, MatrixBlock matrixBlock) 
+		throws DMLRuntimeException, DMLUnsupportedOperationException 
+	{
+		MatrixBlock ret = null;
+		
+		for( Broadcast<PartitionedMatrixBlock> bc : _pbc ) {
+			PartitionedMatrixBlock pm = bc.value();
+			MatrixBlock tmp = pm.sliceOperations(rl, ru, cl, cu, new MatrixBlock());
+			if( ret != null )
+				ret.merge(tmp, false);
+			else
+				ret = tmp;
+		}
+		
+		return ret;
+	}
+	
+	/**
+	 * 
+	 * @param rlen
+	 * @param clen
+	 * @param brlen
+	 * @param bclen
+	 * @return
+	 */
+	public static int computeBlocksPerPartition(long rlen, long clen, long brlen, long bclen) {
+		return (int) Math.floor( BROADCAST_PARTSIZE /  
+				Math.min(rlen, brlen) / Math.min(clen, bclen));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedMatrixBlock.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedMatrixBlock.java
index 805feef..07f2248 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedMatrixBlock.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/data/PartitionedMatrixBlock.java
@@ -57,6 +57,7 @@ public class PartitionedMatrixBlock implements Externalizable
 	private int _clen = -1;
 	private int _brlen = -1;
 	private int _bclen = -1;
+	private int _offset = 0;
 	
 	public PartitionedMatrixBlock() {
 		//do nothing (required for Externalizable)
@@ -92,6 +93,8 @@ public class PartitionedMatrixBlock implements Externalizable
 		catch(Exception ex) {
 			throw new RuntimeException("Failed partitioning of broadcast variable input.", ex);
 		}
+		
+		_offset = 0;
 	}
 	
 	public PartitionedMatrixBlock(int rlen, int clen, int brlen, int bclen) 
@@ -107,6 +110,22 @@ public class PartitionedMatrixBlock implements Externalizable
 		_partBlocks = new MatrixBlock[nrblks * ncblks];		
 	}
 	
+	public long getNumRows() {
+		return _rlen;
+	}
+	
+	public long getNumCols() {
+		return _clen;
+	}
+	
+	public long getNumRowsPerBlock() {
+		return _brlen;
+	}
+	
+	public long getNumColumnsPerBlock() {
+		return _bclen;
+	}
+	
 	/**
 	 * 
 	 * @return
@@ -116,14 +135,6 @@ public class PartitionedMatrixBlock implements Externalizable
 		return (int)Math.ceil((double)_rlen/_brlen);
 	}
 	
-	public long getNumRows() {
-		return _rlen;
-	}
-	
-	public long getNumCols() {
-		return _clen;
-	}
-	
 	/**
 	 * 
 	 * @return
@@ -153,7 +164,8 @@ public class PartitionedMatrixBlock implements Externalizable
 		//get the requested matrix block
 		int rix = rowIndex - 1;
 		int cix = colIndex - 1;
-		return _partBlocks[rix*ncblks + cix];
+		int ix = rix*ncblks+cix - _offset;
+		return _partBlocks[ ix ];
 	}
 	
 	/**
@@ -176,7 +188,8 @@ public class PartitionedMatrixBlock implements Externalizable
 		//get the requested matrix block
 		int rix = rowIndex - 1;
 		int cix = colIndex - 1;
-		_partBlocks[rix*ncblks + cix] = mb;
+		int ix = rix*ncblks+cix - _offset;
+		_partBlocks[ ix ] = mb;
 		
 	}
 	
@@ -186,7 +199,7 @@ public class PartitionedMatrixBlock implements Externalizable
 	 */
 	public long estimateSizeInMemory()
 	{
-		long ret = 8; //header
+		long ret = 24; //header
 		ret += 32;    //block array
 		
 		if( _partBlocks != null )
@@ -202,7 +215,7 @@ public class PartitionedMatrixBlock implements Externalizable
 	 */
 	public long estimateSizeOnDisk()
 	{
-		long ret = 8; //header
+		long ret = 24; //header
 		
 		if( _partBlocks != null )
 			for( MatrixBlock mb : _partBlocks )
@@ -211,6 +224,26 @@ public class PartitionedMatrixBlock implements Externalizable
 		return ret;
 	}
 	
+	/**
+	 * 
+	 * @param offset
+	 * @param numBlks
+	 * @return
+	 */
+	public PartitionedMatrixBlock createPartition( int offset, int numBlks )
+	{
+		PartitionedMatrixBlock ret = new PartitionedMatrixBlock();
+		ret._rlen = _rlen;
+		ret._clen = _clen;
+		ret._brlen = _brlen;
+		ret._bclen = _bclen;
+		ret._partBlocks = new MatrixBlock[numBlks];
+		ret._offset = offset;
+		
+		System.arraycopy(_partBlocks, offset, ret._partBlocks, 0, numBlks);
+		
+		return ret;
+	}
 
 	/**
 	 * Utility for slice operations over partitioned matrices, where the index range can cover
@@ -321,6 +354,8 @@ public class PartitionedMatrixBlock implements Externalizable
 		dos.writeInt(_clen);
 		dos.writeInt(_brlen);
 		dos.writeInt(_bclen);
+		dos.writeInt(_offset);
+		dos.writeInt(_partBlocks.length);
 		for( MatrixBlock mb : _partBlocks )
 			mb.write(dos);
 	}
@@ -337,11 +372,12 @@ public class PartitionedMatrixBlock implements Externalizable
 		_clen = dis.readInt();
 		_brlen = dis.readInt();
 		_bclen = dis.readInt();
-		int nrblks = getNumRowBlocks();
-		int ncblks = getNumColumnBlocks();
-		_partBlocks = new MatrixBlock[nrblks * ncblks];
+		_offset = dis.readInt();
+		
+		int len = dis.readInt();
+		_partBlocks = new MatrixBlock[len];
 		
-		for( int i=0; i<_partBlocks.length; i++ ){
+		for( int i=0; i<len; i++ ) {
 			_partBlocks[i] = new MatrixBlock();
 			_partBlocks[i].readFields(dis);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
index 28260f2..e810a84 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/MatrixVectorBinaryOpPartitionFunction.java
@@ -20,13 +20,12 @@ package com.ibm.bi.dml.runtime.instructions.spark.functions;
 import java.util.Iterator;
 
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.broadcast.Broadcast;
 
 import scala.Tuple2;
 
 import com.ibm.bi.dml.lops.BinaryM.VectorType;
 import com.ibm.bi.dml.runtime.instructions.spark.data.LazyIterableIterator;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
 import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
 import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
 import com.ibm.bi.dml.runtime.matrix.operators.BinaryOperator;
@@ -36,10 +35,10 @@ public class MatrixVectorBinaryOpPartitionFunction implements PairFlatMapFunctio
 	private static final long serialVersionUID = 9096091404578628534L;
 	
 	private BinaryOperator _op = null;
-	private Broadcast<PartitionedMatrixBlock> _pmV = null;
+	private PartitionedBroadcastMatrix _pmV = null;
 	private VectorType _vtype = null;
 	
-	public MatrixVectorBinaryOpPartitionFunction( BinaryOperator op, Broadcast<PartitionedMatrixBlock> binput, VectorType vtype ) 
+	public MatrixVectorBinaryOpPartitionFunction( BinaryOperator op, PartitionedBroadcastMatrix binput, VectorType vtype ) 
 	{
 		_op = op;
 		_pmV = binput;
@@ -75,7 +74,7 @@ public class MatrixVectorBinaryOpPartitionFunction implements PairFlatMapFunctio
 			//get the rhs block 
 			int rix= (int)((_vtype==VectorType.COL_VECTOR) ? ix.getRowIndex() : 1);
 			int cix= (int)((_vtype==VectorType.COL_VECTOR) ? 1 : ix.getColumnIndex());
-			MatrixBlock in2 = _pmV.value().getMatrixBlock(rix, cix);
+			MatrixBlock in2 = _pmV.getMatrixBlock(rix, cix);
 				
 			//execute the binary operation
 			MatrixBlock ret = (MatrixBlock) (in1.binaryOperations (_op, in2, new MatrixBlock()));

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/743f30c5/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java
index c9ec159..70b863c 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/instructions/spark/functions/OuterVectorBinaryOpFunction.java
@@ -20,24 +20,22 @@ package com.ibm.bi.dml.runtime.instructions.spark.functions;
 import java.util.Iterator;
 
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.broadcast.Broadcast;
 
 import scala.Tuple2;
 
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
+import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
 import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
 import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
 import com.ibm.bi.dml.runtime.matrix.operators.BinaryOperator;
 
 public class OuterVectorBinaryOpFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes,MatrixBlock>
 {
-	
 	private static final long serialVersionUID = 1730704346934726826L;
 	
 	private BinaryOperator _op;
-	private Broadcast<PartitionedMatrixBlock> _pmV;
+	private PartitionedBroadcastMatrix _pmV;
 	
-	public OuterVectorBinaryOpFunction( BinaryOperator op, Broadcast<PartitionedMatrixBlock> binput ) 
+	public OuterVectorBinaryOpFunction( BinaryOperator op, PartitionedBroadcastMatrix binput ) 
 	{
 		_op = op;
 		_pmV = binput;
@@ -70,7 +68,7 @@ public class OuterVectorBinaryOpFunction implements PairFlatMapFunction<Tuple2<M
 		@Override
 		public boolean hasNext() {
 			return (_currBlk != null 
-				&& _currPos <= _pmV.value().getNumColumnBlocks());
+				&& _currPos <= _pmV.getNumColumnBlocks());
 		}
 		
 		@Override
@@ -84,7 +82,7 @@ public class OuterVectorBinaryOpFunction implements PairFlatMapFunction<Tuple2<M
 				MatrixIndexes ix = _currBlk._1();
 				MatrixBlock in1 = _currBlk._2();
 				
-				MatrixBlock in2 = _pmV.value().getMatrixBlock(1, _currPos);
+				MatrixBlock in2 = _pmV.getMatrixBlock(1, _currPos);
 				MatrixBlock resultBlk = (MatrixBlock)in1.binaryOperations (_op, in2, new MatrixBlock());
 				resultBlk.examSparsity(); 
 				ret = new Tuple2<MatrixIndexes,MatrixBlock>(