You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/17 07:26:35 UTC

[4/5] incubator-systemml git commit: [SYSTEMML-925] Performance csv-to-binary frame conversion (schema, alloc)

[SYSTEMML-925] Performance csv-to-binary frame conversion (schema,alloc)

This patch makes the following performance improvements to csv-to-binary
frame conversion:

(1) Avoid unnecessary scan in case of known dimensions but unknown nnz,
as the nnz are irrelevant for frames.

(2) Handling of schema information, which has huge performance impact
because frame blocks can be represented in native data types instead of
bloated string objects.

(3) Avoid unnecessary memory allocation to reduce garbage collection
overheads via (1) pre-allocation of frame block column arrays, (2) token
array reuse on string splitting, and (3) shared schema reference for
better memory efficiency of cached outputs.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/8631a149
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/8631a149
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/8631a149

Branch: refs/heads/master
Commit: 8631a149dedf2723322a8c65a07d4d31d089bd24
Parents: 81d2b64
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Sep 17 07:12:20 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Sep 17 00:25:25 2016 -0700

----------------------------------------------------------------------
 .../api/mlcontext/MLContextConversionUtil.java  |   3 +-
 .../spark/CSVReblockSPInstruction.java          |  12 +-
 .../spark/utils/FrameRDDConverterUtils.java     | 121 ++++++++++---------
 .../sysml/runtime/io/IOUtilFunctions.java       |  46 +++++++
 .../sysml/runtime/matrix/data/FrameBlock.java   |  38 ++++--
 .../matrix/mapred/FrameReblockBuffer.java       |   2 +-
 .../functions/frame/FrameConverterTest.java     |   2 +-
 7 files changed, 151 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8631a149/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
index e74dc53..1adc089 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
@@ -601,7 +601,8 @@ public class MLContextConversionUtil {
 				new MatrixFormatMetaData(mc, OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo));
 		JavaPairRDD<Long, FrameBlock> rdd;
 		try {
-			rdd = FrameRDDConverterUtils.csvToBinaryBlock(jsc, javaPairRDDText, mc, false, ",", false, -1);
+			rdd = FrameRDDConverterUtils.csvToBinaryBlock(jsc, javaPairRDDText, mc, 
+					frameObject.getSchema(), false, ",", false, -1);
 		} catch (DMLRuntimeException e) {
 			e.printStackTrace();
 			return null;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8631a149/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
index 44a076d..e11f505 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
@@ -19,13 +19,17 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
+import java.util.List;
+
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.sysml.hops.recompile.Recompiler;
 import org.apache.sysml.parser.Expression.DataType;
+import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
@@ -123,8 +127,8 @@ public class CSVReblockSPInstruction extends UnarySPInstruction
 		if( input1.getDataType() == DataType.MATRIX )
 			out = processMatrixCSVReblockInstruction(sec, mcOut);
 		else if( input1.getDataType() == DataType.FRAME )
-			out = processFrameCSVReblockInstruction(sec, mcOut);
-			
+			out = processFrameCSVReblockInstruction(sec, mcOut, ((FrameObject)obj).getSchema());
+		
 		// put output RDD handle into symbol table
 		sec.setRDDHandleForVariable(output.getName(), out);
 		sec.addLineageRDD(output.getName(), input1.getName());
@@ -159,7 +163,7 @@ public class CSVReblockSPInstruction extends UnarySPInstruction
 	 * @throws DMLRuntimeException
 	 */
 	@SuppressWarnings("unchecked")
-	protected JavaPairRDD<Long,FrameBlock> processFrameCSVReblockInstruction(SparkExecutionContext sec, MatrixCharacteristics mcOut) 
+	protected JavaPairRDD<Long,FrameBlock> processFrameCSVReblockInstruction(SparkExecutionContext sec, MatrixCharacteristics mcOut, List<ValueType> schema) 
 		throws DMLRuntimeException
 	{
 		//get input rdd (needs to be longwritable/text for consistency with meta data, in case of
@@ -169,6 +173,6 @@ public class CSVReblockSPInstruction extends UnarySPInstruction
 		
 		//reblock csv to binary block
 		return FrameRDDConverterUtils.csvToBinaryBlock(sec.getSparkContext(), 
-				in, mcOut, _hasHeader, _delim, _fill, _fillValue);
+				in, mcOut, schema, _hasHeader, _delim, _fill, _fillValue);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8631a149/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index b541242..3ac1daf 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -82,22 +82,23 @@ public class FrameRDDConverterUtils
 	 * 
 	 * @param sc
 	 * @param input
-	 * @param mcOut
+	 * @param mc
+	 * @param schema
 	 * @param hasHeader
 	 * @param delim
 	 * @param fill
-	 * @param missingValue
+	 * @param fillValue
 	 * @return
 	 * @throws DMLRuntimeException
 	 */
 	public static JavaPairRDD<Long, FrameBlock> csvToBinaryBlock(JavaSparkContext sc,
-			JavaPairRDD<LongWritable, Text> input, MatrixCharacteristics mcOut, 
+			JavaPairRDD<LongWritable, Text> input, MatrixCharacteristics mc, List<ValueType> schema,
 			boolean hasHeader, String delim, boolean fill, double fillValue) 
 		throws DMLRuntimeException 
 	{
 		//determine unknown dimensions and sparsity if required
-		if( !mcOut.dimsKnown(true) ) {
-			JavaRDD<String> tmp = input.values()
+		if( !mc.dimsKnown() ) { //nnz irrelevant here
+ 			JavaRDD<String> tmp = input.values()
 					.map(new TextToStringFunction());
 			String tmpStr = tmp.first();
 			boolean metaHeader = tmpStr.startsWith(TfUtils.TXMTD_MVPREFIX) 
@@ -105,24 +106,32 @@ public class FrameRDDConverterUtils
 			tmpStr = (metaHeader) ? tmpStr.substring(tmpStr.indexOf(delim)+1) : tmpStr;
 			long rlen = tmp.count() - (hasHeader ? 1 : 0) - (metaHeader ? 2 : 0);
 			long clen = IOUtilFunctions.splitCSV(tmpStr, delim).length;
-			mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1);
+			mc.set(rlen, clen, mc.getRowsPerBlock(), mc.getColsPerBlock(), -1);
 		}
 		
 		//prepare csv w/ row indexes (sorted by filenames)
 		JavaPairRDD<Text,Long> prepinput = input.values()
 				.zipWithIndex(); //zip row index
 		
+		//prepare default schema if needed
+		if( schema == null || schema.size()==1 ) {
+			schema = Collections.nCopies((int)mc.getCols(), 
+				(schema!=null) ? schema.get(0) : ValueType.STRING);
+		}
+			
 		//convert csv rdd to binary block rdd (w/ partial blocks)
-		JavaPairRDD<Long, FrameBlock> out = prepinput
-				.mapPartitionsToPair(new CSVToBinaryBlockFunction(mcOut, hasHeader, delim, fill));
+		JavaPairRDD<Long, FrameBlock> out = prepinput.mapPartitionsToPair(
+				new CSVToBinaryBlockFunction(mc, schema, hasHeader, delim));
 		
 		return out;
 	}
 	
 	/**
-	 * @param sc 
+	 * 
+	 * @param sc
 	 * @param input
 	 * @param mcOut
+	 * @param schema
 	 * @param hasHeader
 	 * @param delim
 	 * @param fill
@@ -131,7 +140,7 @@ public class FrameRDDConverterUtils
 	 * @throws DMLRuntimeException
 	 */
 	public static JavaPairRDD<Long, FrameBlock> csvToBinaryBlock(JavaSparkContext sc,
-			JavaRDD<String> input, MatrixCharacteristics mcOut, 
+			JavaRDD<String> input, MatrixCharacteristics mcOut, List<ValueType> schema,
 			boolean hasHeader, String delim, boolean fill, double fillValue) 
 		throws DMLRuntimeException 
 	{
@@ -140,7 +149,7 @@ public class FrameRDDConverterUtils
 				input.mapToPair(new StringToSerTextFunction());
 		
 		//convert to binary block
-		return csvToBinaryBlock(sc, prepinput, mcOut, hasHeader, delim, fill, fillValue);
+		return csvToBinaryBlock(sc, prepinput, mcOut, schema, hasHeader, delim, fill, fillValue);
 	}
 	
 	/**
@@ -620,17 +629,17 @@ public class FrameRDDConverterUtils
 		private long _clen = -1;
 		private boolean _hasHeader = false;
 		private String _delim = null;
-		private boolean _fill = false;
 		private int _maxRowsPerBlock = -1; 
+		private List<ValueType> _schema = null;
 		private List<String> _colnames = null;
 		private List<String> _mvMeta = null; //missing value meta data
 		private List<String> _ndMeta = null; //num distinct meta data
 		
-		public CSVToBinaryBlockFunction(MatrixCharacteristics mc, boolean hasHeader, String delim, boolean fill) {
+		public CSVToBinaryBlockFunction(MatrixCharacteristics mc, List<ValueType> schema, boolean hasHeader, String delim) {
 			_clen = mc.getCols();
+			_schema = schema;
 			_hasHeader = hasHeader;
 			_delim = delim;
-			_fill = fill;
 			_maxRowsPerBlock = Math.max((int) (FrameBlock.BUFFER_SIZE/_clen), 1);
 		}
 
@@ -640,9 +649,9 @@ public class FrameRDDConverterUtils
 		{
 			ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>();
 
-			Long[] ix = new Long[1];
-			FrameBlock[] mb = new FrameBlock[1];
-			int iRowsInBlock = 0;
+			long ix = -1;
+			FrameBlock fb = null;
+			String[] tmprow = new String[(int)_clen]; 
 			
 			while( arg0.hasNext() )
 			{
@@ -665,43 +674,61 @@ public class FrameRDDConverterUtils
 				//adjust row index for header and meta data
 				rowix += (_hasHeader ? 0 : 1) - ((_mvMeta == null) ? 0 : 2);
 				
-				if( iRowsInBlock == 0 || iRowsInBlock == _maxRowsPerBlock) {
-					if( iRowsInBlock == _maxRowsPerBlock )
-						flushBlocksToList(ix, mb, ret);
-					createBlocks(rowix, ix, mb);
-					iRowsInBlock = 0;
+				if( fb == null || fb.getNumRows() == _maxRowsPerBlock) {
+					if( fb != null )
+						flushBlocksToList(ix, fb, ret);
+					ix = rowix;
+					fb = createFrameBlock();
 				}
 				
-				//process row data
-				String[] parts = IOUtilFunctions.splitCSV(row, _delim);
-				boolean emptyFound = false;
-				mb[0].appendRow(parts);
-				iRowsInBlock++;
-		
-				//sanity check empty cells filled w/ values
-				IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(row, _fill, emptyFound);
+				//split and process row data 
+				fb.appendRow(IOUtilFunctions.splitCSV(row, _delim, tmprow));
 			}
 		
 			//flush last blocks
-			flushBlocksToList(ix, mb, ret);
+			flushBlocksToList(ix, fb, ret);
 		
 			return ret;
 		}
 		
 		// Creates new state of empty column blocks for current global row index.
-		private void createBlocks(long rowix, Long[] ix, FrameBlock[] mb)
+		private FrameBlock createFrameBlock()
 		{
-			//compute row block index and number of column blocks
-			ix[0] = rowix;
-			mb[0] = new FrameBlock((int)_clen, ValueType.STRING);
+			//frame block with given schema
+			FrameBlock fb = new FrameBlock(_schema);
+			
+			//preallocate physical columns (to avoid re-allocations)
+			fb.ensureAllocatedColumns(_maxRowsPerBlock);
+			fb.reset(0, false); //reset data but keep schema
+			fb.setNumRows(0);   //reset num rows to allow for append
+			
+			//handle meta data
 			if( _colnames != null )
-				mb[0].setColumnNames(_colnames);
+				fb.setColumnNames(_colnames);
 			if( _mvMeta != null )
 				for( int j=0; j<_clen; j++ )
-					mb[0].getColumnMetadata(j).setMvValue(_mvMeta.get(j));
+					fb.getColumnMetadata(j).setMvValue(_mvMeta.get(j));
 			if( _ndMeta != null )
 				for( int j=0; j<_clen; j++ )
-					mb[0].getColumnMetadata(j).setNumDistinct(Long.parseLong(_ndMeta.get(j)));
+					fb.getColumnMetadata(j).setNumDistinct(Long.parseLong(_ndMeta.get(j)));
+		
+			return fb;
+		}
+		
+		/**
+		 * 
+		 * @param ix
+		 * @param fb
+		 * @param ret
+		 * @throws DMLRuntimeException
+		 */
+		private void flushBlocksToList( Long ix, FrameBlock fb, ArrayList<Tuple2<Long,FrameBlock>> ret ) 
+			throws DMLRuntimeException
+		{			
+			if( fb != null && fb.getNumRows()>0 ) {
+				fb.setSchema(_schema); //use shared schema
+				ret.add(new Tuple2<Long,FrameBlock>(ix, fb));
+			}
 		}
 	}
 	
@@ -1126,22 +1153,4 @@ public class FrameRDDConverterUtils
 		}
 	}
 	
-	//////////////////////////////////////
-	// Common functions
-	
-	/**
-	 * Flushes current state of filled column blocks to output list.
-	 * 
-	 * @param ix
-	 * @param fb
-	 * @param ret
-	 * @throws DMLRuntimeException
-	 */
-	private static void flushBlocksToList( Long[] ix, FrameBlock[] fb, ArrayList<Tuple2<Long,FrameBlock>> ret ) 
-		throws DMLRuntimeException
-	{			
-		for( int i=0; i<ix.length; i++ )
-			if( fb[i] != null && fb[0].getNumRows()>0 )
-				ret.add(new Tuple2<Long,FrameBlock>(ix[i], fb[i]));
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8631a149/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
index 0ec3534..b615a59 100644
--- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
@@ -192,6 +192,52 @@ public class IOUtilFunctions
 	}
 	
 	/**
+	 * 
+	 * @param str
+	 * @param delim
+	 * @param tokens
+	 * @return
+	 */
+	public static String[] splitCSV(String str, String delim, String[] tokens)
+	{
+		// check for empty input
+		if( str == null || str.isEmpty() )
+			return new String[]{""};
+		
+		// scan string and create individual tokens
+		int from = 0, to = 0; 
+		int len = str.length();
+		int pos = 0;
+		while( from < len  ) { // for all tokens
+			if( str.charAt(from) == CSV_QUOTE_CHAR ) {
+				to = str.indexOf(CSV_QUOTE_CHAR, from+1);
+				// handle escaped inner quotes, e.g. "aa""a"
+				while( to+1 < len && str.charAt(to+1)==CSV_QUOTE_CHAR )
+					to = str.indexOf(CSV_QUOTE_CHAR, to+2); // to + ""
+				to += 1; // last "
+			}
+			else if(str.regionMatches(from, delim, 0, delim.length())) {
+				to = from; // empty string
+			}
+			else { // default: unquoted non-empty
+				to = str.indexOf(delim, from+1);
+			}
+			
+			// slice out token and advance position
+			to = (to >= 0) ? to : len;
+			tokens[pos++] = str.substring(from, to);
+			from = to + delim.length();
+		}
+		
+		// handle empty string at end
+		if( from == len )
+			tokens[pos] = "";
+			
+		// return tokens
+		return tokens;
+	}
+	
+	/**
 	 * Counts the number of tokens defined by the given delimiter, respecting 
 	 * the rules for quotes and escapes defined in RFC4180.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8631a149/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 8e68014..cbd231e 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -134,6 +134,14 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 	}
 	
 	/**
+	 * 
+	 * @param numRows
+	 */
+	public void setNumRows(int numRows) {
+		_numRows = numRows;
+	}
+	
+	/**
 	 * Get the number of columns of the frame block, that is
 	 * the number of columns defined in the schema.
 	 * 
@@ -361,13 +369,20 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		_coldata.get(c).set(r, UtilFunctions.objectToObject(_schema.get(c), val));
 	}
 
-	public void reset(int nrow)  {
-		getSchema().clear();
-		getColumnNames().clear();
-		if( _colmeta != null ) {
-			for( int i=0; i<_colmeta.size(); i++ )
-				if( !isColumnMetadataDefault(i) )
-					_colmeta.set(i, new ColumnMetadata(0));
+	/**
+	 * 
+	 * @param nrow
+	 * @param clearMeta
+	 */
+	public void reset(int nrow, boolean clearMeta) {
+		if( clearMeta ) {
+			getSchema().clear();
+			getColumnNames().clear();
+			if( _colmeta != null ) {
+				for( int i=0; i<_colmeta.size(); i++ )
+					if( !isColumnMetadataDefault(i) )
+						_colmeta.set(i, new ColumnMetadata(0));
+			}
 		}
 		if(_coldata != null) {
 			for( int i=0; i < _coldata.size(); i++ )
@@ -375,8 +390,11 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		}
 	}
 
+	/**
+	 * 
+	 */
 	public void reset() {
-		reset(0);
+		reset(0, true);
 	}
 	
 
@@ -716,7 +734,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		if( ret == null )
 			ret = new FrameBlock();
 		else
-			ret.reset(ru-rl+1);
+			ret.reset(ru-rl+1, true);
 		
 		//copy output schema and colnames
 		for( int j=cl; j<=cu; j++ ) {
@@ -973,7 +991,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 			result=new FrameBlock(getSchema());
 		else 
 		{
-			result.reset(0);
+			result.reset(0, true);
 			result.setSchema(getSchema());
 		}
 		result.ensureAllocatedColumns(brlen);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8631a149/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java
index 9fd9e36..e844be1 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java
@@ -181,7 +181,7 @@ public class FrameReblockBuffer
 				cbi = bi;
 				cbj = bj;					
 				tmpIx = bi;
-				tmpBlock.reset(Math.min(_brlen, (int)(_rlen-(bi-1)*_brlen)));
+				tmpBlock.reset(Math.min(_brlen, (int)(_rlen-(bi-1)*_brlen)), true);
 			}
 			
 			int ci = UtilFunctions.computeCellInBlock(_buff[i].getRow(), _brlen);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8631a149/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index f0c17eb..e8c3c51 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -464,7 +464,7 @@ public class FrameConverterTest extends AutomatedTestBase
 				OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo;
 				JavaPairRDD<LongWritable,Text> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.inputKeyClass, iinfo.inputValueClass);
 				JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
-						.csvToBinaryBlock(sc, rddIn, mc, false, separator, false, 0)
+						.csvToBinaryBlock(sc, rddIn, mc, null, false, separator, false, 0)
 						.mapToPair(new LongFrameToLongWritableFrameFunction());
 				rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
 				break;