You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by du...@apache.org on 2016/01/22 17:34:11 UTC

[35/51] [partial] incubator-systemml git commit: [SYSTEMML-482] [SYSTEMML-480] Adding a Git attributes file to enfore Unix-styled line endings, and normalizing all of the line endings.

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
index 0c0d399..7fb1ccc 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
@@ -1,112 +1,112 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-
-public class ApplyTfCSVMapper implements Mapper<LongWritable, Text, NullWritable, Text> {
-	
-	boolean _firstRecordInSplit = true;
-	boolean _partFileWithHeader = false;
-	
-	TfUtils tfmapper = null;
-	Reporter _reporter = null;
-	BufferedWriter br = null;
-	JobConf _rJob = null;
-	
-	@Override
-	public void configure(JobConf job) {
-		try {
-			_rJob = job;
-			_partFileWithHeader = TfUtils.isPartFileWithHeader(job);
-			tfmapper = new TfUtils(job);
-			
-			tfmapper.loadTfMetadata(job, true);
-			
-		} catch (IOException e) { throw new RuntimeException(e); }
-		catch(JSONException e)  { throw new RuntimeException(e); }
-
-	}
-	
-	@Override
-	public void map(LongWritable rawKey, Text rawValue, OutputCollector<NullWritable, Text> out, Reporter reporter) throws IOException  {
-		
-		if(_firstRecordInSplit)
-		{
-			_firstRecordInSplit = false;
-			_reporter = reporter;
-			
-			// generate custom output paths so that order of rows in the 
-			// output (across part files) matches w/ that from input data set
-			String partFileSuffix = tfmapper.getPartFileID(_rJob, rawKey.get());
-			Path mapOutputPath = new Path(tfmapper.getOutputPath() + "/transform-part-" + partFileSuffix);
-			
-			// setup the writer for mapper's output
-			// the default part-..... files will be deleted later once the job finishes 
-			br = new BufferedWriter(new OutputStreamWriter(FileSystem.get(_rJob).create( mapOutputPath, true)));
-		}
-		
-		// output the header line
-		if ( rawKey.get() == 0 && _partFileWithHeader ) 
-		{
-			_reporter = reporter;
-			tfmapper.processHeaderLine();
-			if ( tfmapper.hasHeader() )
-				return;
-		}
-		
-		// parse the input line and apply transformation
-		String[] words = tfmapper.getWords(rawValue);
-		
-		if(!tfmapper.omit(words))
-		{
-			try {
-				words = tfmapper.apply(words);
-				String outStr = tfmapper.checkAndPrepOutputString(words);
-				//out.collect(NullWritable.get(), new Text(outStr));
-				br.write(outStr + "\n");
-			} 
-			catch(DMLRuntimeException e) {
-				throw new RuntimeException(e.getMessage() + ": " + rawValue.toString());
-			}
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-		if ( br != null ) 
-			br.close();
-	}
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+
+public class ApplyTfCSVMapper implements Mapper<LongWritable, Text, NullWritable, Text> {
+	
+	boolean _firstRecordInSplit = true;
+	boolean _partFileWithHeader = false;
+	
+	TfUtils tfmapper = null;
+	Reporter _reporter = null;
+	BufferedWriter br = null;
+	JobConf _rJob = null;
+	
+	@Override
+	public void configure(JobConf job) {
+		try {
+			_rJob = job;
+			_partFileWithHeader = TfUtils.isPartFileWithHeader(job);
+			tfmapper = new TfUtils(job);
+			
+			tfmapper.loadTfMetadata(job, true);
+			
+		} catch (IOException e) { throw new RuntimeException(e); }
+		catch(JSONException e)  { throw new RuntimeException(e); }
+
+	}
+	
+	@Override
+	public void map(LongWritable rawKey, Text rawValue, OutputCollector<NullWritable, Text> out, Reporter reporter) throws IOException  {
+		
+		if(_firstRecordInSplit)
+		{
+			_firstRecordInSplit = false;
+			_reporter = reporter;
+			
+			// generate custom output paths so that order of rows in the 
+			// output (across part files) matches w/ that from input data set
+			String partFileSuffix = tfmapper.getPartFileID(_rJob, rawKey.get());
+			Path mapOutputPath = new Path(tfmapper.getOutputPath() + "/transform-part-" + partFileSuffix);
+			
+			// setup the writer for mapper's output
+			// the default part-..... files will be deleted later once the job finishes 
+			br = new BufferedWriter(new OutputStreamWriter(FileSystem.get(_rJob).create( mapOutputPath, true)));
+		}
+		
+		// output the header line
+		if ( rawKey.get() == 0 && _partFileWithHeader ) 
+		{
+			_reporter = reporter;
+			tfmapper.processHeaderLine();
+			if ( tfmapper.hasHeader() )
+				return;
+		}
+		
+		// parse the input line and apply transformation
+		String[] words = tfmapper.getWords(rawValue);
+		
+		if(!tfmapper.omit(words))
+		{
+			try {
+				words = tfmapper.apply(words);
+				String outStr = tfmapper.checkAndPrepOutputString(words);
+				//out.collect(NullWritable.get(), new Text(outStr));
+				br.write(outStr + "\n");
+			} 
+			catch(DMLRuntimeException e) {
+				throw new RuntimeException(e.getMessage() + ": " + rawValue.toString());
+			}
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		if ( br != null ) 
+			br.close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
index 693d687..061f2e3 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
@@ -1,160 +1,160 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-
-
-public class ApplyTfCSVSPARK {
-	
-	/**
-	 * Apply transformation metadata and generate the result in CSV format, as a
-	 * JavaRDD of Strings.
-	 */
-
-	public static JavaPairRDD<Long, String> runSparkJob(
-			SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, Text>> inputRDD, 
-			String tfMtdPath, String specFile, 
-			String tmpPath, CSVFileFormatProperties prop, 
-			int numCols, String headerLine
-		) throws IOException, ClassNotFoundException, InterruptedException, IllegalArgumentException, JSONException {
-
-		// Load transformation metadata and broadcast it
-		JobConf job = new JobConf();
-		FileSystem fs = FileSystem.get(job);
-		
-		String[] naStrings = TfUtils.parseNAStrings(prop.getNAStrings());
-		JSONObject spec = TfUtils.readSpec(fs, specFile);
-		TfUtils _tfmapper = new TfUtils(headerLine, prop.hasHeader(), prop.getDelim(), naStrings, spec, numCols, tfMtdPath, null, tmpPath);
-		
-		_tfmapper.loadTfMetadata();
-
-		Broadcast<TfUtils> bcast_tf = sec.getSparkContext().broadcast(_tfmapper);
-		
-		/*
-		 * Construct transformation metadata (map-side) -- the logic is similar
-		 * to GTFMTDMapper
-		 * 
-		 * Note: The result of mapPartitionsWithIndex is cached so that the
-		 * transformed data is not redundantly computed multiple times
-		 */
-		JavaPairRDD<Long, String> applyRDD = inputRDD
-				.mapPartitionsWithIndex( new ApplyTfCSVMap(bcast_tf),  true)
-				.mapToPair(
-						new PairFunction<String,Long,String>(){
-							private static final long serialVersionUID = 3868143093999082931L;
-							@Override
-							public Tuple2<Long, String> call(String t) throws Exception {
-								return new Tuple2<Long, String>(new Long(1), t);
-							}
-						}
-				).cache();
-
-		/*
-		 * An action to force execution of apply()
-		 * 
-		 * We need to trigger the execution of this RDD so as to ensure the
-		 * creation of a few metadata files (headers, dummycoded information,
-		 * etc.), which are referenced in the caller function.
-		 */
-		applyRDD.count();
-		
-		return applyRDD;
-	}
-
-	public static class ApplyTfCSVMap implements Function2<Integer, Iterator<Tuple2<LongWritable, Text>>, Iterator<String>> {
-
-		private static final long serialVersionUID = 1496686437276906911L;
-
-		TfUtils _tfmapper = null;
-		
-		ApplyTfCSVMap(boolean hasHeader, String delim, String naStrings, String specFile, String tmpPath, String tfMtdPath, long numCols, String headerLine, Broadcast<TfUtils> tf) throws IllegalArgumentException, IOException, JSONException {
-			_tfmapper = tf.getValue();
-		}
-		
-		ApplyTfCSVMap(Broadcast<TfUtils> tf) throws IllegalArgumentException, IOException, JSONException {
-			_tfmapper = tf.getValue();
-		}
-		
-		@Override
-		public Iterator<String> call(Integer partitionID,
-				Iterator<Tuple2<LongWritable, Text>> csvLines) throws Exception {
-			
-			boolean first = true;
-			Tuple2<LongWritable, Text> rec = null;
-			ArrayList<String> outLines = new ArrayList<String>();
-			
-			while(csvLines.hasNext()) {
-				rec = csvLines.next();
-				
-				if (first && partitionID == 0) {
-					first = false;
-					
-					_tfmapper.processHeaderLine();
-					
-					if (_tfmapper.hasHeader() ) {
-						//outLines.add(dcdHeader); // if the header needs to be preserved in the output file
-						continue; 
-					}
-				}
-				
-				// parse the input line and apply transformation
-			
-				String[] words = _tfmapper.getWords(rec._2());
-				
-				if(!_tfmapper.omit(words))
-				{
-					try {
-						words = _tfmapper.apply(words);
-						String outStr = _tfmapper.checkAndPrepOutputString(words);
-						outLines.add(outStr);
-					} 
-					catch(DMLRuntimeException e) {
-						throw new RuntimeException(e.getMessage() + ": " + rec._2().toString());
-					}
-				}
-			}
-			
-			return outLines.iterator();
-		}
-		
-	}
-
-	
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import scala.Tuple2;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+
+
+public class ApplyTfCSVSPARK {
+	
+	/**
+	 * Apply transformation metadata and generate the result in CSV format, as a
+	 * JavaRDD of Strings.
+	 */
+
+	public static JavaPairRDD<Long, String> runSparkJob(
+			SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, Text>> inputRDD, 
+			String tfMtdPath, String specFile, 
+			String tmpPath, CSVFileFormatProperties prop, 
+			int numCols, String headerLine
+		) throws IOException, ClassNotFoundException, InterruptedException, IllegalArgumentException, JSONException {
+
+		// Load transformation metadata and broadcast it
+		JobConf job = new JobConf();
+		FileSystem fs = FileSystem.get(job);
+		
+		String[] naStrings = TfUtils.parseNAStrings(prop.getNAStrings());
+		JSONObject spec = TfUtils.readSpec(fs, specFile);
+		TfUtils _tfmapper = new TfUtils(headerLine, prop.hasHeader(), prop.getDelim(), naStrings, spec, numCols, tfMtdPath, null, tmpPath);
+		
+		_tfmapper.loadTfMetadata();
+
+		Broadcast<TfUtils> bcast_tf = sec.getSparkContext().broadcast(_tfmapper);
+		
+		/*
+		 * Construct transformation metadata (map-side) -- the logic is similar
+		 * to GTFMTDMapper
+		 * 
+		 * Note: The result of mapPartitionsWithIndex is cached so that the
+		 * transformed data is not redundantly computed multiple times
+		 */
+		JavaPairRDD<Long, String> applyRDD = inputRDD
+				.mapPartitionsWithIndex( new ApplyTfCSVMap(bcast_tf),  true)
+				.mapToPair(
+						new PairFunction<String,Long,String>(){
+							private static final long serialVersionUID = 3868143093999082931L;
+							@Override
+							public Tuple2<Long, String> call(String t) throws Exception {
+								return new Tuple2<Long, String>(new Long(1), t);
+							}
+						}
+				).cache();
+
+		/*
+		 * An action to force execution of apply()
+		 * 
+		 * We need to trigger the execution of this RDD so as to ensure the
+		 * creation of a few metadata files (headers, dummycoded information,
+		 * etc.), which are referenced in the caller function.
+		 */
+		applyRDD.count();
+		
+		return applyRDD;
+	}
+
+	public static class ApplyTfCSVMap implements Function2<Integer, Iterator<Tuple2<LongWritable, Text>>, Iterator<String>> {
+
+		private static final long serialVersionUID = 1496686437276906911L;
+
+		TfUtils _tfmapper = null;
+		
+		ApplyTfCSVMap(boolean hasHeader, String delim, String naStrings, String specFile, String tmpPath, String tfMtdPath, long numCols, String headerLine, Broadcast<TfUtils> tf) throws IllegalArgumentException, IOException, JSONException {
+			_tfmapper = tf.getValue();
+		}
+		
+		ApplyTfCSVMap(Broadcast<TfUtils> tf) throws IllegalArgumentException, IOException, JSONException {
+			_tfmapper = tf.getValue();
+		}
+		
+		@Override
+		public Iterator<String> call(Integer partitionID,
+				Iterator<Tuple2<LongWritable, Text>> csvLines) throws Exception {
+			
+			boolean first = true;
+			Tuple2<LongWritable, Text> rec = null;
+			ArrayList<String> outLines = new ArrayList<String>();
+			
+			while(csvLines.hasNext()) {
+				rec = csvLines.next();
+				
+				if (first && partitionID == 0) {
+					first = false;
+					
+					_tfmapper.processHeaderLine();
+					
+					if (_tfmapper.hasHeader() ) {
+						//outLines.add(dcdHeader); // if the header needs to be preserved in the output file
+						continue; 
+					}
+				}
+				
+				// parse the input line and apply transformation
+			
+				String[] words = _tfmapper.getWords(rec._2());
+				
+				if(!_tfmapper.omit(words))
+				{
+					try {
+						words = _tfmapper.apply(words);
+						String outStr = _tfmapper.checkAndPrepOutputString(words);
+						outLines.add(outStr);
+					} 
+					catch(DMLRuntimeException e) {
+						throw new RuntimeException(e.getMessage() + ": " + rec._2().toString());
+					}
+				}
+			}
+			
+			return outLines.iterator();
+		}
+		
+	}
+
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
index f08c9ff..b61c781 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
@@ -1,355 +1,355 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.nio.charset.CharacterCodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.wink.json4j.JSONArray;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
-import org.apache.sysml.runtime.util.UtilFunctions;
-
-public class BinAgent extends TransformationAgent {
-	
-	private static final long serialVersionUID = 1917445005206076078L;
-
-	public static final String MIN_PREFIX = "min";
-	public static final String MAX_PREFIX = "max";
-	public static final String NBINS_PREFIX = "nbins";
-
-	private int[] _binList = null;
-	//private byte[] _binMethodList = null;	// Not used, since only equi-width is supported for now. 
-	private int[] _numBins = null;
-
-	private double[] _min=null, _max=null;	// min and max among non-missing values
-
-	private double[] _binWidths = null;		// width of a bin for each attribute
-	
-	BinAgent() { }
-	
-	BinAgent(JSONObject parsedSpec) throws JSONException {
-		
-		if ( !parsedSpec.containsKey(TX_METHOD.BIN.toString()) )
-			return;
-		
-		JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.BIN.toString());
-		
-		JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
-		//JSONArray mthds = (JSONArray) obj.get(JSON_MTHD);
-		JSONArray nbins = (JSONArray) obj.get(JSON_NBINS);
-			
-		assert(attrs.size() == nbins.size());
-			
-		_binList = new int[attrs.size()];
-		_numBins = new int[attrs.size()];
-		for(int i=0; i < _binList.length; i++) {
-			_binList[i] = UtilFunctions.toInt(attrs.get(i));
-			_numBins[i] = UtilFunctions.toInt(nbins.get(i)); 
-		}
-		
-		// initialize internal transformation metadata
-		_min = new double[_binList.length];
-		Arrays.fill(_min, Double.MAX_VALUE);
-		_max = new double[_binList.length];
-		Arrays.fill(_max, -Double.MAX_VALUE);
-		
-		_binWidths = new double[_binList.length];
-	}
-	
-	public void prepare(String[] words, TfUtils agents) {
-		if ( _binList == null )
-			return;
-		
-		for(int i=0; i <_binList.length; i++) {
-			int colID = _binList[i];
-			
-			String w = null;
-			double d = 0;
-				
-			// equi-width
-			w = UtilFunctions.unquote(words[colID-1].trim());
-			if(!agents.isNA(w)) {
-				d = UtilFunctions.parseToDouble(w);
-				if(d < _min[i])
-					_min[i] = d;
-				if(d > _max[i])
-					_max[i] = d;
-			}
-		}
-	}
-	
-	private DistinctValue prepMinOutput(int idx) throws CharacterCodingException {
-		String s =  MIN_PREFIX + Double.toString(_min[idx]);
-		return  new DistinctValue(s, -1L);
-	}
-	
-	private DistinctValue prepMaxOutput(int idx) throws CharacterCodingException {
-		String s =  MAX_PREFIX + Double.toString(_max[idx]);
-		return  new DistinctValue(s, -1L);
-	}
-	
-	private DistinctValue prepNBinsOutput(int idx) throws CharacterCodingException {
-		String s =  NBINS_PREFIX + Double.toString(_numBins[idx]);
-		return  new DistinctValue(s, -1L);
-	}
-	
-	/**
-	 * Method to output transformation metadata from the mappers. 
-	 * This information is collected and merged by the reducers.
-	 * 
-	 * @param out
-	 * @throws IOException
-	 */
-	@Override
-	public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException {
-		if ( _binList == null )
-			return;
-		
-		try { 
-			for(int i=0; i < _binList.length; i++) {
-				int colID = _binList[i];
-				IntWritable iw = new IntWritable(-colID);
-				
-				out.collect(iw,  prepMinOutput(i));
-				out.collect(iw,  prepMaxOutput(i));
-				out.collect(iw,  prepNBinsOutput(i));
-			}
-		} catch(Exception e) {
-			throw new IOException(e);
-		}
-	}
-	
-	public ArrayList<Tuple2<Integer, DistinctValue>> mapOutputTransformationMetadata(int taskID, ArrayList<Tuple2<Integer, DistinctValue>> list, TfUtils agents) throws IOException {
-		if ( _binList == null )
-			return list;
-		
-		try { 
-			for(int i=0; i < _binList.length; i++) {
-				int colID = _binList[i];
-				Integer iw = -colID;
-				
-				list.add( new Tuple2<Integer,DistinctValue>(iw, prepMinOutput(i)) );
-				list.add( new Tuple2<Integer,DistinctValue>(iw, prepMaxOutput(i)) );
-				list.add( new Tuple2<Integer,DistinctValue>(iw, prepNBinsOutput(i)) );
-			}
-		} catch(Exception e) {
-			throw new IOException(e);
-		}
-		return list;
-	}
-
-	private void writeTfMtd(int colID, String min, String max, String binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException 
-	{
-		Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + BIN_FILE_SUFFIX);
-		BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
-		br.write(colID + TXMTD_SEP + min + TXMTD_SEP + max + TXMTD_SEP + binwidth + TXMTD_SEP + nbins + "\n");
-		br.close();
-	}
-
-	/** 
-	 * Method to merge map output transformation metadata.
-	 * 
-	 * @param values
-	 * @return
-	 * @throws IOException 
-	 */
-	@Override
-	public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
-		double min = Double.MAX_VALUE;
-		double max = -Double.MAX_VALUE;
-		int nbins = 0;
-		
-		DistinctValue val = new DistinctValue();
-		String w = null;
-		double d;
-		while(values.hasNext()) {
-			val.reset();
-			val = values.next();
-			w = val.getWord();
-			
-			if(w.startsWith(MIN_PREFIX)) {
-				d = UtilFunctions.parseToDouble(w.substring( MIN_PREFIX.length() ));
-				if ( d < min )
-					min = d;
-			}
-			else if(w.startsWith(MAX_PREFIX)) {
-				d = UtilFunctions.parseToDouble(w.substring( MAX_PREFIX.length() ));
-				if ( d > max )
-					max = d;
-			}
-			else if (w.startsWith(NBINS_PREFIX)) {
-				nbins = (int) UtilFunctions.parseToLong( w.substring(NBINS_PREFIX.length() ) );
-			}
-			else
-				throw new RuntimeException("MVImputeAgent: Invalid prefix while merging map output: " + w);
-		}
-		
-		// write merged metadata
-		double binwidth = (max-min)/nbins;
-		writeTfMtd(colID, Double.toString(min), Double.toString(max), Double.toString(binwidth), Integer.toString(nbins), outputDir, fs, agents);
-	}
-	
-	
-	public void outputTransformationMetadata(String outputDir, FileSystem fs, TfUtils agents) throws IOException {
-		if(_binList == null)
-			return;
-		
-		MVImputeAgent mvagent = agents.getMVImputeAgent();
-		for(int i=0; i < _binList.length; i++) {
-			int colID = _binList[i];
-			
-			// If the column is imputed with a constant, then adjust min and max based the value of the constant.
-			if ( mvagent.isImputed(colID) != -1 && mvagent.getMethod(colID) == MVMethod.CONSTANT ) 
-			{
-				double cst = UtilFunctions.parseToDouble( mvagent.getReplacement(colID) );
-				if ( cst < _min[i])
-					_min[i] = cst;
-				if ( cst > _max[i])
-					_max[i] = cst;
-			}
-			
-			double binwidth = (_max[i] - _min[i])/_numBins[i];
-			writeTfMtd(colID, Double.toString(_min[i]), Double.toString(_max[i]), Double.toString(binwidth), Integer.toString(_numBins[i]), outputDir, fs, agents);
-		}
-	}
-	
-	// ------------------------------------------------------------------------------------------------
-
-	public int[] getBinList() { return _binList; }
-	public int[] getNumBins() { return _numBins; }
-	public double[] getMin()  { return _min; }
-	public double[] getBinWidths() { return _binWidths; }
-	
-	/**
-	 * Method to load transform metadata for all attributes
-	 * 
-	 * @param job
-	 * @throws IOException
-	 */
-	@Override
-	public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException {
-		if ( _binList == null )
-			return;
-		
-		if(fs.isDirectory(txMtdDir)) {
-			for(int i=0; i<_binList.length;i++) {
-				int colID = _binList[i];
-				
-				Path path = new Path( txMtdDir + "/Bin/" + agents.getName(colID) + BIN_FILE_SUFFIX);
-				TfUtils.checkValidInputFile(fs, path, true); 
-					
-				BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)));
-				// format: colID,min,max,nbins
-				String[] fields = br.readLine().split(TXMTD_SEP);
-				double min = UtilFunctions.parseToDouble(fields[1]);
-				//double max = UtilFunctions.parseToDouble(fields[2]);
-				double binwidth = UtilFunctions.parseToDouble(fields[3]);
-				int nbins = UtilFunctions.parseToInt(fields[4]);
-				
-				_numBins[i] = nbins;
-				_min[i] = min;
-				_binWidths[i] = binwidth; // (max-min)/nbins;
-				
-				br.close();
-			}
-		}
-		else {
-			fs.close();
-			throw new RuntimeException("Path to recode maps must be a directory: " + txMtdDir);
-		}
-	}
-	
-	/**
-	 * Method to apply transformations.
-	 * 
-	 * @param words
-	 * @return
-	 */
-	@Override
-	public String[] apply(String[] words, TfUtils agents) {
-		if ( _binList == null )
-			return words;
-	
-		for(int i=0; i < _binList.length; i++) {
-			int colID = _binList[i];
-			
-			try {
-			double val = UtilFunctions.parseToDouble(words[colID-1]);
-			int binid = 1;
-			double tmp = _min[i] + _binWidths[i];
-			while(val > tmp && binid < _numBins[i]) {
-				tmp += _binWidths[i];
-				binid++;
-			}
-			words[colID-1] = Integer.toString(binid);
-			} catch(NumberFormatException e)
-			{
-				throw new RuntimeException("Encountered \"" + words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an appropriate imputation method.");
-			}
-		}
-		
-		return words;
-	}
-	
-	/**
-	 * Check if the given column ID is subjected to this transformation.
-	 * 
-	 */
-	public int isBinned(int colID)
-	{
-		if(_binList == null)
-			return -1;
-		
-		int idx = Arrays.binarySearch(_binList, colID);
-		return ( idx >= 0 ? idx : -1);
-	}
-
-
-	@Override
-	public void print() {
-		System.out.print("Binning List (Equi-width): \n    ");
-		for(int i : _binList) {
-			System.out.print(i + " ");
-		}
-		System.out.print("\n    ");
-		for(int b : _numBins) {
-			System.out.print(b + " ");
-		}
-		System.out.println();
-	}
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.CharacterCodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.wink.json4j.JSONArray;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import scala.Tuple2;
+
+import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class BinAgent extends TransformationAgent {
+	
+	private static final long serialVersionUID = 1917445005206076078L;
+
+	public static final String MIN_PREFIX = "min";
+	public static final String MAX_PREFIX = "max";
+	public static final String NBINS_PREFIX = "nbins";
+
+	private int[] _binList = null;
+	//private byte[] _binMethodList = null;	// Not used, since only equi-width is supported for now. 
+	private int[] _numBins = null;
+
+	private double[] _min=null, _max=null;	// min and max among non-missing values
+
+	private double[] _binWidths = null;		// width of a bin for each attribute
+	
+	BinAgent() { }
+	
+	BinAgent(JSONObject parsedSpec) throws JSONException {
+		
+		if ( !parsedSpec.containsKey(TX_METHOD.BIN.toString()) )
+			return;
+		
+		JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.BIN.toString());
+		
+		JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
+		//JSONArray mthds = (JSONArray) obj.get(JSON_MTHD);
+		JSONArray nbins = (JSONArray) obj.get(JSON_NBINS);
+			
+		assert(attrs.size() == nbins.size());
+			
+		_binList = new int[attrs.size()];
+		_numBins = new int[attrs.size()];
+		for(int i=0; i < _binList.length; i++) {
+			_binList[i] = UtilFunctions.toInt(attrs.get(i));
+			_numBins[i] = UtilFunctions.toInt(nbins.get(i)); 
+		}
+		
+		// initialize internal transformation metadata
+		_min = new double[_binList.length];
+		Arrays.fill(_min, Double.MAX_VALUE);
+		_max = new double[_binList.length];
+		Arrays.fill(_max, -Double.MAX_VALUE);
+		
+		_binWidths = new double[_binList.length];
+	}
+	
+	public void prepare(String[] words, TfUtils agents) {
+		if ( _binList == null )
+			return;
+		
+		for(int i=0; i <_binList.length; i++) {
+			int colID = _binList[i];
+			
+			String w = null;
+			double d = 0;
+				
+			// equi-width
+			w = UtilFunctions.unquote(words[colID-1].trim());
+			if(!agents.isNA(w)) {
+				d = UtilFunctions.parseToDouble(w);
+				if(d < _min[i])
+					_min[i] = d;
+				if(d > _max[i])
+					_max[i] = d;
+			}
+		}
+	}
+	
+	private DistinctValue prepMinOutput(int idx) throws CharacterCodingException {
+		String s =  MIN_PREFIX + Double.toString(_min[idx]);
+		return  new DistinctValue(s, -1L);
+	}
+	
+	private DistinctValue prepMaxOutput(int idx) throws CharacterCodingException {
+		String s =  MAX_PREFIX + Double.toString(_max[idx]);
+		return  new DistinctValue(s, -1L);
+	}
+	
+	private DistinctValue prepNBinsOutput(int idx) throws CharacterCodingException {
+		String s =  NBINS_PREFIX + Double.toString(_numBins[idx]);
+		return  new DistinctValue(s, -1L);
+	}
+	
+	/**
+	 * Method to output transformation metadata from the mappers. 
+	 * This information is collected and merged by the reducers.
+	 * 
+	 * @param out
+	 * @throws IOException
+	 */
+	@Override
+	public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException {
+		if ( _binList == null )
+			return;
+		
+		try { 
+			for(int i=0; i < _binList.length; i++) {
+				int colID = _binList[i];
+				IntWritable iw = new IntWritable(-colID);
+				
+				out.collect(iw,  prepMinOutput(i));
+				out.collect(iw,  prepMaxOutput(i));
+				out.collect(iw,  prepNBinsOutput(i));
+			}
+		} catch(Exception e) {
+			throw new IOException(e);
+		}
+	}
+	
+	public ArrayList<Tuple2<Integer, DistinctValue>> mapOutputTransformationMetadata(int taskID, ArrayList<Tuple2<Integer, DistinctValue>> list, TfUtils agents) throws IOException {
+		if ( _binList == null )
+			return list;
+		
+		try { 
+			for(int i=0; i < _binList.length; i++) {
+				int colID = _binList[i];
+				Integer iw = -colID;
+				
+				list.add( new Tuple2<Integer,DistinctValue>(iw, prepMinOutput(i)) );
+				list.add( new Tuple2<Integer,DistinctValue>(iw, prepMaxOutput(i)) );
+				list.add( new Tuple2<Integer,DistinctValue>(iw, prepNBinsOutput(i)) );
+			}
+		} catch(Exception e) {
+			throw new IOException(e);
+		}
+		return list;
+	}
+
+	private void writeTfMtd(int colID, String min, String max, String binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException 
+	{
+		Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + BIN_FILE_SUFFIX);
+		BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
+		br.write(colID + TXMTD_SEP + min + TXMTD_SEP + max + TXMTD_SEP + binwidth + TXMTD_SEP + nbins + "\n");
+		br.close();
+	}
+
+	/** 
+	 * Method to merge map output transformation metadata.
+	 * 
+	 * @param values
+	 * @return
+	 * @throws IOException 
+	 */
+	@Override
+	public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
+		double min = Double.MAX_VALUE;
+		double max = -Double.MAX_VALUE;
+		int nbins = 0;
+		
+		DistinctValue val = new DistinctValue();
+		String w = null;
+		double d;
+		while(values.hasNext()) {
+			val.reset();
+			val = values.next();
+			w = val.getWord();
+			
+			if(w.startsWith(MIN_PREFIX)) {
+				d = UtilFunctions.parseToDouble(w.substring( MIN_PREFIX.length() ));
+				if ( d < min )
+					min = d;
+			}
+			else if(w.startsWith(MAX_PREFIX)) {
+				d = UtilFunctions.parseToDouble(w.substring( MAX_PREFIX.length() ));
+				if ( d > max )
+					max = d;
+			}
+			else if (w.startsWith(NBINS_PREFIX)) {
+				nbins = (int) UtilFunctions.parseToLong( w.substring(NBINS_PREFIX.length() ) );
+			}
+			else
+				throw new RuntimeException("MVImputeAgent: Invalid prefix while merging map output: " + w);
+		}
+		
+		// write merged metadata
+		double binwidth = (max-min)/nbins;
+		writeTfMtd(colID, Double.toString(min), Double.toString(max), Double.toString(binwidth), Integer.toString(nbins), outputDir, fs, agents);
+	}
+	
+	
+	public void outputTransformationMetadata(String outputDir, FileSystem fs, TfUtils agents) throws IOException {
+		if(_binList == null)
+			return;
+		
+		MVImputeAgent mvagent = agents.getMVImputeAgent();
+		for(int i=0; i < _binList.length; i++) {
+			int colID = _binList[i];
+			
+			// If the column is imputed with a constant, then adjust min and max based the value of the constant.
+			if ( mvagent.isImputed(colID) != -1 && mvagent.getMethod(colID) == MVMethod.CONSTANT ) 
+			{
+				double cst = UtilFunctions.parseToDouble( mvagent.getReplacement(colID) );
+				if ( cst < _min[i])
+					_min[i] = cst;
+				if ( cst > _max[i])
+					_max[i] = cst;
+			}
+			
+			double binwidth = (_max[i] - _min[i])/_numBins[i];
+			writeTfMtd(colID, Double.toString(_min[i]), Double.toString(_max[i]), Double.toString(binwidth), Integer.toString(_numBins[i]), outputDir, fs, agents);
+		}
+	}
+	
+	// ------------------------------------------------------------------------------------------------
+
+	public int[] getBinList() { return _binList; }
+	public int[] getNumBins() { return _numBins; }
+	public double[] getMin()  { return _min; }
+	public double[] getBinWidths() { return _binWidths; }
+	
+	/**
+	 * Method to load transform metadata for all attributes
+	 * 
+	 * @param job
+	 * @throws IOException
+	 */
+	@Override
+	public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException {
+		if ( _binList == null )
+			return;
+		
+		if(fs.isDirectory(txMtdDir)) {
+			for(int i=0; i<_binList.length;i++) {
+				int colID = _binList[i];
+				
+				Path path = new Path( txMtdDir + "/Bin/" + agents.getName(colID) + BIN_FILE_SUFFIX);
+				TfUtils.checkValidInputFile(fs, path, true); 
+					
+				BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)));
+				// format: colID,min,max,nbins
+				String[] fields = br.readLine().split(TXMTD_SEP);
+				double min = UtilFunctions.parseToDouble(fields[1]);
+				//double max = UtilFunctions.parseToDouble(fields[2]);
+				double binwidth = UtilFunctions.parseToDouble(fields[3]);
+				int nbins = UtilFunctions.parseToInt(fields[4]);
+				
+				_numBins[i] = nbins;
+				_min[i] = min;
+				_binWidths[i] = binwidth; // (max-min)/nbins;
+				
+				br.close();
+			}
+		}
+		else {
+			fs.close();
+			throw new RuntimeException("Path to recode maps must be a directory: " + txMtdDir);
+		}
+	}
+	
+	/**
+	 * Method to apply transformations.
+	 * 
+	 * @param words
+	 * @return
+	 */
+	@Override
+	public String[] apply(String[] words, TfUtils agents) {
+		if ( _binList == null )
+			return words;
+	
+		for(int i=0; i < _binList.length; i++) {
+			int colID = _binList[i];
+			
+			try {
+			double val = UtilFunctions.parseToDouble(words[colID-1]);
+			int binid = 1;
+			double tmp = _min[i] + _binWidths[i];
+			while(val > tmp && binid < _numBins[i]) {
+				tmp += _binWidths[i];
+				binid++;
+			}
+			words[colID-1] = Integer.toString(binid);
+			} catch(NumberFormatException e)
+			{
+				throw new RuntimeException("Encountered \"" + words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an appropriate imputation method.");
+			}
+		}
+		
+		return words;
+	}
+	
+	/**
+	 * Check if the given column ID is subjected to this transformation.
+	 * 
+	 */
+	public int isBinned(int colID)
+	{
+		if(_binList == null)
+			return -1;
+		
+		int idx = Arrays.binarySearch(_binList, colID);
+		return ( idx >= 0 ? idx : -1);
+	}
+
+
+	@Override
+	public void print() {
+		System.out.print("Binning List (Equi-width): \n    ");
+		for(int i : _binList) {
+			System.out.print(i + " ");
+		}
+		System.out.print("\n    ");
+		for(int b : _numBins) {
+			System.out.print(b + " ");
+		}
+		System.out.println();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java b/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
index d44a904..2e52657 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
@@ -1,108 +1,108 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.util.UtilFunctions;
-
-public class DistinctValue implements Writable, Serializable {
-	
-	private static final long serialVersionUID = -8236705946336974836L;
-
-	private static final byte [] EMPTY_BYTES = new byte[0];
-	  
-	// word (distinct value)
-	private byte[] _bytes;
-	private int _length;
-	// count
-	private long _count;
-	
-	public DistinctValue() {
-		_bytes = EMPTY_BYTES;
-		_length = 0;
-		_count = -1;
-	}
-	
-	public DistinctValue(String w, long count) throws CharacterCodingException {
-	    ByteBuffer bb = Text.encode(w, true);
-	    _bytes = bb.array();
-	    _length = bb.limit();
-		_count = count;
-	}
-	
-	public DistinctValue(OffsetCount oc) throws CharacterCodingException 
-	{
-		this(oc.filename + "," + oc.fileOffset, oc.count);
-	}
-	
-	public void reset() {
-		_bytes = EMPTY_BYTES;
-		_length = 0;
-		_count = -1;
-	}
-	
-	public String getWord() {  return new String( _bytes, 0, _length, Charset.forName("UTF-8") ); }
-	public long getCount() { return _count; }
-	
-	@Override
-	public void write(DataOutput out) throws IOException {
-	    // write word
-		WritableUtils.writeVInt(out, _length);
-	    out.write(_bytes, 0, _length);
-		// write count
-	    out.writeLong(_count);
-	}
-	
-	@Override
-	public void readFields(DataInput in) throws IOException {
-	    // read word 
-		int newLength = WritableUtils.readVInt(in);
-	    _bytes = new byte[newLength];
-	    in.readFully(_bytes, 0, newLength);
-	    _length = newLength;
-	    if (_length != _bytes.length)
-	    	System.out.println("ERROR in DistinctValue.readFields()");
-	    // read count
-	    _count = in.readLong();
-	}
-	
-	public OffsetCount getOffsetCount() {
-		OffsetCount oc = new OffsetCount();
-		String[] parts = getWord().split(",");
-		oc.filename = parts[0];
-		oc.fileOffset = UtilFunctions.parseToLong(parts[1]);
-		oc.count = getCount();
-		
-		return oc;
-	}
-	
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class DistinctValue implements Writable, Serializable {
+	
+	private static final long serialVersionUID = -8236705946336974836L;
+
+	private static final byte [] EMPTY_BYTES = new byte[0];
+	  
+	// word (distinct value)
+	private byte[] _bytes;
+	private int _length;
+	// count
+	private long _count;
+	
+	public DistinctValue() {
+		_bytes = EMPTY_BYTES;
+		_length = 0;
+		_count = -1;
+	}
+	
+	public DistinctValue(String w, long count) throws CharacterCodingException {
+	    ByteBuffer bb = Text.encode(w, true);
+	    _bytes = bb.array();
+	    _length = bb.limit();
+		_count = count;
+	}
+	
+	public DistinctValue(OffsetCount oc) throws CharacterCodingException 
+	{
+		this(oc.filename + "," + oc.fileOffset, oc.count);
+	}
+	
+	public void reset() {
+		_bytes = EMPTY_BYTES;
+		_length = 0;
+		_count = -1;
+	}
+	
+	public String getWord() {  return new String( _bytes, 0, _length, Charset.forName("UTF-8") ); }
+	public long getCount() { return _count; }
+	
+	@Override
+	public void write(DataOutput out) throws IOException {
+	    // write word
+		WritableUtils.writeVInt(out, _length);
+	    out.write(_bytes, 0, _length);
+		// write count
+	    out.writeLong(_count);
+	}
+	
+	@Override
+	public void readFields(DataInput in) throws IOException {
+	    // read word 
+		int newLength = WritableUtils.readVInt(in);
+	    _bytes = new byte[newLength];
+	    in.readFully(_bytes, 0, newLength);
+	    _length = newLength;
+	    if (_length != _bytes.length)
+	    	System.out.println("ERROR in DistinctValue.readFields()");
+	    // read count
+	    _count = in.readLong();
+	}
+	
+	public OffsetCount getOffsetCount() {
+		OffsetCount oc = new OffsetCount();
+		String[] parts = getWord().split(",");
+		oc.filename = parts[0];
+		oc.fileOffset = UtilFunctions.parseToLong(parts[1]);
+		oc.count = getCount();
+		
+		return oc;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
index a1c76ba..079ad58 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
@@ -1,426 +1,426 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.wink.json4j.JSONArray;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import com.google.common.base.Functions;
-import com.google.common.collect.Ordering;
-import org.apache.sysml.runtime.util.UtilFunctions;
-
-public class DummycodeAgent extends TransformationAgent {	
-	
-	private static final long serialVersionUID = 5832130477659116489L;
-
-	private int[] _dcdList = null;
-	private long numCols = 0;
-	
-	private HashMap<Integer, HashMap<String,String>> _finalMaps = null;
-	private HashMap<Integer, HashMap<String,Long>> _finalMapsCP = null;
-	private int[] _binList = null;
-	private int[] _numBins = null;
-	
-	private int[] _domainSizes = null;			// length = #of dummycoded columns
-	private int[] _dcdColumnMap = null;			// to help in translating between original and dummycoded column IDs
-	private long _dummycodedLength = 0;			// #of columns after dummycoded
-	
-	DummycodeAgent(int[] list) {
-		_dcdList = list;
-	}
-	
-	DummycodeAgent(JSONObject parsedSpec, long ncol) throws JSONException {
-		numCols = ncol;
-		
-		if ( !parsedSpec.containsKey(TX_METHOD.DUMMYCODE.toString()) )
-			return;
-		
-		JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.DUMMYCODE.toString());
-		JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
-			
-		_dcdList = new int[attrs.size()];
-		for(int i=0; i < _dcdList.length; i++) 
-			_dcdList[i] = UtilFunctions.toInt(attrs.get(i));
-	}
-	
-	public int[] dcdList() {
-		return _dcdList;
-	}
-	
-	/**
-	 * Method to output transformation metadata from the mappers. 
-	 * This information is collected and merged by the reducers.
-	 * 
-	 * @param out
-	 * @throws IOException
-	 * 
-	 */
-	@Override
-	public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException {
-		// There is no metadata required for dummycode.
-		// Required information is output from RecodeAgent.
-		return;
-	}
-	
-	@Override
-	public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values,
-			String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
-		// Nothing to do here
-	}
-
-	public void setRecodeMaps(HashMap<Integer, HashMap<String,String>> maps) {
-		_finalMaps = maps;
-	}
-	
-	public void setRecodeMapsCP(HashMap<Integer, HashMap<String,Long>> maps) {
-		_finalMapsCP = maps;
-	}
-	
-	public void setNumBins(int[] binList, int[] numbins) {
-		_binList = binList;
-		_numBins = numbins;
-	}
-	
-	/**
-	 * Method to generate dummyCodedMaps.csv, with the range of column IDs for each variable in the original data.
-	 * 
-	 * Each line in dummyCodedMaps.csv file is of the form: [ColID, 1/0, st, end]
-	 * 		1/0 indicates if ColID is dummycoded or not
-	 * 		[st,end] is the range of dummycoded column numbers for the given ColID
-	 * 
-	 * It also generates coltypes.csv, with the type (scale, nominal, etc.) of columns in the output.
-	 * Recoded columns are of type nominal, binner columns are of type ordinal, dummycoded columns are of type 
-	 * dummycoded, and the remaining are of type scale.
-	 * 
-	 * @param fs
-	 * @param txMtdDir
-	 * @param numCols
-	 * @param ra
-	 * @param ba
-	 * @return Number of columns in the transformed data
-	 * @throws IOException
-	 */
-	public int genDcdMapsAndColTypes(FileSystem fs, String txMtdDir, int numCols, TfUtils agents) throws IOException {
-		
-		// initialize all column types in the transformed data to SCALE
-		ColumnTypes[] ctypes = new ColumnTypes[(int) _dummycodedLength];
-		for(int i=0; i < _dummycodedLength; i++)
-			ctypes[i] = ColumnTypes.SCALE;
-		
-		_dcdColumnMap = new int[numCols];
-
-		Path pt=new Path(txMtdDir+"/Dummycode/" + DCD_FILE_NAME);
-		BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
-		
-		int sum=1;
-		int idx = 0;
-		for(int colID=1; colID <= numCols; colID++) 
-		{
-			if ( _dcdList != null && idx < _dcdList.length && _dcdList[idx] == colID )
-			{
-				br.write(colID + "," + "1" + "," + sum + "," + (sum+_domainSizes[idx]-1) + "\n");
-				_dcdColumnMap[colID-1] = (sum+_domainSizes[idx]-1)-1;
-
-				for(int i=sum; i <=(sum+_domainSizes[idx]-1); i++)
-					ctypes[i-1] = ColumnTypes.DUMMYCODED;
-				
-				sum += _domainSizes[idx];
-				idx++;
-			}
-			else 
-			{
-				br.write(colID + "," + "0" + "," + sum + "," + sum + "\n");
-				_dcdColumnMap[colID-1] = sum-1;
-				
-				if ( agents.getBinAgent().isBinned(colID) != -1 )
-					ctypes[sum-1] = ColumnTypes.ORDINAL;	// binned variable results in an ordinal column
-				
-				if ( agents.getRecodeAgent().isRecoded(colID) != -1 )
-					ctypes[sum-1] = ColumnTypes.NOMINAL;
-				
-				sum += 1;
-			}
-		}
-		br.close();
-
-		// Write coltypes.csv
-		pt=new Path(txMtdDir+"/" + COLTYPES_FILE_NAME);
-		br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
-		
-		br.write(columnTypeToID(ctypes[0]) + "");
-		for(int i = 1; i < _dummycodedLength; i++) 
-			br.write( "," + columnTypeToID(ctypes[i]));
-		br.close();
-		
-		return sum-1;
-	}
-	
-	/**
-	 * Given a dummycoded column id, find the corresponding original column ID.
-	 *  
-	 * @param colID
-	 * @return
-	 */
-	public int mapDcdColumnID(int colID) 
-	{
-		for(int i=0; i < _dcdColumnMap.length; i++)
-		{
-			int st = (i==0 ? 1 : _dcdColumnMap[i-1]+1+1);
-			int end = _dcdColumnMap[i]+1;
-			//System.out.println((i+1) + ": " + "[" + st + "," + end + "]");
-			
-			if ( colID >= st && colID <= end)
-				return i+1;
-		}
-		return -1;
-	}
-	
-	public String constructDummycodedHeader(String header, Pattern delim) {
-		
-		if(_dcdList == null && _binList == null )
-			// none of the columns are dummycoded, simply return the given header
-			return header;
-		
-		String[] names = delim.split(header, -1);
-		List<String> newNames = null;
-		
-		StringBuilder sb = new StringBuilder();
-		
-		// Dummycoding can be performed on either on a recoded column or on a binned column
-		
-		// process recoded columns
-		if(_finalMapsCP != null && _dcdList != null) 
-		{
-			for(int i=0; i <_dcdList.length; i++) 
-			{
-				int colID = _dcdList[i];
-				HashMap<String,Long> map = _finalMapsCP.get(colID);
-				String colName = UtilFunctions.unquote(names[colID-1]);
-				
-				if ( map != null  ) 
-				{
-					// order map entries by their recodeID
-					Ordering<String> valueComparator = Ordering.natural().onResultOf(Functions.forMap(map));
-					newNames = valueComparator.sortedCopy(map.keySet());
-					
-					// construct concatenated string of map entries
-					sb.setLength(0);
-					for(int idx=0; idx < newNames.size(); idx++) 
-					{
-						if(idx==0) 
-							sb.append( colName + DCD_NAME_SEP + newNames.get(idx));
-						else
-							sb.append( delim + colName + DCD_NAME_SEP + newNames.get(idx));
-					}
-					names[colID-1] = sb.toString();			// replace original column name with dcd name
-				}
-			}
-		}
-		else if(_finalMaps != null && _dcdList != null) {
-			for(int i=0; i <_dcdList.length; i++) {
-				int colID = _dcdList[i];
-				HashMap<String,String> map = _finalMaps.get(colID);
-				String colName = UtilFunctions.unquote(names[colID-1]);
-				
-				if ( map != null ) 
-				{
-					// order map entries by their recodeID (represented as Strings .. "1", "2", etc.)
-					Ordering<String> orderByID = new Ordering<String>() 
-					{
-			    		public int compare(String s1, String s2) {
-			        		return (Integer.parseInt(s1) - Integer.parseInt(s2));
-			    		}
-					};
-					
-					newNames = orderByID.onResultOf(Functions.forMap(map)).sortedCopy(map.keySet());
-					// construct concatenated string of map entries
-					sb.setLength(0);
-					for(int idx=0; idx < newNames.size(); idx++) 
-					{
-						if(idx==0) 
-							sb.append( colName + DCD_NAME_SEP + newNames.get(idx));
-						else
-							sb.append( delim + colName + DCD_NAME_SEP + newNames.get(idx));
-					}
-					names[colID-1] = sb.toString();			// replace original column name with dcd name
-				}
-			}
-		}
-		
-		// process binned columns
-		if (_binList != null) 
-			for(int i=0; i < _binList.length; i++) 
-			{
-				int colID = _binList[i];
-				
-				// need to consider only binned and dummycoded columns
-				if(isDummyCoded(colID) == -1)
-					continue;
-				
-				int numBins = _numBins[i];
-				String colName = UtilFunctions.unquote(names[colID-1]);
-				
-				sb.setLength(0);
-				for(int idx=0; idx < numBins; idx++) 
-					if(idx==0) 
-						sb.append( colName + DCD_NAME_SEP + "Bin" + (idx+1) );
-					else
-						sb.append( delim + colName + DCD_NAME_SEP + "Bin" + (idx+1) );
-				names[colID-1] = sb.toString();			// replace original column name with dcd name
-			}
-		
-		// Construct the full header
-		sb.setLength(0);
-		for(int colID=0; colID < names.length; colID++) 
-		{
-			if (colID == 0)
-				sb.append(names[colID]);
-			else
-				sb.append(delim + names[colID]);
-		}
-		//System.out.println("DummycodedHeader: " + sb.toString());
-		
-		return sb.toString();
-	}
-	
-	@Override
-	public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException {
-		if ( _dcdList == null )
-		{
-			_dummycodedLength = numCols;
-			return;
-		}
-		
-		// sort to-be dummycoded column IDs in ascending order. This is the order in which the new dummycoded record is constructed in apply() function.
-		Arrays.sort(_dcdList);	
-		_domainSizes = new int[_dcdList.length];
-
-		_dummycodedLength = numCols;
-		
-		//HashMap<String, String> map = null;
-		for(int i=0; i<_dcdList.length; i++) {
-			int colID = _dcdList[i];
-			
-			// Find the domain size for colID using _finalMaps or _finalMapsCP
-			int domainSize = 0;
-			if(_finalMaps != null) {
-				if(_finalMaps.get(colID) != null)
-					domainSize = _finalMaps.get(colID).size();
-			}
-			else {
-				if(_finalMapsCP.get(colID) != null)
-					domainSize = _finalMapsCP.get(colID).size();
-			}
-			
-			if ( domainSize != 0 ) {
-				// dummycoded column
-				_domainSizes[i] = domainSize;
-			}
-			else {
-				// binned column
-				if ( _binList != null )
-				for(int j=0; j<_binList.length; j++) {
-					if (colID == _binList[j]) {
-						_domainSizes[i] = _numBins[j];
-						break;
-					}
-				}
-			}
-			_dummycodedLength += _domainSizes[i]-1;
-			//System.out.println("colID=" + colID + ", domainsize=" + _domainSizes[i] + ", dcdLength=" + _dummycodedLength);
-		}
-	}
-
-	/**
-	 * Method to apply transformations.
-	 * 
-	 * @param words
-	 * @return
-	 */
-	@Override
-	public String[] apply(String[] words, TfUtils agents) {
-		
-		if ( _dcdList == null )
-			return words;
-		
-		String[] nwords = new String[(int)_dummycodedLength];
-		
-		int rcdVal = 0;
-		
-		for(int colID=1, idx=0, ncolID=1; colID <= words.length; colID++) {
-			if(idx < _dcdList.length && colID==_dcdList[idx]) {
-				// dummycoded columns
-				try {
-				rcdVal = UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1]));
-				nwords[ ncolID-1+rcdVal-1 ] = "1";
-				ncolID += _domainSizes[idx];
-				idx++;
-				} catch (Exception e) {
-					System.out.println("Error in dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] + ", domainSize=" + _domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength);
-					throw new RuntimeException(e);
-				}
-			}
-			else {
-				nwords[ncolID-1] = words[colID-1];
-				ncolID++;
-			}
-		}
-		
-		return nwords;
-	}
-	
-	/**
-	 * Check if the given column ID is subjected to this transformation.
-	 * 
-	 */
-	public int isDummyCoded(int colID)
-	{
-		if(_dcdList == null)
-			return -1;
-		
-		int idx = Arrays.binarySearch(_dcdList, colID);
-		return ( idx >= 0 ? idx : -1);
-	}
-	
-	@Override
-	public void print() {
-		System.out.print("Dummycoding List: \n    ");
-		for(int i : _dcdList) {
-			System.out.print(i + " ");
-		}
-		System.out.println();
-	}
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.wink.json4j.JSONArray;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import com.google.common.base.Functions;
+import com.google.common.collect.Ordering;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class DummycodeAgent extends TransformationAgent {	
+	
+	private static final long serialVersionUID = 5832130477659116489L;
+
+	private int[] _dcdList = null;
+	private long numCols = 0;
+	
+	private HashMap<Integer, HashMap<String,String>> _finalMaps = null;
+	private HashMap<Integer, HashMap<String,Long>> _finalMapsCP = null;
+	private int[] _binList = null;
+	private int[] _numBins = null;
+	
+	private int[] _domainSizes = null;			// length = #of dummycoded columns
+	private int[] _dcdColumnMap = null;			// to help in translating between original and dummycoded column IDs
+	private long _dummycodedLength = 0;			// #of columns after dummycoded
+	
+	DummycodeAgent(int[] list) {
+		_dcdList = list;
+	}
+	
+	DummycodeAgent(JSONObject parsedSpec, long ncol) throws JSONException {
+		numCols = ncol;
+		
+		if ( !parsedSpec.containsKey(TX_METHOD.DUMMYCODE.toString()) )
+			return;
+		
+		JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.DUMMYCODE.toString());
+		JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
+			
+		_dcdList = new int[attrs.size()];
+		for(int i=0; i < _dcdList.length; i++) 
+			_dcdList[i] = UtilFunctions.toInt(attrs.get(i));
+	}
+	
+	public int[] dcdList() {
+		return _dcdList;
+	}
+	
+	/**
+	 * Method to output transformation metadata from the mappers. 
+	 * This information is collected and merged by the reducers.
+	 * 
+	 * @param out
+	 * @throws IOException
+	 * 
+	 */
+	@Override
+	public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException {
+		// There is no metadata required for dummycode.
+		// Required information is output from RecodeAgent.
+		return;
+	}
+	
+	@Override
+	public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values,
+			String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
+		// Nothing to do here
+	}
+
+	public void setRecodeMaps(HashMap<Integer, HashMap<String,String>> maps) {
+		_finalMaps = maps;
+	}
+	
+	public void setRecodeMapsCP(HashMap<Integer, HashMap<String,Long>> maps) {
+		_finalMapsCP = maps;
+	}
+	
+	public void setNumBins(int[] binList, int[] numbins) {
+		_binList = binList;
+		_numBins = numbins;
+	}
+	
+	/**
+	 * Method to generate dummyCodedMaps.csv, with the range of column IDs for each variable in the original data.
+	 * 
+	 * Each line in dummyCodedMaps.csv file is of the form: [ColID, 1/0, st, end]
+	 * 		1/0 indicates if ColID is dummycoded or not
+	 * 		[st,end] is the range of dummycoded column numbers for the given ColID
+	 * 
+	 * It also generates coltypes.csv, with the type (scale, nominal, etc.) of columns in the output.
+	 * Recoded columns are of type nominal, binner columns are of type ordinal, dummycoded columns are of type 
+	 * dummycoded, and the remaining are of type scale.
+	 * 
+	 * @param fs
+	 * @param txMtdDir
+	 * @param numCols
+	 * @param ra
+	 * @param ba
+	 * @return Number of columns in the transformed data
+	 * @throws IOException
+	 */
+	public int genDcdMapsAndColTypes(FileSystem fs, String txMtdDir, int numCols, TfUtils agents) throws IOException {
+		
+		// initialize all column types in the transformed data to SCALE
+		ColumnTypes[] ctypes = new ColumnTypes[(int) _dummycodedLength];
+		for(int i=0; i < _dummycodedLength; i++)
+			ctypes[i] = ColumnTypes.SCALE;
+		
+		_dcdColumnMap = new int[numCols];
+
+		Path pt=new Path(txMtdDir+"/Dummycode/" + DCD_FILE_NAME);
+		BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
+		
+		int sum=1;
+		int idx = 0;
+		for(int colID=1; colID <= numCols; colID++) 
+		{
+			if ( _dcdList != null && idx < _dcdList.length && _dcdList[idx] == colID )
+			{
+				br.write(colID + "," + "1" + "," + sum + "," + (sum+_domainSizes[idx]-1) + "\n");
+				_dcdColumnMap[colID-1] = (sum+_domainSizes[idx]-1)-1;
+
+				for(int i=sum; i <=(sum+_domainSizes[idx]-1); i++)
+					ctypes[i-1] = ColumnTypes.DUMMYCODED;
+				
+				sum += _domainSizes[idx];
+				idx++;
+			}
+			else 
+			{
+				br.write(colID + "," + "0" + "," + sum + "," + sum + "\n");
+				_dcdColumnMap[colID-1] = sum-1;
+				
+				if ( agents.getBinAgent().isBinned(colID) != -1 )
+					ctypes[sum-1] = ColumnTypes.ORDINAL;	// binned variable results in an ordinal column
+				
+				if ( agents.getRecodeAgent().isRecoded(colID) != -1 )
+					ctypes[sum-1] = ColumnTypes.NOMINAL;
+				
+				sum += 1;
+			}
+		}
+		br.close();
+
+		// Write coltypes.csv
+		pt=new Path(txMtdDir+"/" + COLTYPES_FILE_NAME);
+		br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
+		
+		br.write(columnTypeToID(ctypes[0]) + "");
+		for(int i = 1; i < _dummycodedLength; i++) 
+			br.write( "," + columnTypeToID(ctypes[i]));
+		br.close();
+		
+		return sum-1;
+	}
+	
+	/**
+	 * Given a dummycoded column id, find the corresponding original column ID.
+	 *  
+	 * @param colID
+	 * @return
+	 */
+	public int mapDcdColumnID(int colID) 
+	{
+		for(int i=0; i < _dcdColumnMap.length; i++)
+		{
+			int st = (i==0 ? 1 : _dcdColumnMap[i-1]+1+1);
+			int end = _dcdColumnMap[i]+1;
+			//System.out.println((i+1) + ": " + "[" + st + "," + end + "]");
+			
+			if ( colID >= st && colID <= end)
+				return i+1;
+		}
+		return -1;
+	}
+	
+	public String constructDummycodedHeader(String header, Pattern delim) {
+		
+		if(_dcdList == null && _binList == null )
+			// none of the columns are dummycoded, simply return the given header
+			return header;
+		
+		String[] names = delim.split(header, -1);
+		List<String> newNames = null;
+		
+		StringBuilder sb = new StringBuilder();
+		
+		// Dummycoding can be performed on either on a recoded column or on a binned column
+		
+		// process recoded columns
+		if(_finalMapsCP != null && _dcdList != null) 
+		{
+			for(int i=0; i <_dcdList.length; i++) 
+			{
+				int colID = _dcdList[i];
+				HashMap<String,Long> map = _finalMapsCP.get(colID);
+				String colName = UtilFunctions.unquote(names[colID-1]);
+				
+				if ( map != null  ) 
+				{
+					// order map entries by their recodeID
+					Ordering<String> valueComparator = Ordering.natural().onResultOf(Functions.forMap(map));
+					newNames = valueComparator.sortedCopy(map.keySet());
+					
+					// construct concatenated string of map entries
+					sb.setLength(0);
+					for(int idx=0; idx < newNames.size(); idx++) 
+					{
+						if(idx==0) 
+							sb.append( colName + DCD_NAME_SEP + newNames.get(idx));
+						else
+							sb.append( delim + colName + DCD_NAME_SEP + newNames.get(idx));
+					}
+					names[colID-1] = sb.toString();			// replace original column name with dcd name
+				}
+			}
+		}
+		else if(_finalMaps != null && _dcdList != null) {
+			for(int i=0; i <_dcdList.length; i++) {
+				int colID = _dcdList[i];
+				HashMap<String,String> map = _finalMaps.get(colID);
+				String colName = UtilFunctions.unquote(names[colID-1]);
+				
+				if ( map != null ) 
+				{
+					// order map entries by their recodeID (represented as Strings .. "1", "2", etc.)
+					Ordering<String> orderByID = new Ordering<String>() 
+					{
+			    		public int compare(String s1, String s2) {
+			        		return (Integer.parseInt(s1) - Integer.parseInt(s2));
+			    		}
+					};
+					
+					newNames = orderByID.onResultOf(Functions.forMap(map)).sortedCopy(map.keySet());
+					// construct concatenated string of map entries
+					sb.setLength(0);
+					for(int idx=0; idx < newNames.size(); idx++) 
+					{
+						if(idx==0) 
+							sb.append( colName + DCD_NAME_SEP + newNames.get(idx));
+						else
+							sb.append( delim + colName + DCD_NAME_SEP + newNames.get(idx));
+					}
+					names[colID-1] = sb.toString();			// replace original column name with dcd name
+				}
+			}
+		}
+		
+		// process binned columns
+		if (_binList != null) 
+			for(int i=0; i < _binList.length; i++) 
+			{
+				int colID = _binList[i];
+				
+				// need to consider only binned and dummycoded columns
+				if(isDummyCoded(colID) == -1)
+					continue;
+				
+				int numBins = _numBins[i];
+				String colName = UtilFunctions.unquote(names[colID-1]);
+				
+				sb.setLength(0);
+				for(int idx=0; idx < numBins; idx++) 
+					if(idx==0) 
+						sb.append( colName + DCD_NAME_SEP + "Bin" + (idx+1) );
+					else
+						sb.append( delim + colName + DCD_NAME_SEP + "Bin" + (idx+1) );
+				names[colID-1] = sb.toString();			// replace original column name with dcd name
+			}
+		
+		// Construct the full header
+		sb.setLength(0);
+		for(int colID=0; colID < names.length; colID++) 
+		{
+			if (colID == 0)
+				sb.append(names[colID]);
+			else
+				sb.append(delim + names[colID]);
+		}
+		//System.out.println("DummycodedHeader: " + sb.toString());
+		
+		return sb.toString();
+	}
+	
+	@Override
+	public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException {
+		if ( _dcdList == null )
+		{
+			_dummycodedLength = numCols;
+			return;
+		}
+		
+		// sort to-be dummycoded column IDs in ascending order. This is the order in which the new dummycoded record is constructed in apply() function.
+		Arrays.sort(_dcdList);	
+		_domainSizes = new int[_dcdList.length];
+
+		_dummycodedLength = numCols;
+		
+		//HashMap<String, String> map = null;
+		for(int i=0; i<_dcdList.length; i++) {
+			int colID = _dcdList[i];
+			
+			// Find the domain size for colID using _finalMaps or _finalMapsCP
+			int domainSize = 0;
+			if(_finalMaps != null) {
+				if(_finalMaps.get(colID) != null)
+					domainSize = _finalMaps.get(colID).size();
+			}
+			else {
+				if(_finalMapsCP.get(colID) != null)
+					domainSize = _finalMapsCP.get(colID).size();
+			}
+			
+			if ( domainSize != 0 ) {
+				// dummycoded column
+				_domainSizes[i] = domainSize;
+			}
+			else {
+				// binned column
+				if ( _binList != null )
+				for(int j=0; j<_binList.length; j++) {
+					if (colID == _binList[j]) {
+						_domainSizes[i] = _numBins[j];
+						break;
+					}
+				}
+			}
+			_dummycodedLength += _domainSizes[i]-1;
+			//System.out.println("colID=" + colID + ", domainsize=" + _domainSizes[i] + ", dcdLength=" + _dummycodedLength);
+		}
+	}
+
+	/**
+	 * Method to apply transformations.
+	 * 
+	 * @param words
+	 * @return
+	 */
+	@Override
+	public String[] apply(String[] words, TfUtils agents) {
+		
+		if ( _dcdList == null )
+			return words;
+		
+		String[] nwords = new String[(int)_dummycodedLength];
+		
+		int rcdVal = 0;
+		
+		for(int colID=1, idx=0, ncolID=1; colID <= words.length; colID++) {
+			if(idx < _dcdList.length && colID==_dcdList[idx]) {
+				// dummycoded columns
+				try {
+				rcdVal = UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1]));
+				nwords[ ncolID-1+rcdVal-1 ] = "1";
+				ncolID += _domainSizes[idx];
+				idx++;
+				} catch (Exception e) {
+					System.out.println("Error in dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] + ", domainSize=" + _domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength);
+					throw new RuntimeException(e);
+				}
+			}
+			else {
+				nwords[ncolID-1] = words[colID-1];
+				ncolID++;
+			}
+		}
+		
+		return nwords;
+	}
+	
+	/**
+	 * Check if the given column ID is subjected to this transformation.
+	 * 
+	 */
+	public int isDummyCoded(int colID)
+	{
+		if(_dcdList == null)
+			return -1;
+		
+		int idx = Arrays.binarySearch(_dcdList, colID);
+		return ( idx >= 0 ? idx : -1);
+	}
+	
+	@Override
+	public void print() {
+		System.out.print("Dummycoding List: \n    ");
+		for(int i : _dcdList) {
+			System.out.print(i + " ");
+		}
+		System.out.println();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
index e254403..4e3ece5 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
@@ -1,107 +1,107 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-
-
-public class GTFMTDMapper implements Mapper<LongWritable, Text, IntWritable, DistinctValue>{
-	
-	private OutputCollector<IntWritable, DistinctValue> _collector = null; 
-	private int _mapTaskID = -1;
-	
-	TfUtils _agents = null;
-
-	private boolean _partFileWithHeader = false;
-	private boolean _firstRecordInSplit = true;
-	private String _partFileName = null;
-	private long _offsetInPartFile = -1;
-	
-	// ----------------------------------------------------------------------------------------------
-	
-	/**
-	 * Configure the information used in the mapper, and setup transformation agents.
-	 */
-	@Override
-	public void configure(JobConf job) {
-		String[] parts = job.get("mapred.task.id").split("_");
-		if ( parts[0].equalsIgnoreCase("task")) {
-			_mapTaskID = Integer.parseInt(parts[parts.length-1]);
-		}
-		else if ( parts[0].equalsIgnoreCase("attempt")) {
-			_mapTaskID = Integer.parseInt(parts[parts.length-2]);
-		}
-		else {
-			throw new RuntimeException("Unrecognized format for taskID: " + job.get("mapred.task.id"));
-		}
-
-		try {
-			_partFileName = TfUtils.getPartFileName(job);
-			_partFileWithHeader = TfUtils.isPartFileWithHeader(job);
-			_agents = new TfUtils(job);
-		} catch(IOException e) { throw new RuntimeException(e); }
-		  catch(JSONException e)  { throw new RuntimeException(e); }
-
-	}
-	
-	
-	public void map(LongWritable rawKey, Text rawValue, OutputCollector<IntWritable, DistinctValue> out, Reporter reporter) throws IOException  {
-		
-		if(_firstRecordInSplit)
-		{
-			_firstRecordInSplit = false;
-			_collector = out;
-			_offsetInPartFile = rawKey.get();
-		}
-		
-		// ignore header
-		if (_agents.hasHeader() && rawKey.get() == 0 && _partFileWithHeader)
-			return;
-		
-		_agents.prepareTfMtd(rawValue.toString());
-	}
-
-	@Override
-	public void close() throws IOException {
-		_agents.getMVImputeAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents);
-		_agents.getRecodeAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents);
-		_agents.getBinAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents);
-		
-		// Output part-file offsets to create OFFSETS_FILE, which is to be used in csv reblocking.
-		// OffsetCount is denoted as a DistinctValue by concatenating parfile name and offset within partfile.
-		_collector.collect(new IntWritable((int)_agents.getNumCols()+1), new DistinctValue(new OffsetCount(_partFileName, _offsetInPartFile, _agents.getValid())));
-		
-		// reset global variables, required when the jvm is reused.
-		_firstRecordInSplit = true;
-		_offsetInPartFile = -1;
-		_partFileWithHeader = false;
-	}
-	
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+
+
+public class GTFMTDMapper implements Mapper<LongWritable, Text, IntWritable, DistinctValue>{
+	
+	private OutputCollector<IntWritable, DistinctValue> _collector = null; 
+	private int _mapTaskID = -1;
+	
+	TfUtils _agents = null;
+
+	private boolean _partFileWithHeader = false;
+	private boolean _firstRecordInSplit = true;
+	private String _partFileName = null;
+	private long _offsetInPartFile = -1;
+	
+	// ----------------------------------------------------------------------------------------------
+	
+	/**
+	 * Configure the information used in the mapper, and setup transformation agents.
+	 */
+	@Override
+	public void configure(JobConf job) {
+		String[] parts = job.get("mapred.task.id").split("_");
+		if ( parts[0].equalsIgnoreCase("task")) {
+			_mapTaskID = Integer.parseInt(parts[parts.length-1]);
+		}
+		else if ( parts[0].equalsIgnoreCase("attempt")) {
+			_mapTaskID = Integer.parseInt(parts[parts.length-2]);
+		}
+		else {
+			throw new RuntimeException("Unrecognized format for taskID: " + job.get("mapred.task.id"));
+		}
+
+		try {
+			_partFileName = TfUtils.getPartFileName(job);
+			_partFileWithHeader = TfUtils.isPartFileWithHeader(job);
+			_agents = new TfUtils(job);
+		} catch(IOException e) { throw new RuntimeException(e); }
+		  catch(JSONException e)  { throw new RuntimeException(e); }
+
+	}
+	
+	
+	public void map(LongWritable rawKey, Text rawValue, OutputCollector<IntWritable, DistinctValue> out, Reporter reporter) throws IOException  {
+		
+		if(_firstRecordInSplit)
+		{
+			_firstRecordInSplit = false;
+			_collector = out;
+			_offsetInPartFile = rawKey.get();
+		}
+		
+		// ignore header
+		if (_agents.hasHeader() && rawKey.get() == 0 && _partFileWithHeader)
+			return;
+		
+		_agents.prepareTfMtd(rawValue.toString());
+	}
+
+	@Override
+	public void close() throws IOException {
+		_agents.getMVImputeAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents);
+		_agents.getRecodeAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents);
+		_agents.getBinAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents);
+		
+		// Output part-file offsets to create OFFSETS_FILE, which is to be used in csv reblocking.
+		// OffsetCount is denoted as a DistinctValue by concatenating parfile name and offset within partfile.
+		_collector.collect(new IntWritable((int)_agents.getNumCols()+1), new DistinctValue(new OffsetCount(_partFileName, _offsetInPartFile, _agents.getValid())));
+		
+		// reset global variables, required when the jvm is reused.
+		_firstRecordInSplit = true;
+		_offsetInPartFile = -1;
+		_partFileWithHeader = false;
+	}
+	
+}