You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by du...@apache.org on 2016/01/26 02:12:56 UTC

[32/55] [partial] incubator-systemml git commit: [SYSTEMML-482] [SYSTEMML-480] Adding a Git attributes file to enfore Unix-styled line endings, and normalizing all of the line endings.

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
index 27058de..a4cfaa6 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
@@ -1,549 +1,549 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.BufferedReader;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.parser.DataExpression;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.io.MatrixReader;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.util.MapReduceTool;
-import org.apache.sysml.runtime.util.UtilFunctions;
-import org.apache.sysml.utils.JSONHelper;
-
-
-@SuppressWarnings("deprecation")
-public class TfUtils implements Serializable{
-	
-	private static final long serialVersionUID = 526252850872633125L;
-
-	private OmitAgent _oa = null;
-	private MVImputeAgent _mia = null;
-	private RecodeAgent _ra = null;	
-	private BinAgent _ba = null;
-	private DummycodeAgent _da = null;
-	
-	private long _numRecordsInPartFile;		// Total number of records in the data file
-	private long _numValidRecords;			// (_numRecordsInPartFile - #of omitted records)
-	private long _numTransformedRows; 		// Number of rows after applying transformations
-	private long _numTransformedColumns; 	// Number of columns after applying transformations
-
-	private String _headerLine = null;
-	private boolean _hasHeader;
-	private Pattern _delim = null;
-	private String _delimString = null;
-	private String[] _NAstrings = null;
-	private String[] _outputColumnNames = null;
-	private long _numInputCols = -1;
-	
-	private String _tfMtdDir = null;
-	private String _specFile = null;
-	private String _offsetFile = null;
-	private String _tmpDir = null;
-	private String _outputPath = null;
-	
-	protected static boolean checkValidInputFile(FileSystem fs, Path path, boolean err)
-			throws IOException {
-		// check non-existing file
-		if (!fs.exists(path))
-			if ( err )
-				throw new IOException("File " + path.toString() + " does not exist on HDFS/LFS.");
-			else
-				return false;
-
-		// check for empty file
-		if (MapReduceTool.isFileEmpty(fs, path.toString()))
-			if ( err )
-			throw new EOFException("Empty input file " + path.toString() + ".");
-			else
-				return false;
-		
-		return true;
-	}
-	
-	public static String getPartFileName(JobConf job) throws IOException {
-		FileSystem fs = FileSystem.get(job);
-		Path thisPath=new Path(job.get("map.input.file")).makeQualified(fs);
-		return thisPath.toString();
-	}
-	
-	public static boolean isPartFileWithHeader(JobConf job) throws IOException {
-		FileSystem fs = FileSystem.get(job);
-		
-		String thisfile=getPartFileName(job);
-		Path smallestFilePath=new Path(job.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs);
-		
-		if(thisfile.toString().equals(smallestFilePath.toString()))
-			return true;
-		else
-			return false;
-	}
-	
-	public static JSONObject readSpec(FileSystem fs, String specFile) throws IOException {
-		BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(specFile))));
-		JSONObject obj = JSONHelper.parse(br);
-		br.close();
-		return obj;
-	}
-	
-	/**
-	 * Prepare NA strings so that they can be sent to workers via JobConf.
-	 * A "dummy" string is added at the end to handle the case of empty strings.
-	 * @param na
-	 * @return
-	 */
-	public static String prepNAStrings(String na) {
-		return na  + DataExpression.DELIM_NA_STRING_SEP + "dummy";
-	}
-	
-	public static String[] parseNAStrings(String na) 
-	{
-		if ( na == null )
-			return null;
-		
-		String[] tmp = Pattern.compile(Pattern.quote(DataExpression.DELIM_NA_STRING_SEP)).split(na, -1);
-		return tmp; //Arrays.copyOf(tmp, tmp.length-1);
-	}
-	
-	public static String[] parseNAStrings(JobConf job) 
-	{
-		return parseNAStrings(job.get(MRJobConfiguration.TF_NA_STRINGS));
-	}
-	
-	private void createAgents(JSONObject spec) throws IOException, JSONException {
-		_oa = new OmitAgent(spec);
-		_mia = new MVImputeAgent(spec);
-		_ra = new RecodeAgent(spec);
-		_ba = new BinAgent(spec);
-		_da = new DummycodeAgent(spec, _numInputCols);
-	}
-	
-	public void setupAgents(OmitAgent oa, MVImputeAgent mia, RecodeAgent ra, BinAgent ba, DummycodeAgent da)  {
-		_oa = oa;
-		_mia = mia;
-		_ra = ra;
-		_ba = ba;
-		_da = da;
-	}
-	
-	private void parseColumnNames() {
-		_outputColumnNames = _delim.split(_headerLine, -1);
-		for(int i=0; i < _outputColumnNames.length; i++)
-			_outputColumnNames[i] = UtilFunctions.unquote(_outputColumnNames[i]);
-	}
-	
-	private void init(String headerLine, boolean hasHeader, String delim, String[] naStrings, JSONObject spec, long numCols, String offsetFile, String tmpPath, String outputPath) throws IOException, JSONException
-	{
-		_numRecordsInPartFile = 0;
-		_numValidRecords = 0;
-		_numTransformedRows = 0;
-		_numTransformedColumns = 0;
-		
-		_headerLine = headerLine;
-		_hasHeader = hasHeader;
-		_delimString = delim;
-		_delim = Pattern.compile(Pattern.quote(delim));
-		_NAstrings = naStrings;
-		_numInputCols = numCols;
-		_offsetFile = offsetFile;
-		_tmpDir = tmpPath;
-		_outputPath = outputPath;
-		
-		parseColumnNames();		
-		createAgents(spec);
-	}
-	
-	public TfUtils(JobConf job, boolean minimal) 
-		throws IOException, JSONException 
-	{
-		if( !InfrastructureAnalyzer.isLocalMode(job) ) {
-			ConfigurationManager.setCachedJobConf(job);
-		}
-		
-		_NAstrings = TfUtils.parseNAStrings(job);
-		_specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
-		
-		FileSystem fs = FileSystem.get(job);
-		JSONObject spec = TfUtils.readSpec(fs, _specFile);
-		
-		_oa = new OmitAgent(spec);
-	}
-	
-	// called from GenTFMtdMapper, ApplyTf (Hadoop)
-	public TfUtils(JobConf job) 
-		throws IOException, JSONException 
-	{
-		if( !InfrastructureAnalyzer.isLocalMode(job) ) {
-			ConfigurationManager.setCachedJobConf(job);
-		}
-		
-		boolean hasHeader = Boolean.parseBoolean(job.get(MRJobConfiguration.TF_HAS_HEADER));
-		//Pattern delim = Pattern.compile(Pattern.quote(job.get(MRJobConfiguration.TF_DELIM)));
-		String[] naStrings = TfUtils.parseNAStrings(job);
-		
-		long numCols = UtilFunctions.parseToLong( job.get(MRJobConfiguration.TF_NUM_COLS) );		// #of columns in input data
-			
-		String specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
-		String offsetFile = job.get(MRJobConfiguration.TF_OFFSETS_FILE);
-		String tmpPath = job.get(MRJobConfiguration.TF_TMP_LOC);
-		String outputPath = FileOutputFormat.getOutputPath(job).toString();
-		FileSystem fs = FileSystem.get(job);
-		JSONObject spec = TfUtils.readSpec(fs, specFile);
-		
-		init(job.get(MRJobConfiguration.TF_HEADER), hasHeader, job.get(MRJobConfiguration.TF_DELIM), naStrings, spec, numCols, offsetFile, tmpPath, outputPath);
-	}
-	
-	// called from GenTfMtdReducer 
-	public TfUtils(JobConf job, String tfMtdDir) throws IOException, JSONException 
-	{
-		this(job);
-		_tfMtdDir = tfMtdDir;
-	}
-	
-	// called from GenTFMtdReducer and ApplyTf (Spark)
-	public TfUtils(String headerLine, boolean hasHeader, String delim, String[] naStrings, JSONObject spec, long ncol, String tfMtdDir, String offsetFile, String tmpPath) throws IOException, JSONException {
-		init (headerLine, hasHeader, delim, naStrings, spec, ncol, offsetFile, tmpPath, null);
-		_tfMtdDir = tfMtdDir;
-	}
-	
-	public void incrValid() { _numValidRecords++; }
-	public long getValid()  { return _numValidRecords; }
-	public long getTotal()  { return _numRecordsInPartFile; }
-	public long getNumTransformedRows() 	{ return _numTransformedRows; }
-	public long getNumTransformedColumns() 	{ return _numTransformedColumns; }
-	
-	public String getHeader() 		{ return _headerLine; }
-	public boolean hasHeader() 		{ return _hasHeader; }
-	public String getDelimString() 	{ return _delimString; }
-	public Pattern getDelim() 		{ return _delim; }
-	public String[] getNAStrings() 	{ return _NAstrings; }
-	public long getNumCols() 		{ return _numInputCols; }
-	
-	public String getSpecFile() 	{ return _specFile; }
-	public String getTfMtdDir() 	{ return _tfMtdDir; }
-	public String getOffsetFile() 	{ return _offsetFile; }
-	public String getTmpDir() 		{ return _tmpDir; }
-	public String getOutputPath()	{ return _outputPath; }
-	
-	public String getName(int colID) { return _outputColumnNames[colID-1]; }
-	
-	public void setValid(long n) { _numValidRecords = n;}
-	public void incrTotal() { _numRecordsInPartFile++; }
-	public void setTotal(long n) { _numRecordsInPartFile = n;}
-	
-	public OmitAgent 	  getOmitAgent() 	{ 	return _oa; }
-	public MVImputeAgent  getMVImputeAgent(){ 	return _mia;}
-	public RecodeAgent 	  getRecodeAgent() 	{ 	return _ra; }
-	public BinAgent 	  getBinAgent() 	{ 	return _ba; }
-	public DummycodeAgent getDummycodeAgent() { return _da; }
-	
-	/**
-	 * Function that checks if the given string is one of NA strings.
-	 * 
-	 * @param w
-	 * @return
-	 */
-	public boolean isNA(String w) {
-		if(_NAstrings == null)
-			return false;
-		
-		for(String na : _NAstrings) {
-			if(w.equals(na))
-				return true;
-		}
-		return false;
-	}
-	
-	public String[] getWords(Text line)
-	{
-		return getWords(line.toString());
-	}
-	
-
-	public String[] getWords(String line) 
-	{
-		return getDelim().split(line.trim(), -1);
-	}
-	
-	/**
-	 * Process a given row to construct transformation metadata.
-	 * 
-	 * @param line
-	 * @return
-	 * @throws IOException
-	 */
-	public String[] prepareTfMtd(String line) throws IOException {
-		String[] words = getWords(line);
-		if(!getOmitAgent().omit(words, this))
-		{
-			getMVImputeAgent().prepare(words, this);
-			getRecodeAgent().prepare(words, this);
-			getBinAgent().prepare(words, this);
-			incrValid();;
-		}
-		incrTotal();
-		
-		return words;
-	}
-	
-	public void loadTfMetadata() throws IOException 
-	{
-		JobConf job = ConfigurationManager.getCachedJobConf();
-		loadTfMetadata(job, false);
-	}
-	
-	public void loadTfMetadata(JobConf job, boolean fromLocalFS) throws IOException
-	{
-		Path tfMtdDir = null; 
-		FileSystem fs = null;
-		
-		if(fromLocalFS) {
-			// metadata must be read from local file system (e.g., distributed cache in the case of Hadoop)
-			tfMtdDir = (DistributedCache.getLocalCacheFiles(job))[0];
-			fs = FileSystem.getLocal(job);
-		}
-		else {
-			fs = FileSystem.get(job);
-			tfMtdDir = new Path(getTfMtdDir());
-		}
-		
-		// load transformation metadata 
-		getMVImputeAgent().loadTxMtd(job, fs, tfMtdDir, this);
-		getRecodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
-		getBinAgent().loadTxMtd(job, fs, tfMtdDir, this);
-		
-		// associate recode maps and bin definitions with dummycoding agent,
-		// as recoded and binned columns are typically dummycoded
-		getDummycodeAgent().setRecodeMaps( getRecodeAgent().getRecodeMaps() );
-		getDummycodeAgent().setNumBins(getBinAgent().getBinList(), getBinAgent().getNumBins());
-		getDummycodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
-
-	}
-	
-	/*public void loadTfMetadata () throws IOException
-	{
-		Path tfMtdDir = (DistributedCache.getLocalCacheFiles(_rJob))[0];
-		FileSystem localFS = FileSystem.getLocal(_rJob);
-		
-		loadTfMetadata(_rJob, localFS, tfMtdDir);
-		
-		FileSystem fs;
-		fs = FileSystem.get(_rJob);
-		Path thisPath=new Path(_rJob.get("map.input.file")).makeQualified(fs);
-		String thisfile=thisPath.toString();
-			
-		Path smallestFilePath=new Path(_rJob.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs);
-		if(thisfile.toString().equals(smallestFilePath.toString()))
-			_partFileWithHeader=true;
-		else
-			_partFileWithHeader = false;
-	}*/
-
-
-	public String processHeaderLine() throws IOException 
-	{
-		FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf());
-		String dcdHeader = getDummycodeAgent().constructDummycodedHeader(getHeader(), getDelim());
-		getDummycodeAgent().genDcdMapsAndColTypes(fs, getTmpDir(), (int) getNumCols(), this);
-		
-		// write header information (before and after transformation) to temporary path
-		// these files are copied into txMtdPath, once the ApplyTf job is complete.
-		DataTransform.generateHeaderFiles(fs, getTmpDir(), getHeader(), dcdHeader);
-
-		return dcdHeader;
-		//_numTransformedColumns = getDelim().split(dcdHeader, -1).length; 
-		//return _numTransformedColumns;
-	}
-
-	public boolean omit(String[] words) {
-		if(getOmitAgent() == null)
-			return false;
-		return getOmitAgent().omit(words, this);
-	}
-	
-	
-	public String[] apply(String[] words) {
-		return apply(words, false);
-	}
-	
-	/**
-	 * Function to apply transformation metadata on a given row.
-	 * 
-	 * @param words
-	 * @param optimizeMaps
-	 * @return
-	 */
-	public String[] apply ( String[] words, boolean optimizeMaps ) 
-	{
-		words = getMVImputeAgent().apply(words, this);
-		
-		if(optimizeMaps)
-			// specific case of transform() invoked from CP (to save boxing and unboxing)
-			words = getRecodeAgent().cp_apply(words, this);
-		else
-			words = getRecodeAgent().apply(words, this);
-
-		words = getBinAgent().apply(words, this);
-		words = getDummycodeAgent().apply(words, this);
-		
-		_numTransformedRows++;
-		
-		return words;
-	}
-	
-	public void check(String []words) throws DMLRuntimeException 
-	{
-		boolean checkEmptyString = ( getNAStrings() != null );
-		if ( checkEmptyString ) 
-		{
-			final String msg = "When na.strings are provided, empty string \"\" is considered as a missing value, and it must be imputed appropriately. Encountered an unhandled empty string in column ID: ";
-			for(int i=0; i<words.length; i++) 
-				if ( words[i] != null && words[i].equals(""))
-					throw new DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1));
-		}
-	}
-	
-	public String checkAndPrepOutputString(String []words) throws DMLRuntimeException 
-	{
-		return checkAndPrepOutputString(words, new StringBuilder());
-	}
-	
-	public String checkAndPrepOutputString(String []words, StringBuilder sb) throws DMLRuntimeException 
-	{
-		/*
-		 * Check if empty strings ("") have to be handled.
-		 * 
-		 * Unless na.strings are provided, empty strings are (implicitly) considered as value zero.
-		 * When na.strings are provided, then "" is considered a missing value indicator, and the 
-		 * user is expected to provide an appropriate imputation method. Therefore, when na.strings 
-		 * are provided, "" encountered in any column (after all transformations are applied) 
-		 * denotes an erroneous condition.  
-		 */
-		boolean checkEmptyString = ( getNAStrings() != null ); //&& !MVImputeAgent.isNA("", TransformationAgent.NAstrings) ) {
-		
-		//StringBuilder sb = new StringBuilder();
-		sb.setLength(0);
-		int i =0;
-		
-		if ( checkEmptyString ) 
-		{
-			final String msg = "When na.strings are provided, empty string \"\" is considered as a missing value, and it must be imputed appropriately. Encountered an unhandled empty string in column ID: ";
-			if ( words[0] != null ) 
-				if ( words[0].equals("") )
-					throw new DMLRuntimeException( msg + getDummycodeAgent().mapDcdColumnID(1));
-				else 
-					sb.append(words[0]);
-			else
-				sb.append("0");
-			
-			for(i=1; i<words.length; i++) 
-			{
-				sb.append(_delimString);
-				
-				if ( words[i] != null ) 
-					if ( words[i].equals("") )
-						throw new DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1));
-					else 
-						sb.append(words[i]);
-				else
-					sb.append("0");
-			}
-		}
-		else 
-		{
-			sb.append(words[0] != null ? words[0] : "0");
-			for(i=1; i<words.length; i++) 
-			{
-				sb.append(_delimString);
-				sb.append(words[i] != null ? words[i] : "0");
-			}
-		}
-		
-		return sb.toString();
-	}
-
-	private Reader initOffsetsReader(JobConf job) throws IOException 
-	{
-		Path path=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
-		FileSystem fs = FileSystem.get(job);
-		Path[] files = MatrixReader.getSequenceFilePaths(fs, path);
-		if ( files.length != 1 )
-			throw new IOException("Expecting a single file under counters file: " + path.toString());
-		
-		Reader reader = new SequenceFile.Reader(fs, files[0], job);
-		
-		return reader;
-	}
-	
-	/**
-	 * Function to generate custom file names (transform-part-.....) for
-	 * mappers' output for ApplyTfCSV job. The idea is to find the index 
-	 * of (thisfile, fileoffset) in the list of all offsets from the 
-	 * counters/offsets file, which was generated from either GenTfMtdMR
-	 * or AssignRowIDMR job.
-	 * 
-	 */
-	public String getPartFileID(JobConf job, long offset) throws IOException
-	{
-		Reader reader = initOffsetsReader(job);
-		
-		ByteWritable key=new ByteWritable();
-		OffsetCount value=new OffsetCount();
-		String thisFile = TfUtils.getPartFileName(job);
-		
-		int id = 0;
-		while (reader.next(key, value)) {
-			if ( thisFile.equals(value.filename) && value.fileOffset == offset ) 
-				break;
-			id++;
-		}
-		reader.close();
-		
-		String sid = Integer.toString(id);
-		char[] carr = new char[5-sid.length()];
-		Arrays.fill(carr, '0');
-		String ret = (new String(carr)).concat(sid);
-		
-		return ret;
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+
+import java.io.BufferedReader;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.parser.DataExpression;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.io.MatrixReader;
+import org.apache.sysml.runtime.matrix.CSVReblockMR;
+import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+import org.apache.sysml.runtime.util.MapReduceTool;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.utils.JSONHelper;
+
+
+@SuppressWarnings("deprecation")
+public class TfUtils implements Serializable{
+	
+	private static final long serialVersionUID = 526252850872633125L;
+
+	private OmitAgent _oa = null;
+	private MVImputeAgent _mia = null;
+	private RecodeAgent _ra = null;	
+	private BinAgent _ba = null;
+	private DummycodeAgent _da = null;
+	
+	private long _numRecordsInPartFile;		// Total number of records in the data file
+	private long _numValidRecords;			// (_numRecordsInPartFile - #of omitted records)
+	private long _numTransformedRows; 		// Number of rows after applying transformations
+	private long _numTransformedColumns; 	// Number of columns after applying transformations
+
+	private String _headerLine = null;
+	private boolean _hasHeader;
+	private Pattern _delim = null;
+	private String _delimString = null;
+	private String[] _NAstrings = null;
+	private String[] _outputColumnNames = null;
+	private long _numInputCols = -1;
+	
+	private String _tfMtdDir = null;
+	private String _specFile = null;
+	private String _offsetFile = null;
+	private String _tmpDir = null;
+	private String _outputPath = null;
+	
+	protected static boolean checkValidInputFile(FileSystem fs, Path path, boolean err)
+			throws IOException {
+		// check non-existing file
+		if (!fs.exists(path))
+			if ( err )
+				throw new IOException("File " + path.toString() + " does not exist on HDFS/LFS.");
+			else
+				return false;
+
+		// check for empty file
+		if (MapReduceTool.isFileEmpty(fs, path.toString()))
+			if ( err )
+			throw new EOFException("Empty input file " + path.toString() + ".");
+			else
+				return false;
+		
+		return true;
+	}
+	
+	public static String getPartFileName(JobConf job) throws IOException {
+		FileSystem fs = FileSystem.get(job);
+		Path thisPath=new Path(job.get("map.input.file")).makeQualified(fs);
+		return thisPath.toString();
+	}
+	
+	public static boolean isPartFileWithHeader(JobConf job) throws IOException {
+		FileSystem fs = FileSystem.get(job);
+		
+		String thisfile=getPartFileName(job);
+		Path smallestFilePath=new Path(job.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs);
+		
+		if(thisfile.toString().equals(smallestFilePath.toString()))
+			return true;
+		else
+			return false;
+	}
+	
+	public static JSONObject readSpec(FileSystem fs, String specFile) throws IOException {
+		BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(specFile))));
+		JSONObject obj = JSONHelper.parse(br);
+		br.close();
+		return obj;
+	}
+	
+	/**
+	 * Prepare NA strings so that they can be sent to workers via JobConf.
+	 * A "dummy" string is added at the end to handle the case of empty strings.
+	 * @param na
+	 * @return
+	 */
+	public static String prepNAStrings(String na) {
+		return na  + DataExpression.DELIM_NA_STRING_SEP + "dummy";
+	}
+	
+	public static String[] parseNAStrings(String na) 
+	{
+		if ( na == null )
+			return null;
+		
+		String[] tmp = Pattern.compile(Pattern.quote(DataExpression.DELIM_NA_STRING_SEP)).split(na, -1);
+		return tmp; //Arrays.copyOf(tmp, tmp.length-1);
+	}
+	
+	public static String[] parseNAStrings(JobConf job) 
+	{
+		return parseNAStrings(job.get(MRJobConfiguration.TF_NA_STRINGS));
+	}
+	
+	private void createAgents(JSONObject spec) throws IOException, JSONException {
+		_oa = new OmitAgent(spec);
+		_mia = new MVImputeAgent(spec);
+		_ra = new RecodeAgent(spec);
+		_ba = new BinAgent(spec);
+		_da = new DummycodeAgent(spec, _numInputCols);
+	}
+	
+	public void setupAgents(OmitAgent oa, MVImputeAgent mia, RecodeAgent ra, BinAgent ba, DummycodeAgent da)  {
+		_oa = oa;
+		_mia = mia;
+		_ra = ra;
+		_ba = ba;
+		_da = da;
+	}
+	
+	private void parseColumnNames() {
+		_outputColumnNames = _delim.split(_headerLine, -1);
+		for(int i=0; i < _outputColumnNames.length; i++)
+			_outputColumnNames[i] = UtilFunctions.unquote(_outputColumnNames[i]);
+	}
+	
+	private void init(String headerLine, boolean hasHeader, String delim, String[] naStrings, JSONObject spec, long numCols, String offsetFile, String tmpPath, String outputPath) throws IOException, JSONException
+	{
+		_numRecordsInPartFile = 0;
+		_numValidRecords = 0;
+		_numTransformedRows = 0;
+		_numTransformedColumns = 0;
+		
+		_headerLine = headerLine;
+		_hasHeader = hasHeader;
+		_delimString = delim;
+		_delim = Pattern.compile(Pattern.quote(delim));
+		_NAstrings = naStrings;
+		_numInputCols = numCols;
+		_offsetFile = offsetFile;
+		_tmpDir = tmpPath;
+		_outputPath = outputPath;
+		
+		parseColumnNames();		
+		createAgents(spec);
+	}
+	
+	public TfUtils(JobConf job, boolean minimal) 
+		throws IOException, JSONException 
+	{
+		if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+			ConfigurationManager.setCachedJobConf(job);
+		}
+		
+		_NAstrings = TfUtils.parseNAStrings(job);
+		_specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
+		
+		FileSystem fs = FileSystem.get(job);
+		JSONObject spec = TfUtils.readSpec(fs, _specFile);
+		
+		_oa = new OmitAgent(spec);
+	}
+	
+	// called from GenTFMtdMapper, ApplyTf (Hadoop)
+	public TfUtils(JobConf job) 
+		throws IOException, JSONException 
+	{
+		if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+			ConfigurationManager.setCachedJobConf(job);
+		}
+		
+		boolean hasHeader = Boolean.parseBoolean(job.get(MRJobConfiguration.TF_HAS_HEADER));
+		//Pattern delim = Pattern.compile(Pattern.quote(job.get(MRJobConfiguration.TF_DELIM)));
+		String[] naStrings = TfUtils.parseNAStrings(job);
+		
+		long numCols = UtilFunctions.parseToLong( job.get(MRJobConfiguration.TF_NUM_COLS) );		// #of columns in input data
+			
+		String specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
+		String offsetFile = job.get(MRJobConfiguration.TF_OFFSETS_FILE);
+		String tmpPath = job.get(MRJobConfiguration.TF_TMP_LOC);
+		String outputPath = FileOutputFormat.getOutputPath(job).toString();
+		FileSystem fs = FileSystem.get(job);
+		JSONObject spec = TfUtils.readSpec(fs, specFile);
+		
+		init(job.get(MRJobConfiguration.TF_HEADER), hasHeader, job.get(MRJobConfiguration.TF_DELIM), naStrings, spec, numCols, offsetFile, tmpPath, outputPath);
+	}
+	
+	// called from GenTfMtdReducer 
+	public TfUtils(JobConf job, String tfMtdDir) throws IOException, JSONException 
+	{
+		this(job);
+		_tfMtdDir = tfMtdDir;
+	}
+	
+	// called from GenTFMtdReducer and ApplyTf (Spark)
+	public TfUtils(String headerLine, boolean hasHeader, String delim, String[] naStrings, JSONObject spec, long ncol, String tfMtdDir, String offsetFile, String tmpPath) throws IOException, JSONException {
+		init (headerLine, hasHeader, delim, naStrings, spec, ncol, offsetFile, tmpPath, null);
+		_tfMtdDir = tfMtdDir;
+	}
+	
+	public void incrValid() { _numValidRecords++; }
+	public long getValid()  { return _numValidRecords; }
+	public long getTotal()  { return _numRecordsInPartFile; }
+	public long getNumTransformedRows() 	{ return _numTransformedRows; }
+	public long getNumTransformedColumns() 	{ return _numTransformedColumns; }
+	
+	public String getHeader() 		{ return _headerLine; }
+	public boolean hasHeader() 		{ return _hasHeader; }
+	public String getDelimString() 	{ return _delimString; }
+	public Pattern getDelim() 		{ return _delim; }
+	public String[] getNAStrings() 	{ return _NAstrings; }
+	public long getNumCols() 		{ return _numInputCols; }
+	
+	public String getSpecFile() 	{ return _specFile; }
+	public String getTfMtdDir() 	{ return _tfMtdDir; }
+	public String getOffsetFile() 	{ return _offsetFile; }
+	public String getTmpDir() 		{ return _tmpDir; }
+	public String getOutputPath()	{ return _outputPath; }
+	
+	public String getName(int colID) { return _outputColumnNames[colID-1]; }
+	
+	public void setValid(long n) { _numValidRecords = n;}
+	public void incrTotal() { _numRecordsInPartFile++; }
+	public void setTotal(long n) { _numRecordsInPartFile = n;}
+	
+	public OmitAgent 	  getOmitAgent() 	{ 	return _oa; }
+	public MVImputeAgent  getMVImputeAgent(){ 	return _mia;}
+	public RecodeAgent 	  getRecodeAgent() 	{ 	return _ra; }
+	public BinAgent 	  getBinAgent() 	{ 	return _ba; }
+	public DummycodeAgent getDummycodeAgent() { return _da; }
+	
+	/**
+	 * Function that checks if the given string is one of NA strings.
+	 * 
+	 * @param w
+	 * @return
+	 */
+	public boolean isNA(String w) {
+		if(_NAstrings == null)
+			return false;
+		
+		for(String na : _NAstrings) {
+			if(w.equals(na))
+				return true;
+		}
+		return false;
+	}
+	
+	public String[] getWords(Text line)
+	{
+		return getWords(line.toString());
+	}
+	
+
+	public String[] getWords(String line) 
+	{
+		return getDelim().split(line.trim(), -1);
+	}
+	
+	/**
+	 * Process a given row to construct transformation metadata.
+	 * 
+	 * @param line
+	 * @return
+	 * @throws IOException
+	 */
+	public String[] prepareTfMtd(String line) throws IOException {
+		String[] words = getWords(line);
+		if(!getOmitAgent().omit(words, this))
+		{
+			getMVImputeAgent().prepare(words, this);
+			getRecodeAgent().prepare(words, this);
+			getBinAgent().prepare(words, this);
+			incrValid();;
+		}
+		incrTotal();
+		
+		return words;
+	}
+	
+	public void loadTfMetadata() throws IOException 
+	{
+		JobConf job = ConfigurationManager.getCachedJobConf();
+		loadTfMetadata(job, false);
+	}
+	
+	public void loadTfMetadata(JobConf job, boolean fromLocalFS) throws IOException
+	{
+		Path tfMtdDir = null; 
+		FileSystem fs = null;
+		
+		if(fromLocalFS) {
+			// metadata must be read from local file system (e.g., distributed cache in the case of Hadoop)
+			tfMtdDir = (DistributedCache.getLocalCacheFiles(job))[0];
+			fs = FileSystem.getLocal(job);
+		}
+		else {
+			fs = FileSystem.get(job);
+			tfMtdDir = new Path(getTfMtdDir());
+		}
+		
+		// load transformation metadata 
+		getMVImputeAgent().loadTxMtd(job, fs, tfMtdDir, this);
+		getRecodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
+		getBinAgent().loadTxMtd(job, fs, tfMtdDir, this);
+		
+		// associate recode maps and bin definitions with dummycoding agent,
+		// as recoded and binned columns are typically dummycoded
+		getDummycodeAgent().setRecodeMaps( getRecodeAgent().getRecodeMaps() );
+		getDummycodeAgent().setNumBins(getBinAgent().getBinList(), getBinAgent().getNumBins());
+		getDummycodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
+
+	}
+	
+	/*public void loadTfMetadata () throws IOException
+	{
+		Path tfMtdDir = (DistributedCache.getLocalCacheFiles(_rJob))[0];
+		FileSystem localFS = FileSystem.getLocal(_rJob);
+		
+		loadTfMetadata(_rJob, localFS, tfMtdDir);
+		
+		FileSystem fs;
+		fs = FileSystem.get(_rJob);
+		Path thisPath=new Path(_rJob.get("map.input.file")).makeQualified(fs);
+		String thisfile=thisPath.toString();
+			
+		Path smallestFilePath=new Path(_rJob.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs);
+		if(thisfile.toString().equals(smallestFilePath.toString()))
+			_partFileWithHeader=true;
+		else
+			_partFileWithHeader = false;
+	}*/
+
+
+	public String processHeaderLine() throws IOException 
+	{
+		FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf());
+		String dcdHeader = getDummycodeAgent().constructDummycodedHeader(getHeader(), getDelim());
+		getDummycodeAgent().genDcdMapsAndColTypes(fs, getTmpDir(), (int) getNumCols(), this);
+		
+		// write header information (before and after transformation) to temporary path
+		// these files are copied into txMtdPath, once the ApplyTf job is complete.
+		DataTransform.generateHeaderFiles(fs, getTmpDir(), getHeader(), dcdHeader);
+
+		return dcdHeader;
+		//_numTransformedColumns = getDelim().split(dcdHeader, -1).length; 
+		//return _numTransformedColumns;
+	}
+
+	public boolean omit(String[] words) {
+		if(getOmitAgent() == null)
+			return false;
+		return getOmitAgent().omit(words, this);
+	}
+	
+	
+	public String[] apply(String[] words) {
+		return apply(words, false);
+	}
+	
+	/**
+	 * Function to apply transformation metadata on a given row.
+	 * 
+	 * @param words
+	 * @param optimizeMaps
+	 * @return
+	 */
+	public String[] apply ( String[] words, boolean optimizeMaps ) 
+	{
+		words = getMVImputeAgent().apply(words, this);
+		
+		if(optimizeMaps)
+			// specific case of transform() invoked from CP (to save boxing and unboxing)
+			words = getRecodeAgent().cp_apply(words, this);
+		else
+			words = getRecodeAgent().apply(words, this);
+
+		words = getBinAgent().apply(words, this);
+		words = getDummycodeAgent().apply(words, this);
+		
+		_numTransformedRows++;
+		
+		return words;
+	}
+	
+	public void check(String []words) throws DMLRuntimeException 
+	{
+		boolean checkEmptyString = ( getNAStrings() != null );
+		if ( checkEmptyString ) 
+		{
+			final String msg = "When na.strings are provided, empty string \"\" is considered as a missing value, and it must be imputed appropriately. Encountered an unhandled empty string in column ID: ";
+			for(int i=0; i<words.length; i++) 
+				if ( words[i] != null && words[i].equals(""))
+					throw new DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1));
+		}
+	}
+	
+	public String checkAndPrepOutputString(String []words) throws DMLRuntimeException 
+	{
+		return checkAndPrepOutputString(words, new StringBuilder());
+	}
+	
+	public String checkAndPrepOutputString(String []words, StringBuilder sb) throws DMLRuntimeException 
+	{
+		/*
+		 * Check if empty strings ("") have to be handled.
+		 * 
+		 * Unless na.strings are provided, empty strings are (implicitly) considered as value zero.
+		 * When na.strings are provided, then "" is considered a missing value indicator, and the 
+		 * user is expected to provide an appropriate imputation method. Therefore, when na.strings 
+		 * are provided, "" encountered in any column (after all transformations are applied) 
+		 * denotes an erroneous condition.  
+		 */
+		boolean checkEmptyString = ( getNAStrings() != null ); //&& !MVImputeAgent.isNA("", TransformationAgent.NAstrings) ) {
+		
+		//StringBuilder sb = new StringBuilder();
+		sb.setLength(0);
+		int i =0;
+		
+		if ( checkEmptyString ) 
+		{
+			final String msg = "When na.strings are provided, empty string \"\" is considered as a missing value, and it must be imputed appropriately. Encountered an unhandled empty string in column ID: ";
+			if ( words[0] != null ) 
+				if ( words[0].equals("") )
+					throw new DMLRuntimeException( msg + getDummycodeAgent().mapDcdColumnID(1));
+				else 
+					sb.append(words[0]);
+			else
+				sb.append("0");
+			
+			for(i=1; i<words.length; i++) 
+			{
+				sb.append(_delimString);
+				
+				if ( words[i] != null ) 
+					if ( words[i].equals("") )
+						throw new DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1));
+					else 
+						sb.append(words[i]);
+				else
+					sb.append("0");
+			}
+		}
+		else 
+		{
+			sb.append(words[0] != null ? words[0] : "0");
+			for(i=1; i<words.length; i++) 
+			{
+				sb.append(_delimString);
+				sb.append(words[i] != null ? words[i] : "0");
+			}
+		}
+		
+		return sb.toString();
+	}
+
+	private Reader initOffsetsReader(JobConf job) throws IOException 
+	{
+		Path path=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
+		FileSystem fs = FileSystem.get(job);
+		Path[] files = MatrixReader.getSequenceFilePaths(fs, path);
+		if ( files.length != 1 )
+			throw new IOException("Expecting a single file under counters file: " + path.toString());
+		
+		Reader reader = new SequenceFile.Reader(fs, files[0], job);
+		
+		return reader;
+	}
+	
+	/**
+	 * Function to generate custom file names (transform-part-.....) for
+	 * mappers' output for ApplyTfCSV job. The idea is to find the index 
+	 * of (thisfile, fileoffset) in the list of all offsets from the 
+	 * counters/offsets file, which was generated from either GenTfMtdMR
+	 * or AssignRowIDMR job.
+	 * 
+	 */
+	public String getPartFileID(JobConf job, long offset) throws IOException
+	{
+		Reader reader = initOffsetsReader(job);
+		
+		ByteWritable key=new ByteWritable();
+		OffsetCount value=new OffsetCount();
+		String thisFile = TfUtils.getPartFileName(job);
+		
+		int id = 0;
+		while (reader.next(key, value)) {
+			if ( thisFile.equals(value.filename) && value.fileOffset == offset ) 
+				break;
+			id++;
+		}
+		reader.close();
+		
+		String sid = Integer.toString(id);
+		char[] carr = new char[5-sid.length()];
+		Arrays.fill(carr, '0');
+		String ret = (new String(carr)).concat(sid);
+		
+		return ret;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java b/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java
index e818089..2c5e37f 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java
@@ -1,93 +1,93 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-
-public abstract class TransformationAgent implements Serializable {
-	
-	private static final long serialVersionUID = -2995384194257356337L;
-	
-	public static enum TX_METHOD { 
-		IMPUTE ("impute"), 
-		RECODE ("recode"), 
-		BIN ("bin"), 
-		DUMMYCODE ("dummycode"), 
-		SCALE ("scale"),
-		OMIT ("omit"),
-		MVRCD ("mvrcd");
-		
-		private String _name;
-		
-		TX_METHOD(String name) { _name = name; }
-		
-		public String toString() {
-			return _name;
-		}
-	}
-	
-	protected static String JSON_ATTRS 	= "attributes"; 
-	protected static String JSON_MTHD 	= "methods"; 
-	protected static String JSON_CONSTS = "constants"; 
-	protected static String JSON_NBINS 	= "numbins"; 
-	
-	protected static final String MV_FILE_SUFFIX 		= ".impute";
-	protected static final String RCD_MAP_FILE_SUFFIX 	= ".map";
-	protected static final String NDISTINCT_FILE_SUFFIX = ".ndistinct";
-	protected static final String MODE_FILE_SUFFIX 		= ".mode";
-	protected static final String BIN_FILE_SUFFIX 		= ".bin";
-	protected static final String SCALE_FILE_SUFFIX		= ".scale";
-	protected static final String DCD_FILE_NAME 		= "dummyCodeMaps.csv";
-	protected static final String COLTYPES_FILE_NAME 	= "coltypes.csv";
-	
-	protected static final String TXMTD_SEP 	= ",";
-	protected static final String DCD_NAME_SEP 	= "_";
-	
-	protected static final String OUT_HEADER = "column.names";
-	protected static final String OUT_DCD_HEADER = "dummycoded.column.names";
-	
-	abstract public void print();
-	abstract public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException;
-	abstract public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException;
-	
-	abstract public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException;
-	abstract public String[] apply(String[] words, TfUtils agents);
-	
-	protected enum ColumnTypes { SCALE, NOMINAL, ORDINAL, DUMMYCODED, INVALID }
-	protected byte columnTypeToID(ColumnTypes type) throws IOException { 
-		switch(type) 
-		{
-		case SCALE: return 1;
-		case NOMINAL: return 2;
-		case ORDINAL: return 3;
-		case DUMMYCODED: return 1; // Ideally, dummycoded columns should be of a different type. Treating them as SCALE is incorrect, semantically.
-		default:
-			throw new IOException("Invalid Column Type: " + type);
-		}
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+
+public abstract class TransformationAgent implements Serializable {
+	
+	private static final long serialVersionUID = -2995384194257356337L;
+	
+	public static enum TX_METHOD { 
+		IMPUTE ("impute"), 
+		RECODE ("recode"), 
+		BIN ("bin"), 
+		DUMMYCODE ("dummycode"), 
+		SCALE ("scale"),
+		OMIT ("omit"),
+		MVRCD ("mvrcd");
+		
+		private String _name;
+		
+		TX_METHOD(String name) { _name = name; }
+		
+		public String toString() {
+			return _name;
+		}
+	}
+	
+	protected static String JSON_ATTRS 	= "attributes"; 
+	protected static String JSON_MTHD 	= "methods"; 
+	protected static String JSON_CONSTS = "constants"; 
+	protected static String JSON_NBINS 	= "numbins"; 
+	
+	protected static final String MV_FILE_SUFFIX 		= ".impute";
+	protected static final String RCD_MAP_FILE_SUFFIX 	= ".map";
+	protected static final String NDISTINCT_FILE_SUFFIX = ".ndistinct";
+	protected static final String MODE_FILE_SUFFIX 		= ".mode";
+	protected static final String BIN_FILE_SUFFIX 		= ".bin";
+	protected static final String SCALE_FILE_SUFFIX		= ".scale";
+	protected static final String DCD_FILE_NAME 		= "dummyCodeMaps.csv";
+	protected static final String COLTYPES_FILE_NAME 	= "coltypes.csv";
+	
+	protected static final String TXMTD_SEP 	= ",";
+	protected static final String DCD_NAME_SEP 	= "_";
+	
+	protected static final String OUT_HEADER = "column.names";
+	protected static final String OUT_DCD_HEADER = "dummycoded.column.names";
+	
+	abstract public void print();
+	abstract public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException;
+	abstract public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException;
+	
+	abstract public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException;
+	abstract public String[] apply(String[] words, TfUtils agents);
+	
+	protected enum ColumnTypes { SCALE, NOMINAL, ORDINAL, DUMMYCODED, INVALID }
+	protected byte columnTypeToID(ColumnTypes type) throws IOException { 
+		switch(type) 
+		{
+		case SCALE: return 1;
+		case NOMINAL: return 2;
+		case ORDINAL: return 3;
+		case DUMMYCODED: return 1; // Ideally, dummycoded columns should be of a different type. Treating them as SCALE is incorrect, semantically.
+		default:
+			throw new IOException("Invalid Column Type: " + type);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/standalone/runStandaloneSystemML.bat
----------------------------------------------------------------------
diff --git a/src/main/standalone/runStandaloneSystemML.bat b/src/main/standalone/runStandaloneSystemML.bat
index aba2002..f837970 100644
--- a/src/main/standalone/runStandaloneSystemML.bat
+++ b/src/main/standalone/runStandaloneSystemML.bat
@@ -1,50 +1,50 @@
-::-------------------------------------------------------------
-::
-:: Licensed to the Apache Software Foundation (ASF) under one
-:: or more contributor license agreements.  See the NOTICE file
-:: distributed with this work for additional information
-:: regarding copyright ownership.  The ASF licenses this file
-:: to you under the Apache License, Version 2.0 (the
-:: "License"); you may not use this file except in compliance
-:: with the License.  You may obtain a copy of the License at
-:: 
-::   http://www.apache.org/licenses/LICENSE-2.0
-:: 
-:: Unless required by applicable law or agreed to in writing,
-:: software distributed under the License is distributed on an
-:: "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-:: KIND, either express or implied.  See the License for the
-:: specific language governing permissions and limitations
-:: under the License.
-::
-::-------------------------------------------------------------
-
-@ECHO OFF
-
-IF "%~1" == ""  GOTO Err
-IF "%~1" == "-help" GOTO Msg
-IF "%~1" == "-h" GOTO Msg
-
-setLocal EnableDelayedExpansion
-
-SET HADOOP_HOME=%CD%/lib/hadoop
-
-set CLASSPATH=./lib/*
-echo !CLASSPATH!
-
-set LOG4JPROP=log4j.properties
-
-for /f "tokens=1,* delims= " %%a in ("%*") do set ALLBUTFIRST=%%b
-
-java -Xmx4g -Xms4g -Xmn400m -cp %CLASSPATH% -Dlog4j.configuration=file:%LOG4JPROP% org.apache.sysml.api.DMLScript -f %1 -exec singlenode -config=SystemML-config.xml %ALLBUTFIRST%
-GOTO End
-
-:Err
-ECHO "Wrong Usage. Please provide DML filename to be executed."
-GOTO Msg
-
-:Msg
-ECHO "Usage: runStandaloneSystemML.bat <dml-filename> [arguments] [-help]"
-ECHO "Script internally invokes 'java -Xmx4g -Xms4g -Xmn400m -jar jSystemML.jar -f <dml-filename> -exec singlenode -config=SystemML-config.xml [Optional-Arguments]'"
-
-:End
+::-------------------------------------------------------------
+::
+:: Licensed to the Apache Software Foundation (ASF) under one
+:: or more contributor license agreements.  See the NOTICE file
+:: distributed with this work for additional information
+:: regarding copyright ownership.  The ASF licenses this file
+:: to you under the Apache License, Version 2.0 (the
+:: "License"); you may not use this file except in compliance
+:: with the License.  You may obtain a copy of the License at
+:: 
+::   http://www.apache.org/licenses/LICENSE-2.0
+:: 
+:: Unless required by applicable law or agreed to in writing,
+:: software distributed under the License is distributed on an
+:: "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+:: KIND, either express or implied.  See the License for the
+:: specific language governing permissions and limitations
+:: under the License.
+::
+::-------------------------------------------------------------
+
+@ECHO OFF
+
+IF "%~1" == ""  GOTO Err
+IF "%~1" == "-help" GOTO Msg
+IF "%~1" == "-h" GOTO Msg
+
+setLocal EnableDelayedExpansion
+
+SET HADOOP_HOME=%CD%/lib/hadoop
+
+set CLASSPATH=./lib/*
+echo !CLASSPATH!
+
+set LOG4JPROP=log4j.properties
+
+for /f "tokens=1,* delims= " %%a in ("%*") do set ALLBUTFIRST=%%b
+
+java -Xmx4g -Xms4g -Xmn400m -cp %CLASSPATH% -Dlog4j.configuration=file:%LOG4JPROP% org.apache.sysml.api.DMLScript -f %1 -exec singlenode -config=SystemML-config.xml %ALLBUTFIRST%
+GOTO End
+
+:Err
+ECHO "Wrong Usage. Please provide DML filename to be executed."
+GOTO Msg
+
+:Msg
+ECHO "Usage: runStandaloneSystemML.bat <dml-filename> [arguments] [-help]"
+ECHO "Script internally invokes 'java -Xmx4g -Xms4g -Xmn400m -jar jSystemML.jar -f <dml-filename> -exec singlenode -config=SystemML-config.xml [Optional-Arguments]'"
+
+:End

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/test/scripts/applications/apply-transform/apply-transform.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/applications/apply-transform/apply-transform.dml b/src/test/scripts/applications/apply-transform/apply-transform.dml
index de7fa02..fdd85c7 100644
--- a/src/test/scripts/applications/apply-transform/apply-transform.dml
+++ b/src/test/scripts/applications/apply-transform/apply-transform.dml
@@ -1,156 +1,156 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-cmdLine_missing_value_maps = ifdef($missing_value_maps, " ")
-cmdLine_bin_defns = ifdef($bin_defns, " ")
-cmdLine_dummy_code_maps = ifdef($dummy_code_maps, " ")
-cmdLine_normalization_maps = ifdef($normalization_maps, " ")
-
-original_X = read($X)
-
-if(cmdLine_missing_value_maps != " "){
-	missing_val_maps = read(cmdLine_missing_value_maps)
-
-	last_data_col = ncol(original_X)-nrow(missing_val_maps)
-	X = original_X[,1:last_data_col]
-}else
-	X = original_X
-
-# col 1: col index of missing indicator col
-#		 0 otherwise
-# col 2: global mean if imputation is needed
-# col 3: num_bins if binning is required
-# col 4: bin width if binning is required
-# col 5: min val if binning is required
-# col 6: begin col if dummy coding is required
-# col 7: end col if dummy coding is required
-# col 8: 1 if normalization is required 0 ow
-# col 9: mean for normalization
-# col 10: std for z-scoring for normalization
-#		 -1 indicates mean subtraction  
-attrinfo = matrix(0, rows=ncol(X), cols=10)
-
-if(cmdLine_missing_value_maps != " "){
-	missing_indicator_mat = original_X[,(last_data_col+1):ncol(original_X)]
-	
-	parfor(i in 1:nrow(missing_val_maps), check=0){
-		attr_index_mv = castAsScalar(missing_val_maps[i,1])
-		attrinfo[attr_index_mv,1] = i
-		attrinfo[attr_index_mv,2] = missing_val_maps[i,2]
-	}	
-}
- 	
-if(cmdLine_bin_defns != " "){
-	bin_defns = read(cmdLine_bin_defns)
-	parfor(i in 1:nrow(bin_defns), check=0){
-		attr_index_bin = castAsScalar(bin_defns[i,1])
-		attrinfo[attr_index_bin,3] = bin_defns[i,4]
-		attrinfo[attr_index_bin,4] = bin_defns[i,2]
-		attrinfo[attr_index_bin,5] = bin_defns[i,3]
-	}
-}
-
-if(cmdLine_dummy_code_maps != " "){
-	dummy_code_maps = read(cmdLine_dummy_code_maps)
-	parfor(i in 1:nrow(dummy_code_maps), check=0){
-		attr_index_dc = castAsScalar(dummy_code_maps[i,1])
-		attrinfo[attr_index_dc,6] = dummy_code_maps[i,2]
-		attrinfo[attr_index_dc,7] = dummy_code_maps[i,3]
-	}
-}else{
-	attrinfo[,6] = seq(1, ncol(X), 1)
-	attrinfo[,7] = seq(1, ncol(X), 1)
-}
-
-if(cmdLine_normalization_maps != " "){
-	normalization_map = read(cmdLine_normalization_maps)
-	parfor(i in 1:nrow(normalization_map), check=0){
-		attr_index_normalization = castAsScalar(normalization_map[i,1])
-		attrinfo[attr_index_normalization,8] = 1
-		attrinfo[attr_index_normalization,9] = castAsScalar(normalization_map[i,2])
-		attrinfo[attr_index_normalization,10] = castAsScalar(normalization_map[i,3])
-	}
-}
-
-#write(attrinfo, "binning/attrinfo.mtx", format="csv")
-
-cols_in_transformed_X = castAsScalar(attrinfo[nrow(attrinfo),6])
-new_X = matrix(0, rows=nrow(X), cols=cols_in_transformed_X)
-log = matrix(0, rows=ncol(X), cols=2)
-parfor(i in 1:ncol(X), check=0){
-	col = X[,i]
-	
-	mv_col_id = castAsScalar(attrinfo[i,1])
-	global_mean = castAsScalar(attrinfo[i,2])
-	num_bins = castAsScalar(attrinfo[i,3])
-	bin_width = castAsScalar(attrinfo[i,4])
-	min_val = castAsScalar(attrinfo[i,5])
-	dummy_coding_beg_col = castAsScalar(attrinfo[i,6])
-	dummy_coding_end_col = castAsScalar(attrinfo[i,7])
-	normalization_needed = castAsScalar(attrinfo[i,8])
-	normalization_mean = castAsScalar(attrinfo[i,9])
-	normalization_std = castAsScalar(attrinfo[i,10])
-	
-	if(mv_col_id > 0){ 
-		# fill-in with global mean
-		col = col + missing_indicator_mat[,mv_col_id] * global_mean
-	}
-	
-	if(num_bins > 0){
-		# only for equiwidth bins
-	
-		# note that max_val entries will get assigned num_bins+1
-		col = round((col - min_val)/bin_width - 0.5) + 1
-		less_than_lb = ppred(col, 1, "<")
-		more_than_ub = ppred(col, num_bins, ">")
-		
-		col = (1 - less_than_lb - more_than_ub)*col + more_than_ub*num_bins + less_than_lb
-	}
-
-	if(dummy_coding_beg_col == dummy_coding_end_col){
-		if(normalization_needed == 1){
-			if(normalization_std == -1) col = col - normalization_mean
-			else col = (col - normalization_mean)/normalization_std
-		}
-		
-		new_X[,dummy_coding_beg_col] = col
-	}else{
-		min_val = min(col)
-		max_val = max(col)
-		if(min_val >= 1 & max_val <= dummy_coding_end_col - dummy_coding_beg_col + 1){
-			res = table(seq(1, nrow(X), 1), col, nrow(X), (dummy_coding_end_col-dummy_coding_beg_col+1))
-			new_X[,dummy_coding_beg_col:dummy_coding_end_col] = res
-		}else{
-			log[i,1] = 1
-			if(min_val < 1) log[i,2] = min_val
-			else log[i,2] = max_val
-		}
-	}
-}
-
-write(new_X, $transformed_X, format="text")
-
-s = "Warning Messages"
-for(i in 1:nrow(log)){
-	if(castAsScalar(log[i,1]) == 1)
-		s = append(s, "Unseen value in column " + i + " (" + castAsScalar(log[i,2]) + ")")
-}
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+cmdLine_missing_value_maps = ifdef($missing_value_maps, " ")
+cmdLine_bin_defns = ifdef($bin_defns, " ")
+cmdLine_dummy_code_maps = ifdef($dummy_code_maps, " ")
+cmdLine_normalization_maps = ifdef($normalization_maps, " ")
+
+original_X = read($X)
+
+if(cmdLine_missing_value_maps != " "){
+	missing_val_maps = read(cmdLine_missing_value_maps)
+
+	last_data_col = ncol(original_X)-nrow(missing_val_maps)
+	X = original_X[,1:last_data_col]
+}else
+	X = original_X
+
+# col 1: col index of missing indicator col
+#		 0 otherwise
+# col 2: global mean if imputation is needed
+# col 3: num_bins if binning is required
+# col 4: bin width if binning is required
+# col 5: min val if binning is required
+# col 6: begin col if dummy coding is required
+# col 7: end col if dummy coding is required
+# col 8: 1 if normalization is required 0 ow
+# col 9: mean for normalization
+# col 10: std for z-scoring for normalization
+#		 -1 indicates mean subtraction  
+attrinfo = matrix(0, rows=ncol(X), cols=10)
+
+if(cmdLine_missing_value_maps != " "){
+	missing_indicator_mat = original_X[,(last_data_col+1):ncol(original_X)]
+	
+	parfor(i in 1:nrow(missing_val_maps), check=0){
+		attr_index_mv = castAsScalar(missing_val_maps[i,1])
+		attrinfo[attr_index_mv,1] = i
+		attrinfo[attr_index_mv,2] = missing_val_maps[i,2]
+	}	
+}
+ 	
+if(cmdLine_bin_defns != " "){
+	bin_defns = read(cmdLine_bin_defns)
+	parfor(i in 1:nrow(bin_defns), check=0){
+		attr_index_bin = castAsScalar(bin_defns[i,1])
+		attrinfo[attr_index_bin,3] = bin_defns[i,4]
+		attrinfo[attr_index_bin,4] = bin_defns[i,2]
+		attrinfo[attr_index_bin,5] = bin_defns[i,3]
+	}
+}
+
+if(cmdLine_dummy_code_maps != " "){
+	dummy_code_maps = read(cmdLine_dummy_code_maps)
+	parfor(i in 1:nrow(dummy_code_maps), check=0){
+		attr_index_dc = castAsScalar(dummy_code_maps[i,1])
+		attrinfo[attr_index_dc,6] = dummy_code_maps[i,2]
+		attrinfo[attr_index_dc,7] = dummy_code_maps[i,3]
+	}
+}else{
+	attrinfo[,6] = seq(1, ncol(X), 1)
+	attrinfo[,7] = seq(1, ncol(X), 1)
+}
+
+if(cmdLine_normalization_maps != " "){
+	normalization_map = read(cmdLine_normalization_maps)
+	parfor(i in 1:nrow(normalization_map), check=0){
+		attr_index_normalization = castAsScalar(normalization_map[i,1])
+		attrinfo[attr_index_normalization,8] = 1
+		attrinfo[attr_index_normalization,9] = castAsScalar(normalization_map[i,2])
+		attrinfo[attr_index_normalization,10] = castAsScalar(normalization_map[i,3])
+	}
+}
+
+#write(attrinfo, "binning/attrinfo.mtx", format="csv")
+
+cols_in_transformed_X = castAsScalar(attrinfo[nrow(attrinfo),6])
+new_X = matrix(0, rows=nrow(X), cols=cols_in_transformed_X)
+log = matrix(0, rows=ncol(X), cols=2)
+parfor(i in 1:ncol(X), check=0){
+	col = X[,i]
+	
+	mv_col_id = castAsScalar(attrinfo[i,1])
+	global_mean = castAsScalar(attrinfo[i,2])
+	num_bins = castAsScalar(attrinfo[i,3])
+	bin_width = castAsScalar(attrinfo[i,4])
+	min_val = castAsScalar(attrinfo[i,5])
+	dummy_coding_beg_col = castAsScalar(attrinfo[i,6])
+	dummy_coding_end_col = castAsScalar(attrinfo[i,7])
+	normalization_needed = castAsScalar(attrinfo[i,8])
+	normalization_mean = castAsScalar(attrinfo[i,9])
+	normalization_std = castAsScalar(attrinfo[i,10])
+	
+	if(mv_col_id > 0){ 
+		# fill-in with global mean
+		col = col + missing_indicator_mat[,mv_col_id] * global_mean
+	}
+	
+	if(num_bins > 0){
+		# only for equiwidth bins
+	
+		# note that max_val entries will get assigned num_bins+1
+		col = round((col - min_val)/bin_width - 0.5) + 1
+		less_than_lb = ppred(col, 1, "<")
+		more_than_ub = ppred(col, num_bins, ">")
+		
+		col = (1 - less_than_lb - more_than_ub)*col + more_than_ub*num_bins + less_than_lb
+	}
+
+	if(dummy_coding_beg_col == dummy_coding_end_col){
+		if(normalization_needed == 1){
+			if(normalization_std == -1) col = col - normalization_mean
+			else col = (col - normalization_mean)/normalization_std
+		}
+		
+		new_X[,dummy_coding_beg_col] = col
+	}else{
+		min_val = min(col)
+		max_val = max(col)
+		if(min_val >= 1 & max_val <= dummy_coding_end_col - dummy_coding_beg_col + 1){
+			res = table(seq(1, nrow(X), 1), col, nrow(X), (dummy_coding_end_col-dummy_coding_beg_col+1))
+			new_X[,dummy_coding_beg_col:dummy_coding_end_col] = res
+		}else{
+			log[i,1] = 1
+			if(min_val < 1) log[i,2] = min_val
+			else log[i,2] = max_val
+		}
+	}
+}
+
+write(new_X, $transformed_X, format="text")
+
+s = "Warning Messages"
+for(i in 1:nrow(log)){
+	if(castAsScalar(log[i,1]) == 1)
+		s = append(s, "Unseen value in column " + i + " (" + castAsScalar(log[i,2]) + ")")
+}
 write(s, $Log)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/test/scripts/applications/apply-transform/apply-transform.pydml
----------------------------------------------------------------------
diff --git a/src/test/scripts/applications/apply-transform/apply-transform.pydml b/src/test/scripts/applications/apply-transform/apply-transform.pydml
index be04495..f6c40dd 100644
--- a/src/test/scripts/applications/apply-transform/apply-transform.pydml
+++ b/src/test/scripts/applications/apply-transform/apply-transform.pydml
@@ -1,146 +1,146 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-cmdLine_missing_value_maps = ifdef($missing_value_maps, " ")
-cmdLine_bin_defns = ifdef($bin_defns, " ")
-cmdLine_dummy_code_maps = ifdef($dummy_code_maps, " ")
-cmdLine_normalization_maps = ifdef($normalization_maps, " ")
-
-original_X = load($X)
-
-if(cmdLine_missing_value_maps != " "):
-    missing_val_maps = read(cmdLine_missing_value_maps)
-
-    last_data_col = ncol(original_X)-nrow(missing_val_maps)
-    X = original_X[,1:last_data_col]
-else:
-    X = original_X
-
-# col 1: col index of missing indicator col
-#         0 otherwise
-# col 2: global mean if imputation is needed
-# col 3: num_bins if binning is required
-# col 4: bin width if binning is required
-# col 5: min val if binning is required
-# col 6: begin col if dummy coding is required
-# col 7: end col if dummy coding is required
-# col 8: 1 if normalization is required 0 ow
-# col 9: mean for normalization
-# col 10: std for z-scoring for normalization
-#         -1 indicates mean subtraction  
-attrinfo = full(0, rows=ncol(X), cols=10)
-
-if(cmdLine_missing_value_maps != " "):
-    missing_indicator_mat = original_X[,(last_data_col+1):ncol(original_X)]
-    
-    parfor(i in 1:nrow(missing_val_maps), check=0):
-        attr_index_mv = castAsScalar(missing_val_maps[i,1])
-        attrinfo[attr_index_mv,1] = i
-        attrinfo[attr_index_mv,2] = missing_val_maps[i,2]
-    
-if(cmdLine_bin_defns != " "):
-    bin_defns = read(cmdLine_bin_defns)
-    parfor(i in 1:nrow(bin_defns), check=0):
-        attr_index_bin = castAsScalar(bin_defns[i,1])
-        attrinfo[attr_index_bin,3] = bin_defns[i,4]
-        attrinfo[attr_index_bin,4] = bin_defns[i,2]
-        attrinfo[attr_index_bin,5] = bin_defns[i,3]
-
-if(cmdLine_dummy_code_maps != " "):
-    dummy_code_maps = read(cmdLine_dummy_code_maps)
-    parfor(i in 1:nrow(dummy_code_maps), check=0):
-        attr_index_dc = castAsScalar(dummy_code_maps[i,1])
-        attrinfo[attr_index_dc,6] = dummy_code_maps[i,2]
-        attrinfo[attr_index_dc,7] = dummy_code_maps[i,3]
-else:
-    attrinfo[,6] = seq(1, ncol(X), 1)
-    attrinfo[,7] = seq(1, ncol(X), 1)
-
-if(cmdLine_normalization_maps != " "):
-    normalization_map = read(cmdLine_normalization_maps)
-    parfor(i in 1:nrow(normalization_map), check=0):
-        attr_index_normalization = castAsScalar(normalization_map[i,1])
-        attrinfo[attr_index_normalization,8] = 1
-        attrinfo[attr_index_normalization,9] = castAsScalar(normalization_map[i,2])
-        attrinfo[attr_index_normalization,10] = castAsScalar(normalization_map[i,3])
-
-#write(attrinfo, "binning/attrinfo.mtx", format="csv")
-
-cols_in_transformed_X = castAsScalar(attrinfo[nrow(attrinfo),6])
-new_X = full(0, rows=nrow(X), cols=cols_in_transformed_X)
-log = full(0, rows=ncol(X), cols=2)
-parfor(i in 1:ncol(X), check=0):
-    col = X[,i]
-    
-    mv_col_id = castAsScalar(attrinfo[i,1])
-    global_mean = castAsScalar(attrinfo[i,2])
-    num_bins = castAsScalar(attrinfo[i,3])
-    bin_width = castAsScalar(attrinfo[i,4])
-    min_val = castAsScalar(attrinfo[i,5])
-    dummy_coding_beg_col = castAsScalar(attrinfo[i,6])
-    dummy_coding_end_col = castAsScalar(attrinfo[i,7])
-    normalization_needed = castAsScalar(attrinfo[i,8])
-    normalization_mean = castAsScalar(attrinfo[i,9])
-    normalization_std = castAsScalar(attrinfo[i,10])
-    
-    if(mv_col_id > 0):
-        # fill-in with global mean
-        col = col + missing_indicator_mat[,mv_col_id] * global_mean
-    
-    if(num_bins > 0):
-        # only for equiwidth bins
-    
-        # note that max_val entries will get assigned num_bins+1
-        col = round((col - min_val)/bin_width - 0.5) + 1
-        less_than_lb = ppred(col, 1, "<")
-        more_than_ub = ppred(col, num_bins, ">")
-        
-        col = (1 - less_than_lb - more_than_ub)*col + more_than_ub*num_bins + less_than_lb
-
-    if(dummy_coding_beg_col == dummy_coding_end_col):
-        if(normalization_needed == 1):
-            if(normalization_std == -1):
-                col = col - normalization_mean
-            else:
-                col = (col - normalization_mean)/normalization_std
-        
-        new_X[,dummy_coding_beg_col] = col
-    else:
-        min_val = min(col)
-        max_val = max(col)
-        if(min_val >= 1 & max_val <= dummy_coding_end_col - dummy_coding_beg_col + 1):
-            res = table(seq(1, nrow(X), 1), col, nrow(X), (dummy_coding_end_col-dummy_coding_beg_col+1))
-            new_X[,dummy_coding_beg_col:dummy_coding_end_col] = res
-        else:
-            log[i,1] = 1
-            if(min_val < 1):
-                log[i,2] = min_val
-            else:
-                log[i,2] = max_val
-
-save(new_X, $transformed_X, format="text")
-
-s = "Warning Messages"
-for(i in 1:nrow(log)):
-    if(castAsScalar(log[i,1]) == 1):
-        s = append(s, "Unseen value in column " + i + " (" + castAsScalar(log[i,2]) + ")")
-
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+cmdLine_missing_value_maps = ifdef($missing_value_maps, " ")
+cmdLine_bin_defns = ifdef($bin_defns, " ")
+cmdLine_dummy_code_maps = ifdef($dummy_code_maps, " ")
+cmdLine_normalization_maps = ifdef($normalization_maps, " ")
+
+original_X = load($X)
+
+if(cmdLine_missing_value_maps != " "):
+    missing_val_maps = read(cmdLine_missing_value_maps)
+
+    last_data_col = ncol(original_X)-nrow(missing_val_maps)
+    X = original_X[,1:last_data_col]
+else:
+    X = original_X
+
+# col 1: col index of missing indicator col
+#         0 otherwise
+# col 2: global mean if imputation is needed
+# col 3: num_bins if binning is required
+# col 4: bin width if binning is required
+# col 5: min val if binning is required
+# col 6: begin col if dummy coding is required
+# col 7: end col if dummy coding is required
+# col 8: 1 if normalization is required 0 ow
+# col 9: mean for normalization
+# col 10: std for z-scoring for normalization
+#         -1 indicates mean subtraction  
+attrinfo = full(0, rows=ncol(X), cols=10)
+
+if(cmdLine_missing_value_maps != " "):
+    missing_indicator_mat = original_X[,(last_data_col+1):ncol(original_X)]
+    
+    parfor(i in 1:nrow(missing_val_maps), check=0):
+        attr_index_mv = castAsScalar(missing_val_maps[i,1])
+        attrinfo[attr_index_mv,1] = i
+        attrinfo[attr_index_mv,2] = missing_val_maps[i,2]
+    
+if(cmdLine_bin_defns != " "):
+    bin_defns = read(cmdLine_bin_defns)
+    parfor(i in 1:nrow(bin_defns), check=0):
+        attr_index_bin = castAsScalar(bin_defns[i,1])
+        attrinfo[attr_index_bin,3] = bin_defns[i,4]
+        attrinfo[attr_index_bin,4] = bin_defns[i,2]
+        attrinfo[attr_index_bin,5] = bin_defns[i,3]
+
+if(cmdLine_dummy_code_maps != " "):
+    dummy_code_maps = read(cmdLine_dummy_code_maps)
+    parfor(i in 1:nrow(dummy_code_maps), check=0):
+        attr_index_dc = castAsScalar(dummy_code_maps[i,1])
+        attrinfo[attr_index_dc,6] = dummy_code_maps[i,2]
+        attrinfo[attr_index_dc,7] = dummy_code_maps[i,3]
+else:
+    attrinfo[,6] = seq(1, ncol(X), 1)
+    attrinfo[,7] = seq(1, ncol(X), 1)
+
+if(cmdLine_normalization_maps != " "):
+    normalization_map = read(cmdLine_normalization_maps)
+    parfor(i in 1:nrow(normalization_map), check=0):
+        attr_index_normalization = castAsScalar(normalization_map[i,1])
+        attrinfo[attr_index_normalization,8] = 1
+        attrinfo[attr_index_normalization,9] = castAsScalar(normalization_map[i,2])
+        attrinfo[attr_index_normalization,10] = castAsScalar(normalization_map[i,3])
+
+#write(attrinfo, "binning/attrinfo.mtx", format="csv")
+
+cols_in_transformed_X = castAsScalar(attrinfo[nrow(attrinfo),6])
+new_X = full(0, rows=nrow(X), cols=cols_in_transformed_X)
+log = full(0, rows=ncol(X), cols=2)
+parfor(i in 1:ncol(X), check=0):
+    col = X[,i]
+    
+    mv_col_id = castAsScalar(attrinfo[i,1])
+    global_mean = castAsScalar(attrinfo[i,2])
+    num_bins = castAsScalar(attrinfo[i,3])
+    bin_width = castAsScalar(attrinfo[i,4])
+    min_val = castAsScalar(attrinfo[i,5])
+    dummy_coding_beg_col = castAsScalar(attrinfo[i,6])
+    dummy_coding_end_col = castAsScalar(attrinfo[i,7])
+    normalization_needed = castAsScalar(attrinfo[i,8])
+    normalization_mean = castAsScalar(attrinfo[i,9])
+    normalization_std = castAsScalar(attrinfo[i,10])
+    
+    if(mv_col_id > 0):
+        # fill-in with global mean
+        col = col + missing_indicator_mat[,mv_col_id] * global_mean
+    
+    if(num_bins > 0):
+        # only for equiwidth bins
+    
+        # note that max_val entries will get assigned num_bins+1
+        col = round((col - min_val)/bin_width - 0.5) + 1
+        less_than_lb = ppred(col, 1, "<")
+        more_than_ub = ppred(col, num_bins, ">")
+        
+        col = (1 - less_than_lb - more_than_ub)*col + more_than_ub*num_bins + less_than_lb
+
+    if(dummy_coding_beg_col == dummy_coding_end_col):
+        if(normalization_needed == 1):
+            if(normalization_std == -1):
+                col = col - normalization_mean
+            else:
+                col = (col - normalization_mean)/normalization_std
+        
+        new_X[,dummy_coding_beg_col] = col
+    else:
+        min_val = min(col)
+        max_val = max(col)
+        if(min_val >= 1 & max_val <= dummy_coding_end_col - dummy_coding_beg_col + 1):
+            res = table(seq(1, nrow(X), 1), col, nrow(X), (dummy_coding_end_col-dummy_coding_beg_col+1))
+            new_X[,dummy_coding_beg_col:dummy_coding_end_col] = res
+        else:
+            log[i,1] = 1
+            if(min_val < 1):
+                log[i,2] = min_val
+            else:
+                log[i,2] = max_val
+
+save(new_X, $transformed_X, format="text")
+
+s = "Warning Messages"
+for(i in 1:nrow(log)):
+    if(castAsScalar(log[i,1]) == 1):
+        s = append(s, "Unseen value in column " + i + " (" + castAsScalar(log[i,2]) + ")")
+
 save(s, $Log)
\ No newline at end of file