You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/17 07:26:36 UTC

[5/5] incubator-systemml git commit: [SYSTEMML-928] Fix spark append compilation for frames (only m/rappend)

[SYSTEMML-928] Fix spark append compilation for frames (only m/rappend)

For the exec type spark, we support mappend, rappend, galignedappend,
and gappend for matrices but only mappend and rappend for frames. This
patch fixes the related operator selection aware of this restriction.
Furthermore, this also includes various minor runtime cleanups of append
instructions and frame converters.  


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/8d5f3cea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/8d5f3cea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/8d5f3cea

Branch: refs/heads/master
Commit: 8d5f3ceab60beb225f2f62882b772968c1ef29c5
Parents: 8631a14
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Sep 16 23:17:40 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Sep 17 00:25:28 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/BinaryOp.java    |  7 ++-
 .../instructions/SPInstructionParser.java       | 26 ++++-------
 .../spark/AppendMSPInstruction.java             | 30 +++++++++++++
 .../spark/AppendRSPInstruction.java             | 30 +++++++++++++
 .../spark/FrameAppendMSPInstruction.java        | 24 -----------
 .../spark/FrameAppendRSPInstruction.java        | 23 ----------
 .../spark/MatrixAppendMSPInstruction.java       | 24 -----------
 .../spark/MatrixAppendRSPInstruction.java       | 23 ----------
 .../spark/utils/FrameRDDConverterUtils.java     | 45 +++++++++-----------
 9 files changed, 94 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/hops/BinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/BinaryOp.java b/src/main/java/org/apache/sysml/hops/BinaryOp.java
index edc327d..ab0315b 100644
--- a/src/main/java/org/apache/sysml/hops/BinaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/BinaryOp.java
@@ -1302,12 +1302,15 @@ public class BinaryOp extends Hop
 		if( cbind && m1_dim2 >= 1 && m2_dim2 >= 0  //column dims known
 			&& m1_dim2+m2_dim2 <= m1_cpb   //output has one column block
 		  ||!cbind && m1_dim1 >= 1 && m2_dim1 >= 0 //row dims known
-			&& m1_dim1+m2_dim1 <= m1_rpb ) //output has one column block
+			&& m1_dim1+m2_dim1 <= m1_rpb   //output has one column block
+		  || dt == DataType.FRAME ) 
 		{
 			return AppendMethod.MR_RAPPEND;
 		}
 		
-		// if(mc1.getCols() % mc1.getColsPerBlock() == 0) {
+		//note: below append methods are only supported for matrix, not frame
+		
+		//special case of block-aligned append line
 		if( cbind && m1_dim2 % m1_cpb == 0 
 		   || !cbind && m1_dim1 % m1_rpb == 0 ) 
 		{

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
index c74b44e..a244288 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
@@ -34,12 +34,13 @@ import org.apache.sysml.lops.WeightedSquaredLoss;
 import org.apache.sysml.lops.WeightedSquaredLossR;
 import org.apache.sysml.lops.WeightedUnaryMM;
 import org.apache.sysml.lops.WeightedUnaryMMR;
-import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.instructions.spark.AggregateTernarySPInstruction;
 import org.apache.sysml.runtime.instructions.spark.AggregateUnarySPInstruction;
 import org.apache.sysml.runtime.instructions.spark.AppendGAlignedSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.AppendGSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.AppendMSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.AppendRSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.ArithmeticBinarySPInstruction;
 import org.apache.sysml.runtime.instructions.spark.BinUaggChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.BuiltinBinarySPInstruction;
@@ -53,13 +54,9 @@ import org.apache.sysml.runtime.instructions.spark.CovarianceSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.CpmmSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.CumulativeAggregateSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.CumulativeOffsetSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.FrameAppendMSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.FrameAppendRSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.IndexingSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MapmmChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MapmmSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.MatrixAppendMSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.MatrixAppendRSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MatrixReshapeSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MultiReturnParameterizedBuiltinSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.PMapmmSPInstruction;
@@ -81,7 +78,6 @@ import org.apache.sysml.runtime.instructions.spark.QuantileSortSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.UaggOuterChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.WriteSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.ZipmmSPInstruction;
-import org.apache.sysml.runtime.util.UtilFunctions;
 
 
 public class SPInstructionParser extends InstructionParser 
@@ -392,24 +388,18 @@ public class SPInstructionParser extends InstructionParser
 			case MatrixReshape:
 				return MatrixReshapeSPInstruction.parseInstruction(str);
 				
-			case MAppend:
-				if(UtilFunctions.getDataType(str, 1) == DataType.MATRIX)
-					return MatrixAppendMSPInstruction.parseInstruction(str);
-				else
-					return FrameAppendMSPInstruction.parseInstruction(str);
+			case MAppend: //matrix/frame
+				return AppendMSPInstruction.parseInstruction(str);
+				
+			case RAppend: //matrix/frame
+				return AppendRSPInstruction.parseInstruction(str);
 			
-			case GAppend:
+			case GAppend: 
 				return AppendGSPInstruction.parseInstruction(str);
 			
 			case GAlignedAppend:
 				return AppendGAlignedSPInstruction.parseInstruction(str);
 				
-			case RAppend:
-				if(UtilFunctions.getDataType(str, 1) == DataType.MATRIX)
-					return MatrixAppendRSPInstruction.parseInstruction(str);
-				else
-					return FrameAppendRSPInstruction.parseInstruction(str);
-				
 			case Rand:
 				return RandSPInstruction.parseInstruction(str);
 				

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
index eaf23d5..ab70af2 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
@@ -20,8 +20,12 @@
 package org.apache.sysml.runtime.instructions.spark;
 
 
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 public abstract class AppendMSPInstruction extends BinarySPInstruction
 {
@@ -36,4 +40,30 @@ public abstract class AppendMSPInstruction extends BinarySPInstruction
 		_cbind = cbind;
 	}
 
+	public static AppendMSPInstruction parseInstruction( String str ) 
+		throws DMLRuntimeException 
+	{
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		InstructionUtils.checkNumFields (parts, 5);
+		
+		String opcode = parts[0];
+		CPOperand in1 = new CPOperand(parts[1]);
+		CPOperand in2 = new CPOperand(parts[2]);
+		CPOperand offset = new CPOperand(parts[3]);
+		CPOperand out = new CPOperand(parts[4]);
+		boolean cbind = Boolean.parseBoolean(parts[5]);
+		
+		if(!opcode.equalsIgnoreCase("mappend"))
+			throw new DMLRuntimeException("Unknown opcode while parsing a AppendMSPInstruction: " + str);
+		
+		//construct matrix/frame appendm instruction
+		if( in1.getDataType().isMatrix() ) {
+			return new MatrixAppendMSPInstruction(new ReorgOperator(OffsetColumnIndex
+					.getOffsetColumnIndexFnObject(-1)), in1, in2, offset, out, cbind, opcode, str);
+		}
+		else { //frame			
+			return new FrameAppendMSPInstruction(new ReorgOperator(OffsetColumnIndex
+					.getOffsetColumnIndexFnObject(-1)), in1, in2, offset, out, cbind, opcode, str);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
index 6d3cf5e..b56b7d7 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
@@ -19,8 +19,12 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 
 public abstract class AppendRSPInstruction extends BinarySPInstruction
@@ -33,5 +37,31 @@ public abstract class AppendRSPInstruction extends BinarySPInstruction
 		_sptype = SPINSTRUCTION_TYPE.RAppend;
 		_cbind = cbind;
 	}
+
+	public static AppendRSPInstruction parseInstruction ( String str ) 
+		throws DMLRuntimeException 
+	{	
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		InstructionUtils.checkNumFields (parts, 4);
+		
+		String opcode = parts[0];
+		CPOperand in1 = new CPOperand(parts[1]);
+		CPOperand in2 = new CPOperand(parts[2]);
+		CPOperand out = new CPOperand(parts[3]);
+		boolean cbind = Boolean.parseBoolean(parts[4]);
+		
+		if(!opcode.equalsIgnoreCase("rappend"))
+			throw new DMLRuntimeException("Unknown opcode while parsing a MatrixAppendRSPInstruction: " + str);
+		
+		if( in1.getDataType().isMatrix() ) {
+			return new MatrixAppendRSPInstruction(new ReorgOperator(OffsetColumnIndex
+					.getOffsetColumnIndexFnObject(-1)), in1, in2, out, cbind, opcode, str);
+		}
+		else { //frame
+
+			return new FrameAppendRSPInstruction(new ReorgOperator(OffsetColumnIndex
+					.getOffsetColumnIndexFnObject(-1)), in1, in2, out, cbind, opcode, str);
+		}
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
index b67f364..7aad0bf 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
@@ -30,14 +30,11 @@ import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
 import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 public class FrameAppendMSPInstruction extends AppendMSPInstruction
 {
@@ -46,27 +43,6 @@ public class FrameAppendMSPInstruction extends AppendMSPInstruction
 		super(op, in1, in2, offset, out, cbind, opcode, istr);
 	}
 	
-	public static FrameAppendMSPInstruction parseInstruction ( String str ) 
-		throws DMLRuntimeException 
-	{
-		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
-		InstructionUtils.checkNumFields (parts, 5);
-		
-		String opcode = parts[0];
-		CPOperand in1 = new CPOperand(parts[1]);
-		CPOperand in2 = new CPOperand(parts[2]);
-		CPOperand offset = new CPOperand(parts[3]);
-		CPOperand out = new CPOperand(parts[4]);
-		boolean cbind = Boolean.parseBoolean(parts[5]);
-		
-		if(!opcode.equalsIgnoreCase("mappend"))
-			throw new DMLRuntimeException("Unknown opcode while parsing a FrameAppendMSPInstruction: " + str);
-		
-		return new FrameAppendMSPInstruction(
-				new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
-				in1, in2, offset, out, cbind, opcode, str);
-	}
-	
 	@Override
 	public void processInstruction(ExecutionContext ec)
 		throws DMLRuntimeException 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
index c627e40..067769d 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
@@ -29,13 +29,10 @@ import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 public class FrameAppendRSPInstruction extends AppendRSPInstruction
 {
@@ -43,26 +40,6 @@ public class FrameAppendRSPInstruction extends AppendRSPInstruction
 	{
 		super(op, in1, in2, out, cbind, opcode, istr);
 	}
-	
-	public static FrameAppendRSPInstruction parseInstruction ( String str ) 
-			throws DMLRuntimeException 
-	{	
-		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
-		InstructionUtils.checkNumFields (parts, 4);
-		
-		String opcode = parts[0];
-		CPOperand in1 = new CPOperand(parts[1]);
-		CPOperand in2 = new CPOperand(parts[2]);
-		CPOperand out = new CPOperand(parts[3]);
-		boolean cbind = Boolean.parseBoolean(parts[4]);
-		
-		if(!opcode.equalsIgnoreCase("rappend"))
-			throw new DMLRuntimeException("Unknown opcode while parsing a FrameAppendRSPInstruction: " + str);
-		
-		return new FrameAppendRSPInstruction(
-				new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
-				in1, in2, out, cbind, opcode, str);
-	}
 		
 	@Override
 	public void processInstruction(ExecutionContext ec)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
index 9e557bb..c4cd548 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
@@ -30,8 +30,6 @@ import scala.Tuple2;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
 import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
@@ -42,7 +40,6 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 public class MatrixAppendMSPInstruction extends AppendMSPInstruction
 {
@@ -51,27 +48,6 @@ public class MatrixAppendMSPInstruction extends AppendMSPInstruction
 		super(op, in1, in2, offset, out, cbind, opcode, istr);
 	}
 	
-	public static MatrixAppendMSPInstruction parseInstruction ( String str ) 
-		throws DMLRuntimeException 
-	{
-		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
-		InstructionUtils.checkNumFields (parts, 5);
-		
-		String opcode = parts[0];
-		CPOperand in1 = new CPOperand(parts[1]);
-		CPOperand in2 = new CPOperand(parts[2]);
-		CPOperand offset = new CPOperand(parts[3]);
-		CPOperand out = new CPOperand(parts[4]);
-		boolean cbind = Boolean.parseBoolean(parts[5]);
-		
-		if(!opcode.equalsIgnoreCase("mappend"))
-			throw new DMLRuntimeException("Unknown opcode while parsing a MatrixAppendMSPInstruction: " + str);
-		
-		return new MatrixAppendMSPInstruction(
-				new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
-				in1, in2, offset, out, cbind, opcode, str);
-	}
-	
 	@Override
 	public void processInstruction(ExecutionContext ec)
 		throws DMLRuntimeException 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
index 644fcd2..779e5ab 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
@@ -27,13 +27,10 @@ import scala.Tuple2;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 public class MatrixAppendRSPInstruction extends AppendRSPInstruction
 {
@@ -42,26 +39,6 @@ public class MatrixAppendRSPInstruction extends AppendRSPInstruction
 		super(op, in1, in2, out, cbind, opcode, istr);
 	}
 	
-	public static MatrixAppendRSPInstruction parseInstruction ( String str ) 
-		throws DMLRuntimeException 
-	{	
-		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
-		InstructionUtils.checkNumFields (parts, 4);
-		
-		String opcode = parts[0];
-		CPOperand in1 = new CPOperand(parts[1]);
-		CPOperand in2 = new CPOperand(parts[2]);
-		CPOperand out = new CPOperand(parts[3]);
-		boolean cbind = Boolean.parseBoolean(parts[4]);
-		
-		if(!opcode.equalsIgnoreCase("rappend"))
-			throw new DMLRuntimeException("Unknown opcode while parsing a MatrixAppendRSPInstruction: " + str);
-		
-		return new MatrixAppendRSPInstruction(
-				new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
-				in1, in2, out, cbind, opcode, str);
-	}
-	
 	@Override
 	public void processInstruction(ExecutionContext ec)
 		throws DMLRuntimeException 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 3ac1daf..351d559 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -160,7 +160,8 @@ public class FrameRDDConverterUtils
 	 * @param strict
 	 * @return
 	 */
-	public static JavaRDD<String> binaryBlockToCsv(JavaPairRDD<Long,FrameBlock> in, MatrixCharacteristics mcIn, CSVFileFormatProperties props, boolean strict)
+	public static JavaRDD<String> binaryBlockToCsv(JavaPairRDD<Long,FrameBlock> in, 
+			MatrixCharacteristics mcIn, CSVFileFormatProperties props, boolean strict)
 	{
 		JavaPairRDD<Long,FrameBlock> input = in;
 		
@@ -193,16 +194,12 @@ public class FrameRDDConverterUtils
 			JavaPairRDD<LongWritable, Text> in, MatrixCharacteristics mcOut, List<ValueType> schema ) 
 		throws DMLRuntimeException  
 	{
-		//replicate schema entry if necessary
-		List<ValueType> lschema = (schema.size()==1 && mcOut.getCols()>1) ?
-				Collections.nCopies((int)mcOut.getCols(), schema.get(0)) : schema;
-		
 		//convert input rdd to serializable long/frame block
 		JavaPairRDD<Long,Text> input = 
 				in.mapToPair(new LongWritableTextToLongTextFunction());
 		
 		//do actual conversion
-		return textCellToBinaryBlockLongIndex(sc, input, mcOut, lschema);
+		return textCellToBinaryBlockLongIndex(sc, input, mcOut, schema);
 	}
 
 	/**
@@ -215,12 +212,18 @@ public class FrameRDDConverterUtils
 	 * @throws DMLRuntimeException
 	 */
 	public static JavaPairRDD<Long, FrameBlock> textCellToBinaryBlockLongIndex(JavaSparkContext sc,
-			JavaPairRDD<Long, Text> input, MatrixCharacteristics mcOut, List<ValueType> schema ) 
+			JavaPairRDD<Long, Text> input, MatrixCharacteristics mc, List<ValueType> schema ) 
 		throws DMLRuntimeException  
 	{
+		//prepare default schema if needed
+		if( schema == null || schema.size()==1 ) {
+			schema = Collections.nCopies((int)mc.getCols(), 
+				(schema!=null) ? schema.get(0) : ValueType.STRING);
+		}
 		
  		//convert textcell rdd to binary block rdd (w/ partial blocks)
-		JavaPairRDD<Long, FrameBlock> output = input.values().mapPartitionsToPair(new TextToBinaryBlockFunction( mcOut, schema ));
+		JavaPairRDD<Long, FrameBlock> output = input.values()
+				.mapPartitionsToPair(new TextToBinaryBlockFunction( mc, schema ));
 		
 		//aggregate partial matrix blocks
 		JavaPairRDD<Long,FrameBlock> out = 
@@ -259,14 +262,9 @@ public class FrameRDDConverterUtils
 			JavaPairRDD<MatrixIndexes, MatrixBlock> input, MatrixCharacteristics mcIn)
 		throws DMLRuntimeException 
 	{
-		//Do actual conversion
-		JavaPairRDD<Long, FrameBlock> output = matrixBlockToBinaryBlockLongIndex(sc,input, mcIn);
-		
-		//convert input rdd to serializable LongWritable/frame block
-		JavaPairRDD<LongWritable,FrameBlock> out = 
-				output.mapToPair(new LongFrameToLongWritableFrameFunction());
-		
-		return out;
+		//convert and map to serializable LongWritable/frame block
+		return matrixBlockToBinaryBlockLongIndex(sc,input, mcIn)
+			.mapToPair(new LongFrameToLongWritableFrameFunction());
 	}
 	
 
@@ -285,16 +283,17 @@ public class FrameRDDConverterUtils
 		JavaPairRDD<Long, FrameBlock> out = null;
 		
 		if(mcIn.getCols() > mcIn.getColsPerBlock()) {
-			
+			//convert matrix binary block to frame binary block
 			out = input.flatMapToPair(new MatrixToBinaryBlockFunction(mcIn));
 			
 			//aggregate partial frame blocks
-			if(mcIn.getCols() > mcIn.getColsPerBlock())
-				out = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey( out );
+			out = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey( out );
 		}
-		else
+		else {
+			//convert single matrix binary block to frame binary block (w/o shuffle)
 			out = input.mapToPair(new MatrixToBinaryBlockOneColumnBlockFunction(mcIn));
-		
+		}
+			
 		return out;
 	}
 	
@@ -725,10 +724,8 @@ public class FrameRDDConverterUtils
 		private void flushBlocksToList( Long ix, FrameBlock fb, ArrayList<Tuple2<Long,FrameBlock>> ret ) 
 			throws DMLRuntimeException
 		{			
-			if( fb != null && fb.getNumRows()>0 ) {
-				fb.setSchema(_schema); //use shared schema
+			if( fb != null && fb.getNumRows()>0 )
 				ret.add(new Tuple2<Long,FrameBlock>(ix, fb));
-			}
 		}
 	}