You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ac...@apache.org on 2016/07/19 04:10:41 UTC

incubator-systemml git commit: [SYSTEMML-562] Frame Append operation

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 09477a7b0 -> c22f239e3


[SYSTEMML-562] Frame Append operation


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/c22f239e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/c22f239e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/c22f239e

Branch: refs/heads/master
Commit: c22f239e3fd3b526190812919960684bfcf1a715
Parents: 09477a7
Author: Arvind Surve <ac...@yahoo.com>
Authored: Mon Jul 18 21:10:08 2016 -0700
Committer: Arvind Surve <ac...@yahoo.com>
Committed: Mon Jul 18 21:10:09 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/BinaryOp.java    |   7 +-
 .../controlprogram/caching/CacheableData.java   |  39 +++
 .../controlprogram/caching/MatrixObject.java    |  44 ---
 .../instructions/SPInstructionParser.java       |  18 +-
 .../instructions/cp/VariableCPInstruction.java  |  18 +-
 .../spark/AppendMSPInstruction.java             | 257 +----------------
 .../spark/AppendRSPInstruction.java             |  85 +-----
 .../spark/FrameAppendMSPInstruction.java        | 157 ++++++++++
 .../spark/FrameAppendRSPInstruction.java        | 170 +++++++++++
 .../spark/MatrixAppendMSPInstruction.java       | 284 +++++++++++++++++++
 .../spark/MatrixAppendRSPInstruction.java       | 112 ++++++++
 .../sysml/runtime/matrix/data/FrameBlock.java   |   2 +
 .../sysml/runtime/util/UtilFunctions.java       |  19 ++
 .../functions/frame/FrameAppendDistTest.java    | 226 +++++++++++++++
 src/test/scripts/functions/frame/FrameAppend.R  |  33 +++
 .../scripts/functions/frame/FrameAppend.dml     |  29 ++
 16 files changed, 1108 insertions(+), 392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/hops/BinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/BinaryOp.java b/src/main/java/org/apache/sysml/hops/BinaryOp.java
index 94de0e7..65e9232 100644
--- a/src/main/java/org/apache/sysml/hops/BinaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/BinaryOp.java
@@ -1125,7 +1125,7 @@ public class BinaryOp extends Hop
 		
 		Lop offset = createOffsetLop( left, cbind ); //offset 1st input
 		AppendMethod am = optFindAppendSPMethod(left.getDim1(), left.getDim2(), right.getDim1(), right.getDim2(), 
-				right.getRowsInBlock(), right.getColsInBlock(), right.getNnz(), cbind);
+				right.getRowsInBlock(), right.getColsInBlock(), right.getNnz(), cbind, dt);
 	
 		switch( am )
 		{
@@ -1281,16 +1281,17 @@ public class BinaryOp extends Hop
 		return AppendMethod.MR_GAPPEND; 	
 	}
 	
-	private static AppendMethod optFindAppendSPMethod( long m1_dim1, long m1_dim2, long m2_dim1, long m2_dim2, long m1_rpb, long m1_cpb, long m2_nnz, boolean cbind )
+	private static AppendMethod optFindAppendSPMethod( long m1_dim1, long m1_dim2, long m2_dim1, long m2_dim2, long m1_rpb, long m1_cpb, long m2_nnz, boolean cbind, DataType dt )
 	{
 		if(FORCED_APPEND_METHOD != null) {
 			return FORCED_APPEND_METHOD;
 		}
 		
 		//check for best case (map-only w/o shuffle)		
-		if(    m2_dim1 >= 1 && m2_dim2 >= 1   //rhs dims known 				
+		if((    m2_dim1 >= 1 && m2_dim2 >= 1   //rhs dims known 				
 			&& (cbind && m2_dim2 <= m1_cpb    //rhs is smaller than column block 
 			|| !cbind && m2_dim1 <= m1_rpb) ) //rhs is smaller than row block
+			&& ((dt == DataType.MATRIX) || (dt == DataType.FRAME && cbind)))
 		{
 			if( OptimizerUtils.checkSparkBroadcastMemoryBudget(m2_dim1, m2_dim2, m1_rpb, m1_cpb, m2_nnz) ) {
 				return AppendMethod.MR_MAPPEND;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
index d60c607..2b45ddd 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
@@ -1422,4 +1422,43 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 	public static synchronized void enableCaching() {
 		_activeFlag = true;
 	}
+
+	/**
+	 * 
+	 * @param fName
+	 * @param outputFormat
+	 * @return
+	 * @throws CacheException
+	 */
+	public synchronized boolean moveData(String fName, String outputFormat) 
+		throws CacheException 
+	{	
+		boolean ret = false;
+		
+		try
+		{
+			//export or rename to target file on hdfs
+			if( (isDirty() || (!isEqualOutputFormat(outputFormat) && isEmpty(true))) ||
+				(getRDDHandle() != null && !MapReduceTool.existsFileOnHDFS(_hdfsFileName)))
+			{
+				exportData(fName, outputFormat);
+				ret = true;
+			}
+			else if( isEqualOutputFormat(outputFormat) )
+			{
+				MapReduceTool.deleteFileIfExistOnHDFS(fName);
+				MapReduceTool.deleteFileIfExistOnHDFS(fName+".mtd");
+				MapReduceTool.renameFileOnHDFS( _hdfsFileName, fName );
+				writeMetaData( fName, outputFormat, null );
+				ret = true;
+			}				
+		}
+		catch (Exception e)
+		{
+			throw new CacheException ("Move to " + fName + " failed.", e);
+		}
+		
+		return ret;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index 5d5f41f..4148545 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -282,50 +282,6 @@ public class MatrixObject extends CacheableData<MatrixBlock>
 		
 		return str.toString();
 	}
-
-	/**
-	 * 
-	 * @param fName
-	 * @param outputFormat
-	 * @return
-	 * @throws CacheException
-	 */
-	public synchronized boolean moveData(String fName, String outputFormat) 
-		throws CacheException 
-	{	
-		boolean ret = false;
-		
-		try
-		{
-			//ensure input file is persistent on hdfs (pending RDD operations), 
-			//file might have been written during export or collect via write/read
-			if( getRDDHandle() != null && !MapReduceTool.existsFileOnHDFS(_hdfsFileName) ) { 
-				writeBlobFromRDDtoHDFS(getRDDHandle(), _hdfsFileName, outputFormat);
-			}
-			
-			//export or rename to target file on hdfs
-			if( isDirty() || (!isEqualOutputFormat(outputFormat) && isEmpty(true))) 
-			{
-				exportData(fName, outputFormat);
-				ret = true;
-			}
-			else if( isEqualOutputFormat(outputFormat) )
-			{
-				MapReduceTool.deleteFileIfExistOnHDFS(fName);
-				MapReduceTool.deleteFileIfExistOnHDFS(fName+".mtd");
-				MapReduceTool.renameFileOnHDFS( _hdfsFileName, fName );
-				writeMetaData( fName, outputFormat, null );
-				ret = true;
-			}				
-		}
-		catch (Exception e)
-		{
-			throw new CacheException ("Move to " + fName + " failed.", e);
-		}
-		
-		return ret;
-	}
-	
 	
 	// *********************************************
 	// ***                                       ***

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
index e8437fb..c74b44e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
@@ -34,13 +34,12 @@ import org.apache.sysml.lops.WeightedSquaredLoss;
 import org.apache.sysml.lops.WeightedSquaredLossR;
 import org.apache.sysml.lops.WeightedUnaryMM;
 import org.apache.sysml.lops.WeightedUnaryMMR;
+import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.instructions.spark.AggregateTernarySPInstruction;
 import org.apache.sysml.runtime.instructions.spark.AggregateUnarySPInstruction;
 import org.apache.sysml.runtime.instructions.spark.AppendGAlignedSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.AppendGSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.AppendMSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.AppendRSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.ArithmeticBinarySPInstruction;
 import org.apache.sysml.runtime.instructions.spark.BinUaggChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.BuiltinBinarySPInstruction;
@@ -54,9 +53,13 @@ import org.apache.sysml.runtime.instructions.spark.CovarianceSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.CpmmSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.CumulativeAggregateSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.CumulativeOffsetSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.FrameAppendMSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.FrameAppendRSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.IndexingSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MapmmChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MapmmSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.MatrixAppendMSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.MatrixAppendRSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MatrixReshapeSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MultiReturnParameterizedBuiltinSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.PMapmmSPInstruction;
@@ -78,6 +81,7 @@ import org.apache.sysml.runtime.instructions.spark.QuantileSortSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.UaggOuterChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.WriteSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.ZipmmSPInstruction;
+import org.apache.sysml.runtime.util.UtilFunctions;
 
 
 public class SPInstructionParser extends InstructionParser 
@@ -389,7 +393,10 @@ public class SPInstructionParser extends InstructionParser
 				return MatrixReshapeSPInstruction.parseInstruction(str);
 				
 			case MAppend:
-				return AppendMSPInstruction.parseInstruction(str);
+				if(UtilFunctions.getDataType(str, 1) == DataType.MATRIX)
+					return MatrixAppendMSPInstruction.parseInstruction(str);
+				else
+					return FrameAppendMSPInstruction.parseInstruction(str);
 			
 			case GAppend:
 				return AppendGSPInstruction.parseInstruction(str);
@@ -398,7 +405,10 @@ public class SPInstructionParser extends InstructionParser
 				return AppendGAlignedSPInstruction.parseInstruction(str);
 				
 			case RAppend:
-				return AppendRSPInstruction.parseInstruction(str);
+				if(UtilFunctions.getDataType(str, 1) == DataType.MATRIX)
+					return MatrixAppendRSPInstruction.parseInstruction(str);
+				else
+					return FrameAppendRSPInstruction.parseInstruction(str);
 				
 			case Rand:
 				return RandSPInstruction.parseInstruction(str);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
index 1fae8fc..ee86b59 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
@@ -28,6 +28,7 @@ import org.apache.sysml.lops.UnaryCP;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
@@ -655,6 +656,7 @@ public class VariableCPInstruction extends CPInstruction
 	 * @param ec
 	 * @throws DMLRuntimeException
 	 */
+	@SuppressWarnings("rawtypes")
 	private void processMoveInstruction(ExecutionContext ec) throws DMLRuntimeException {
 		
 		if ( input3 == null ) {
@@ -683,16 +685,22 @@ public class VariableCPInstruction extends CPInstruction
 			if ( ec.getVariable(input1.getName()) == null ) 
 				throw new DMLRuntimeException("Unexpected error: could not find a data object for variable name:" + input1.getName() + ", while processing instruction " +this.toString());
 			
-			MatrixObject mo = (MatrixObject) ec.getVariable(input1.getName());
+			Object object = ec.getVariable(input1.getName());
+			
 			if ( input3.getName().equalsIgnoreCase("binaryblock") ) {
-				boolean success = mo.moveData(input2.getName(), input3.getName());
+				boolean success = false;
+				success = ((CacheableData)object).moveData(input2.getName(), input3.getName());
 				if (!success) {
 					throw new DMLRuntimeException("Failed to move var " + input1.getName() + " to file " + input2.getName() + ".");
 				}
 			}
-			else 
-				throw new DMLRuntimeException("Unexpected formats while copying: from blocks [" 
-							+ mo.getNumRowsPerBlock() + "," + mo.getNumColumnsPerBlock() + "] to " + input3.getName());
+			else
+				if(object instanceof MatrixObject)
+					throw new DMLRuntimeException("Unexpected formats while copying: from matrix blocks [" 
+							+ ((MatrixObject)object).getNumRowsPerBlock() + "," + ((MatrixObject)object).getNumColumnsPerBlock() + "] to " + input3.getName());
+				else if (object instanceof FrameObject)
+					throw new DMLRuntimeException("Unexpected formats while copying: from fram object [" 
+							+ ((FrameObject)object).getNumColumns() + "," + ((FrameObject)object).getNumColumns() + "] to " + input3.getName());
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
index 26be3eb..eaf23d5 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
@@ -19,35 +19,14 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
-import java.util.ArrayList;
-import java.util.Iterator;
 
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
-import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
-import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
-import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
-import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
-public class AppendMSPInstruction extends BinarySPInstruction
+public abstract class AppendMSPInstruction extends BinarySPInstruction
 {
-	private CPOperand _offset = null;
-	private boolean _cbind = true;
+	protected CPOperand _offset = null;
+	protected boolean _cbind = true;
 	
 	public AppendMSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand offset, CPOperand out, boolean cbind, String opcode, String istr)
 	{
@@ -56,235 +35,5 @@ public class AppendMSPInstruction extends BinarySPInstruction
 		_offset = offset;
 		_cbind = cbind;
 	}
-	
-	public static AppendMSPInstruction parseInstruction ( String str ) 
-		throws DMLRuntimeException 
-	{
-		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
-		InstructionUtils.checkNumFields (parts, 5);
-		
-		String opcode = parts[0];
-		CPOperand in1 = new CPOperand(parts[1]);
-		CPOperand in2 = new CPOperand(parts[2]);
-		CPOperand offset = new CPOperand(parts[3]);
-		CPOperand out = new CPOperand(parts[4]);
-		boolean cbind = Boolean.parseBoolean(parts[5]);
-		
-		if(!opcode.equalsIgnoreCase("mappend"))
-			throw new DMLRuntimeException("Unknown opcode while parsing a AppendMSPInstruction: " + str);
-		
-		return new AppendMSPInstruction(
-				new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
-				in1, in2, offset, out, cbind, opcode, str);
-	}
-	
-	@Override
-	public void processInstruction(ExecutionContext ec)
-		throws DMLRuntimeException 
-	{
-		// map-only append (rhs must be vector and fit in mapper mem)
-		SparkExecutionContext sec = (SparkExecutionContext)ec;
-		checkBinaryAppendInputCharacteristics(sec, _cbind, false, false);
-		MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(input1.getName());
-		MatrixCharacteristics mc2 = sec.getMatrixCharacteristics(input2.getName());
-		int brlen = mc1.getRowsPerBlock();
-		int bclen = mc1.getColsPerBlock();
-		
-		JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
-		PartitionedBroadcast<MatrixBlock> in2 = sec.getBroadcastForVariable( input2.getName() );
-		long off = sec.getScalarInput( _offset.getName(), _offset.getValueType(), _offset.isLiteral()).getLongValue();
-		
-		//execute map-append operations (partitioning preserving if #in-blocks = #out-blocks)
-		JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
-		if( preservesPartitioning(mc1, mc2, _cbind) ) {
-			out = in1.mapPartitionsToPair(
-					new MapSideAppendPartitionFunction(in2, _cbind, off, brlen, bclen), true);
-		}
-		else {
-			out = in1.flatMapToPair(
-					new MapSideAppendFunction(in2, _cbind, off, brlen, bclen));
-		}
-		
-		//put output RDD handle into symbol table
-		updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
-		sec.setRDDHandleForVariable(output.getName(), out);
-		sec.addLineageRDD(output.getName(), input1.getName());
-		sec.addLineageBroadcast(output.getName(), input2.getName());
-	}
-	
-	/**
-	 * 
-	 * @param mcIn1
-	 * @param mcIn2
-	 * @return
-	 */
-	private boolean preservesPartitioning( MatrixCharacteristics mcIn1, MatrixCharacteristics mcIn2, boolean cbind )
-	{
-		long ncblksIn1 = cbind ?
-				(long)Math.ceil((double)mcIn1.getCols()/mcIn1.getColsPerBlock()) : 
-				(long)Math.ceil((double)mcIn1.getRows()/mcIn1.getRowsPerBlock());
-		long ncblksOut = cbind ? 
-				(long)Math.ceil(((double)mcIn1.getCols()+mcIn2.getCols())/mcIn1.getColsPerBlock()) : 
-				(long)Math.ceil(((double)mcIn1.getRows()+mcIn2.getRows())/mcIn1.getRowsPerBlock());
-		
-		//mappend is partitioning-preserving if in-block append (e.g., common case of colvector append)
-		return (ncblksIn1 == ncblksOut);
-	}
-	
-	/**
-	 * 
-	 */
-	private static class MapSideAppendFunction implements  PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, MatrixBlock> 
-	{
-		private static final long serialVersionUID = 2738541014432173450L;
-		
-		private PartitionedBroadcast<MatrixBlock> _pm = null;
-		private boolean _cbind = true;
-		private long _offset; 
-		private int _brlen; 
-		private int _bclen;
-		private long _lastBlockColIndex;
-		
-		public MapSideAppendFunction(PartitionedBroadcast<MatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen)  
-		{
-			_pm = binput;
-			_cbind = cbind;
-			
-			_offset = offset;
-			_brlen = brlen;
-			_bclen = bclen;
-			
-			//check for boundary block
-			int blen = cbind ? bclen : brlen;
-			_lastBlockColIndex = (long)Math.ceil((double)_offset/blen);			
-		}
-		
-		@Override
-		public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> kv) 
-			throws Exception 
-		{
-			ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>();
-			
-			IndexedMatrixValue in1 = SparkUtils.toIndexedMatrixBlock(kv);
-			MatrixIndexes ix = in1.getIndexes();
-			
-			//case 1: pass through of non-boundary blocks
-			if( (_cbind?ix.getColumnIndex():ix.getRowIndex())!=_lastBlockColIndex ) 
-			{
-				ret.add( kv );
-			}
-			//case 2: pass through full input block and rhs block 
-			else if( _cbind && in1.getValue().getNumColumns() == _bclen 
-					|| !_cbind && in1.getValue().getNumRows() == _brlen) 
-			{				
-				//output lhs block
-				ret.add( kv );
-				
-				//output shallow copy of rhs block
-				if( _cbind ) {
-					ret.add( new Tuple2<MatrixIndexes, MatrixBlock>(
-							new MatrixIndexes(ix.getRowIndex(), ix.getColumnIndex()+1),
-							_pm.getBlock((int)ix.getRowIndex(), 1)) );
-				}
-				else { //rbind
-					ret.add( new Tuple2<MatrixIndexes, MatrixBlock>(
-							new MatrixIndexes(ix.getRowIndex()+1, ix.getColumnIndex()),
-							_pm.getBlock(1, (int)ix.getColumnIndex())) );	
-				}
-			}
-			//case 3: append operation on boundary block
-			else 
-			{
-				//allocate space for the output value
-				ArrayList<IndexedMatrixValue> outlist=new ArrayList<IndexedMatrixValue>(2);
-				IndexedMatrixValue first = new IndexedMatrixValue(new MatrixIndexes(ix), new MatrixBlock());
-				outlist.add(first);
-				
-				MatrixBlock value_in2 = null;
-				if( _cbind ) {
-					value_in2 = _pm.getBlock((int)ix.getRowIndex(), 1);
-					if(in1.getValue().getNumColumns()+value_in2.getNumColumns()>_bclen) {
-						IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
-						second.getIndexes().setIndexes(ix.getRowIndex(), ix.getColumnIndex()+1);
-						outlist.add(second);
-					}
-				}
-				else { //rbind
-					value_in2 = _pm.getBlock(1, (int)ix.getColumnIndex());
-					if(in1.getValue().getNumRows()+value_in2.getNumRows()>_brlen) {
-						IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
-						second.getIndexes().setIndexes(ix.getRowIndex()+1, ix.getColumnIndex());
-						outlist.add(second);
-					}
-				}
-	
-				OperationsOnMatrixValues.performAppend(in1.getValue(), value_in2, outlist, _brlen, _bclen, _cbind, true, 0);	
-				ret.addAll(SparkUtils.fromIndexedMatrixBlock(outlist));
-			}
-			
-			return ret;
-		}
-	}
-	
-	/**
-	 * 
-	 */
-	private static class MapSideAppendPartitionFunction implements  PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixBlock>>, MatrixIndexes, MatrixBlock> 
-	{
-		private static final long serialVersionUID = 5767240739761027220L;
-
-		private PartitionedBroadcast<MatrixBlock> _pm = null;
-		private boolean _cbind = true;
-		private long _lastBlockColIndex = -1;
-		
-		public MapSideAppendPartitionFunction(PartitionedBroadcast<MatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen)  
-		{
-			_pm = binput;
-			_cbind = cbind;
-			
-			//check for boundary block
-			int blen = cbind ? bclen : brlen;
-			_lastBlockColIndex = (long)Math.ceil((double)offset/blen);			
-		}
 
-		@Override
-		public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> arg0)
-			throws Exception 
-		{
-			return new MapAppendPartitionIterator(arg0);
-		}
-		
-		/**
-		 * Lazy mappend iterator to prevent materialization of entire partition output in-memory.
-		 * The implementation via mapPartitions is required to preserve partitioning information,
-		 * which is important for performance. 
-		 */
-		private class MapAppendPartitionIterator extends LazyIterableIterator<Tuple2<MatrixIndexes, MatrixBlock>>
-		{
-			public MapAppendPartitionIterator(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> in) {
-				super(in);
-			}
-
-			@Override
-			protected Tuple2<MatrixIndexes, MatrixBlock> computeNext(Tuple2<MatrixIndexes, MatrixBlock> arg)
-				throws Exception
-			{
-				MatrixIndexes ix = arg._1();
-				MatrixBlock in1 = arg._2();
-				
-				//case 1: pass through of non-boundary blocks
-				if( (_cbind?ix.getColumnIndex():ix.getRowIndex()) != _lastBlockColIndex ) {
-					return arg;
-				}
-				//case 3: append operation on boundary block
-				else {
-					int rowix = _cbind ? (int)ix.getRowIndex() : 1;
-					int colix = _cbind ? 1 : (int)ix.getColumnIndex();					
-					MatrixBlock in2 = _pm.getBlock(rowix, colix);
-					MatrixBlock out = in1.appendOperations(in2, new MatrixBlock(), _cbind);
-					return new Tuple2<MatrixIndexes,MatrixBlock>(ix, out);
-				}	
-			}			
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
index 93fc7aa..6d3cf5e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
@@ -19,25 +19,13 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
-public class AppendRSPInstruction extends BinarySPInstruction
+
+public abstract class AppendRSPInstruction extends BinarySPInstruction
 {
-	private boolean _cbind = true;
+	protected boolean _cbind = true;
 	
 	public AppendRSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, boolean cbind, String opcode, String istr)
 	{
@@ -45,72 +33,5 @@ public class AppendRSPInstruction extends BinarySPInstruction
 		_sptype = SPINSTRUCTION_TYPE.RAppend;
 		_cbind = cbind;
 	}
-	
-	public static AppendRSPInstruction parseInstruction ( String str ) 
-		throws DMLRuntimeException 
-	{	
-		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
-		InstructionUtils.checkNumFields (parts, 4);
-		
-		String opcode = parts[0];
-		CPOperand in1 = new CPOperand(parts[1]);
-		CPOperand in2 = new CPOperand(parts[2]);
-		CPOperand out = new CPOperand(parts[3]);
-		boolean cbind = Boolean.parseBoolean(parts[4]);
-		
-		if(!opcode.equalsIgnoreCase("rappend"))
-			throw new DMLRuntimeException("Unknown opcode while parsing a AppendRSPInstruction: " + str);
-		
-		return new AppendRSPInstruction(
-				new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
-				in1, in2, out, cbind, opcode, str);
-	}
-	
-	@Override
-	public void processInstruction(ExecutionContext ec)
-		throws DMLRuntimeException 
-	{
-		// reduce-only append (output must have at most one column block)
-		SparkExecutionContext sec = (SparkExecutionContext)ec;
-		checkBinaryAppendInputCharacteristics(sec, _cbind, true, false);
-		
-		JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
-		JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
-		
-		//execute reduce-append operations (partitioning preserving)
-		JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1
-				.join(in2)
-				.mapValues(new ReduceSideAppendFunction(_cbind));
-
-		//put output RDD handle into symbol table
-		updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
-		sec.setRDDHandleForVariable(output.getName(), out);
-		sec.addLineageRDD(output.getName(), input1.getName());
-		sec.addLineageRDD(output.getName(), input2.getName());		
-	}
-	
-	/**
-	 * 
-	 */
-	private static class ReduceSideAppendFunction implements Function<Tuple2<MatrixBlock, MatrixBlock>, MatrixBlock> 
-	{
-		private static final long serialVersionUID = -6763904972560309095L;
-
-		private boolean _cbind = true;
-				
-		public ReduceSideAppendFunction(boolean cbind) {
-			_cbind = cbind;
-		}
-		
-		@Override
-		public MatrixBlock call(Tuple2<MatrixBlock, MatrixBlock> arg0)
-			throws Exception 
-		{
-			MatrixBlock left = arg0._1();
-			MatrixBlock right = arg0._2();
-			
-			return left.appendOperations(right, new MatrixBlock(), _cbind);
-		}
-	}
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
new file mode 100644
index 0000000..b67f364
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark;
+
+import java.util.Iterator;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+import scala.Tuple2;
+
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+
+public class FrameAppendMSPInstruction extends AppendMSPInstruction
+{
+	public FrameAppendMSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand offset, CPOperand out, boolean cbind, String opcode, String istr)
+	{
+		super(op, in1, in2, offset, out, cbind, opcode, istr);
+	}
+	
+	public static FrameAppendMSPInstruction parseInstruction ( String str ) 
+		throws DMLRuntimeException 
+	{
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		InstructionUtils.checkNumFields (parts, 5);
+		
+		String opcode = parts[0];
+		CPOperand in1 = new CPOperand(parts[1]);
+		CPOperand in2 = new CPOperand(parts[2]);
+		CPOperand offset = new CPOperand(parts[3]);
+		CPOperand out = new CPOperand(parts[4]);
+		boolean cbind = Boolean.parseBoolean(parts[5]);
+		
+		if(!opcode.equalsIgnoreCase("mappend"))
+			throw new DMLRuntimeException("Unknown opcode while parsing a FrameAppendMSPInstruction: " + str);
+		
+		return new FrameAppendMSPInstruction(
+				new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
+				in1, in2, offset, out, cbind, opcode, str);
+	}
+	
+	@Override
+	public void processInstruction(ExecutionContext ec)
+		throws DMLRuntimeException 
+	{
+		// map-only append (rhs must be vector and fit in mapper mem)
+		SparkExecutionContext sec = (SparkExecutionContext)ec;
+		checkBinaryAppendInputCharacteristics(sec, _cbind, false, false);
+		
+		JavaPairRDD<Long,FrameBlock> in1 = sec.getFrameBinaryBlockRDDHandleForVariable( input1.getName() );
+		PartitionedBroadcast<FrameBlock> in2 = sec.getBroadcastForFrameVariable( input2.getName() );
+		
+		//execute map-append operations (partitioning preserving if keys for blocks not changing)
+		JavaPairRDD<Long,FrameBlock> out = null;
+		if( preservesPartitioning(_cbind) ) {
+			out = in1.mapPartitionsToPair(
+					new MapSideAppendPartitionFunction(in2), true);
+		}
+		else 
+			throw new DMLRuntimeException("Append type rbind not supported for frame mappend, instead use rappend");
+		
+		//put output RDD handle into symbol table
+		updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
+		sec.setRDDHandleForVariable(output.getName(), out);
+		sec.addLineageRDD(output.getName(), input1.getName());
+		sec.addLineageBroadcast(output.getName(), input2.getName());
+	}
+	
+	/** 
+	 * 
+	 * @param cbind
+	 * @return
+	 */
+	private boolean preservesPartitioning( boolean cbind )
+	{
+		//Partitions for input1 will be preserved in case of cbind, 
+		// where as in case of rbind partitions will not be preserved.
+		return cbind;
+	}
+	
+	/**
+	 * 
+	 */
+	private static class MapSideAppendPartitionFunction implements  PairFlatMapFunction<Iterator<Tuple2<Long,FrameBlock>>, Long, FrameBlock> 
+	{
+		private static final long serialVersionUID = -3997051891171313830L;
+
+		private PartitionedBroadcast<FrameBlock> _pm = null;
+		
+		public MapSideAppendPartitionFunction(PartitionedBroadcast<FrameBlock> binput)  
+		{
+			_pm = binput;
+		}
+
+		@Override
+		public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Long, FrameBlock>> arg0)
+			throws Exception 
+		{
+			return new MapAppendPartitionIterator(arg0);
+		}
+		
+		/**
+		 * Lazy mappend iterator to prevent materialization of entire partition output in-memory.
+		 * The implementation via mapPartitions is required to preserve partitioning information,
+		 * which is important for performance. 
+		 */
+		private class MapAppendPartitionIterator extends LazyIterableIterator<Tuple2<Long, FrameBlock>>
+		{
+			public MapAppendPartitionIterator(Iterator<Tuple2<Long, FrameBlock>> in) {
+				super(in);
+			}
+
+			@Override
+			protected Tuple2<Long, FrameBlock> computeNext(Tuple2<Long, FrameBlock> arg)
+				throws Exception
+			{
+				Long ix = arg._1();
+				FrameBlock in1 = arg._2();
+			
+				int rowix = (ix.intValue()-1)/OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE+1;
+				int colix = 1;
+				
+				FrameBlock in2 = _pm.getBlock(rowix, colix);
+				FrameBlock out = in1.appendOperations(in2, new FrameBlock(), true); //cbind
+				return new Tuple2<Long,FrameBlock>(ix, out);
+			}			
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
new file mode 100644
index 0000000..c627e40
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+
+import scala.Tuple2;
+
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+
+public class FrameAppendRSPInstruction extends AppendRSPInstruction
+{
+	public FrameAppendRSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, boolean cbind, String opcode, String istr)
+	{
+		super(op, in1, in2, out, cbind, opcode, istr);
+	}
+	
+	public static FrameAppendRSPInstruction parseInstruction ( String str ) 
+			throws DMLRuntimeException 
+	{	
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		InstructionUtils.checkNumFields (parts, 4);
+		
+		String opcode = parts[0];
+		CPOperand in1 = new CPOperand(parts[1]);
+		CPOperand in2 = new CPOperand(parts[2]);
+		CPOperand out = new CPOperand(parts[3]);
+		boolean cbind = Boolean.parseBoolean(parts[4]);
+		
+		if(!opcode.equalsIgnoreCase("rappend"))
+			throw new DMLRuntimeException("Unknown opcode while parsing a FrameAppendRSPInstruction: " + str);
+		
+		return new FrameAppendRSPInstruction(
+				new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
+				in1, in2, out, cbind, opcode, str);
+	}
+		
+	@Override
+	public void processInstruction(ExecutionContext ec)
+		throws DMLRuntimeException 
+	{
+		SparkExecutionContext sec = (SparkExecutionContext)ec;
+		JavaPairRDD<Long,FrameBlock> in1 = sec.getFrameBinaryBlockRDDHandleForVariable( input1.getName() );
+		JavaPairRDD<Long,FrameBlock> in2 = sec.getFrameBinaryBlockRDDHandleForVariable( input2.getName() );
+		JavaPairRDD<Long,FrameBlock> out = null;
+		long leftRows = sec.getMatrixCharacteristics(input1.getName()).getRows();
+		
+		if(_cbind) {
+			JavaPairRDD<Long,FrameBlock> in1Aligned = in1.mapToPair(new ReduceSideAppendAlignFunction(leftRows));			
+			in1Aligned = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey(in1Aligned);			
+			JavaPairRDD<Long,FrameBlock> in2Aligned = in2.mapToPair(new ReduceSideAppendAlignFunction(leftRows));
+			in2Aligned = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey(in2Aligned);			
+			
+			out = in1Aligned.join(in2Aligned).mapValues(new ReduceSideColumnsFunction(_cbind));
+		} else {	//rbind
+			JavaPairRDD<Long,FrameBlock> right = in2.mapToPair( new ReduceSideAppendRowsFunction(leftRows));
+			out = in1.union(right);
+		}
+		
+		//put output RDD handle into symbol table
+		updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
+		sec.setRDDHandleForVariable(output.getName(), out);
+		sec.addLineageRDD(output.getName(), input1.getName());
+		sec.addLineageRDD(output.getName(), input2.getName());		
+	}
+	
+	/**
+	 * 
+	 */
+	private static class ReduceSideColumnsFunction implements Function<Tuple2<FrameBlock, FrameBlock>, FrameBlock> 
+	{
+		private static final long serialVersionUID = -97824903649667646L;
+
+		private boolean _cbind = true;
+				
+		public ReduceSideColumnsFunction(boolean cbind) {
+			_cbind = cbind;
+		}
+		
+		@Override
+		public FrameBlock call(Tuple2<FrameBlock, FrameBlock> arg0)
+			throws Exception 
+		{
+			FrameBlock left = arg0._1();
+			FrameBlock right = arg0._2();
+			
+			return left.appendOperations(right, new FrameBlock(), _cbind);
+		}
+	}
+
+	/**
+	 * 
+	 */
+	private static class ReduceSideAppendRowsFunction implements PairFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock> 
+	{
+		private static final long serialVersionUID = 1723795153048336791L;
+
+		private long _offset;
+				
+		public ReduceSideAppendRowsFunction(long offset) {
+			_offset = offset;
+		}
+		
+		@Override
+		public Tuple2<Long,FrameBlock> call(Tuple2<Long, FrameBlock> arg0)
+			throws Exception 
+		{
+			return new Tuple2<Long, FrameBlock>(arg0._1()+_offset, arg0._2());
+		}
+	}
+
+	/**
+	 * 
+	 */
+	private static class ReduceSideAppendAlignFunction implements PairFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock> 
+	{
+		private static final long serialVersionUID = 5850400295183766409L;
+
+		private long _rows;
+				
+		public ReduceSideAppendAlignFunction(long rows) {
+			_rows = rows;
+		}
+		
+		@Override
+		public Tuple2<Long,FrameBlock> call(Tuple2<Long, FrameBlock> arg0)
+			throws Exception 
+		{
+			FrameBlock resultBlock = new FrameBlock(arg0._2().getSchema());
+						
+			long index = (arg0._1()/OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE)*OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE+1;
+			int maxRows = (int) (_rows - index+1 >= OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE?OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE:_rows - index+1);
+
+			resultBlock.ensureAllocatedColumns(maxRows);
+			resultBlock = resultBlock.leftIndexingOperations(arg0._2(), 0, maxRows-1, 0, arg0._2().getNumColumns()-1, new FrameBlock());
+			
+			return new Tuple2<Long, FrameBlock>(index, resultBlock);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
new file mode 100644
index 0000000..9e557bb
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+import scala.Tuple2;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
+import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+
+public class MatrixAppendMSPInstruction extends AppendMSPInstruction
+{
+	public MatrixAppendMSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand offset, CPOperand out, boolean cbind, String opcode, String istr)
+	{
+		super(op, in1, in2, offset, out, cbind, opcode, istr);
+	}
+	
+	public static MatrixAppendMSPInstruction parseInstruction ( String str ) 
+		throws DMLRuntimeException 
+	{
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		InstructionUtils.checkNumFields (parts, 5);
+		
+		String opcode = parts[0];
+		CPOperand in1 = new CPOperand(parts[1]);
+		CPOperand in2 = new CPOperand(parts[2]);
+		CPOperand offset = new CPOperand(parts[3]);
+		CPOperand out = new CPOperand(parts[4]);
+		boolean cbind = Boolean.parseBoolean(parts[5]);
+		
+		if(!opcode.equalsIgnoreCase("mappend"))
+			throw new DMLRuntimeException("Unknown opcode while parsing a MatrixAppendMSPInstruction: " + str);
+		
+		return new MatrixAppendMSPInstruction(
+				new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
+				in1, in2, offset, out, cbind, opcode, str);
+	}
+	
+	@Override
+	public void processInstruction(ExecutionContext ec)
+		throws DMLRuntimeException 
+	{
+		// map-only append (rhs must be vector and fit in mapper mem)
+		SparkExecutionContext sec = (SparkExecutionContext)ec;
+		checkBinaryAppendInputCharacteristics(sec, _cbind, false, false);
+		MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(input1.getName());
+		MatrixCharacteristics mc2 = sec.getMatrixCharacteristics(input2.getName());
+		int brlen = mc1.getRowsPerBlock();
+		int bclen = mc1.getColsPerBlock();
+		
+		JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
+		PartitionedBroadcast<MatrixBlock> in2 = sec.getBroadcastForVariable( input2.getName() );
+		long off = sec.getScalarInput( _offset.getName(), _offset.getValueType(), _offset.isLiteral()).getLongValue();
+		
+		//execute map-append operations (partitioning preserving if #in-blocks = #out-blocks)
+		JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
+		if( preservesPartitioning(mc1, mc2, _cbind) ) {
+			out = in1.mapPartitionsToPair(
+					new MapSideAppendPartitionFunction(in2, _cbind, off, brlen, bclen), true);
+		}
+		else {
+			out = in1.flatMapToPair(
+					new MapSideAppendFunction(in2, _cbind, off, brlen, bclen));
+		}
+		
+		//put output RDD handle into symbol table
+		updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
+		sec.setRDDHandleForVariable(output.getName(), out);
+		sec.addLineageRDD(output.getName(), input1.getName());
+		sec.addLineageBroadcast(output.getName(), input2.getName());
+	}
+	
+	/**
+	 * 
+	 * @param mcIn1
+	 * @param mcIn2
+	 * @return
+	 */
+	private boolean preservesPartitioning( MatrixCharacteristics mcIn1, MatrixCharacteristics mcIn2, boolean cbind )
+	{
+		long ncblksIn1 = cbind ?
+				(long)Math.ceil((double)mcIn1.getCols()/mcIn1.getColsPerBlock()) : 
+				(long)Math.ceil((double)mcIn1.getRows()/mcIn1.getRowsPerBlock());
+		long ncblksOut = cbind ? 
+				(long)Math.ceil(((double)mcIn1.getCols()+mcIn2.getCols())/mcIn1.getColsPerBlock()) : 
+				(long)Math.ceil(((double)mcIn1.getRows()+mcIn2.getRows())/mcIn1.getRowsPerBlock());
+		
+		//mappend is partitioning-preserving if in-block append (e.g., common case of colvector append)
+		return (ncblksIn1 == ncblksOut);
+	}
+	
+	/**
+	 * 
+	 */
+	private static class MapSideAppendFunction implements  PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, MatrixBlock> 
+	{
+		private static final long serialVersionUID = 2738541014432173450L;
+		
+		private PartitionedBroadcast<MatrixBlock> _pm = null;
+		private boolean _cbind = true;
+		private long _offset; 
+		private int _brlen; 
+		private int _bclen;
+		private long _lastBlockColIndex;
+		
+		public MapSideAppendFunction(PartitionedBroadcast<MatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen)  
+		{
+			_pm = binput;
+			_cbind = cbind;
+			
+			_offset = offset;
+			_brlen = brlen;
+			_bclen = bclen;
+			
+			//check for boundary block
+			int blen = cbind ? bclen : brlen;
+			_lastBlockColIndex = (long)Math.ceil((double)_offset/blen);			
+		}
+		
+		@Override
+		public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> kv) 
+			throws Exception 
+		{
+			ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>();
+			
+			IndexedMatrixValue in1 = SparkUtils.toIndexedMatrixBlock(kv);
+			MatrixIndexes ix = in1.getIndexes();
+			
+			//case 1: pass through of non-boundary blocks
+			if( (_cbind?ix.getColumnIndex():ix.getRowIndex())!=_lastBlockColIndex ) 
+			{
+				ret.add( kv );
+			}
+			//case 2: pass through full input block and rhs block 
+			else if( _cbind && in1.getValue().getNumColumns() == _bclen 
+					|| !_cbind && in1.getValue().getNumRows() == _brlen) 
+			{				
+				//output lhs block
+				ret.add( kv );
+				
+				//output shallow copy of rhs block
+				if( _cbind ) {
+					ret.add( new Tuple2<MatrixIndexes, MatrixBlock>(
+							new MatrixIndexes(ix.getRowIndex(), ix.getColumnIndex()+1),
+							_pm.getBlock((int)ix.getRowIndex(), 1)) );
+				}
+				else { //rbind
+					ret.add( new Tuple2<MatrixIndexes, MatrixBlock>(
+							new MatrixIndexes(ix.getRowIndex()+1, ix.getColumnIndex()),
+							_pm.getBlock(1, (int)ix.getColumnIndex())) );	
+				}
+			}
+			//case 3: append operation on boundary block
+			else 
+			{
+				//allocate space for the output value
+				ArrayList<IndexedMatrixValue> outlist=new ArrayList<IndexedMatrixValue>(2);
+				IndexedMatrixValue first = new IndexedMatrixValue(new MatrixIndexes(ix), new MatrixBlock());
+				outlist.add(first);
+				
+				MatrixBlock value_in2 = null;
+				if( _cbind ) {
+					value_in2 = _pm.getBlock((int)ix.getRowIndex(), 1);
+					if(in1.getValue().getNumColumns()+value_in2.getNumColumns()>_bclen) {
+						IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
+						second.getIndexes().setIndexes(ix.getRowIndex(), ix.getColumnIndex()+1);
+						outlist.add(second);
+					}
+				}
+				else { //rbind
+					value_in2 = _pm.getBlock(1, (int)ix.getColumnIndex());
+					if(in1.getValue().getNumRows()+value_in2.getNumRows()>_brlen) {
+						IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
+						second.getIndexes().setIndexes(ix.getRowIndex()+1, ix.getColumnIndex());
+						outlist.add(second);
+					}
+				}
+	
+				OperationsOnMatrixValues.performAppend(in1.getValue(), value_in2, outlist, _brlen, _bclen, _cbind, true, 0);	
+				ret.addAll(SparkUtils.fromIndexedMatrixBlock(outlist));
+			}
+			
+			return ret;
+		}
+	}
+	
+	/**
+	 * 
+	 */
+	private static class MapSideAppendPartitionFunction implements  PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixBlock>>, MatrixIndexes, MatrixBlock> 
+	{
+		private static final long serialVersionUID = 5767240739761027220L;
+
+		private PartitionedBroadcast<MatrixBlock> _pm = null;
+		private boolean _cbind = true;
+		private long _lastBlockColIndex = -1;
+		
+		public MapSideAppendPartitionFunction(PartitionedBroadcast<MatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen)  
+		{
+			_pm = binput;
+			_cbind = cbind;
+			
+			//check for boundary block
+			int blen = cbind ? bclen : brlen;
+			_lastBlockColIndex = (long)Math.ceil((double)offset/blen);			
+		}
+
+		@Override
+		public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> arg0)
+			throws Exception 
+		{
+			return new MapAppendPartitionIterator(arg0);
+		}
+		
+		/**
+		 * Lazy mappend iterator to prevent materialization of entire partition output in-memory.
+		 * The implementation via mapPartitions is required to preserve partitioning information,
+		 * which is important for performance. 
+		 */
+		private class MapAppendPartitionIterator extends LazyIterableIterator<Tuple2<MatrixIndexes, MatrixBlock>>
+		{
+			public MapAppendPartitionIterator(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> in) {
+				super(in);
+			}
+
+			@Override
+			protected Tuple2<MatrixIndexes, MatrixBlock> computeNext(Tuple2<MatrixIndexes, MatrixBlock> arg)
+				throws Exception
+			{
+				MatrixIndexes ix = arg._1();
+				MatrixBlock in1 = arg._2();
+				
+				//case 1: pass through of non-boundary blocks
+				if( (_cbind?ix.getColumnIndex():ix.getRowIndex()) != _lastBlockColIndex ) {
+					return arg;
+				}
+				//case 3: append operation on boundary block
+				else {
+					int rowix = _cbind ? (int)ix.getRowIndex() : 1;
+					int colix = _cbind ? 1 : (int)ix.getColumnIndex();					
+					MatrixBlock in2 = _pm.getBlock(rowix, colix);
+					MatrixBlock out = in1.appendOperations(in2, new MatrixBlock(), _cbind);
+					return new Tuple2<MatrixIndexes,MatrixBlock>(ix, out);
+				}	
+			}			
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
new file mode 100644
index 0000000..644fcd2
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+
+import scala.Tuple2;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+
+public class MatrixAppendRSPInstruction extends AppendRSPInstruction
+{
+	public MatrixAppendRSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, boolean cbind, String opcode, String istr)
+	{
+		super(op, in1, in2, out, cbind, opcode, istr);
+	}
+	
+	public static MatrixAppendRSPInstruction parseInstruction ( String str ) 
+		throws DMLRuntimeException 
+	{	
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		InstructionUtils.checkNumFields (parts, 4);
+		
+		String opcode = parts[0];
+		CPOperand in1 = new CPOperand(parts[1]);
+		CPOperand in2 = new CPOperand(parts[2]);
+		CPOperand out = new CPOperand(parts[3]);
+		boolean cbind = Boolean.parseBoolean(parts[4]);
+		
+		if(!opcode.equalsIgnoreCase("rappend"))
+			throw new DMLRuntimeException("Unknown opcode while parsing a MatrixAppendRSPInstruction: " + str);
+		
+		return new MatrixAppendRSPInstruction(
+				new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
+				in1, in2, out, cbind, opcode, str);
+	}
+	
+	@Override
+	public void processInstruction(ExecutionContext ec)
+		throws DMLRuntimeException 
+	{
+		// reduce-only append (output must have at most one column block)
+		SparkExecutionContext sec = (SparkExecutionContext)ec;
+		checkBinaryAppendInputCharacteristics(sec, _cbind, true, false);
+		
+		JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
+		JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
+		
+		//execute reduce-append operations (partitioning preserving)
+		JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1
+				.join(in2)
+				.mapValues(new ReduceSideAppendFunction(_cbind));
+
+		//put output RDD handle into symbol table
+		updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
+		sec.setRDDHandleForVariable(output.getName(), out);
+		sec.addLineageRDD(output.getName(), input1.getName());
+		sec.addLineageRDD(output.getName(), input2.getName());		
+	}
+	
+	/**
+	 * 
+	 */
+	private static class ReduceSideAppendFunction implements Function<Tuple2<MatrixBlock, MatrixBlock>, MatrixBlock> 
+	{
+		private static final long serialVersionUID = -6763904972560309095L;
+
+		private boolean _cbind = true;
+				
+		public ReduceSideAppendFunction(boolean cbind) {
+			_cbind = cbind;
+		}
+		
+		@Override
+		public MatrixBlock call(Tuple2<MatrixBlock, MatrixBlock> arg0)
+			throws Exception 
+		{
+			MatrixBlock left = arg0._1();
+			MatrixBlock right = arg0._2();
+			
+			return left.appendOperations(right, new MatrixBlock(), _cbind);
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 64bc6fe..a4f826c 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -777,6 +777,8 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 			ret._schema.addAll(that._schema);
 			ret._colnames = new ArrayList<String>(_colnames);
 			ret._colnames.addAll(that._colnames);
+			ret._colmeta = new ArrayList<ColumnMetadata>(_colmeta);
+			ret._colmeta.addAll(that._colmeta);
 			
 			//concatenate column data (w/ deep copy to prevent side effects)
 			for( Array tmp : _coldata )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 6bce4ff..fa17fcd 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -23,7 +23,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.NumItemsByEachReducerMetaData;
@@ -596,4 +599,20 @@ public class UtilFunctions
 		
 		return Arrays.asList(schema);
 	}
+	
+	
+	/*
+	 * This function will return datatype, if its Matrix or Frame
+	 * 
+	 *  @param	str
+	 *  		Instruction string to execute
+	 */
+	
+	public static DataType getDataType(String str, int index)
+	{
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		CPOperand in1 = new CPOperand(parts[index]);
+	
+		return in1.getDataType();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
new file mode 100644
index 0000000..20c4a27
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.frame;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.hops.BinaryOp;
+import org.apache.sysml.hops.BinaryOp.AppendMethod;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class FrameAppendDistTest extends AutomatedTestBase
+{
+	private final static String TEST_NAME = "FrameAppend";
+	private final static String TEST_DIR = "functions/frame/";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FrameAppendDistTest.class.getSimpleName() + "/";
+
+	private final static double epsilon=0.0000000001;
+	private final static int min=1;
+	private final static int max=100;
+	
+	private final static int rows1 = 1692;
+	private final static int rows2 = 1192;
+	//usecase a: inblock single
+	private final static int cols1a = 375;
+	private final static int cols2a = 92;
+	//usecase b: inblock multiple
+	private final static int cols1b = 1059;
+	//usecase c: outblock blocksize 
+	private final static int cols1d = 1460;
+	private final static int cols3d = 990;
+	
+		
+	private final static double sparsity1 = 0.5;
+	private final static double sparsity2 = 0.01;
+	
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, 
+				new String[] {"C"}));
+	}
+
+	@Test
+	public void testAppendInBlock1DenseSP() {
+		commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1a, cols2a, false, AppendMethod.MR_RAPPEND, false);
+	}   
+	
+	@Test
+	public void testAppendInBlock1SparseSP() {
+		commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1a, cols2a, true, AppendMethod.MR_RAPPEND, false);
+	}   
+	
+	@Test
+	public void testAppendInBlock1DenseRBindSP() {
+		commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows2, cols1a, cols1a, false, AppendMethod.MR_RAPPEND, true);
+	}   
+	
+	@Test
+	public void testAppendInBlock1SparseRBindSP() {
+		commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1a, cols1a, true, AppendMethod.MR_RAPPEND, true);
+	}   
+	
+	//NOTE: mappend only applied for m2_cols<=blocksize
+	@Test
+	public void testMapAppendInBlock2DenseSP() {
+		commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1b, cols2a, false, AppendMethod.MR_MAPPEND, false);
+	}
+	
+	@Test
+	public void testMapAppendInBlock2SparseSP() {
+		commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1b, cols2a, true, AppendMethod.MR_MAPPEND, false);
+	}
+	
+	@Test
+	public void testMapAppendOutBlock2DenseSP() {
+		commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1d, cols3d, false, AppendMethod.MR_MAPPEND, false);
+	}
+	
+	@Test
+	public void testMapAppendOutBlock2SparseSP() {
+		commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1d, cols3d, true, AppendMethod.MR_MAPPEND, false);
+	}
+	
+	/**
+	 * 
+	 * @param platform
+	 * @param rows
+	 * @param cols1
+	 * @param cols2
+	 * @param sparse
+	 */
+	public void commonAppendTest(RUNTIME_PLATFORM platform, int rows1, int rows2, int cols1, int cols2, boolean sparse, AppendMethod forcedAppendMethod, boolean rbind)
+	{
+		TestConfiguration config = getAndLoadTestConfiguration(TEST_NAME);
+	    
+		RUNTIME_PLATFORM prevPlfm=rtplatform;
+		
+		double sparsity = (sparse) ? sparsity2 : sparsity1; 
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		
+		try
+		{
+			if(forcedAppendMethod != null) {
+				BinaryOp.FORCED_APPEND_METHOD = forcedAppendMethod;
+			}
+			rtplatform = platform;
+			if( rtplatform == RUNTIME_PLATFORM.SPARK )
+				DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+	
+			config.addVariable("rows", rows1);
+			config.addVariable("cols", cols1);
+	          
+			/* This is for running the junit test the new way, i.e., construct the arguments directly */
+			String RI_HOME = SCRIPT_DIR + TEST_DIR;
+			fullDMLScriptName = RI_HOME + TEST_NAME + ".dml";
+			programArgs = new String[]{"-explain","-args",  input("A"), 
+					                             Long.toString(rows1), 
+					                             Long.toString(cols1),
+								                 input("B"),
+					                             Long.toString(rows2), 
+								                 Long.toString(cols2),
+		                                         output("C"),
+		                                         (rbind? "rbind": "cbind")};
+			fullRScriptName = RI_HOME + TEST_NAME + ".R";
+			rCmd = "Rscript" + " " + fullRScriptName + " " + 
+			       inputDir() + " " + expectedDir() + " " + (rbind? "rbind": "cbind");
+	
+			//initialize the frame data.
+			List<ValueType> lschemaA = Arrays.asList(genMixSchema(cols1));
+			double[][] A = getRandomMatrix(rows1, cols1, min, max, sparsity, 1111 /*\\System.currentTimeMillis()*/);
+			writeInputFrameWithMTD("A", A, true, lschemaA, OutputInfo.BinaryBlockOutputInfo);	        
+	        
+			List<ValueType> lschemaB = Arrays.asList(genMixSchema(cols2));
+			double[][] B = getRandomMatrix(rows2, cols2, min, max, sparsity, 2345 /*\\System.currentTimeMillis()*/);
+			writeInputFrameWithMTD("B", B, true, lschemaB, OutputInfo.BinaryBlockOutputInfo);	        
+	        	        
+			boolean exceptionExpected = false;
+			int expectedNumberOfJobs = -1;
+			runTest(true, exceptionExpected, null, expectedNumberOfJobs);
+			runRScript(true);
+
+			List<ValueType> lschemaAB = new ArrayList<ValueType>(lschemaA);
+			lschemaAB.addAll(lschemaB);
+			
+			for(String file: config.getOutputFiles())
+			{
+				FrameBlock frameBlock = readDMLFrameFromHDFS(file, InputInfo.BinaryBlockInputInfo);
+				MatrixCharacteristics md = new MatrixCharacteristics(frameBlock.getNumRows(), frameBlock.getNumColumns(), -1, -1);
+				FrameBlock frameRBlock = readRFrameFromHDFS(file+".csv", InputInfo.CSVInputInfo, md);
+				verifyFrameData(frameBlock, frameRBlock, (ValueType[]) lschemaAB.toArray(new ValueType[0]));
+				System.out.println("File processed is " + file);
+			}
+		}
+		catch(Exception ex) {
+			ex.printStackTrace();
+			throw new RuntimeException(ex);
+		}
+		finally
+		{
+			//reset execution platform
+			rtplatform = prevPlfm;
+			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+			BinaryOp.FORCED_APPEND_METHOD = null;
+		}
+	}
+	
+	ValueType[] genMixSchema(int cols)
+	{
+		List<ValueType> schemaMixedLargeListStr = Collections.nCopies(cols/4, ValueType.STRING);
+		List<ValueType> schemaMixedLargeListDble  = Collections.nCopies(cols/4, ValueType.DOUBLE);
+		List<ValueType> schemaMixedLargeListInt  = Collections.nCopies(cols/4, ValueType.INT);
+		List<ValueType> schemaMixedLargeListBool  = Collections.nCopies(cols-(cols/4)*3, ValueType.BOOLEAN);
+
+		final List<ValueType> schemaMixedLargeList = new ArrayList<ValueType>(schemaMixedLargeListStr);
+		schemaMixedLargeList.addAll(schemaMixedLargeListDble);
+		schemaMixedLargeList.addAll(schemaMixedLargeListInt);
+		schemaMixedLargeList.addAll(schemaMixedLargeListBool);
+		ValueType[] schemaMixedLarge = new ValueType[schemaMixedLargeList.size()];
+		schemaMixedLarge = (ValueType[]) schemaMixedLargeList.toArray(schemaMixedLarge);
+		
+		return schemaMixedLarge;
+	}
+	
+	private void verifyFrameData(FrameBlock frame1, FrameBlock frame2, ValueType[] schema) {
+		for ( int i=0; i<frame1.getNumRows(); ++i )
+			for( int j=0; j<frame1.getNumColumns(); j++ )	{
+				Object val1 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame1.get(i, j)));
+				Object val2 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame2.get(i, j)));
+				if( TestUtils.compareToR(schema[j], val1, val2, epsilon) != 0)
+					Assert.fail("The DML data for cell ("+ i + "," + j + ") is " + val1 + 
+							", not same as the R value " + val2);
+			}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/test/scripts/functions/frame/FrameAppend.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/frame/FrameAppend.R b/src/test/scripts/functions/frame/FrameAppend.R
new file mode 100644
index 0000000..f97916d
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameAppend.R
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+args <- commandArgs(TRUE)
+options(digits=22)
+library("Matrix")
+
+A=read.csv(paste(args[1], "A.csv", sep=""), header = FALSE, stringsAsFactors=FALSE)
+B=read.csv(paste(args[1], "B.csv", sep=""), header = FALSE, stringsAsFactors=FALSE)
+if(args[3] == "rbind") {
+	C=rbind(A, B)
+} else {
+	C=cbind2(A, B)
+}
+write.csv(C, paste(args[2], "C.csv", sep=""), row.names = FALSE, quote = FALSE)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/test/scripts/functions/frame/FrameAppend.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/frame/FrameAppend.dml b/src/test/scripts/functions/frame/FrameAppend.dml
new file mode 100644
index 0000000..eea118e
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameAppend.dml
@@ -0,0 +1,29 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A=read($1, data_type="frame", rows=$2, cols=$3, format="binary")
+B=read($4, data_type="frame", rows=$5, cols=$6, format="binary")
+if ($8 == "rbind") {
+	C=rbind(A, B)
+} else {
+	C=cbind(A, B)
+}
+write(C, $7, format="binary")