You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by du...@apache.org on 2016/01/22 17:34:12 UTC

[36/51] [partial] incubator-systemml git commit: [SYSTEMML-482] [SYSTEMML-480] Adding a Git attributes file to enfore Unix-styled line endings, and normalizing all of the line endings.

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparableDesc.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparableDesc.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparableDesc.java
index 19c35f1..3c97c9b 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparableDesc.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparableDesc.java
@@ -1,49 +1,49 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.matrix.sort;
-
-import org.apache.hadoop.io.DoubleWritable;
-
-public class IndexSortComparableDesc extends IndexSortComparable
-{
-	
-	@Override
-	public int compareTo(Object o) 
-	{
-		//descending order (note: we cannot just inverted the ascending order)
-		if( o instanceof DoubleWritable ) {
-			int tmp = _dval.compareTo((DoubleWritable) o);
-			return (( tmp!=0 ) ? -1*tmp : tmp); //prevent -0
-		}
-		//compare double value and index (e.g., for stable sort)
-		else if( o instanceof IndexSortComparable) {
-			IndexSortComparable that = (IndexSortComparable)o;
-			int tmp = _dval.compareTo(that._dval);
-			tmp = (( tmp!=0 ) ? -1*tmp : tmp); //prevent -0
-			if( tmp==0 ) //secondary sort
-				tmp = _lval.compareTo(that._lval);
-			return tmp;
-		}	
-		else {
-			throw new RuntimeException("Unsupported comparison involving class: "+o.getClass().getName());
-		}
-		
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.matrix.sort;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+public class IndexSortComparableDesc extends IndexSortComparable
+{
+	
+	@Override
+	public int compareTo(Object o) 
+	{
+		//descending order (note: we cannot just inverted the ascending order)
+		if( o instanceof DoubleWritable ) {
+			int tmp = _dval.compareTo((DoubleWritable) o);
+			return (( tmp!=0 ) ? -1*tmp : tmp); //prevent -0
+		}
+		//compare double value and index (e.g., for stable sort)
+		else if( o instanceof IndexSortComparable) {
+			IndexSortComparable that = (IndexSortComparable)o;
+			int tmp = _dval.compareTo(that._dval);
+			tmp = (( tmp!=0 ) ? -1*tmp : tmp); //prevent -0
+			if( tmp==0 ) //secondary sort
+				tmp = _lval.compareTo(that._lval);
+			return tmp;
+		}	
+		else {
+			throw new RuntimeException("Unsupported comparison involving class: "+o.getClass().getName());
+		}
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortMapper.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortMapper.java
index 686f79a..ee0af1e 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortMapper.java
@@ -1,75 +1,75 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.matrix.sort;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-import org.apache.sysml.runtime.matrix.SortMR;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-
-public class IndexSortMapper extends MapReduceBase 
-   implements Mapper<MatrixIndexes, MatrixBlock, IndexSortComparable, LongWritable>
-{
-		
-	  private int _brlen = -1;
-	  
-	  //reuse writables
-	  private LongWritable   _tmpLong = new LongWritable();
-	  private IndexSortComparable _tmpSortKey = null;
-		
-	  @Override
-	  public void map(MatrixIndexes key, MatrixBlock value, OutputCollector<IndexSortComparable, LongWritable> out, Reporter reporter) 
-        throws IOException 
-	  {
-		  if( value.getNumColumns()>1 )
-			  throw new IOException("IndexSort only supports column vectors, but found matrix block with clen="+value.getNumColumns());
-		  
-		  long row_offset = (key.getRowIndex()-1)*_brlen+1;
-		  for( int i=0; i<value.getNumRows(); i++ )
-		  {
-			  double dval = value.quickGetValue(i, 0);
-			  long lix = row_offset+i;
-			  _tmpSortKey.set( dval, lix );
-			  _tmpLong.set(lix);
-			  out.collect(_tmpSortKey, _tmpLong);  
-		  }
-	  }
-	
-	  @Override
-	  public void configure(JobConf job)
-	  {
-		 super.configure(job);
-		 _brlen = MRJobConfiguration.getNumRowsPerBlock(job, (byte) 0);
-		 boolean desc = job.getBoolean(SortMR.SORT_DECREASING, false);
-		 if( !desc )
-			 _tmpSortKey = new IndexSortComparable();
-		 else
-			 _tmpSortKey = new IndexSortComparableDesc();
-	  }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.matrix.sort;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.sysml.runtime.matrix.SortMR;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+
+public class IndexSortMapper extends MapReduceBase 
+   implements Mapper<MatrixIndexes, MatrixBlock, IndexSortComparable, LongWritable>
+{
+		
+	  private int _brlen = -1;
+	  
+	  //reuse writables
+	  private LongWritable   _tmpLong = new LongWritable();
+	  private IndexSortComparable _tmpSortKey = null;
+		
+	  @Override
+	  public void map(MatrixIndexes key, MatrixBlock value, OutputCollector<IndexSortComparable, LongWritable> out, Reporter reporter) 
+        throws IOException 
+	  {
+		  if( value.getNumColumns()>1 )
+			  throw new IOException("IndexSort only supports column vectors, but found matrix block with clen="+value.getNumColumns());
+		  
+		  long row_offset = (key.getRowIndex()-1)*_brlen+1;
+		  for( int i=0; i<value.getNumRows(); i++ )
+		  {
+			  double dval = value.quickGetValue(i, 0);
+			  long lix = row_offset+i;
+			  _tmpSortKey.set( dval, lix );
+			  _tmpLong.set(lix);
+			  out.collect(_tmpSortKey, _tmpLong);  
+		  }
+	  }
+	
+	  @Override
+	  public void configure(JobConf job)
+	  {
+		 super.configure(job);
+		 _brlen = MRJobConfiguration.getNumRowsPerBlock(job, (byte) 0);
+		 boolean desc = job.getBoolean(SortMR.SORT_DECREASING, false);
+		 if( !desc )
+			 _tmpSortKey = new IndexSortComparable();
+		 else
+			 _tmpSortKey = new IndexSortComparableDesc();
+	  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortReducer.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortReducer.java
index 30c0a61..51a8a97 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortReducer.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortReducer.java
@@ -1,103 +1,103 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.matrix.sort;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-import org.apache.sysml.runtime.matrix.SortMR;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.util.MapReduceTool;
-
-public class IndexSortReducer extends MapReduceBase 
-    implements Reducer<IndexSortComparable, LongWritable, MatrixIndexes, MatrixBlock>
-{
-	
-	
-	  private String _taskID = null;
-	  private int _brlen = -1;
-	  private MatrixIndexes _indexes = null;
-	  private MatrixBlock _data = null;
-	  private int _pos = 0;
-	  
-	  private OutputCollector<MatrixIndexes, MatrixBlock> _out = null;
-	  
-	  @Override
-	  public void reduce(IndexSortComparable key, Iterator<LongWritable> values, OutputCollector<MatrixIndexes, MatrixBlock> out, Reporter report) 
-		 throws IOException 
-	  {
-		  //cache output collector
-		  _out = out;
-		  
-		  //output binary block
-		  int count = 0;
-		  while( values.hasNext() )
-		  {
-			  //flush full matrix block
-			  if( _pos >= _brlen ) {
-				  _indexes.setIndexes(_indexes.getRowIndex()+1, _indexes.getColumnIndex());
-				  out.collect(_indexes, _data);
-				  _pos = 0;
-				  _data.reset(_brlen,1,false);
-			  }
-				  
-			  _data.quickSetValue(_pos, 0, values.next().get());
-			  _pos++;
-			  count++;  
-		  }
-		  
-		  report.incrCounter(SortMR.NUM_VALUES_PREFIX, _taskID, count);	
-	  }  
-		
-	  @Override
-	  public void configure(JobConf job) 
-	  {
-		  _taskID = MapReduceTool.getUniqueKeyPerTask(job, false);
-		  _brlen = MRJobConfiguration.getNumRowsPerBlock(job, (byte) 0);
-		  _pos = 0;
-		  _data = new MatrixBlock(_brlen, 1, false);
-		  //note the output indexes are a sequence for rows and the taskID for columns
-		  //this is useful because the counts are collected over taskIDs as well, which
-		  //later on makes the task of reshifting self contained 
-		  _indexes = new MatrixIndexes(0, Long.parseLong(_taskID));
-	  }
-	  
-	  @Override
-	  public void close() 
-		  throws IOException
-	  {  
-		  //flush final matrix block
-		  if( _pos > 0 ){
-			  _indexes.setIndexes(_indexes.getRowIndex()+1, _indexes.getColumnIndex());
-			  _data.setNumRows(_pos);
-			  _out.collect(_indexes, _data);
-		  }
-	  }
-}
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.matrix.sort;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.sysml.runtime.matrix.SortMR;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+import org.apache.sysml.runtime.util.MapReduceTool;
+
+public class IndexSortReducer extends MapReduceBase 
+    implements Reducer<IndexSortComparable, LongWritable, MatrixIndexes, MatrixBlock>
+{
+	
+	
+	  private String _taskID = null;
+	  private int _brlen = -1;
+	  private MatrixIndexes _indexes = null;
+	  private MatrixBlock _data = null;
+	  private int _pos = 0;
+	  
+	  private OutputCollector<MatrixIndexes, MatrixBlock> _out = null;
+	  
+	  @Override
+	  public void reduce(IndexSortComparable key, Iterator<LongWritable> values, OutputCollector<MatrixIndexes, MatrixBlock> out, Reporter report) 
+		 throws IOException 
+	  {
+		  //cache output collector
+		  _out = out;
+		  
+		  //output binary block
+		  int count = 0;
+		  while( values.hasNext() )
+		  {
+			  //flush full matrix block
+			  if( _pos >= _brlen ) {
+				  _indexes.setIndexes(_indexes.getRowIndex()+1, _indexes.getColumnIndex());
+				  out.collect(_indexes, _data);
+				  _pos = 0;
+				  _data.reset(_brlen,1,false);
+			  }
+				  
+			  _data.quickSetValue(_pos, 0, values.next().get());
+			  _pos++;
+			  count++;  
+		  }
+		  
+		  report.incrCounter(SortMR.NUM_VALUES_PREFIX, _taskID, count);	
+	  }  
+		
+	  @Override
+	  public void configure(JobConf job) 
+	  {
+		  _taskID = MapReduceTool.getUniqueKeyPerTask(job, false);
+		  _brlen = MRJobConfiguration.getNumRowsPerBlock(job, (byte) 0);
+		  _pos = 0;
+		  _data = new MatrixBlock(_brlen, 1, false);
+		  //note the output indexes are a sequence for rows and the taskID for columns
+		  //this is useful because the counts are collected over taskIDs as well, which
+		  //later on makes the task of reshifting self contained 
+		  _indexes = new MatrixIndexes(0, Long.parseLong(_taskID));
+	  }
+	  
+	  @Override
+	  public void close() 
+		  throws IOException
+	  {  
+		  //flush final matrix block
+		  if( _pos > 0 ){
+			  _indexes.setIndexes(_indexes.getRowIndex()+1, _indexes.getColumnIndex());
+			  _data.setNumRows(_pos);
+			  _out.collect(_indexes, _data);
+		  }
+	  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortStitchupMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortStitchupMapper.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortStitchupMapper.java
index 6405d1a..306fbc2 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortStitchupMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortStitchupMapper.java
@@ -1,141 +1,141 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.matrix.sort;
-
-import java.io.IOException;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-import org.apache.sysml.runtime.matrix.SortMR;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-
-public class IndexSortStitchupMapper extends MapReduceBase 
- 	  implements Mapper<MatrixIndexes, MatrixBlock, MatrixIndexes, MatrixBlock>
-{
-	
-  	private long[] _offsets = null;
-  	private long _rlen = -1;
-  	private long _brlen = -1;
-  	
-  	private MatrixBlock _tmpBlk = null;
-  	private MatrixIndexes _tmpIx = null;
-  	
-	@Override
-	public void map(MatrixIndexes key, MatrixBlock value, OutputCollector<MatrixIndexes, MatrixBlock> out, Reporter reporter) 
-		throws IOException 
-	{
-		//compute starting cell offset
-		int id = (int)key.getColumnIndex();
-		long offset = _offsets[id];
-		offset += (key.getRowIndex()-1)*_brlen;
-		
-		//SPECIAL CASE: block aligned
-		int blksize = computeOutputBlocksize(_rlen, _brlen, offset);
-		if( offset%_brlen==0 && value.getNumRows()==blksize ) 
-		{ 
-			_tmpIx.setIndexes(offset/_brlen+1, 1);
-			out.collect(_tmpIx, value);
-		}
-		//GENERAL CASE: not block aligned
-		else 
-		{
-			int loffset = (int) (offset%_brlen);
-			//multiple output blocks
-			if( value.getNumRows()+loffset>_brlen ) 
-			{
-				long tmpnnz = 0;
-				//output first part
-				_tmpBlk.reset( (int)_brlen, 1 );
-				for( int i=0; i<_brlen-loffset; i++ )
-					_tmpBlk.quickSetValue(loffset+i, 0, value.quickGetValue(i, 0));
-				tmpnnz += _tmpBlk.getNonZeros();
-				_tmpIx.setIndexes(offset/_brlen+1, 1);
-				out.collect(_tmpIx, _tmpBlk);		
-			
-				//output second block
-				blksize = computeOutputBlocksize(_rlen, _brlen, offset+(_brlen-loffset));
-				_tmpBlk.reset( blksize, 1 );
-				for( int i=(int)_brlen-loffset; i<value.getNumRows(); i++ )
-					_tmpBlk.quickSetValue(i-((int)_brlen-loffset), 0, value.quickGetValue(i, 0));
-				tmpnnz += _tmpBlk.getNonZeros();
-				_tmpIx.setIndexes(offset/_brlen+2, 1);
-				out.collect(_tmpIx, _tmpBlk);	
-				
-				//sanity check for correctly redistributed non-zeros
-				if( tmpnnz != value.getNonZeros() )
-					throw new IOException("Number of split non-zeros does not match non-zeros of original block ("+tmpnnz+" vs "+value.getNonZeros()+")");
-			}
-			//single output block
-			else 
-			{	
-				_tmpBlk.reset( blksize, 1 );
-				for( int i=0; i<value.getNumRows(); i++ )
-					_tmpBlk.quickSetValue(loffset+i, 0, value.quickGetValue(i, 0));
-				_tmpIx.setIndexes(offset/_brlen+1, 1);
-				out.collect(_tmpIx, _tmpBlk);		
-			}
-		}
-	}
-	
-	@Override
-	public void configure(JobConf job)
-	{
-		super.configure(job);
-		_offsets = parseOffsets(job.get(SortMR.SORT_INDEXES_OFFSETS));
-		_rlen = MRJobConfiguration.getNumRows(job, (byte) 0);
-		_brlen = MRJobConfiguration.getNumRowsPerBlock(job, (byte) 0);
-		
-		_tmpIx = new MatrixIndexes();
-		_tmpBlk = new MatrixBlock((int)_brlen, 1, false);
-	}
-	
-	
-	/**
-	 * 
-	 * @param str
-	 * @return
-	 */
-	private static long[] parseOffsets(String str)
-	{
-		String counts = str.substring(1, str.length() - 1);
-		StringTokenizer st = new StringTokenizer(counts, ",");
-		int len = st.countTokens();
-		long[] ret = new long[len];
-		for( int i=0; i<len; i++ )
-			ret[i] = Long.parseLong(st.nextToken().trim());
-		
-		return ret;
-	}
-	
-	private static int computeOutputBlocksize( long rlen, long brlen, long offset )
-	{
-		long rix = offset/brlen+1;
-		int blksize = (int) Math.min(brlen, rlen-(rix-1)*brlen);
-
-		return blksize;
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.matrix.sort;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.sysml.runtime.matrix.SortMR;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+
+public class IndexSortStitchupMapper extends MapReduceBase 
+ 	  implements Mapper<MatrixIndexes, MatrixBlock, MatrixIndexes, MatrixBlock>
+{
+	
+  	private long[] _offsets = null;
+  	private long _rlen = -1;
+  	private long _brlen = -1;
+  	
+  	private MatrixBlock _tmpBlk = null;
+  	private MatrixIndexes _tmpIx = null;
+  	
+	@Override
+	public void map(MatrixIndexes key, MatrixBlock value, OutputCollector<MatrixIndexes, MatrixBlock> out, Reporter reporter) 
+		throws IOException 
+	{
+		//compute starting cell offset
+		int id = (int)key.getColumnIndex();
+		long offset = _offsets[id];
+		offset += (key.getRowIndex()-1)*_brlen;
+		
+		//SPECIAL CASE: block aligned
+		int blksize = computeOutputBlocksize(_rlen, _brlen, offset);
+		if( offset%_brlen==0 && value.getNumRows()==blksize ) 
+		{ 
+			_tmpIx.setIndexes(offset/_brlen+1, 1);
+			out.collect(_tmpIx, value);
+		}
+		//GENERAL CASE: not block aligned
+		else 
+		{
+			int loffset = (int) (offset%_brlen);
+			//multiple output blocks
+			if( value.getNumRows()+loffset>_brlen ) 
+			{
+				long tmpnnz = 0;
+				//output first part
+				_tmpBlk.reset( (int)_brlen, 1 );
+				for( int i=0; i<_brlen-loffset; i++ )
+					_tmpBlk.quickSetValue(loffset+i, 0, value.quickGetValue(i, 0));
+				tmpnnz += _tmpBlk.getNonZeros();
+				_tmpIx.setIndexes(offset/_brlen+1, 1);
+				out.collect(_tmpIx, _tmpBlk);		
+			
+				//output second block
+				blksize = computeOutputBlocksize(_rlen, _brlen, offset+(_brlen-loffset));
+				_tmpBlk.reset( blksize, 1 );
+				for( int i=(int)_brlen-loffset; i<value.getNumRows(); i++ )
+					_tmpBlk.quickSetValue(i-((int)_brlen-loffset), 0, value.quickGetValue(i, 0));
+				tmpnnz += _tmpBlk.getNonZeros();
+				_tmpIx.setIndexes(offset/_brlen+2, 1);
+				out.collect(_tmpIx, _tmpBlk);	
+				
+				//sanity check for correctly redistributed non-zeros
+				if( tmpnnz != value.getNonZeros() )
+					throw new IOException("Number of split non-zeros does not match non-zeros of original block ("+tmpnnz+" vs "+value.getNonZeros()+")");
+			}
+			//single output block
+			else 
+			{	
+				_tmpBlk.reset( blksize, 1 );
+				for( int i=0; i<value.getNumRows(); i++ )
+					_tmpBlk.quickSetValue(loffset+i, 0, value.quickGetValue(i, 0));
+				_tmpIx.setIndexes(offset/_brlen+1, 1);
+				out.collect(_tmpIx, _tmpBlk);		
+			}
+		}
+	}
+	
+	@Override
+	public void configure(JobConf job)
+	{
+		super.configure(job);
+		_offsets = parseOffsets(job.get(SortMR.SORT_INDEXES_OFFSETS));
+		_rlen = MRJobConfiguration.getNumRows(job, (byte) 0);
+		_brlen = MRJobConfiguration.getNumRowsPerBlock(job, (byte) 0);
+		
+		_tmpIx = new MatrixIndexes();
+		_tmpBlk = new MatrixBlock((int)_brlen, 1, false);
+	}
+	
+	
+	/**
+	 * 
+	 * @param str
+	 * @return
+	 */
+	private static long[] parseOffsets(String str)
+	{
+		String counts = str.substring(1, str.length() - 1);
+		StringTokenizer st = new StringTokenizer(counts, ",");
+		int len = st.countTokens();
+		long[] ret = new long[len];
+		for( int i=0; i<len; i++ )
+			ret[i] = Long.parseLong(st.nextToken().trim());
+		
+		return ret;
+	}
+	
+	private static int computeOutputBlocksize( long rlen, long brlen, long offset )
+	{
+		long rix = offset/brlen+1;
+		int blksize = (int) Math.min(brlen, rlen-(rix-1)*brlen);
+
+		return blksize;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortStitchupReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortStitchupReducer.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortStitchupReducer.java
index f0f3c1e..fb9ec62 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortStitchupReducer.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortStitchupReducer.java
@@ -1,74 +1,74 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.matrix.sort;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-
-public class IndexSortStitchupReducer extends MapReduceBase 
-		implements Reducer<MatrixIndexes, MatrixBlock, MatrixIndexes, MatrixBlock>
-{
-	
-	private MatrixBlock _tmpBlk = null;
-	
-	@Override
-	public void reduce(MatrixIndexes key, Iterator<MatrixBlock> values, OutputCollector<MatrixIndexes, MatrixBlock> out, Reporter report) 
-		 throws IOException 
-	{
-		try
-		{
-			//handle first block (to handle dimensions)
-			MatrixBlock tmp = values.next();
-			_tmpBlk.reset(tmp.getNumRows(), tmp.getNumColumns());
-			_tmpBlk.merge(tmp, false);		
-			
-			//handle remaining blocks
-			while( values.hasNext() )
-			{
-				tmp = values.next();
-				_tmpBlk.merge(tmp, false);
-			}
-		}
-		catch(DMLRuntimeException ex)
-		{
-			throw new IOException(ex);
-		}
-		
-		out.collect(key, _tmpBlk);
-	}  
-	
-	@Override
-	public void configure(JobConf job)
-	{
-		int brlen = MRJobConfiguration.getNumRowsPerBlock(job, (byte) 0);
-		_tmpBlk = new MatrixBlock(brlen, 1, false);
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.matrix.sort;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+
+public class IndexSortStitchupReducer extends MapReduceBase 
+		implements Reducer<MatrixIndexes, MatrixBlock, MatrixIndexes, MatrixBlock>
+{
+	
+	private MatrixBlock _tmpBlk = null;
+	
+	@Override
+	public void reduce(MatrixIndexes key, Iterator<MatrixBlock> values, OutputCollector<MatrixIndexes, MatrixBlock> out, Reporter report) 
+		 throws IOException 
+	{
+		try
+		{
+			//handle first block (to handle dimensions)
+			MatrixBlock tmp = values.next();
+			_tmpBlk.reset(tmp.getNumRows(), tmp.getNumColumns());
+			_tmpBlk.merge(tmp, false);		
+			
+			//handle remaining blocks
+			while( values.hasNext() )
+			{
+				tmp = values.next();
+				_tmpBlk.merge(tmp, false);
+			}
+		}
+		catch(DMLRuntimeException ex)
+		{
+			throw new IOException(ex);
+		}
+		
+		out.collect(key, _tmpBlk);
+	}  
+	
+	@Override
+	public void configure(JobConf job)
+	{
+		int brlen = MRJobConfiguration.getNumRowsPerBlock(job, (byte) 0);
+		_tmpBlk = new MatrixBlock(brlen, 1, false);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/matrix/sort/ValueSortMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/ValueSortMapper.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/ValueSortMapper.java
index 126539c..e4477e2 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/ValueSortMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/ValueSortMapper.java
@@ -1,98 +1,98 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.matrix.sort;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.instructions.mr.CombineUnaryInstruction;
-import org.apache.sysml.runtime.matrix.SortMR;
-import org.apache.sysml.runtime.matrix.data.Converter;
-import org.apache.sysml.runtime.matrix.data.MatrixCell;
-import org.apache.sysml.runtime.matrix.data.Pair;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-
-@SuppressWarnings("rawtypes")
-public class ValueSortMapper<KIN extends WritableComparable, VIN extends Writable, KOUT extends WritableComparable, VOUT extends Writable> extends MapReduceBase 
-      implements Mapper<KIN, VIN, KOUT, VOUT>
-{
-	
-	private int brlen;
-	private int bclen;
-	private CombineUnaryInstruction combineInstruction=null;
-	private Converter<KIN, VIN, KOUT, VOUT> inputConverter;
-	private IntWritable one=new IntWritable(1);
-	private DoubleWritable combinedKey=new DoubleWritable();
-	
-	@SuppressWarnings("unchecked")
-	public void map(KIN key, VIN value, OutputCollector<KOUT, VOUT> out,
-			Reporter reporter) throws IOException {
-		inputConverter.convert(key, value);
-		while(inputConverter.hasNext())
-		{
-			Pair pair=inputConverter.next();
-			if(combineInstruction==null)
-			{
-				//System.out.println("output: "+pair.getKey()+": "+pair.getValue());
-				out.collect((KOUT) pair.getKey(), (VOUT)pair.getValue());
-			}else
-			{
-				processCombineUnaryInstruction(pair, out);
-			}
-		}
-	} 
-	
-	@SuppressWarnings("unchecked")
-	private void processCombineUnaryInstruction(Pair pair, OutputCollector<KOUT, VOUT> out) 
-		throws IOException
-	{
-		combinedKey.set(((MatrixCell)pair.getValue()).getValue());
-		out.collect((KOUT)combinedKey, (VOUT)one);
-	}
-	
-	@Override
-	@SuppressWarnings("unchecked")
-	public void configure(JobConf job)
-	{
-		try 
-		{
-			brlen = MRJobConfiguration.getNumRowsPerBlock(job, (byte) 0);
-			bclen = MRJobConfiguration.getNumColumnsPerBlock(job, (byte) 0);
-			String str=job.get(SortMR.COMBINE_INSTRUCTION, null);
-			if(str!=null && !str.isEmpty() && !"null".equals(str))
-					combineInstruction=(CombineUnaryInstruction) CombineUnaryInstruction.parseInstruction(str);
-			inputConverter = MRJobConfiguration.getInputConverter(job, (byte) 0);
-			inputConverter.setBlockSize(brlen, bclen);
-		} 
-		catch (DMLRuntimeException e) {
-			throw new RuntimeException(e);
-		}
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.matrix.sort;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.instructions.mr.CombineUnaryInstruction;
+import org.apache.sysml.runtime.matrix.SortMR;
+import org.apache.sysml.runtime.matrix.data.Converter;
+import org.apache.sysml.runtime.matrix.data.MatrixCell;
+import org.apache.sysml.runtime.matrix.data.Pair;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+
+@SuppressWarnings("rawtypes")
+public class ValueSortMapper<KIN extends WritableComparable, VIN extends Writable, KOUT extends WritableComparable, VOUT extends Writable> extends MapReduceBase 
+      implements Mapper<KIN, VIN, KOUT, VOUT>
+{
+	
+	private int brlen;
+	private int bclen;
+	private CombineUnaryInstruction combineInstruction=null;
+	private Converter<KIN, VIN, KOUT, VOUT> inputConverter;
+	private IntWritable one=new IntWritable(1);
+	private DoubleWritable combinedKey=new DoubleWritable();
+	
+	@SuppressWarnings("unchecked")
+	public void map(KIN key, VIN value, OutputCollector<KOUT, VOUT> out,
+			Reporter reporter) throws IOException {
+		inputConverter.convert(key, value);
+		while(inputConverter.hasNext())
+		{
+			Pair pair=inputConverter.next();
+			if(combineInstruction==null)
+			{
+				//System.out.println("output: "+pair.getKey()+": "+pair.getValue());
+				out.collect((KOUT) pair.getKey(), (VOUT)pair.getValue());
+			}else
+			{
+				processCombineUnaryInstruction(pair, out);
+			}
+		}
+	} 
+	
+	@SuppressWarnings("unchecked")
+	private void processCombineUnaryInstruction(Pair pair, OutputCollector<KOUT, VOUT> out) 
+		throws IOException
+	{
+		combinedKey.set(((MatrixCell)pair.getValue()).getValue());
+		out.collect((KOUT)combinedKey, (VOUT)one);
+	}
+	
+	@Override
+	@SuppressWarnings("unchecked")
+	public void configure(JobConf job)
+	{
+		try 
+		{
+			brlen = MRJobConfiguration.getNumRowsPerBlock(job, (byte) 0);
+			bclen = MRJobConfiguration.getNumColumnsPerBlock(job, (byte) 0);
+			String str=job.get(SortMR.COMBINE_INSTRUCTION, null);
+			if(str!=null && !str.isEmpty() && !"null".equals(str))
+					combineInstruction=(CombineUnaryInstruction) CombineUnaryInstruction.parseInstruction(str);
+			inputConverter = MRJobConfiguration.getInputConverter(job, (byte) 0);
+			inputConverter.setBlockSize(brlen, bclen);
+		} 
+		catch (DMLRuntimeException e) {
+			throw new RuntimeException(e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/matrix/sort/ValueSortReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/ValueSortReducer.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/ValueSortReducer.java
index 23bdbc8..13751c5 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/ValueSortReducer.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/ValueSortReducer.java
@@ -1,68 +1,68 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.matrix.sort;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-import org.apache.sysml.runtime.matrix.SortMR;
-import org.apache.sysml.runtime.util.MapReduceTool;
-
-@SuppressWarnings("rawtypes")
-public class ValueSortReducer<K extends WritableComparable, V extends Writable> extends MapReduceBase 
-      implements Reducer<K, V, K, V>
-{	
-	
-	private String taskID=null;
-	private boolean valueIsWeight=false;
-	private long count=0;
-	
-	public void configure(JobConf job)
-	{
-		taskID=MapReduceTool.getUniqueKeyPerTask(job, false);
-		valueIsWeight=job.getBoolean(SortMR.VALUE_IS_WEIGHT, false);
-	}
-
-	@Override
-	public void reduce(K key, Iterator<V> values, OutputCollector<K, V> out,
-			Reporter report) throws IOException {
-		int sum=0;
-		while(values.hasNext())
-		{
-			V value=values.next();
-			out.collect(key, value);
-			if(valueIsWeight)
-				sum+=((IntWritable)value).get();
-			else
-				sum++;
-		}
-		count+=sum;
-		report.incrCounter(SortMR.NUM_VALUES_PREFIX, taskID, sum);
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.matrix.sort;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.sysml.runtime.matrix.SortMR;
+import org.apache.sysml.runtime.util.MapReduceTool;
+
+@SuppressWarnings("rawtypes")
+public class ValueSortReducer<K extends WritableComparable, V extends Writable> extends MapReduceBase 
+      implements Reducer<K, V, K, V>
+{	
+	
+	private String taskID=null;
+	private boolean valueIsWeight=false;
+	private long count=0;
+	
+	public void configure(JobConf job)
+	{
+		taskID=MapReduceTool.getUniqueKeyPerTask(job, false);
+		valueIsWeight=job.getBoolean(SortMR.VALUE_IS_WEIGHT, false);
+	}
+
+	@Override
+	public void reduce(K key, Iterator<V> values, OutputCollector<K, V> out,
+			Reporter report) throws IOException {
+		int sum=0;
+		while(values.hasNext())
+		{
+			V value=values.next();
+			out.collect(key, value);
+			if(valueIsWeight)
+				sum+=((IntWritable)value).get();
+			else
+				sum++;
+		}
+		count+=sum;
+		report.incrCounter(SortMR.NUM_VALUES_PREFIX, taskID, sum);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java
index a94b371..c51652f 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java
@@ -1,156 +1,156 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.util.HashSet;
-
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.Counters.Group;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.conf.DMLConfig;
-import org.apache.sysml.runtime.instructions.InstructionParser;
-import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.BlockRow;
-import org.apache.sysml.runtime.matrix.JobReturn;
-import org.apache.sysml.runtime.matrix.WriteCSVMR;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-import org.apache.sysml.runtime.matrix.data.InputInfo;
-import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes;
-import org.apache.sysml.runtime.matrix.mapred.CSVReblockReducer;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.MatrixChar_N_ReducerGroups;
-import org.apache.sysml.runtime.util.MapReduceTool;
-
-/**
- * MapReduce job that performs the actual data transformations, such as recoding
- * and binning. In contrast to ApplyTxCSVMR, this job generates the output in
- * BinaryBlock format. This job takes a data set as well as the transformation
- * metadata (which, for example, computed from GenTxMtdMR) as inputs.
- * 
- */
-
-@SuppressWarnings("deprecation")
-public class ApplyTfBBMR {
-	
-	public static JobReturn runJob(String inputPath, String rblkInst, String otherInst, String specPath, String mapsPath, String tmpPath, String outputPath, String partOffsetsFile, CSVFileFormatProperties inputDataProperties, long numRows, long numColsBefore, long numColsAfter, int replication, String headerLine) throws Exception {
-		
-		CSVReblockInstruction rblk = (CSVReblockInstruction) InstructionParser.parseSingleInstruction(rblkInst);
-		
-		long[] rlens = new long[]{numRows};
-		long[] clens = new long[]{numColsAfter};
-		int[] brlens = new int[]{rblk.brlen};
-		int[] bclens = new int[]{rblk.bclen};
-		byte[] realIndexes = new byte[]{rblk.input};
-		byte[] resultIndexes = new byte[]{rblk.output};
-
-		JobConf job = new JobConf(ApplyTfBBMR.class);
-		job.setJobName("ApplyTfBB");
-
-		/* Setup MapReduce Job */
-		job.setJarByClass(ApplyTfBBMR.class);
-		
-		// set relevant classes
-		job.setMapperClass(ApplyTfBBMapper.class);
-	
-		MRJobConfiguration.setUpMultipleInputs(job, realIndexes, new String[]{inputPath}, new InputInfo[]{InputInfo.CSVInputInfo}, brlens, bclens, false, ConvertTarget.CELL);
-
-		MRJobConfiguration.setMatricesDimensions(job, realIndexes, rlens, clens);
-		MRJobConfiguration.setBlocksSizes(job, realIndexes, brlens, bclens);
-
-		MRJobConfiguration.setCSVReblockInstructions(job, rblkInst);
-		
-		//set up the instructions that will happen in the reducer, after the aggregation instrucions
-		MRJobConfiguration.setInstructionsInReducer(job, otherInst);
-
-		job.setInt("dfs.replication", replication);
-		
-		//set up preferred custom serialization framework for binary block format
-		if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
-			MRJobConfiguration.addBinaryBlockSerializationFramework( job );
-
-		//set up what matrices are needed to pass from the mapper to reducer
-		HashSet<Byte> mapoutputIndexes=MRJobConfiguration.setUpOutputIndexesForMapper(job, realIndexes,  null, 
-				rblkInst, null, otherInst, resultIndexes);
-
-		MatrixChar_N_ReducerGroups ret=MRJobConfiguration.computeMatrixCharacteristics(job, realIndexes, 
-				null, rblkInst, null, null, null, resultIndexes, mapoutputIndexes, false);
-
-		//set up the number of reducers
-		int numRed = WriteCSVMR.determineNumReducers(rlens, clens, ConfigurationManager.getConfig().getIntValue(DMLConfig.NUM_REDUCERS), ret.numReducerGroups);
-		job.setNumReduceTasks( numRed );
-
-		//set up the multiple output files, and their format information
-		MRJobConfiguration.setUpMultipleOutputs(job, new byte[]{rblk.output}, new byte[]{0}, new String[]{outputPath}, new OutputInfo[]{OutputInfo.BinaryBlockOutputInfo}, true, false);
-		
-		// configure mapper and the mapper output key value pairs
-		job.setMapperClass(ApplyTfBBMapper.class);
-		job.setMapOutputKeyClass(TaggedFirstSecondIndexes.class);
-		job.setMapOutputValueClass(BlockRow.class);
-		
-		//configure reducer
-		job.setReducerClass(CSVReblockReducer.class);
-	
-		//turn off adaptivemr
-		job.setBoolean("adaptivemr.map.enable", false);
-
-		//set unique working dir
-		MRJobConfiguration.setUniqueWorkingDir(job);
-		
-		// Add transformation metadata file as well as partOffsetsFile to Distributed cache
-		DistributedCache.addCacheFile((new Path(mapsPath)).toUri(), job);
-		DistributedCache.createSymlink(job);
-		
-		Path cachefile=new Path(new Path(partOffsetsFile), "part-00000");
-		DistributedCache.addCacheFile(cachefile.toUri(), job);
-		DistributedCache.createSymlink(job);
-		
-		job.set(MRJobConfiguration.TF_HAS_HEADER, 	Boolean.toString(inputDataProperties.hasHeader()));
-		job.set(MRJobConfiguration.TF_DELIM, 		inputDataProperties.getDelim());
-		if ( inputDataProperties.getNAStrings() != null)
-			// Adding "dummy" string to handle the case of na_strings = ""
-			job.set(MRJobConfiguration.TF_NA_STRINGS, TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) );
-		job.set(MRJobConfiguration.TF_SPEC_FILE, 	specPath);
-		job.set(MRJobConfiguration.TF_SMALLEST_FILE, CSVReblockMR.findSmallestFile(job, inputPath));
-		job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, outputPath);
-		job.setLong(MRJobConfiguration.TF_NUM_COLS, numColsBefore);
-		job.set(MRJobConfiguration.TF_TXMTD_PATH, mapsPath);
-		job.set(MRJobConfiguration.TF_HEADER, headerLine);
-		job.set(CSVReblockMR.ROWID_FILE_NAME, cachefile.toString());
-		job.set(MRJobConfiguration.TF_TMP_LOC, tmpPath);
-
-		RunningJob runjob=JobClient.runJob(job);
-		
-		MapReduceTool.deleteFileIfExistOnHDFS(cachefile, job);
-		
-		Group group=runjob.getCounters().getGroup(MRJobConfiguration.NUM_NONZERO_CELLS);
-		for(int i=0; i<resultIndexes.length; i++) {
-			ret.stats[i].setNonZeros(group.getCounter(Integer.toString(i)));
-		}
-		return new JobReturn(ret.stats, runjob.isSuccessful());
-	}
-	
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.util.HashSet;
+
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.conf.DMLConfig;
+import org.apache.sysml.runtime.instructions.InstructionParser;
+import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction;
+import org.apache.sysml.runtime.matrix.CSVReblockMR;
+import org.apache.sysml.runtime.matrix.CSVReblockMR.BlockRow;
+import org.apache.sysml.runtime.matrix.JobReturn;
+import org.apache.sysml.runtime.matrix.WriteCSVMR;
+import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes;
+import org.apache.sysml.runtime.matrix.mapred.CSVReblockReducer;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.MatrixChar_N_ReducerGroups;
+import org.apache.sysml.runtime.util.MapReduceTool;
+
+/**
+ * MapReduce job that performs the actual data transformations, such as recoding
+ * and binning. In contrast to ApplyTxCSVMR, this job generates the output in
+ * BinaryBlock format. This job takes a data set as well as the transformation
+ * metadata (which, for example, computed from GenTxMtdMR) as inputs.
+ * 
+ */
+
+@SuppressWarnings("deprecation")
+public class ApplyTfBBMR {
+	
+	public static JobReturn runJob(String inputPath, String rblkInst, String otherInst, String specPath, String mapsPath, String tmpPath, String outputPath, String partOffsetsFile, CSVFileFormatProperties inputDataProperties, long numRows, long numColsBefore, long numColsAfter, int replication, String headerLine) throws Exception {
+		
+		CSVReblockInstruction rblk = (CSVReblockInstruction) InstructionParser.parseSingleInstruction(rblkInst);
+		
+		long[] rlens = new long[]{numRows};
+		long[] clens = new long[]{numColsAfter};
+		int[] brlens = new int[]{rblk.brlen};
+		int[] bclens = new int[]{rblk.bclen};
+		byte[] realIndexes = new byte[]{rblk.input};
+		byte[] resultIndexes = new byte[]{rblk.output};
+
+		JobConf job = new JobConf(ApplyTfBBMR.class);
+		job.setJobName("ApplyTfBB");
+
+		/* Setup MapReduce Job */
+		job.setJarByClass(ApplyTfBBMR.class);
+		
+		// set relevant classes
+		job.setMapperClass(ApplyTfBBMapper.class);
+	
+		MRJobConfiguration.setUpMultipleInputs(job, realIndexes, new String[]{inputPath}, new InputInfo[]{InputInfo.CSVInputInfo}, brlens, bclens, false, ConvertTarget.CELL);
+
+		MRJobConfiguration.setMatricesDimensions(job, realIndexes, rlens, clens);
+		MRJobConfiguration.setBlocksSizes(job, realIndexes, brlens, bclens);
+
+		MRJobConfiguration.setCSVReblockInstructions(job, rblkInst);
+		
+		//set up the instructions that will happen in the reducer, after the aggregation instrucions
+		MRJobConfiguration.setInstructionsInReducer(job, otherInst);
+
+		job.setInt("dfs.replication", replication);
+		
+		//set up preferred custom serialization framework for binary block format
+		if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
+			MRJobConfiguration.addBinaryBlockSerializationFramework( job );
+
+		//set up what matrices are needed to pass from the mapper to reducer
+		HashSet<Byte> mapoutputIndexes=MRJobConfiguration.setUpOutputIndexesForMapper(job, realIndexes,  null, 
+				rblkInst, null, otherInst, resultIndexes);
+
+		MatrixChar_N_ReducerGroups ret=MRJobConfiguration.computeMatrixCharacteristics(job, realIndexes, 
+				null, rblkInst, null, null, null, resultIndexes, mapoutputIndexes, false);
+
+		//set up the number of reducers
+		int numRed = WriteCSVMR.determineNumReducers(rlens, clens, ConfigurationManager.getConfig().getIntValue(DMLConfig.NUM_REDUCERS), ret.numReducerGroups);
+		job.setNumReduceTasks( numRed );
+
+		//set up the multiple output files, and their format information
+		MRJobConfiguration.setUpMultipleOutputs(job, new byte[]{rblk.output}, new byte[]{0}, new String[]{outputPath}, new OutputInfo[]{OutputInfo.BinaryBlockOutputInfo}, true, false);
+		
+		// configure mapper and the mapper output key value pairs
+		job.setMapperClass(ApplyTfBBMapper.class);
+		job.setMapOutputKeyClass(TaggedFirstSecondIndexes.class);
+		job.setMapOutputValueClass(BlockRow.class);
+		
+		//configure reducer
+		job.setReducerClass(CSVReblockReducer.class);
+	
+		//turn off adaptivemr
+		job.setBoolean("adaptivemr.map.enable", false);
+
+		//set unique working dir
+		MRJobConfiguration.setUniqueWorkingDir(job);
+		
+		// Add transformation metadata file as well as partOffsetsFile to Distributed cache
+		DistributedCache.addCacheFile((new Path(mapsPath)).toUri(), job);
+		DistributedCache.createSymlink(job);
+		
+		Path cachefile=new Path(new Path(partOffsetsFile), "part-00000");
+		DistributedCache.addCacheFile(cachefile.toUri(), job);
+		DistributedCache.createSymlink(job);
+		
+		job.set(MRJobConfiguration.TF_HAS_HEADER, 	Boolean.toString(inputDataProperties.hasHeader()));
+		job.set(MRJobConfiguration.TF_DELIM, 		inputDataProperties.getDelim());
+		if ( inputDataProperties.getNAStrings() != null)
+			// Adding "dummy" string to handle the case of na_strings = ""
+			job.set(MRJobConfiguration.TF_NA_STRINGS, TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) );
+		job.set(MRJobConfiguration.TF_SPEC_FILE, 	specPath);
+		job.set(MRJobConfiguration.TF_SMALLEST_FILE, CSVReblockMR.findSmallestFile(job, inputPath));
+		job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, outputPath);
+		job.setLong(MRJobConfiguration.TF_NUM_COLS, numColsBefore);
+		job.set(MRJobConfiguration.TF_TXMTD_PATH, mapsPath);
+		job.set(MRJobConfiguration.TF_HEADER, headerLine);
+		job.set(CSVReblockMR.ROWID_FILE_NAME, cachefile.toString());
+		job.set(MRJobConfiguration.TF_TMP_LOC, tmpPath);
+
+		RunningJob runjob=JobClient.runJob(job);
+		
+		MapReduceTool.deleteFileIfExistOnHDFS(cachefile, job);
+		
+		Group group=runjob.getCounters().getGroup(MRJobConfiguration.NUM_NONZERO_CELLS);
+		for(int i=0; i<resultIndexes.length; i++) {
+			ret.stats[i].setNonZeros(group.getCounter(Integer.toString(i)));
+		}
+		return new JobReturn(ret.stats, runjob.isSuccessful());
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
index f2639f3..b91ab60 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
@@ -1,151 +1,151 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes;
-import org.apache.sysml.runtime.matrix.mapred.CSVReblockMapper;
-import org.apache.sysml.runtime.matrix.mapred.CSVReblockMapper.IndexedBlockRow;
-import org.apache.sysml.runtime.matrix.mapred.MapperBase;
-
-@SuppressWarnings("deprecation")
-public class ApplyTfBBMapper extends MapperBase implements Mapper<LongWritable, Text, TaggedFirstSecondIndexes, CSVReblockMR.BlockRow>{
-	
-	boolean _partFileWithHeader = false;
-	TfUtils tfmapper = null;
-	Reporter _reporter = null;
-	
-	// variables relevant to CSV Reblock
-	private IndexedBlockRow idxRow = null;
-	private long rowOffset=0;
-	private HashMap<Long, Long> offsetMap=new HashMap<Long, Long>();
-	private boolean _first = true;
-	private long num=0;
-	
-	@Override
-	public void configure(JobConf job) {
-		super.configure(job);
-		try {
-			_partFileWithHeader = TfUtils.isPartFileWithHeader(job);
-			tfmapper = new TfUtils(job);
-			tfmapper.loadTfMetadata(job, true);
-			
-			// Load relevant information for CSV Reblock
-			ByteWritable key=new ByteWritable();
-			OffsetCount value=new OffsetCount();
-			Path p=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
-			
-			FileSystem fs = FileSystem.get(job);
-			Path thisPath=new Path(job.get("map.input.file")).makeQualified(fs);
-			String thisfile=thisPath.toString();
-
-			SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
-			while (reader.next(key, value)) {
-				// "key" needn't be checked since the offset file has information about a single CSV input (the raw data file)
-				if(thisfile.equals(value.filename))
-					offsetMap.put(value.fileOffset, value.count);
-			}
-			reader.close();
-
-			idxRow = new CSVReblockMapper.IndexedBlockRow();
-			int maxBclen=0;
-		
-			for(ArrayList<CSVReblockInstruction> insv: csv_reblock_instructions)
-				for(CSVReblockInstruction in: insv)
-				{	
-					if(maxBclen<in.bclen)
-						maxBclen=in.bclen;
-				}
-			
-			//always dense since common csv usecase
-			idxRow.getRow().data.reset(1, maxBclen, false);		
-
-		} catch (IOException e) { throw new RuntimeException(e); }
- 		 catch(JSONException e)  { throw new RuntimeException(e); }
-
-	}
-	
-	@Override
-	public void map(LongWritable rawKey, Text rawValue, OutputCollector<TaggedFirstSecondIndexes,CSVReblockMR.BlockRow> out, Reporter reporter) throws IOException  {
-		
-		if(_first) {
-			rowOffset=offsetMap.get(rawKey.get());
-			_reporter = reporter;
-			_first=false;
-		}
-		
-		// output the header line
-		if ( rawKey.get() == 0 && _partFileWithHeader ) 
-		{
-			tfmapper.processHeaderLine();
-			if ( tfmapper.hasHeader() )
-				return;
-		}
-		
-		// parse the input line and apply transformation
-		String[] words = tfmapper.getWords(rawValue);
-		
-		if(!tfmapper.omit(words))
-		{
-			words = tfmapper.apply(words);
-			try {
-				tfmapper.check(words);
-				
-				// Perform CSV Reblock
-				CSVReblockInstruction ins = csv_reblock_instructions.get(0).get(0);
-				idxRow = CSVReblockMapper.processRow(idxRow, words, rowOffset, num, ins.output, ins.brlen, ins.bclen, ins.fill, ins.fillValue, out);
-			}
-			catch(DMLRuntimeException e) {
-				throw new RuntimeException(e.getMessage() + ":" + rawValue.toString());
-			}
-			num++;
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-	}
-
-	@Override
-	protected void specialOperationsForActualMap(int index,
-			OutputCollector<Writable, Writable> out, Reporter reporter)
-			throws IOException {
-	}
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction;
+import org.apache.sysml.runtime.matrix.CSVReblockMR;
+import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes;
+import org.apache.sysml.runtime.matrix.mapred.CSVReblockMapper;
+import org.apache.sysml.runtime.matrix.mapred.CSVReblockMapper.IndexedBlockRow;
+import org.apache.sysml.runtime.matrix.mapred.MapperBase;
+
+@SuppressWarnings("deprecation")
+public class ApplyTfBBMapper extends MapperBase implements Mapper<LongWritable, Text, TaggedFirstSecondIndexes, CSVReblockMR.BlockRow>{
+	
+	boolean _partFileWithHeader = false;
+	TfUtils tfmapper = null;
+	Reporter _reporter = null;
+	
+	// variables relevant to CSV Reblock
+	private IndexedBlockRow idxRow = null;
+	private long rowOffset=0;
+	private HashMap<Long, Long> offsetMap=new HashMap<Long, Long>();
+	private boolean _first = true;
+	private long num=0;
+	
+	@Override
+	public void configure(JobConf job) {
+		super.configure(job);
+		try {
+			_partFileWithHeader = TfUtils.isPartFileWithHeader(job);
+			tfmapper = new TfUtils(job);
+			tfmapper.loadTfMetadata(job, true);
+			
+			// Load relevant information for CSV Reblock
+			ByteWritable key=new ByteWritable();
+			OffsetCount value=new OffsetCount();
+			Path p=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
+			
+			FileSystem fs = FileSystem.get(job);
+			Path thisPath=new Path(job.get("map.input.file")).makeQualified(fs);
+			String thisfile=thisPath.toString();
+
+			SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
+			while (reader.next(key, value)) {
+				// "key" needn't be checked since the offset file has information about a single CSV input (the raw data file)
+				if(thisfile.equals(value.filename))
+					offsetMap.put(value.fileOffset, value.count);
+			}
+			reader.close();
+
+			idxRow = new CSVReblockMapper.IndexedBlockRow();
+			int maxBclen=0;
+		
+			for(ArrayList<CSVReblockInstruction> insv: csv_reblock_instructions)
+				for(CSVReblockInstruction in: insv)
+				{	
+					if(maxBclen<in.bclen)
+						maxBclen=in.bclen;
+				}
+			
+			//always dense since common csv usecase
+			idxRow.getRow().data.reset(1, maxBclen, false);		
+
+		} catch (IOException e) { throw new RuntimeException(e); }
+ 		 catch(JSONException e)  { throw new RuntimeException(e); }
+
+	}
+	
+	@Override
+	public void map(LongWritable rawKey, Text rawValue, OutputCollector<TaggedFirstSecondIndexes,CSVReblockMR.BlockRow> out, Reporter reporter) throws IOException  {
+		
+		if(_first) {
+			rowOffset=offsetMap.get(rawKey.get());
+			_reporter = reporter;
+			_first=false;
+		}
+		
+		// output the header line
+		if ( rawKey.get() == 0 && _partFileWithHeader ) 
+		{
+			tfmapper.processHeaderLine();
+			if ( tfmapper.hasHeader() )
+				return;
+		}
+		
+		// parse the input line and apply transformation
+		String[] words = tfmapper.getWords(rawValue);
+		
+		if(!tfmapper.omit(words))
+		{
+			words = tfmapper.apply(words);
+			try {
+				tfmapper.check(words);
+				
+				// Perform CSV Reblock
+				CSVReblockInstruction ins = csv_reblock_instructions.get(0).get(0);
+				idxRow = CSVReblockMapper.processRow(idxRow, words, rowOffset, num, ins.output, ins.brlen, ins.bclen, ins.fill, ins.fillValue, out);
+			}
+			catch(DMLRuntimeException e) {
+				throw new RuntimeException(e.getMessage() + ":" + rawValue.toString());
+			}
+			num++;
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+	}
+
+	@Override
+	protected void specialOperationsForActualMap(int index,
+			OutputCollector<Writable, Writable> out, Reporter reporter)
+			throws IOException {
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
index ce2cf48..7c323f4 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
@@ -1,128 +1,128 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.JobReturn;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-
-
-@SuppressWarnings("deprecation")
-public class ApplyTfCSVMR {
-	
-	public static JobReturn runJob(String inputPath, String specPath, String mapsPath, String tmpPath, String outputPath, String partOffsetsFile, CSVFileFormatProperties inputDataProperties, long numCols, int replication, String headerLine) throws IOException, ClassNotFoundException, InterruptedException {
-		JobConf job = new JobConf(ApplyTfCSVMR.class);
-		job.setJobName("ApplyTfCSV");
-
-		/* Setup MapReduce Job */
-		job.setJarByClass(ApplyTfCSVMR.class);
-		
-		// set relevant classes
-		job.setMapperClass(ApplyTfCSVMapper.class);
-		job.setNumReduceTasks(0);
-	
-		// Add transformation metadata file as well as partOffsetsFile to Distributed cache
-		DistributedCache.addCacheFile((new Path(mapsPath)).toUri(), job);
-		DistributedCache.createSymlink(job);
-		
-		Path cachefile=new Path(partOffsetsFile);
-		DistributedCache.addCacheFile(cachefile.toUri(), job);
-		DistributedCache.createSymlink(job);
-		
-		// set input and output properties
-		job.setInputFormat(TextInputFormat.class);
-		job.setOutputFormat(TextOutputFormat.class);
-		
-		job.setMapOutputKeyClass(NullWritable.class);
-		job.setMapOutputValueClass(Text.class);
-		
-		job.setOutputKeyClass(NullWritable.class);
-		job.setOutputValueClass(Text.class);
-		
-		job.setInt("dfs.replication", replication);
-		
-		FileInputFormat.addInputPath(job, new Path(inputPath));
-		// delete outputPath, if exists already.
-		Path outPath = new Path(outputPath);
-		FileSystem fs = FileSystem.get(job);
-		fs.delete(outPath, true);
-		FileOutputFormat.setOutputPath(job, outPath);
-
-		job.set(MRJobConfiguration.TF_HAS_HEADER, 	Boolean.toString(inputDataProperties.hasHeader()));
-		job.set(MRJobConfiguration.TF_DELIM, 		inputDataProperties.getDelim());
-		if ( inputDataProperties.getNAStrings() != null)
-			// Adding "dummy" string to handle the case of na_strings = ""
-			job.set(MRJobConfiguration.TF_NA_STRINGS, TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) );
-		job.set(MRJobConfiguration.TF_SPEC_FILE, 	specPath);
-		job.set(MRJobConfiguration.TF_SMALLEST_FILE, CSVReblockMR.findSmallestFile(job, inputPath));
-		job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, outputPath);
-		job.setLong(MRJobConfiguration.TF_NUM_COLS, numCols);
-		job.set(MRJobConfiguration.TF_TXMTD_PATH, mapsPath);
-		job.set(MRJobConfiguration.TF_HEADER, headerLine);
-		job.set(CSVReblockMR.ROWID_FILE_NAME, cachefile.toString());
-		job.set(MRJobConfiguration.TF_TMP_LOC, tmpPath);
-		
-		//turn off adaptivemr
-		job.setBoolean("adaptivemr.map.enable", false);
-
-		// Run the job
-		RunningJob runjob = JobClient.runJob(job);
-		
-		// Since transform CSV produces part files w/ prefix transform-part-*,
-		// delete all the "default" part-..... files
-		deletePartFiles(fs, outPath);
-		
-		MatrixCharacteristics mc = new MatrixCharacteristics();
-		return new JobReturn(new MatrixCharacteristics[]{mc}, runjob.isSuccessful());
-	}
-	
-	private static void deletePartFiles(FileSystem fs, Path path) throws FileNotFoundException, IOException
-	{
-		PathFilter filter=new PathFilter(){
-			public boolean accept(Path file) {
-				return file.getName().startsWith("part-");
-	        }
-		};
-		FileStatus[] list = fs.listStatus(path, filter);
-		for(FileStatus stat : list) {
-			fs.delete(stat.getPath(), false);
-		}
-	}
-	
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+import org.apache.sysml.runtime.matrix.CSVReblockMR;
+import org.apache.sysml.runtime.matrix.JobReturn;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+
+
+@SuppressWarnings("deprecation")
+public class ApplyTfCSVMR {
+	
+	public static JobReturn runJob(String inputPath, String specPath, String mapsPath, String tmpPath, String outputPath, String partOffsetsFile, CSVFileFormatProperties inputDataProperties, long numCols, int replication, String headerLine) throws IOException, ClassNotFoundException, InterruptedException {
+		JobConf job = new JobConf(ApplyTfCSVMR.class);
+		job.setJobName("ApplyTfCSV");
+
+		/* Setup MapReduce Job */
+		job.setJarByClass(ApplyTfCSVMR.class);
+		
+		// set relevant classes
+		job.setMapperClass(ApplyTfCSVMapper.class);
+		job.setNumReduceTasks(0);
+	
+		// Add transformation metadata file as well as partOffsetsFile to Distributed cache
+		DistributedCache.addCacheFile((new Path(mapsPath)).toUri(), job);
+		DistributedCache.createSymlink(job);
+		
+		Path cachefile=new Path(partOffsetsFile);
+		DistributedCache.addCacheFile(cachefile.toUri(), job);
+		DistributedCache.createSymlink(job);
+		
+		// set input and output properties
+		job.setInputFormat(TextInputFormat.class);
+		job.setOutputFormat(TextOutputFormat.class);
+		
+		job.setMapOutputKeyClass(NullWritable.class);
+		job.setMapOutputValueClass(Text.class);
+		
+		job.setOutputKeyClass(NullWritable.class);
+		job.setOutputValueClass(Text.class);
+		
+		job.setInt("dfs.replication", replication);
+		
+		FileInputFormat.addInputPath(job, new Path(inputPath));
+		// delete outputPath, if exists already.
+		Path outPath = new Path(outputPath);
+		FileSystem fs = FileSystem.get(job);
+		fs.delete(outPath, true);
+		FileOutputFormat.setOutputPath(job, outPath);
+
+		job.set(MRJobConfiguration.TF_HAS_HEADER, 	Boolean.toString(inputDataProperties.hasHeader()));
+		job.set(MRJobConfiguration.TF_DELIM, 		inputDataProperties.getDelim());
+		if ( inputDataProperties.getNAStrings() != null)
+			// Adding "dummy" string to handle the case of na_strings = ""
+			job.set(MRJobConfiguration.TF_NA_STRINGS, TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) );
+		job.set(MRJobConfiguration.TF_SPEC_FILE, 	specPath);
+		job.set(MRJobConfiguration.TF_SMALLEST_FILE, CSVReblockMR.findSmallestFile(job, inputPath));
+		job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, outputPath);
+		job.setLong(MRJobConfiguration.TF_NUM_COLS, numCols);
+		job.set(MRJobConfiguration.TF_TXMTD_PATH, mapsPath);
+		job.set(MRJobConfiguration.TF_HEADER, headerLine);
+		job.set(CSVReblockMR.ROWID_FILE_NAME, cachefile.toString());
+		job.set(MRJobConfiguration.TF_TMP_LOC, tmpPath);
+		
+		//turn off adaptivemr
+		job.setBoolean("adaptivemr.map.enable", false);
+
+		// Run the job
+		RunningJob runjob = JobClient.runJob(job);
+		
+		// Since transform CSV produces part files w/ prefix transform-part-*,
+		// delete all the "default" part-..... files
+		deletePartFiles(fs, outPath);
+		
+		MatrixCharacteristics mc = new MatrixCharacteristics();
+		return new JobReturn(new MatrixCharacteristics[]{mc}, runjob.isSuccessful());
+	}
+	
+	private static void deletePartFiles(FileSystem fs, Path path) throws FileNotFoundException, IOException
+	{
+		PathFilter filter=new PathFilter(){
+			public boolean accept(Path file) {
+				return file.getName().startsWith("part-");
+	        }
+		};
+		FileStatus[] list = fs.listStatus(path, filter);
+		for(FileStatus stat : list) {
+			fs.delete(stat.getPath(), false);
+		}
+	}
+	
+}