You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/17 07:26:32 UTC

[1/5] incubator-systemml git commit: [SYSTEMML-925] Performance frame block serialization w/ default metadata

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 4481346f9 -> 8d5f3ceab


[SYSTEMML-925] Performance frame block serialization w/ default metadata

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0e8f1e18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0e8f1e18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0e8f1e18

Branch: refs/heads/master
Commit: 0e8f1e1863aa53f77220254051b75cc8c3ce670c
Parents: 4481346
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Sep 16 23:17:47 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Sep 17 00:25:16 2016 -0700

----------------------------------------------------------------------
 .../sysml/runtime/matrix/data/FrameBlock.java   | 44 ++++++++++++++++----
 1 file changed, 35 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0e8f1e18/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 1c8ffd0..8e68014 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -305,6 +305,26 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 	
 	/**
 	 * 
+	 * @return
+	 */
+	public boolean isColNamesDefault() {
+		boolean ret = true;
+		for( int j=0; j<getNumColumns() && ret; j++ )
+			ret &= isColNameDefault(j);
+		return ret;	
+	}
+	
+	/**
+	 * 
+	 * @param i
+	 * @return
+	 */
+	public boolean isColNameDefault(int i) {
+		return _colnames.get(i).equals("C"+i);
+	}
+	
+	/**
+	 * 
 	 */
 	public void recomputeColumnCardinality() {
 		for( int j=0; j<getNumColumns(); j++ ) {
@@ -510,16 +530,21 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 	
 	@Override
 	public void write(DataOutput out) throws IOException {
-		//write header (rows, cols)
+		boolean isDefaultMeta = isColNamesDefault()
+				&& isColumnMetadataDefault();
+		//write header (rows, cols, default)
 		out.writeInt(getNumRows());
 		out.writeInt(getNumColumns());
+		out.writeBoolean(isDefaultMeta);
 		//write columns (value type, data)
 		for( int j=0; j<getNumColumns(); j++ ) {
 			out.writeByte(_schema.get(j).ordinal());
-			out.writeUTF(_colnames.get(j));
-			out.writeLong(_colmeta.get(j).getNumDistinct());
-			out.writeUTF( (_colmeta.get(j).getMvValue()!=null) ? 
-					_colmeta.get(j).getMvValue() : "" );
+			if( !isDefaultMeta ) {
+				out.writeUTF(_colnames.get(j));
+				out.writeLong(_colmeta.get(j).getNumDistinct());
+				out.writeUTF( (_colmeta.get(j).getMvValue()!=null) ? 
+						_colmeta.get(j).getMvValue() : "" );
+			}
 			_coldata.get(j).write(out);
 		}
 	}
@@ -529,15 +554,16 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		//read head (rows, cols)
 		_numRows = in.readInt();
 		int numCols = in.readInt();
+		boolean isDefaultMeta = in.readBoolean();
 		//read columns (value type, meta, data)
 		_schema.clear();
 		_colmeta.clear();
 		_coldata.clear();
 		for( int j=0; j<numCols; j++ ) {
 			ValueType vt = ValueType.values()[in.readByte()];
-			String name = in.readUTF();
-			long ndistinct = in.readLong();
-			String mvvalue = in.readUTF();
+			String name = isDefaultMeta ? createColName(j) : in.readUTF();
+			long ndistinct = isDefaultMeta ? 0 : in.readLong();
+			String mvvalue = isDefaultMeta ? null : in.readUTF();
 			Array arr = null;
 			switch( vt ) {
 				case STRING:  arr = new StringArray(new String[_numRows]); break;
@@ -550,7 +576,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 			_schema.add(vt);
 			_colnames.add(name);
 			_colmeta.add(new ColumnMetadata(ndistinct, 
-					mvvalue.isEmpty() ? null : mvvalue));
+					(mvvalue==null || mvvalue.isEmpty()) ? null : mvvalue));
 			_coldata.add(arr);
 		}
 	}


[5/5] incubator-systemml git commit: [SYSTEMML-928] Fix spark append compilation for frames (only m/rappend)

Posted by mb...@apache.org.
[SYSTEMML-928] Fix spark append compilation for frames (only m/rappend)

For the exec type spark, we support mappend, rappend, galignedappend,
and gappend for matrices but only mappend and rappend for frames. This
patch fixes the related operator selection aware of this restriction.
Furthermore, this also includes various minor runtime cleanups of append
instructions and frame converters.  


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/8d5f3cea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/8d5f3cea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/8d5f3cea

Branch: refs/heads/master
Commit: 8d5f3ceab60beb225f2f62882b772968c1ef29c5
Parents: 8631a14
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Sep 16 23:17:40 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Sep 17 00:25:28 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/BinaryOp.java    |  7 ++-
 .../instructions/SPInstructionParser.java       | 26 ++++-------
 .../spark/AppendMSPInstruction.java             | 30 +++++++++++++
 .../spark/AppendRSPInstruction.java             | 30 +++++++++++++
 .../spark/FrameAppendMSPInstruction.java        | 24 -----------
 .../spark/FrameAppendRSPInstruction.java        | 23 ----------
 .../spark/MatrixAppendMSPInstruction.java       | 24 -----------
 .../spark/MatrixAppendRSPInstruction.java       | 23 ----------
 .../spark/utils/FrameRDDConverterUtils.java     | 45 +++++++++-----------
 9 files changed, 94 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/hops/BinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/BinaryOp.java b/src/main/java/org/apache/sysml/hops/BinaryOp.java
index edc327d..ab0315b 100644
--- a/src/main/java/org/apache/sysml/hops/BinaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/BinaryOp.java
@@ -1302,12 +1302,15 @@ public class BinaryOp extends Hop
 		if( cbind && m1_dim2 >= 1 && m2_dim2 >= 0  //column dims known
 			&& m1_dim2+m2_dim2 <= m1_cpb   //output has one column block
 		  ||!cbind && m1_dim1 >= 1 && m2_dim1 >= 0 //row dims known
-			&& m1_dim1+m2_dim1 <= m1_rpb ) //output has one column block
+			&& m1_dim1+m2_dim1 <= m1_rpb   //output has one column block
+		  || dt == DataType.FRAME ) 
 		{
 			return AppendMethod.MR_RAPPEND;
 		}
 		
-		// if(mc1.getCols() % mc1.getColsPerBlock() == 0) {
+		//note: below append methods are only supported for matrix, not frame
+		
+		//special case of block-aligned append line
 		if( cbind && m1_dim2 % m1_cpb == 0 
 		   || !cbind && m1_dim1 % m1_rpb == 0 ) 
 		{

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
index c74b44e..a244288 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
@@ -34,12 +34,13 @@ import org.apache.sysml.lops.WeightedSquaredLoss;
 import org.apache.sysml.lops.WeightedSquaredLossR;
 import org.apache.sysml.lops.WeightedUnaryMM;
 import org.apache.sysml.lops.WeightedUnaryMMR;
-import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.instructions.spark.AggregateTernarySPInstruction;
 import org.apache.sysml.runtime.instructions.spark.AggregateUnarySPInstruction;
 import org.apache.sysml.runtime.instructions.spark.AppendGAlignedSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.AppendGSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.AppendMSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.AppendRSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.ArithmeticBinarySPInstruction;
 import org.apache.sysml.runtime.instructions.spark.BinUaggChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.BuiltinBinarySPInstruction;
@@ -53,13 +54,9 @@ import org.apache.sysml.runtime.instructions.spark.CovarianceSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.CpmmSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.CumulativeAggregateSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.CumulativeOffsetSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.FrameAppendMSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.FrameAppendRSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.IndexingSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MapmmChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MapmmSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.MatrixAppendMSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.MatrixAppendRSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MatrixReshapeSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MultiReturnParameterizedBuiltinSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.PMapmmSPInstruction;
@@ -81,7 +78,6 @@ import org.apache.sysml.runtime.instructions.spark.QuantileSortSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.UaggOuterChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.WriteSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.ZipmmSPInstruction;
-import org.apache.sysml.runtime.util.UtilFunctions;
 
 
 public class SPInstructionParser extends InstructionParser 
@@ -392,24 +388,18 @@ public class SPInstructionParser extends InstructionParser
 			case MatrixReshape:
 				return MatrixReshapeSPInstruction.parseInstruction(str);
 				
-			case MAppend:
-				if(UtilFunctions.getDataType(str, 1) == DataType.MATRIX)
-					return MatrixAppendMSPInstruction.parseInstruction(str);
-				else
-					return FrameAppendMSPInstruction.parseInstruction(str);
+			case MAppend: //matrix/frame
+				return AppendMSPInstruction.parseInstruction(str);
+				
+			case RAppend: //matrix/frame
+				return AppendRSPInstruction.parseInstruction(str);
 			
-			case GAppend:
+			case GAppend: 
 				return AppendGSPInstruction.parseInstruction(str);
 			
 			case GAlignedAppend:
 				return AppendGAlignedSPInstruction.parseInstruction(str);
 				
-			case RAppend:
-				if(UtilFunctions.getDataType(str, 1) == DataType.MATRIX)
-					return MatrixAppendRSPInstruction.parseInstruction(str);
-				else
-					return FrameAppendRSPInstruction.parseInstruction(str);
-				
 			case Rand:
 				return RandSPInstruction.parseInstruction(str);
 				

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
index eaf23d5..ab70af2 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
@@ -20,8 +20,12 @@
 package org.apache.sysml.runtime.instructions.spark;
 
 
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 public abstract class AppendMSPInstruction extends BinarySPInstruction
 {
@@ -36,4 +40,30 @@ public abstract class AppendMSPInstruction extends BinarySPInstruction
 		_cbind = cbind;
 	}
 
+	public static AppendMSPInstruction parseInstruction( String str ) 
+		throws DMLRuntimeException 
+	{
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		InstructionUtils.checkNumFields (parts, 5);
+		
+		String opcode = parts[0];
+		CPOperand in1 = new CPOperand(parts[1]);
+		CPOperand in2 = new CPOperand(parts[2]);
+		CPOperand offset = new CPOperand(parts[3]);
+		CPOperand out = new CPOperand(parts[4]);
+		boolean cbind = Boolean.parseBoolean(parts[5]);
+		
+		if(!opcode.equalsIgnoreCase("mappend"))
+			throw new DMLRuntimeException("Unknown opcode while parsing a AppendMSPInstruction: " + str);
+		
+		//construct matrix/frame appendm instruction
+		if( in1.getDataType().isMatrix() ) {
+			return new MatrixAppendMSPInstruction(new ReorgOperator(OffsetColumnIndex
+					.getOffsetColumnIndexFnObject(-1)), in1, in2, offset, out, cbind, opcode, str);
+		}
+		else { //frame			
+			return new FrameAppendMSPInstruction(new ReorgOperator(OffsetColumnIndex
+					.getOffsetColumnIndexFnObject(-1)), in1, in2, offset, out, cbind, opcode, str);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
index 6d3cf5e..b56b7d7 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
@@ -19,8 +19,12 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 
 public abstract class AppendRSPInstruction extends BinarySPInstruction
@@ -33,5 +37,31 @@ public abstract class AppendRSPInstruction extends BinarySPInstruction
 		_sptype = SPINSTRUCTION_TYPE.RAppend;
 		_cbind = cbind;
 	}
+
+	public static AppendRSPInstruction parseInstruction ( String str ) 
+		throws DMLRuntimeException 
+	{	
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		InstructionUtils.checkNumFields (parts, 4);
+		
+		String opcode = parts[0];
+		CPOperand in1 = new CPOperand(parts[1]);
+		CPOperand in2 = new CPOperand(parts[2]);
+		CPOperand out = new CPOperand(parts[3]);
+		boolean cbind = Boolean.parseBoolean(parts[4]);
+		
+		if(!opcode.equalsIgnoreCase("rappend"))
+			throw new DMLRuntimeException("Unknown opcode while parsing a MatrixAppendRSPInstruction: " + str);
+		
+		if( in1.getDataType().isMatrix() ) {
+			return new MatrixAppendRSPInstruction(new ReorgOperator(OffsetColumnIndex
+					.getOffsetColumnIndexFnObject(-1)), in1, in2, out, cbind, opcode, str);
+		}
+		else { //frame
+
+			return new FrameAppendRSPInstruction(new ReorgOperator(OffsetColumnIndex
+					.getOffsetColumnIndexFnObject(-1)), in1, in2, out, cbind, opcode, str);
+		}
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
index b67f364..7aad0bf 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
@@ -30,14 +30,11 @@ import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
 import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 public class FrameAppendMSPInstruction extends AppendMSPInstruction
 {
@@ -46,27 +43,6 @@ public class FrameAppendMSPInstruction extends AppendMSPInstruction
 		super(op, in1, in2, offset, out, cbind, opcode, istr);
 	}
 	
-	public static FrameAppendMSPInstruction parseInstruction ( String str ) 
-		throws DMLRuntimeException 
-	{
-		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
-		InstructionUtils.checkNumFields (parts, 5);
-		
-		String opcode = parts[0];
-		CPOperand in1 = new CPOperand(parts[1]);
-		CPOperand in2 = new CPOperand(parts[2]);
-		CPOperand offset = new CPOperand(parts[3]);
-		CPOperand out = new CPOperand(parts[4]);
-		boolean cbind = Boolean.parseBoolean(parts[5]);
-		
-		if(!opcode.equalsIgnoreCase("mappend"))
-			throw new DMLRuntimeException("Unknown opcode while parsing a FrameAppendMSPInstruction: " + str);
-		
-		return new FrameAppendMSPInstruction(
-				new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
-				in1, in2, offset, out, cbind, opcode, str);
-	}
-	
 	@Override
 	public void processInstruction(ExecutionContext ec)
 		throws DMLRuntimeException 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
index c627e40..067769d 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
@@ -29,13 +29,10 @@ import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 public class FrameAppendRSPInstruction extends AppendRSPInstruction
 {
@@ -43,26 +40,6 @@ public class FrameAppendRSPInstruction extends AppendRSPInstruction
 	{
 		super(op, in1, in2, out, cbind, opcode, istr);
 	}
-	
-	public static FrameAppendRSPInstruction parseInstruction ( String str ) 
-			throws DMLRuntimeException 
-	{	
-		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
-		InstructionUtils.checkNumFields (parts, 4);
-		
-		String opcode = parts[0];
-		CPOperand in1 = new CPOperand(parts[1]);
-		CPOperand in2 = new CPOperand(parts[2]);
-		CPOperand out = new CPOperand(parts[3]);
-		boolean cbind = Boolean.parseBoolean(parts[4]);
-		
-		if(!opcode.equalsIgnoreCase("rappend"))
-			throw new DMLRuntimeException("Unknown opcode while parsing a FrameAppendRSPInstruction: " + str);
-		
-		return new FrameAppendRSPInstruction(
-				new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
-				in1, in2, out, cbind, opcode, str);
-	}
 		
 	@Override
 	public void processInstruction(ExecutionContext ec)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
index 9e557bb..c4cd548 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
@@ -30,8 +30,6 @@ import scala.Tuple2;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
 import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
@@ -42,7 +40,6 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 public class MatrixAppendMSPInstruction extends AppendMSPInstruction
 {
@@ -51,27 +48,6 @@ public class MatrixAppendMSPInstruction extends AppendMSPInstruction
 		super(op, in1, in2, offset, out, cbind, opcode, istr);
 	}
 	
-	public static MatrixAppendMSPInstruction parseInstruction ( String str ) 
-		throws DMLRuntimeException 
-	{
-		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
-		InstructionUtils.checkNumFields (parts, 5);
-		
-		String opcode = parts[0];
-		CPOperand in1 = new CPOperand(parts[1]);
-		CPOperand in2 = new CPOperand(parts[2]);
-		CPOperand offset = new CPOperand(parts[3]);
-		CPOperand out = new CPOperand(parts[4]);
-		boolean cbind = Boolean.parseBoolean(parts[5]);
-		
-		if(!opcode.equalsIgnoreCase("mappend"))
-			throw new DMLRuntimeException("Unknown opcode while parsing a MatrixAppendMSPInstruction: " + str);
-		
-		return new MatrixAppendMSPInstruction(
-				new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
-				in1, in2, offset, out, cbind, opcode, str);
-	}
-	
 	@Override
 	public void processInstruction(ExecutionContext ec)
 		throws DMLRuntimeException 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
index 644fcd2..779e5ab 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
@@ -27,13 +27,10 @@ import scala.Tuple2;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 public class MatrixAppendRSPInstruction extends AppendRSPInstruction
 {
@@ -42,26 +39,6 @@ public class MatrixAppendRSPInstruction extends AppendRSPInstruction
 		super(op, in1, in2, out, cbind, opcode, istr);
 	}
 	
-	public static MatrixAppendRSPInstruction parseInstruction ( String str ) 
-		throws DMLRuntimeException 
-	{	
-		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
-		InstructionUtils.checkNumFields (parts, 4);
-		
-		String opcode = parts[0];
-		CPOperand in1 = new CPOperand(parts[1]);
-		CPOperand in2 = new CPOperand(parts[2]);
-		CPOperand out = new CPOperand(parts[3]);
-		boolean cbind = Boolean.parseBoolean(parts[4]);
-		
-		if(!opcode.equalsIgnoreCase("rappend"))
-			throw new DMLRuntimeException("Unknown opcode while parsing a MatrixAppendRSPInstruction: " + str);
-		
-		return new MatrixAppendRSPInstruction(
-				new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
-				in1, in2, out, cbind, opcode, str);
-	}
-	
 	@Override
 	public void processInstruction(ExecutionContext ec)
 		throws DMLRuntimeException 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 3ac1daf..351d559 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -160,7 +160,8 @@ public class FrameRDDConverterUtils
 	 * @param strict
 	 * @return
 	 */
-	public static JavaRDD<String> binaryBlockToCsv(JavaPairRDD<Long,FrameBlock> in, MatrixCharacteristics mcIn, CSVFileFormatProperties props, boolean strict)
+	public static JavaRDD<String> binaryBlockToCsv(JavaPairRDD<Long,FrameBlock> in, 
+			MatrixCharacteristics mcIn, CSVFileFormatProperties props, boolean strict)
 	{
 		JavaPairRDD<Long,FrameBlock> input = in;
 		
@@ -193,16 +194,12 @@ public class FrameRDDConverterUtils
 			JavaPairRDD<LongWritable, Text> in, MatrixCharacteristics mcOut, List<ValueType> schema ) 
 		throws DMLRuntimeException  
 	{
-		//replicate schema entry if necessary
-		List<ValueType> lschema = (schema.size()==1 && mcOut.getCols()>1) ?
-				Collections.nCopies((int)mcOut.getCols(), schema.get(0)) : schema;
-		
 		//convert input rdd to serializable long/frame block
 		JavaPairRDD<Long,Text> input = 
 				in.mapToPair(new LongWritableTextToLongTextFunction());
 		
 		//do actual conversion
-		return textCellToBinaryBlockLongIndex(sc, input, mcOut, lschema);
+		return textCellToBinaryBlockLongIndex(sc, input, mcOut, schema);
 	}
 
 	/**
@@ -215,12 +212,18 @@ public class FrameRDDConverterUtils
 	 * @throws DMLRuntimeException
 	 */
 	public static JavaPairRDD<Long, FrameBlock> textCellToBinaryBlockLongIndex(JavaSparkContext sc,
-			JavaPairRDD<Long, Text> input, MatrixCharacteristics mcOut, List<ValueType> schema ) 
+			JavaPairRDD<Long, Text> input, MatrixCharacteristics mc, List<ValueType> schema ) 
 		throws DMLRuntimeException  
 	{
+		//prepare default schema if needed
+		if( schema == null || schema.size()==1 ) {
+			schema = Collections.nCopies((int)mc.getCols(), 
+				(schema!=null) ? schema.get(0) : ValueType.STRING);
+		}
 		
  		//convert textcell rdd to binary block rdd (w/ partial blocks)
-		JavaPairRDD<Long, FrameBlock> output = input.values().mapPartitionsToPair(new TextToBinaryBlockFunction( mcOut, schema ));
+		JavaPairRDD<Long, FrameBlock> output = input.values()
+				.mapPartitionsToPair(new TextToBinaryBlockFunction( mc, schema ));
 		
 		//aggregate partial matrix blocks
 		JavaPairRDD<Long,FrameBlock> out = 
@@ -259,14 +262,9 @@ public class FrameRDDConverterUtils
 			JavaPairRDD<MatrixIndexes, MatrixBlock> input, MatrixCharacteristics mcIn)
 		throws DMLRuntimeException 
 	{
-		//Do actual conversion
-		JavaPairRDD<Long, FrameBlock> output = matrixBlockToBinaryBlockLongIndex(sc,input, mcIn);
-		
-		//convert input rdd to serializable LongWritable/frame block
-		JavaPairRDD<LongWritable,FrameBlock> out = 
-				output.mapToPair(new LongFrameToLongWritableFrameFunction());
-		
-		return out;
+		//convert and map to serializable LongWritable/frame block
+		return matrixBlockToBinaryBlockLongIndex(sc,input, mcIn)
+			.mapToPair(new LongFrameToLongWritableFrameFunction());
 	}
 	
 
@@ -285,16 +283,17 @@ public class FrameRDDConverterUtils
 		JavaPairRDD<Long, FrameBlock> out = null;
 		
 		if(mcIn.getCols() > mcIn.getColsPerBlock()) {
-			
+			//convert matrix binary block to frame binary block
 			out = input.flatMapToPair(new MatrixToBinaryBlockFunction(mcIn));
 			
 			//aggregate partial frame blocks
-			if(mcIn.getCols() > mcIn.getColsPerBlock())
-				out = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey( out );
+			out = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey( out );
 		}
-		else
+		else {
+			//convert single matrix binary block to frame binary block (w/o shuffle)
 			out = input.mapToPair(new MatrixToBinaryBlockOneColumnBlockFunction(mcIn));
-		
+		}
+			
 		return out;
 	}
 	
@@ -725,10 +724,8 @@ public class FrameRDDConverterUtils
 		private void flushBlocksToList( Long ix, FrameBlock fb, ArrayList<Tuple2<Long,FrameBlock>> ret ) 
 			throws DMLRuntimeException
 		{			
-			if( fb != null && fb.getNumRows()>0 ) {
-				fb.setSchema(_schema); //use shared schema
+			if( fb != null && fb.getNumRows()>0 )
 				ret.add(new Tuple2<Long,FrameBlock>(ix, fb));
-			}
 		}
 	}
 	


[4/5] incubator-systemml git commit: [SYSTEMML-925] Performance csv-to-binary frame conversion (schema, alloc)

Posted by mb...@apache.org.
[SYSTEMML-925] Performance csv-to-binary frame conversion (schema,alloc)

This patch makes the following performance improvements to csv-to-binary
frame conversion:

(1) Avoid unnecessary scan in case of known dimensions but unknown nnz,
as the nnz are irrelevant for frames.

(2) Handling of schema information, which has huge performance impact
because frame blocks can be represented in native data types instead of
bloated string objects.

(3) Avoid unnecessary memory allocation to reduce garbage collection
overheads via (1) pre-allocation of frame block column arrays, (2) token
array reuse on string splitting, and (3) shared schema reference for
better memory efficiency of cached outputs.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/8631a149
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/8631a149
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/8631a149

Branch: refs/heads/master
Commit: 8631a149dedf2723322a8c65a07d4d31d089bd24
Parents: 81d2b64
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Sep 17 07:12:20 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Sep 17 00:25:25 2016 -0700

----------------------------------------------------------------------
 .../api/mlcontext/MLContextConversionUtil.java  |   3 +-
 .../spark/CSVReblockSPInstruction.java          |  12 +-
 .../spark/utils/FrameRDDConverterUtils.java     | 121 ++++++++++---------
 .../sysml/runtime/io/IOUtilFunctions.java       |  46 +++++++
 .../sysml/runtime/matrix/data/FrameBlock.java   |  38 ++++--
 .../matrix/mapred/FrameReblockBuffer.java       |   2 +-
 .../functions/frame/FrameConverterTest.java     |   2 +-
 7 files changed, 151 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8631a149/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
index e74dc53..1adc089 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
@@ -601,7 +601,8 @@ public class MLContextConversionUtil {
 				new MatrixFormatMetaData(mc, OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo));
 		JavaPairRDD<Long, FrameBlock> rdd;
 		try {
-			rdd = FrameRDDConverterUtils.csvToBinaryBlock(jsc, javaPairRDDText, mc, false, ",", false, -1);
+			rdd = FrameRDDConverterUtils.csvToBinaryBlock(jsc, javaPairRDDText, mc, 
+					frameObject.getSchema(), false, ",", false, -1);
 		} catch (DMLRuntimeException e) {
 			e.printStackTrace();
 			return null;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8631a149/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
index 44a076d..e11f505 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
@@ -19,13 +19,17 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
+import java.util.List;
+
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.sysml.hops.recompile.Recompiler;
 import org.apache.sysml.parser.Expression.DataType;
+import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
@@ -123,8 +127,8 @@ public class CSVReblockSPInstruction extends UnarySPInstruction
 		if( input1.getDataType() == DataType.MATRIX )
 			out = processMatrixCSVReblockInstruction(sec, mcOut);
 		else if( input1.getDataType() == DataType.FRAME )
-			out = processFrameCSVReblockInstruction(sec, mcOut);
-			
+			out = processFrameCSVReblockInstruction(sec, mcOut, ((FrameObject)obj).getSchema());
+		
 		// put output RDD handle into symbol table
 		sec.setRDDHandleForVariable(output.getName(), out);
 		sec.addLineageRDD(output.getName(), input1.getName());
@@ -159,7 +163,7 @@ public class CSVReblockSPInstruction extends UnarySPInstruction
 	 * @throws DMLRuntimeException
 	 */
 	@SuppressWarnings("unchecked")
-	protected JavaPairRDD<Long,FrameBlock> processFrameCSVReblockInstruction(SparkExecutionContext sec, MatrixCharacteristics mcOut) 
+	protected JavaPairRDD<Long,FrameBlock> processFrameCSVReblockInstruction(SparkExecutionContext sec, MatrixCharacteristics mcOut, List<ValueType> schema) 
 		throws DMLRuntimeException
 	{
 		//get input rdd (needs to be longwritable/text for consistency with meta data, in case of
@@ -169,6 +173,6 @@ public class CSVReblockSPInstruction extends UnarySPInstruction
 		
 		//reblock csv to binary block
 		return FrameRDDConverterUtils.csvToBinaryBlock(sec.getSparkContext(), 
-				in, mcOut, _hasHeader, _delim, _fill, _fillValue);
+				in, mcOut, schema, _hasHeader, _delim, _fill, _fillValue);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8631a149/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index b541242..3ac1daf 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -82,22 +82,23 @@ public class FrameRDDConverterUtils
 	 * 
 	 * @param sc
 	 * @param input
-	 * @param mcOut
+	 * @param mc
+	 * @param schema
 	 * @param hasHeader
 	 * @param delim
 	 * @param fill
-	 * @param missingValue
+	 * @param fillValue
 	 * @return
 	 * @throws DMLRuntimeException
 	 */
 	public static JavaPairRDD<Long, FrameBlock> csvToBinaryBlock(JavaSparkContext sc,
-			JavaPairRDD<LongWritable, Text> input, MatrixCharacteristics mcOut, 
+			JavaPairRDD<LongWritable, Text> input, MatrixCharacteristics mc, List<ValueType> schema,
 			boolean hasHeader, String delim, boolean fill, double fillValue) 
 		throws DMLRuntimeException 
 	{
 		//determine unknown dimensions and sparsity if required
-		if( !mcOut.dimsKnown(true) ) {
-			JavaRDD<String> tmp = input.values()
+		if( !mc.dimsKnown() ) { //nnz irrelevant here
+ 			JavaRDD<String> tmp = input.values()
 					.map(new TextToStringFunction());
 			String tmpStr = tmp.first();
 			boolean metaHeader = tmpStr.startsWith(TfUtils.TXMTD_MVPREFIX) 
@@ -105,24 +106,32 @@ public class FrameRDDConverterUtils
 			tmpStr = (metaHeader) ? tmpStr.substring(tmpStr.indexOf(delim)+1) : tmpStr;
 			long rlen = tmp.count() - (hasHeader ? 1 : 0) - (metaHeader ? 2 : 0);
 			long clen = IOUtilFunctions.splitCSV(tmpStr, delim).length;
-			mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1);
+			mc.set(rlen, clen, mc.getRowsPerBlock(), mc.getColsPerBlock(), -1);
 		}
 		
 		//prepare csv w/ row indexes (sorted by filenames)
 		JavaPairRDD<Text,Long> prepinput = input.values()
 				.zipWithIndex(); //zip row index
 		
+		//prepare default schema if needed
+		if( schema == null || schema.size()==1 ) {
+			schema = Collections.nCopies((int)mc.getCols(), 
+				(schema!=null) ? schema.get(0) : ValueType.STRING);
+		}
+			
 		//convert csv rdd to binary block rdd (w/ partial blocks)
-		JavaPairRDD<Long, FrameBlock> out = prepinput
-				.mapPartitionsToPair(new CSVToBinaryBlockFunction(mcOut, hasHeader, delim, fill));
+		JavaPairRDD<Long, FrameBlock> out = prepinput.mapPartitionsToPair(
+				new CSVToBinaryBlockFunction(mc, schema, hasHeader, delim));
 		
 		return out;
 	}
 	
 	/**
-	 * @param sc 
+	 * 
+	 * @param sc
 	 * @param input
 	 * @param mcOut
+	 * @param schema
 	 * @param hasHeader
 	 * @param delim
 	 * @param fill
@@ -131,7 +140,7 @@ public class FrameRDDConverterUtils
 	 * @throws DMLRuntimeException
 	 */
 	public static JavaPairRDD<Long, FrameBlock> csvToBinaryBlock(JavaSparkContext sc,
-			JavaRDD<String> input, MatrixCharacteristics mcOut, 
+			JavaRDD<String> input, MatrixCharacteristics mcOut, List<ValueType> schema,
 			boolean hasHeader, String delim, boolean fill, double fillValue) 
 		throws DMLRuntimeException 
 	{
@@ -140,7 +149,7 @@ public class FrameRDDConverterUtils
 				input.mapToPair(new StringToSerTextFunction());
 		
 		//convert to binary block
-		return csvToBinaryBlock(sc, prepinput, mcOut, hasHeader, delim, fill, fillValue);
+		return csvToBinaryBlock(sc, prepinput, mcOut, schema, hasHeader, delim, fill, fillValue);
 	}
 	
 	/**
@@ -620,17 +629,17 @@ public class FrameRDDConverterUtils
 		private long _clen = -1;
 		private boolean _hasHeader = false;
 		private String _delim = null;
-		private boolean _fill = false;
 		private int _maxRowsPerBlock = -1; 
+		private List<ValueType> _schema = null;
 		private List<String> _colnames = null;
 		private List<String> _mvMeta = null; //missing value meta data
 		private List<String> _ndMeta = null; //num distinct meta data
 		
-		public CSVToBinaryBlockFunction(MatrixCharacteristics mc, boolean hasHeader, String delim, boolean fill) {
+		public CSVToBinaryBlockFunction(MatrixCharacteristics mc, List<ValueType> schema, boolean hasHeader, String delim) {
 			_clen = mc.getCols();
+			_schema = schema;
 			_hasHeader = hasHeader;
 			_delim = delim;
-			_fill = fill;
 			_maxRowsPerBlock = Math.max((int) (FrameBlock.BUFFER_SIZE/_clen), 1);
 		}
 
@@ -640,9 +649,9 @@ public class FrameRDDConverterUtils
 		{
 			ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>();
 
-			Long[] ix = new Long[1];
-			FrameBlock[] mb = new FrameBlock[1];
-			int iRowsInBlock = 0;
+			long ix = -1;
+			FrameBlock fb = null;
+			String[] tmprow = new String[(int)_clen]; 
 			
 			while( arg0.hasNext() )
 			{
@@ -665,43 +674,61 @@ public class FrameRDDConverterUtils
 				//adjust row index for header and meta data
 				rowix += (_hasHeader ? 0 : 1) - ((_mvMeta == null) ? 0 : 2);
 				
-				if( iRowsInBlock == 0 || iRowsInBlock == _maxRowsPerBlock) {
-					if( iRowsInBlock == _maxRowsPerBlock )
-						flushBlocksToList(ix, mb, ret);
-					createBlocks(rowix, ix, mb);
-					iRowsInBlock = 0;
+				if( fb == null || fb.getNumRows() == _maxRowsPerBlock) {
+					if( fb != null )
+						flushBlocksToList(ix, fb, ret);
+					ix = rowix;
+					fb = createFrameBlock();
 				}
 				
-				//process row data
-				String[] parts = IOUtilFunctions.splitCSV(row, _delim);
-				boolean emptyFound = false;
-				mb[0].appendRow(parts);
-				iRowsInBlock++;
-		
-				//sanity check empty cells filled w/ values
-				IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(row, _fill, emptyFound);
+				//split and process row data 
+				fb.appendRow(IOUtilFunctions.splitCSV(row, _delim, tmprow));
 			}
 		
 			//flush last blocks
-			flushBlocksToList(ix, mb, ret);
+			flushBlocksToList(ix, fb, ret);
 		
 			return ret;
 		}
 		
 		// Creates new state of empty column blocks for current global row index.
-		private void createBlocks(long rowix, Long[] ix, FrameBlock[] mb)
+		private FrameBlock createFrameBlock()
 		{
-			//compute row block index and number of column blocks
-			ix[0] = rowix;
-			mb[0] = new FrameBlock((int)_clen, ValueType.STRING);
+			//frame block with given schema
+			FrameBlock fb = new FrameBlock(_schema);
+			
+			//preallocate physical columns (to avoid re-allocations)
+			fb.ensureAllocatedColumns(_maxRowsPerBlock);
+			fb.reset(0, false); //reset data but keep schema
+			fb.setNumRows(0);   //reset num rows to allow for append
+			
+			//handle meta data
 			if( _colnames != null )
-				mb[0].setColumnNames(_colnames);
+				fb.setColumnNames(_colnames);
 			if( _mvMeta != null )
 				for( int j=0; j<_clen; j++ )
-					mb[0].getColumnMetadata(j).setMvValue(_mvMeta.get(j));
+					fb.getColumnMetadata(j).setMvValue(_mvMeta.get(j));
 			if( _ndMeta != null )
 				for( int j=0; j<_clen; j++ )
-					mb[0].getColumnMetadata(j).setNumDistinct(Long.parseLong(_ndMeta.get(j)));
+					fb.getColumnMetadata(j).setNumDistinct(Long.parseLong(_ndMeta.get(j)));
+		
+			return fb;
+		}
+		
+		/**
+		 * 
+		 * @param ix
+		 * @param fb
+		 * @param ret
+		 * @throws DMLRuntimeException
+		 */
+		private void flushBlocksToList( Long ix, FrameBlock fb, ArrayList<Tuple2<Long,FrameBlock>> ret ) 
+			throws DMLRuntimeException
+		{			
+			if( fb != null && fb.getNumRows()>0 ) {
+				fb.setSchema(_schema); //use shared schema
+				ret.add(new Tuple2<Long,FrameBlock>(ix, fb));
+			}
 		}
 	}
 	
@@ -1126,22 +1153,4 @@ public class FrameRDDConverterUtils
 		}
 	}
 	
-	//////////////////////////////////////
-	// Common functions
-	
-	/**
-	 * Flushes current state of filled column blocks to output list.
-	 * 
-	 * @param ix
-	 * @param fb
-	 * @param ret
-	 * @throws DMLRuntimeException
-	 */
-	private static void flushBlocksToList( Long[] ix, FrameBlock[] fb, ArrayList<Tuple2<Long,FrameBlock>> ret ) 
-		throws DMLRuntimeException
-	{			
-		for( int i=0; i<ix.length; i++ )
-			if( fb[i] != null && fb[0].getNumRows()>0 )
-				ret.add(new Tuple2<Long,FrameBlock>(ix[i], fb[i]));
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8631a149/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
index 0ec3534..b615a59 100644
--- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
@@ -192,6 +192,52 @@ public class IOUtilFunctions
 	}
 	
 	/**
+	 * 
+	 * @param str
+	 * @param delim
+	 * @param tokens
+	 * @return
+	 */
+	public static String[] splitCSV(String str, String delim, String[] tokens)
+	{
+		// check for empty input
+		if( str == null || str.isEmpty() )
+			return new String[]{""};
+		
+		// scan string and create individual tokens
+		int from = 0, to = 0; 
+		int len = str.length();
+		int pos = 0;
+		while( from < len  ) { // for all tokens
+			if( str.charAt(from) == CSV_QUOTE_CHAR ) {
+				to = str.indexOf(CSV_QUOTE_CHAR, from+1);
+				// handle escaped inner quotes, e.g. "aa""a"
+				while( to+1 < len && str.charAt(to+1)==CSV_QUOTE_CHAR )
+					to = str.indexOf(CSV_QUOTE_CHAR, to+2); // to + ""
+				to += 1; // last "
+			}
+			else if(str.regionMatches(from, delim, 0, delim.length())) {
+				to = from; // empty string
+			}
+			else { // default: unquoted non-empty
+				to = str.indexOf(delim, from+1);
+			}
+			
+			// slice out token and advance position
+			to = (to >= 0) ? to : len;
+			tokens[pos++] = str.substring(from, to);
+			from = to + delim.length();
+		}
+		
+		// handle empty string at end
+		if( from == len )
+			tokens[pos] = "";
+			
+		// return tokens
+		return tokens;
+	}
+	
+	/**
 	 * Counts the number of tokens defined by the given delimiter, respecting 
 	 * the rules for quotes and escapes defined in RFC4180.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8631a149/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 8e68014..cbd231e 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -134,6 +134,14 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 	}
 	
 	/**
+	 * 
+	 * @param numRows
+	 */
+	public void setNumRows(int numRows) {
+		_numRows = numRows;
+	}
+	
+	/**
 	 * Get the number of columns of the frame block, that is
 	 * the number of columns defined in the schema.
 	 * 
@@ -361,13 +369,20 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		_coldata.get(c).set(r, UtilFunctions.objectToObject(_schema.get(c), val));
 	}
 
-	public void reset(int nrow)  {
-		getSchema().clear();
-		getColumnNames().clear();
-		if( _colmeta != null ) {
-			for( int i=0; i<_colmeta.size(); i++ )
-				if( !isColumnMetadataDefault(i) )
-					_colmeta.set(i, new ColumnMetadata(0));
+	/**
+	 * 
+	 * @param nrow
+	 * @param clearMeta
+	 */
+	public void reset(int nrow, boolean clearMeta) {
+		if( clearMeta ) {
+			getSchema().clear();
+			getColumnNames().clear();
+			if( _colmeta != null ) {
+				for( int i=0; i<_colmeta.size(); i++ )
+					if( !isColumnMetadataDefault(i) )
+						_colmeta.set(i, new ColumnMetadata(0));
+			}
 		}
 		if(_coldata != null) {
 			for( int i=0; i < _coldata.size(); i++ )
@@ -375,8 +390,11 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		}
 	}
 
+	/**
+	 * 
+	 */
 	public void reset() {
-		reset(0);
+		reset(0, true);
 	}
 	
 
@@ -716,7 +734,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		if( ret == null )
 			ret = new FrameBlock();
 		else
-			ret.reset(ru-rl+1);
+			ret.reset(ru-rl+1, true);
 		
 		//copy output schema and colnames
 		for( int j=cl; j<=cu; j++ ) {
@@ -973,7 +991,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 			result=new FrameBlock(getSchema());
 		else 
 		{
-			result.reset(0);
+			result.reset(0, true);
 			result.setSchema(getSchema());
 		}
 		result.ensureAllocatedColumns(brlen);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8631a149/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java
index 9fd9e36..e844be1 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java
@@ -181,7 +181,7 @@ public class FrameReblockBuffer
 				cbi = bi;
 				cbj = bj;					
 				tmpIx = bi;
-				tmpBlock.reset(Math.min(_brlen, (int)(_rlen-(bi-1)*_brlen)));
+				tmpBlock.reset(Math.min(_brlen, (int)(_rlen-(bi-1)*_brlen)), true);
 			}
 			
 			int ci = UtilFunctions.computeCellInBlock(_buff[i].getRow(), _brlen);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8631a149/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index f0c17eb..e8c3c51 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -464,7 +464,7 @@ public class FrameConverterTest extends AutomatedTestBase
 				OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo;
 				JavaPairRDD<LongWritable,Text> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.inputKeyClass, iinfo.inputValueClass);
 				JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
-						.csvToBinaryBlock(sc, rddIn, mc, false, separator, false, 0)
+						.csvToBinaryBlock(sc, rddIn, mc, null, false, separator, false, 0)
 						.mapToPair(new LongFrameToLongWritableFrameFunction());
 				rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
 				break;


[3/5] incubator-systemml git commit: [SYSTEMML-927] Fix frame schema handling in spark cast/write instruction

Posted by mb...@apache.org.
[SYSTEMML-927] Fix frame schema handling in spark cast/write instruction

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/81d2b641
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/81d2b641
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/81d2b641

Branch: refs/heads/master
Commit: 81d2b641d99743ab54528a214659a5166e65aabe
Parents: 69a7858
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Sep 17 05:39:56 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Sep 17 00:25:22 2016 -0700

----------------------------------------------------------------------
 .../sysml/runtime/instructions/spark/CastSPInstruction.java | 9 +++++++++
 .../runtime/instructions/spark/WriteSPInstruction.java      | 9 ++++++---
 2 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/81d2b641/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
index d869f11..4487b20 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
@@ -19,9 +19,12 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
+import java.util.Collections;
+
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.lops.UnaryCP;
+import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
@@ -88,5 +91,11 @@ public class CastSPInstruction extends UnarySPInstruction
 		sec.setRDDHandleForVariable(output.getName(), out);
 		updateUnaryOutputMatrixCharacteristics(sec, input1.getName(), output.getName());
 		sec.addLineageRDD(output.getName(), input1.getName());
+		
+		//update schema information for output frame
+		if( opcode.equals(UnaryCP.CAST_AS_FRAME_OPCODE) ) {
+			sec.getFrameObject(output.getName()).setSchema(
+				Collections.nCopies((int)mcIn.getCols(), ValueType.DOUBLE));
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/81d2b641/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
index e4e2606..1b974f9 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.instructions.spark;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.hadoop.io.LongWritable;
@@ -136,6 +137,8 @@ public class WriteSPInstruction extends SPInstruction
 
 		//get filename (literal or variable expression)
 		String fname = ec.getScalarInput(input2.getName(), ValueType.STRING, input2.isLiteral()).getStringValue();
+		List<ValueType> schema = (input1.getDataType()==DataType.FRAME) ? 
+				sec.getFrameObject(input1.getName()).getSchema() : null;
 		
 		try
 		{
@@ -150,7 +153,7 @@ public class WriteSPInstruction extends SPInstruction
 			if( input1.getDataType()==DataType.MATRIX )
 				processMatrixWriteInstruction(sec, fname, oi);
 			else
-				processFrameWriteInstruction(sec, fname, oi);
+				processFrameWriteInstruction(sec, fname, oi, schema);
 		}
 		catch(IOException ex)
 		{
@@ -279,7 +282,7 @@ public class WriteSPInstruction extends SPInstruction
 	 * @throws IOException 
 	 */
 	@SuppressWarnings("unchecked")
-	protected void processFrameWriteInstruction(SparkExecutionContext sec, String fname, OutputInfo oi) 
+	protected void processFrameWriteInstruction(SparkExecutionContext sec, String fname, OutputInfo oi, List<ValueType> schema) 
 		throws DMLRuntimeException, IOException
 	{
 		//get input rdd
@@ -310,7 +313,7 @@ public class WriteSPInstruction extends SPInstruction
 		}
 		
 		// write meta data file
-		MapReduceTool.writeMetaDataFile(fname + ".mtd", input1.getValueType(), null, DataType.FRAME, mc, oi, formatProperties);	
+		MapReduceTool.writeMetaDataFile(fname + ".mtd", input1.getValueType(), schema, DataType.FRAME, mc, oi, formatProperties);	
 	}
 	
 	/**


[2/5] incubator-systemml git commit: [SYSTEMML-925] Performance binary-to-csv frame conversion (w/o sort)

Posted by mb...@apache.org.
[SYSTEMML-925] Performance binary-to-csv frame conversion (w/o sort) 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/69a78581
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/69a78581
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/69a78581

Branch: refs/heads/master
Commit: 69a78581e8cc4bcc6dec5ecc88d9dad6aec96297
Parents: 0e8f1e1
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Sep 17 01:57:32 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Sep 17 00:25:19 2016 -0700

----------------------------------------------------------------------
 .../spark/utils/FrameRDDConverterUtils.java     | 57 ++++++++++++++++++--
 1 file changed, 53 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/69a78581/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index faf8ba1..b541242 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -156,7 +156,7 @@ public class FrameRDDConverterUtils
 		JavaPairRDD<Long,FrameBlock> input = in;
 		
 		//sort if required (on blocks/rows)
-		if( strict ) {
+		if( strict && !isSorted(input) ) {
 			input = input.sortByKey(true);
 		}
 		
@@ -454,7 +454,7 @@ public class FrameRDDConverterUtils
 		JavaRDD<String> dataRdd = sc.textFile(fnameIn);
 		return dataRdd.map(new RowGenerator(schema, delim));
 	}
-	
+
 	/* 
 	 * Row Generator class based on individual line in CSV file.
 	 */
@@ -480,8 +480,50 @@ public class FrameRDDConverterUtils
 		      return RowFactory.create(objects);
 		}
 	}
-	
 
+	/**
+	 * Check if the rdd is already sorted in order to avoid unnecessary
+	 * sampling, shuffle, and sort per partition.
+	 * 
+	 * @param in
+	 * @return
+	 */
+	private static boolean isSorted(JavaPairRDD<Long, FrameBlock> in) {		
+		//check sorted partitions (returns max key if true; -1 otherwise)
+		List<Long> keys = in.keys().mapPartitions(
+				new SortingAnalysisFunction()).collect();
+		long max = 0;
+		for( Long val : keys ) {
+			if( val < max )
+				return false;
+			max = val;
+		}
+		return true;
+	}
+
+	/**
+	 * 
+	 */
+	private static class SortingAnalysisFunction implements FlatMapFunction<Iterator<Long>,Long> 
+	{
+		private static final long serialVersionUID = -5789003262381127469L;
+
+		@Override
+		public Iterable<Long> call(Iterator<Long> arg0) throws Exception 
+		{
+			long max = 0;
+			while( max >= 0 && arg0.hasNext() ) {
+				long val = arg0.next();
+				max = (val < max) ? -1 : val;
+			}			
+			
+			ArrayList<Long> ret = new ArrayList<Long>();	
+			ret.add(max);
+			return ret;
+		}
+	}
+	
+	
 	/////////////////////////////////
 	// CSV-SPECIFIC FUNCTIONS
 	
@@ -1087,7 +1129,14 @@ public class FrameRDDConverterUtils
 	//////////////////////////////////////
 	// Common functions
 	
-	// Flushes current state of filled column blocks to output list.
+	/**
+	 * Flushes current state of filled column blocks to output list.
+	 * 
+	 * @param ix
+	 * @param fb
+	 * @param ret
+	 * @throws DMLRuntimeException
+	 */
 	private static void flushBlocksToList( Long[] ix, FrameBlock[] fb, ArrayList<Tuple2<Long,FrameBlock>> ret ) 
 		throws DMLRuntimeException
 	{