You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/06/08 19:24:06 UTC

[2/6] systemml git commit: [SYSTEMML-1300] Remove file-based transform from compiler/runtime

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
index 9e30f5c..7743b61 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
@@ -19,40 +19,13 @@
 
 package org.apache.sysml.runtime.transform;
 
-import java.io.EOFException;
-import java.io.IOException;
 import java.io.Serializable;
-import java.util.Arrays;
 import java.util.regex.Pattern;
 
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.lops.Lop;
-import org.apache.sysml.parser.DataExpression;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.util.MapReduceTool;
-import org.apache.sysml.runtime.util.UtilFunctions;
 
-@SuppressWarnings("deprecation")
-public class TfUtils implements Serializable{
-	
+public class TfUtils implements Serializable
+{	
 	private static final long serialVersionUID = 526252850872633125L;
 
 	protected enum ColumnTypes { 
@@ -89,9 +62,7 @@ public class TfUtils implements Serializable{
 	
 	//transform meta data constants (old file-based transform)
 	public static final String TXMTD_SEP         = ",";
-	public static final String TXMTD_COLTYPES    = "coltypes.csv";	
 	public static final String TXMTD_COLNAMES    = "column.names";
-	public static final String TXMTD_DC_COLNAMES = "dummycoded.column.names";	
 	public static final String TXMTD_RCD_MAP_SUFFIX      = ".map";
 	public static final String TXMTD_RCD_DISTINCT_SUFFIX = ".ndistinct";
 	public static final String TXMTD_BIN_FILE_SUFFIX     = ".bin";
@@ -101,184 +72,14 @@ public class TfUtils implements Serializable{
 	public static final String JSON_MTHD 	= "methods"; 
 	public static final String JSON_CONSTS = "constants"; 
 	public static final String JSON_NBINS 	= "numbins"; 		
-	protected static final String MODE_FILE_SUFFIX 		= ".mode";
-	protected static final String SCALE_FILE_SUFFIX		= ".scale";
-	protected static final String DCD_FILE_NAME 		= "dummyCodeMaps.csv";	
-	protected static final String DCD_NAME_SEP 	= "_";
-	
-	
-	private OmitAgent _oa = null;
-	private MVImputeAgent _mia = null;
-	private RecodeAgent _ra = null;	
-	private BinAgent _ba = null;
-	private DummycodeAgent _da = null;
-	
-	private long _numRecordsInPartFile;		// Total number of records in the data file
-	private long _numValidRecords;			// (_numRecordsInPartFile - #of omitted records)
-	private long _numTransformedRows; 		// Number of rows after applying transformations
-	private long _numTransformedColumns; 	// Number of columns after applying transformations
 
 	private String _headerLine = null;
 	private boolean _hasHeader;
 	private Pattern _delim = null;
 	private String _delimString = null;
 	private String[] _NAstrings = null;
-	private String[] _outputColumnNames = null;
 	private int _numInputCols = -1;
 	
-	private String _tfMtdDir = null;
-	private String _spec = null;
-	private String _offsetFile = null;
-	private String _tmpDir = null;
-	private String _outputPath = null;
-	
-	public TfUtils(JobConf job, boolean minimal) 
-		throws IOException, JSONException 
-	{
-		if( !InfrastructureAnalyzer.isLocalMode(job) ) {
-			ConfigurationManager.setCachedJobConf(job);
-		}		
-		_NAstrings = TfUtils.parseNAStrings(job);
-		_spec = job.get(MRJobConfiguration.TF_SPEC);
-		_oa = new OmitAgent(new JSONObject(_spec), null, -1);
-	}
-	
-	// called from GenTFMtdMapper, ApplyTf (Hadoop)
-	public TfUtils(JobConf job) 
-		throws IOException, JSONException 
-	{
-		if( !InfrastructureAnalyzer.isLocalMode(job) ) {
-			ConfigurationManager.setCachedJobConf(job);
-		}
-		
-		boolean hasHeader = Boolean.parseBoolean(job.get(MRJobConfiguration.TF_HAS_HEADER));
-		String[] naStrings = TfUtils.parseNAStrings(job);
-		long numCols = UtilFunctions.parseToLong( job.get(MRJobConfiguration.TF_NUM_COLS) ); // #cols input data
-		String spec = job.get(MRJobConfiguration.TF_SPEC);
-		String offsetFile = job.get(MRJobConfiguration.TF_OFFSETS_FILE);
-		String tmpPath = job.get(MRJobConfiguration.TF_TMP_LOC);
-		String outputPath = FileOutputFormat.getOutputPath(job).toString();
-		JSONObject jspec = new JSONObject(spec);
-		
-		init(job.get(MRJobConfiguration.TF_HEADER), hasHeader, job.get(MRJobConfiguration.TF_DELIM), naStrings, jspec, numCols, offsetFile, tmpPath, outputPath);
-	}
-	
-	// called from GenTfMtdReducer 
-	public TfUtils(JobConf job, String tfMtdDir) throws IOException, JSONException 
-	{
-		this(job);
-		_tfMtdDir = tfMtdDir;
-	}
-	
-	// called from GenTFMtdReducer and ApplyTf (Spark)
-	public TfUtils(String headerLine, boolean hasHeader, String delim, String[] naStrings, JSONObject spec, long ncol, String tfMtdDir, String offsetFile, String tmpPath) throws IOException, JSONException {
-		init (headerLine, hasHeader, delim, naStrings, spec, ncol, offsetFile, tmpPath, null);
-		_tfMtdDir = tfMtdDir;
-	}
-
-	protected static boolean checkValidInputFile(FileSystem fs, Path path, boolean err)
-			throws IOException {
-		// check non-existing file
-		if (!fs.exists(path))
-			if ( err )
-				throw new IOException("File " + path.toString() + " does not exist on HDFS/LFS.");
-			else
-				return false;
-
-		// check for empty file
-		if( MapReduceTool.isFileEmpty(fs, path) )
-			if ( err )
-				throw new EOFException("Empty input file " + path.toString() + ".");
-			else
-				return false;
-		
-		return true;
-	}
-	
-	public static String getPartFileName(JobConf job) throws IOException {
-		Path path = new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE));
-		FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-		path = path.makeQualified(fs);
-		return path.toString();
-	}
-	
-	public static boolean isPartFileWithHeader(JobConf job) throws IOException {
-		String thisfile=getPartFileName(job);
-		Path path = new Path(job.get(MRJobConfiguration.TF_SMALLEST_FILE));
-		FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-		path = path.makeQualified(fs);
-		return thisfile.toString().equals(path.toString());
-	}
-	
-	/**
-	 * Prepare NA strings so that they can be sent to workers via JobConf.
-	 * A "dummy" string is added at the end to handle the case of empty strings.
-	 * @param na NA string
-	 * @return NA string concatenated with NA string separator concatenated with "dummy"
-	 */
-	public static String prepNAStrings(String na) {
-		return na  + DataExpression.DELIM_NA_STRING_SEP + "dummy";
-	}
-	
-	public static String[] parseNAStrings(String na) 
-	{
-		if ( na == null )
-			return null;
-		
-		String[] tmp = Pattern.compile(Pattern.quote(DataExpression.DELIM_NA_STRING_SEP)).split(na, -1);
-		return tmp; //Arrays.copyOf(tmp, tmp.length-1);
-	}
-	
-	public static String[] parseNAStrings(JobConf job) 
-	{
-		return parseNAStrings(job.get(MRJobConfiguration.TF_NA_STRINGS));
-	}
-	
-	private void createAgents(JSONObject spec, String[] naStrings) 
-		throws IOException, JSONException 
-	{
-		_oa = new OmitAgent(spec, _outputColumnNames, _numInputCols);
-		_mia = new MVImputeAgent(spec, null, naStrings, _numInputCols);
-		_ra = new RecodeAgent(spec, _outputColumnNames, _numInputCols);
-		_ba = new BinAgent(spec, _outputColumnNames, _numInputCols);
-		_da = new DummycodeAgent(spec, _outputColumnNames, _numInputCols);
-	}
-	
-	private void parseColumnNames() {
-		_outputColumnNames = _delim.split(_headerLine, -1);
-		for(int i=0; i < _outputColumnNames.length; i++)
-			_outputColumnNames[i] = UtilFunctions.unquote(_outputColumnNames[i]);
-	}
-	
-	private void init(String headerLine, boolean hasHeader, String delim, String[] naStrings, JSONObject spec, long numCols, String offsetFile, String tmpPath, String outputPath) throws IOException, JSONException
-	{
-		_numRecordsInPartFile = 0;
-		_numValidRecords = 0;
-		_numTransformedRows = 0;
-		_numTransformedColumns = 0;
-		
-		//TODO: fix hard-wired header propagation to meta data column names
-		
-		_headerLine = headerLine;
-		_hasHeader = hasHeader;
-		_delimString = delim;
-		_delim = Pattern.compile(Pattern.quote(delim));
-		_NAstrings = naStrings;
-		_numInputCols = (int)numCols;
-		_offsetFile = offsetFile;
-		_tmpDir = tmpPath;
-		_outputPath = outputPath;
-		
-		parseColumnNames();		
-		createAgents(spec, naStrings);
-	}
-	
-	public void incrValid() { _numValidRecords++; }
-	public long getValid()  { return _numValidRecords; }
-	public long getTotal()  { return _numRecordsInPartFile; }
-	public long getNumTransformedRows() 	{ return _numTransformedRows; }
-	public long getNumTransformedColumns() 	{ return _numTransformedColumns; }
-	
 	public String getHeader() 		{ return _headerLine; }
 	public boolean hasHeader() 		{ return _hasHeader; }
 	public String getDelimString() 	{ return _delimString; }
@@ -286,24 +87,6 @@ public class TfUtils implements Serializable{
 	public String[] getNAStrings() 	{ return _NAstrings; }
 	public long getNumCols() 		{ return _numInputCols; }
 	
-	public String getSpec() 	{ return _spec; }
-	public String getTfMtdDir() 	{ return _tfMtdDir; }
-	public String getOffsetFile() 	{ return _offsetFile; }
-	public String getTmpDir() 		{ return _tmpDir; }
-	public String getOutputPath()	{ return _outputPath; }
-	
-	public String getName(int colID) { return _outputColumnNames[colID-1]; }
-	
-	public void setValid(long n) { _numValidRecords = n;}
-	public void incrTotal() { _numRecordsInPartFile++; }
-	public void setTotal(long n) { _numRecordsInPartFile = n;}
-	
-	public OmitAgent 	  getOmitAgent() 	{ 	return _oa; }
-	public MVImputeAgent  getMVImputeAgent(){ 	return _mia;}
-	public RecodeAgent 	  getRecodeAgent() 	{ 	return _ra; }
-	public BinAgent 	  getBinAgent() 	{ 	return _ba; }
-	public DummycodeAgent getDummycodeAgent() { return _da; }
-	
 	/**
 	 * Function that checks if the given string is one of NA strings.
 	 * 
@@ -321,229 +104,4 @@ public class TfUtils implements Serializable{
 		}
 		return false;
 	}
-	
-	public String[] getWords(Text line) {
-		return getWords(line.toString());
-	}
-	
-
-	public String[] getWords(String line) {
-		return getDelim().split(line.trim(), -1);
-	}
-	
-	/**
-	 * Process a given row to construct transformation metadata.
-	 * 
-	 * @param line string to break into words
-	 * @return string array of words from the line
-	 * @throws IOException if IOException occurs
-	 */
-	public String[] prepareTfMtd(String line) throws IOException {
-		String[] words = getWords(line);
-		if(!getOmitAgent().omit(words, this))
-		{
-			getMVImputeAgent().prepare(words);
-			getRecodeAgent().prepare(words, this);
-			getBinAgent().prepare(words, this);
-			incrValid();
-		}
-		incrTotal();
-		
-		return words;
-	}
-	
-	public void loadTfMetadata() throws IOException 
-	{
-		JobConf job = ConfigurationManager.getCachedJobConf();
-		loadTfMetadata(job, false);
-	}
-	
-	public void loadTfMetadata(JobConf job, boolean fromLocalFS) throws IOException
-	{
-		Path tfMtdDir = null; 
-		FileSystem fs = null;
-		
-		if(fromLocalFS) {
-			// metadata must be read from local file system (e.g., distributed cache in the case of Hadoop)
-			tfMtdDir = (DistributedCache.getLocalCacheFiles(job))[0];
-			fs = FileSystem.getLocal(job);
-		}
-		else {
-			tfMtdDir = new Path(getTfMtdDir());
-			fs = IOUtilFunctions.getFileSystem(tfMtdDir, job);
-		}
-		
-		// load transformation metadata 
-		getMVImputeAgent().loadTxMtd(job, fs, tfMtdDir, this);
-		getRecodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
-		getBinAgent().loadTxMtd(job, fs, tfMtdDir, this);
-		
-		// associate recode maps and bin definitions with dummycoding agent,
-		// as recoded and binned columns are typically dummycoded
-		getDummycodeAgent().setRecodeMaps( getRecodeAgent().getRecodeMaps() );
-		getDummycodeAgent().setNumBins(getBinAgent().getColList(), getBinAgent().getNumBins());
-		getDummycodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
-
-	}
-
-	public String processHeaderLine() throws IOException 
-	{
-		//TODO: fix hard-wired header propagation to meta data column names
-		
-		FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf());
-		String dcdHeader = getDummycodeAgent().constructDummycodedHeader(getHeader(), getDelim());
-		getDummycodeAgent().genDcdMapsAndColTypes(fs, getTmpDir(), (int) getNumCols(), this);
-		
-		// write header information (before and after transformation) to temporary path
-		// these files are copied into txMtdPath, once the ApplyTf job is complete.
-		DataTransform.generateHeaderFiles(fs, getTmpDir(), getHeader(), dcdHeader);
-
-		return dcdHeader;
-		//_numTransformedColumns = getDelim().split(dcdHeader, -1).length; 
-		//return _numTransformedColumns;
-	}
-
-	public boolean omit(String[] words) {
-		if(getOmitAgent() == null)
-			return false;
-		return getOmitAgent().omit(words, this);
-	}
-	
-	/**
-	 * Function to apply transformation metadata on a given row.
-	 * 
-	 * @param words string array of words
-	 * @return string array of transformed words
-	 */
-	public String[] apply( String[] words ) {
-		words = getMVImputeAgent().apply(words);
-		words = getRecodeAgent().apply(words);
-		words = getBinAgent().apply(words);
-		words = getDummycodeAgent().apply(words);		
-		_numTransformedRows++;
-		
-		return words;
-	}
-	
-	public void check(String []words) throws DMLRuntimeException 
-	{
-		boolean checkEmptyString = ( getNAStrings() != null );
-		if ( checkEmptyString ) 
-		{
-			final String msg = "When na.strings are provided, empty string \"\" is considered as a missing value, and it must be imputed appropriately. Encountered an unhandled empty string in column ID: ";
-			for(int i=0; i<words.length; i++) 
-				if ( words[i] != null && words[i].equals(""))
-					throw new DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1));
-		}
-	}
-	
-	public String checkAndPrepOutputString(String []words) throws DMLRuntimeException {
-		return checkAndPrepOutputString(words, new StringBuilder());
-	}
-	
-	public String checkAndPrepOutputString(String []words, StringBuilder sb) throws DMLRuntimeException 
-	{
-		/*
-		 * Check if empty strings ("") have to be handled.
-		 * 
-		 * Unless na.strings are provided, empty strings are (implicitly) considered as value zero.
-		 * When na.strings are provided, then "" is considered a missing value indicator, and the 
-		 * user is expected to provide an appropriate imputation method. Therefore, when na.strings 
-		 * are provided, "" encountered in any column (after all transformations are applied) 
-		 * denotes an erroneous condition.  
-		 */
-		boolean checkEmptyString = ( getNAStrings() != null ); //&& !MVImputeAgent.isNA("", TransformationAgent.NAstrings) ) {
-		
-		//StringBuilder sb = new StringBuilder();
-		sb.setLength(0);
-		int i =0;
-		
-		if ( checkEmptyString ) 
-		{
-			final String msg = "When na.strings are provided, empty string \"\" is considered as a missing value, and it must be imputed appropriately. Encountered an unhandled empty string in column ID: ";
-			if ( words[0] != null ) 
-				if ( words[0].equals("") )
-					throw new DMLRuntimeException( msg + getDummycodeAgent().mapDcdColumnID(1));
-				else 
-					sb.append(words[0]);
-			else
-				sb.append("0");
-			
-			for(i=1; i<words.length; i++) 
-			{
-				sb.append(_delimString);
-				
-				if ( words[i] != null ) 
-					if ( words[i].equals("") )
-						throw new DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1));
-					else 
-						sb.append(words[i]);
-				else
-					sb.append("0");
-			}
-		}
-		else 
-		{
-			sb.append(words[0] != null ? words[0] : "0");
-			for(i=1; i<words.length; i++) 
-			{
-				sb.append(_delimString);
-				sb.append(words[i] != null ? words[i] : "0");
-			}
-		}
-		
-		return sb.toString();
-	}
-
-	private Reader initOffsetsReader(JobConf job) throws IOException 
-	{
-		Path path=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
-		FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-		Path[] files = MatrixReader.getSequenceFilePaths(fs, path);
-		if ( files.length != 1 )
-			throw new IOException("Expecting a single file under counters file: " + path.toString());
-		
-		Reader reader = new SequenceFile.Reader(fs, files[0], job);
-		
-		return reader;
-	}
-	
-	/**
-	 * Function to generate custom file names (transform-part-.....) for
-	 * mappers' output for ApplyTfCSV job. The idea is to find the index 
-	 * of (thisfile, fileoffset) in the list of all offsets from the 
-	 * counters/offsets file, which was generated from either GenTfMtdMR
-	 * or AssignRowIDMR job.
-	 * 
-	 * @param job job configuration
-	 * @param offset file offset
-	 * @return part file id (ie, 00001, 00002, etc)
-	 * @throws IOException if IOException occurs
-	 */
-	public String getPartFileID(JobConf job, long offset) throws IOException
-	{
-		Reader reader = null;
-		int id = 0;
-		try {
-			reader = initOffsetsReader(job);
-			ByteWritable key=new ByteWritable();
-			OffsetCount value=new OffsetCount();
-			String thisFile = TfUtils.getPartFileName(job);
-			while (reader.next(key, value)) {
-				if ( thisFile.equals(value.filename) && value.fileOffset == offset ) 
-					break;
-				id++;
-			}
-		}
-		finally {
-			IOUtilFunctions.closeSilently(reader);
-		}
-		
-		String sid = Integer.toString(id);
-		char[] carr = new char[5-sid.length()];
-		Arrays.fill(carr, '0');
-		String ret = (new String(carr)).concat(sid);
-		
-		return ret;
-	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
index a3f01a1..304dcdb 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
@@ -19,20 +19,10 @@
 
 package org.apache.sysml.runtime.transform.encode;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.transform.DistinctValue;
-import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
 import org.apache.wink.json4j.JSONArray;
 
@@ -152,11 +142,4 @@ public abstract class Encoder implements Serializable
 	 * @param meta frame block
 	 */
 	public abstract void initMetaData(FrameBlock meta);
-	
-	
-	//OLD API: kept for a transition phase only
-	//TODO stage 2: refactor data and meta data IO into minimal set of ultility functions
-	abstract public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException;
-	abstract public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException;
-	abstract public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderBin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderBin.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderBin.java
new file mode 100644
index 0000000..fbe6994
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderBin.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.wink.json4j.JSONArray;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+import org.apache.sysml.lops.Lop;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class EncoderBin extends Encoder 
+{	
+	private static final long serialVersionUID = 1917445005206076078L;
+
+	public static final String MIN_PREFIX = "min";
+	public static final String MAX_PREFIX = "max";
+	public static final String NBINS_PREFIX = "nbins";
+
+	private int[] _numBins = null;
+	private double[] _min=null, _max=null;	// min and max among non-missing values
+	private double[] _binWidths = null;		// width of a bin for each attribute
+	
+	//frame transform-apply attributes
+	private double[][] _binMins = null;
+	private double[][] _binMaxs = null;
+	
+	public EncoderBin(JSONObject parsedSpec, String[] colnames, int clen) 
+		throws JSONException, IOException 
+	{
+		this(parsedSpec, colnames, clen, false);
+	}
+
+	public EncoderBin(JSONObject parsedSpec, String[] colnames, int clen, boolean colsOnly) 
+		throws JSONException, IOException 
+	{
+		super( null, clen );		
+		if ( !parsedSpec.containsKey(TfUtils.TXMETHOD_BIN) )
+			return;
+		
+		if( colsOnly ) {
+			List<Integer> collist = TfMetaUtils.parseBinningColIDs(parsedSpec, colnames);
+			initColList(ArrayUtils.toPrimitive(collist.toArray(new Integer[0])));
+		}
+		else 
+		{
+			JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_BIN);		
+			JSONArray attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS);
+			JSONArray nbins = (JSONArray) obj.get(TfUtils.JSON_NBINS);
+			initColList(attrs);
+			
+			_numBins = new int[attrs.size()];
+			for(int i=0; i < _numBins.length; i++)
+				_numBins[i] = UtilFunctions.toInt(nbins.get(i)); 
+			
+			// initialize internal transformation metadata
+			_min = new double[_colList.length];
+			Arrays.fill(_min, Double.MAX_VALUE);
+			_max = new double[_colList.length];
+			Arrays.fill(_max, -Double.MAX_VALUE);
+			
+			_binWidths = new double[_colList.length];
+		}
+	}
+
+	public void prepare(String[] words, TfUtils agents) {
+		if ( !isApplicable() )
+			return;
+		
+		for(int i=0; i <_colList.length; i++) {
+			int colID = _colList[i];
+			
+			String w = null;
+			double d = 0;
+				
+			// equi-width
+			w = UtilFunctions.unquote(words[colID-1].trim());
+			if(!TfUtils.isNA(agents.getNAStrings(),w)) {
+				d = UtilFunctions.parseToDouble(w);
+				if(d < _min[i])
+					_min[i] = d;
+				if(d > _max[i])
+					_max[i] = d;
+			}
+		}
+	}
+		
+	@Override
+	public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+		build(in);
+		return apply(in, out);
+	}
+
+	@Override
+	public void build(FrameBlock in) {
+		// nothing to do
+	}
+	
+	/**
+	 * Method to apply transformations.
+	 */
+	@Override
+	public String[] apply(String[] words) {
+		if( !isApplicable() )
+			return words;
+	
+		for(int i=0; i < _colList.length; i++) {
+			int colID = _colList[i];
+			try {
+				double val = UtilFunctions.parseToDouble(words[colID-1]);
+				int binid = 1;
+				double tmp = _min[i] + _binWidths[i];
+				while(val > tmp && binid < _numBins[i]) {
+					tmp += _binWidths[i];
+					binid++;
+				}
+				words[colID-1] = Integer.toString(binid);
+			} 
+			catch(NumberFormatException e) {
+				throw new RuntimeException("Encountered \"" + words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an appropriate imputation method.");
+			}
+		}
+		
+		return words;
+	}
+
+	@Override
+	public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
+		for(int j=0; j<_colList.length; j++) {
+			int colID = _colList[j];
+			for( int i=0; i<in.getNumRows(); i++ ) {
+				double inVal = UtilFunctions.objectToDouble(
+						in.getSchema()[colID-1], in.get(i, colID-1));
+				int ix = Arrays.binarySearch(_binMaxs[j], inVal);
+				int binID = ((ix < 0) ? Math.abs(ix+1) : ix) + 1;		
+				out.quickSetValue(i, colID-1, binID);
+			}	
+		}
+		return out;
+	}
+
+	@Override
+	public FrameBlock getMetaData(FrameBlock meta) {
+		return meta;
+	}
+	
+	@Override
+	public void initMetaData(FrameBlock meta) {
+		_binMins = new double[_colList.length][];
+		_binMaxs = new double[_colList.length][];
+		for( int j=0; j<_colList.length; j++ ) {
+			int colID = _colList[j]; //1-based
+			int nbins = (int)meta.getColumnMetadata()[colID-1].getNumDistinct();
+			_binMins[j] = new double[nbins];
+			_binMaxs[j] = new double[nbins];
+			for( int i=0; i<nbins; i++ ) {
+				String[] tmp = meta.get(i, colID-1).toString().split(Lop.DATATYPE_PREFIX);
+				_binMins[j][i] = Double.parseDouble(tmp[0]);
+				_binMaxs[j][i] = Double.parseDouble(tmp[1]);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
index 9efbc19..deff887 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
@@ -19,20 +19,11 @@
 
 package org.apache.sysml.runtime.transform.encode;
 
-import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.transform.DistinctValue;
-import org.apache.sysml.runtime.transform.TfUtils;
 
 /**
  * Simple composite encoder that applies a list of encoders 
@@ -90,7 +81,6 @@ public class EncoderComposite extends Encoder
 			encoder.build(in);
 	}
 
-
 	@Override
 	public String[] apply(String[] in) {
 		for( Encoder encoder : _encoders )
@@ -119,19 +109,4 @@ public class EncoderComposite extends Encoder
 		for( Encoder encoder : _encoders )
 			encoder.initMetaData(out);
 	}
-
-	@Override
-	public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException {
-		throw new RuntimeException("File-based api not supported.");
-	}
-
-	@Override
-	public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
-		throw new RuntimeException("File-based api not supported.");	
-	}
-
-	@Override
-	public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException {
-		throw new RuntimeException("File-based api not supported.");
-	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderDummycode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderDummycode.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderDummycode.java
new file mode 100644
index 0000000..743381a
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderDummycode.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+public class EncoderDummycode extends Encoder 
+{		
+	private static final long serialVersionUID = 5832130477659116489L;
+
+	private int[] _domainSizes = null;			// length = #of dummycoded columns
+	private long _dummycodedLength = 0;			// #of columns after dummycoded
+
+	public EncoderDummycode(JSONObject parsedSpec, String[] colnames, int clen) throws JSONException {
+		super(null, clen);
+		
+		if ( parsedSpec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) {
+			int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, colnames, TfUtils.TXMETHOD_DUMMYCODE);
+			initColList(collist);
+		}
+	}
+	
+	@Override
+	public int getNumCols() {
+		return (int)_dummycodedLength;
+	}
+	
+	@Override
+	public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+		return apply(in, out);
+	}
+
+	@Override
+	public void build(FrameBlock in) {
+		//do nothing
+	}
+	
+	/**
+	 * Method to apply transformations.
+	 * 
+	 * @param words array of strings
+	 * @return array of transformed strings
+	 */
+	@Override
+	public String[] apply(String[] words) 
+	{
+		if( !isApplicable() )
+			return words;
+		
+		String[] nwords = new String[(int)_dummycodedLength];
+		int rcdVal = 0;
+		
+		for(int colID=1, idx=0, ncolID=1; colID <= words.length; colID++) {
+			if(idx < _colList.length && colID==_colList[idx]) {
+				// dummycoded columns
+				try {
+					rcdVal = UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1]));
+					nwords[ ncolID-1+rcdVal-1 ] = "1";
+					ncolID += _domainSizes[idx];
+					idx++;
+				} 
+				catch (Exception e) {
+					throw new RuntimeException("Error in dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] 
+							+ ", domainSize=" + _domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength);
+				}
+			}
+			else {
+				nwords[ncolID-1] = words[colID-1];
+				ncolID++;
+			}
+		}
+		
+		return nwords;
+	}
+	
+	@Override
+	public MatrixBlock apply(FrameBlock in, MatrixBlock out) 
+	{
+		MatrixBlock ret = new MatrixBlock(out.getNumRows(), (int)_dummycodedLength, false);
+		
+		for( int i=0; i<out.getNumRows(); i++ ) {
+			for(int colID=1, idx=0, ncolID=1; colID <= out.getNumColumns(); colID++) {
+				double val = out.quickGetValue(i, colID-1);
+				if(idx < _colList.length && colID==_colList[idx]) {
+					ret.quickSetValue(i, ncolID-1+(int)val-1, 1);
+					ncolID += _domainSizes[idx];
+					idx++;
+				}
+				else {
+					double ptval = UtilFunctions.objectToDouble(in.getSchema()[colID-1], in.get(i, colID-1));
+					ret.quickSetValue(i, ncolID-1, ptval);
+					ncolID++;
+				}
+			}
+		}
+		
+		return ret;
+	}
+
+	@Override
+	public FrameBlock getMetaData(FrameBlock out) {
+		return out;
+	}
+	
+	@Override
+	public void initMetaData(FrameBlock meta) {
+		//initialize domain sizes and output num columns
+		_domainSizes = new int[_colList.length];
+		_dummycodedLength = _clen;
+		for( int j=0; j<_colList.length; j++ ) {
+			int colID = _colList[j]; //1-based
+			_domainSizes[j] = (int)meta.getColumnMetadata()[colID-1].getNumDistinct();
+			_dummycodedLength +=  _domainSizes[j]-1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
index f7ceefd..13b2810 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
@@ -28,11 +28,6 @@ import org.apache.commons.lang.ArrayUtils;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
-import org.apache.sysml.runtime.transform.BinAgent;
-import org.apache.sysml.runtime.transform.DummycodeAgent;
-import org.apache.sysml.runtime.transform.MVImputeAgent;
-import org.apache.sysml.runtime.transform.OmitAgent;
-import org.apache.sysml.runtime.transform.RecodeAgent;
 import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
@@ -40,7 +35,6 @@ import org.apache.wink.json4j.JSONObject;
 
 public class EncoderFactory 
 {
-
 	public static Encoder createEncoder(String spec, String[] colnames, int clen, FrameBlock meta) throws DMLRuntimeException {
 		return createEncoder(spec, colnames, UtilFunctions.nCopies(clen, ValueType.STRING), meta);
 	}
@@ -79,7 +73,7 @@ public class EncoderFactory
 			
 			//create individual encoders
 			if( !rcIDs.isEmpty() ) {
-				RecodeAgent ra = new RecodeAgent(jSpec, colnames, clen);
+				EncoderRecode ra = new EncoderRecode(jSpec, colnames, clen);
 				ra.setColList(ArrayUtils.toPrimitive(rcIDs.toArray(new Integer[0])));
 				lencoders.add(ra);	
 			}
@@ -87,13 +81,13 @@ public class EncoderFactory
 				lencoders.add(new EncoderPassThrough(
 						ArrayUtils.toPrimitive(ptIDs.toArray(new Integer[0])), clen));	
 			if( !dcIDs.isEmpty() )
-				lencoders.add(new DummycodeAgent(jSpec, colnames, schema.length));
+				lencoders.add(new EncoderDummycode(jSpec, colnames, schema.length));
 			if( !binIDs.isEmpty() )
-				lencoders.add(new BinAgent(jSpec, colnames, schema.length, true));
+				lencoders.add(new EncoderBin(jSpec, colnames, schema.length, true));
 			if( !oIDs.isEmpty() )
-				lencoders.add(new OmitAgent(jSpec, colnames, schema.length));
+				lencoders.add(new EncoderOmit(jSpec, colnames, schema.length));
 			if( !mvIDs.isEmpty() ) {
-				MVImputeAgent ma = new MVImputeAgent(jSpec, colnames, schema.length);
+				EncoderMVImpute ma = new EncoderMVImpute(jSpec, colnames, schema.length);
 				ma.initRecodeIDList(rcIDs);
 				lencoders.add(ma);
 			}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderMVImpute.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderMVImpute.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderMVImpute.java
new file mode 100644
index 0000000..55a0bde
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderMVImpute.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.wink.json4j.JSONArray;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+import org.apache.sysml.runtime.functionobjects.CM;
+import org.apache.sysml.runtime.functionobjects.Mean;
+import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class EncoderMVImpute extends Encoder 
+{	
+	private static final long serialVersionUID = 9057868620144662194L;
+
+	public enum MVMethod { INVALID, GLOBAL_MEAN, GLOBAL_MODE, CONSTANT };
+	
+	private MVMethod[] _mvMethodList = null;
+	private MVMethod[] _mvscMethodList = null;	// scaling methods for attributes that are imputed and also scaled
+	
+	private BitSet _isMVScaled = null;
+	private CM _varFn = CM.getCMFnObject(AggregateOperationTypes.VARIANCE);		// function object that understands variance computation
+	
+	// objects required to compute mean and variance of all non-missing entries 
+	private Mean _meanFn = Mean.getMeanFnObject();	// function object that understands mean computation
+	private KahanObject[] _meanList = null; 		// column-level means, computed so far
+	private long[] _countList = null;				// #of non-missing values
+	
+	private CM_COV_Object[] _varList = null;		// column-level variances, computed so far (for scaling)
+
+	private int[] 			_scnomvList = null;			// List of attributes that are scaled but not imputed
+	private MVMethod[]		_scnomvMethodList = null;	// scaling methods: 0 for invalid; 1 for mean-subtraction; 2 for z-scoring
+	private KahanObject[] 	_scnomvMeanList = null;		// column-level means, for attributes scaled but not imputed
+	private long[] 			_scnomvCountList = null;	// #of non-missing values, for attributes scaled but not imputed
+	private CM_COV_Object[] _scnomvVarList = null;		// column-level variances, computed so far
+	
+	private String[] _replacementList = null;		// replacements: for global_mean, mean; and for global_mode, recode id of mode category
+	private String[] _NAstrings = null;
+	private List<Integer> _rcList = null; 
+	private HashMap<Integer,HashMap<String,Long>> _hist = null;
+	
+	public String[] getReplacements() { return _replacementList; }
+	public KahanObject[] getMeans()   { return _meanList; }
+	public CM_COV_Object[] getVars()  { return _varList; }
+	public KahanObject[] getMeans_scnomv()   { return _scnomvMeanList; }
+	public CM_COV_Object[] getVars_scnomv()  { return _scnomvVarList; }
+	
+	public EncoderMVImpute(JSONObject parsedSpec, String[] colnames, int clen) 
+		throws JSONException
+	{
+		super(null, clen);
+		
+		//handle column list
+		int[] collist = TfMetaUtils.parseJsonObjectIDList(parsedSpec, colnames, TfUtils.TXMETHOD_IMPUTE);
+		initColList(collist);
+	
+		//handle method list
+		parseMethodsAndReplacments(parsedSpec);
+		
+		//create reuse histograms
+		_hist = new HashMap<Integer, HashMap<String,Long>>();
+	}
+			
+	public EncoderMVImpute(JSONObject parsedSpec, String[] colnames, String[] NAstrings, int clen)
+		throws JSONException 
+	{
+		super(null, clen);	
+		boolean isMV = parsedSpec.containsKey(TfUtils.TXMETHOD_IMPUTE);
+		boolean isSC = parsedSpec.containsKey(TfUtils.TXMETHOD_SCALE);		
+		_NAstrings = NAstrings;
+		
+		if(!isMV) {
+			// MV Impute is not applicable
+			_colList = null;
+			_mvMethodList = null;
+			_meanList = null;
+			_countList = null;
+			_replacementList = null;
+		}
+		else {
+			JSONObject mvobj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_IMPUTE);
+			JSONArray mvattrs = (JSONArray) mvobj.get(TfUtils.JSON_ATTRS);
+			JSONArray mvmthds = (JSONArray) mvobj.get(TfUtils.JSON_MTHD);
+			int mvLength = mvattrs.size();
+			
+			_colList = new int[mvLength];
+			_mvMethodList = new MVMethod[mvLength];
+			
+			_meanList = new KahanObject[mvLength];
+			_countList = new long[mvLength];
+			_varList = new CM_COV_Object[mvLength];
+			
+			_isMVScaled = new BitSet(_colList.length);
+			_isMVScaled.clear();
+			
+			for(int i=0; i < _colList.length; i++) {
+				_colList[i] = UtilFunctions.toInt(mvattrs.get(i));
+				_mvMethodList[i] = MVMethod.values()[UtilFunctions.toInt(mvmthds.get(i))]; 
+				_meanList[i] = new KahanObject(0, 0);
+			}
+			
+			_replacementList = new String[mvLength]; 	// contains replacements for all columns (scale and categorical)
+			
+			JSONArray constants = (JSONArray)mvobj.get(TfUtils.JSON_CONSTS);
+			for(int i=0; i < constants.size(); i++) {
+				if ( constants.get(i) == null )
+					_replacementList[i] = "NaN";
+				else
+					_replacementList[i] = constants.get(i).toString();
+			}
+		}
+		
+		// Handle scaled attributes
+		if ( !isSC )
+		{
+			// scaling is not applicable
+			_scnomvCountList = null;
+			_scnomvMeanList = null;
+			_scnomvVarList = null;
+		}
+		else
+		{
+			if ( _colList != null ) 
+				_mvscMethodList = new MVMethod[_colList.length];
+			
+			JSONObject scobj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_SCALE);
+			JSONArray scattrs = (JSONArray) scobj.get(TfUtils.JSON_ATTRS);
+			JSONArray scmthds = (JSONArray) scobj.get(TfUtils.JSON_MTHD);
+			int scLength = scattrs.size();
+			
+			int[] _allscaled = new int[scLength];
+			int scnomv = 0, colID;
+			byte mthd;
+			for(int i=0; i < scLength; i++)
+			{
+				colID = UtilFunctions.toInt(scattrs.get(i));
+				mthd = (byte) UtilFunctions.toInt(scmthds.get(i)); 
+						
+				_allscaled[i] = colID;
+				
+				// check if the attribute is also MV imputed
+				int mvidx = isApplicable(colID);
+				if(mvidx != -1)
+				{
+					_isMVScaled.set(mvidx);
+					_mvscMethodList[mvidx] = MVMethod.values()[mthd];
+					_varList[mvidx] = new CM_COV_Object();
+				}
+				else
+					scnomv++;	// count of scaled but not imputed 
+			}
+			
+			if(scnomv > 0)
+			{
+				_scnomvList = new int[scnomv];			
+				_scnomvMethodList = new MVMethod[scnomv];	
+	
+				_scnomvMeanList = new KahanObject[scnomv];
+				_scnomvCountList = new long[scnomv];
+				_scnomvVarList = new CM_COV_Object[scnomv];
+				
+				for(int i=0, idx=0; i < scLength; i++)
+				{
+					colID = UtilFunctions.toInt(scattrs.get(i));
+					mthd = (byte)UtilFunctions.toInt(scmthds.get(i)); 
+							
+					if(isApplicable(colID) == -1)
+					{	// scaled but not imputed
+						_scnomvList[idx] = colID;
+						_scnomvMethodList[idx] = MVMethod.values()[mthd];
+						_scnomvMeanList[idx] = new KahanObject(0, 0);
+						_scnomvVarList[idx] = new CM_COV_Object();
+						idx++;
+					}
+				}
+			}
+		}
+	}
+
+	private void parseMethodsAndReplacments(JSONObject parsedSpec) throws JSONException {
+		JSONArray mvspec = (JSONArray) parsedSpec.get(TfUtils.TXMETHOD_IMPUTE);
+		_mvMethodList = new MVMethod[mvspec.size()];
+		_replacementList = new String[mvspec.size()];
+		_meanList = new KahanObject[mvspec.size()];
+		_countList = new long[mvspec.size()];
+		for(int i=0; i < mvspec.size(); i++) {
+			JSONObject mvobj = (JSONObject)mvspec.get(i);
+			_mvMethodList[i] = MVMethod.valueOf(mvobj.get("method").toString().toUpperCase()); 
+			if( _mvMethodList[i] == MVMethod.CONSTANT ) {
+				_replacementList[i] = mvobj.getString("value").toString();
+			}
+			_meanList[i] = new KahanObject(0, 0);
+		}
+	}
+		
+	public void prepare(String[] words) throws IOException {
+		
+		try {
+			String w = null;
+			if(_colList != null)
+			for(int i=0; i <_colList.length; i++) {
+				int colID = _colList[i];
+				w = UtilFunctions.unquote(words[colID-1].trim());
+				
+				try {
+				if(!TfUtils.isNA(_NAstrings, w)) {
+					_countList[i]++;
+					
+					boolean computeMean = (_mvMethodList[i] == MVMethod.GLOBAL_MEAN || _isMVScaled.get(i) );
+					if(computeMean) {
+						// global_mean
+						double d = UtilFunctions.parseToDouble(w);
+						_meanFn.execute2(_meanList[i], d, _countList[i]);
+						
+						if (_isMVScaled.get(i) && _mvscMethodList[i] == MVMethod.GLOBAL_MODE)
+							_varFn.execute(_varList[i], d);
+					}
+					else {
+						// global_mode or constant
+						// Nothing to do here. Mode is computed using recode maps.
+					}
+				}
+				} catch (NumberFormatException e) 
+				{
+					throw new RuntimeException("Encountered \"" + w + "\" in column ID \"" + colID + "\", when expecting a numeric value. Consider adding \"" + w + "\" to na.strings, along with an appropriate imputation method.");
+				}
+			}
+			
+			// Compute mean and variance for attributes that are scaled but not imputed
+			if(_scnomvList != null)
+			for(int i=0; i < _scnomvList.length; i++) 
+			{
+				int colID = _scnomvList[i];
+				w = UtilFunctions.unquote(words[colID-1].trim());
+				double d = UtilFunctions.parseToDouble(w);
+				_scnomvCountList[i]++; 		// not required, this is always equal to total #records processed
+				_meanFn.execute2(_scnomvMeanList[i], d, _scnomvCountList[i]);
+				if(_scnomvMethodList[i] == MVMethod.GLOBAL_MODE)
+					_varFn.execute(_scnomvVarList[i], d);
+			}
+		} catch(Exception e) {
+			throw new IOException(e);
+		}
+	}
+	
+	public MVMethod getMethod(int colID) {
+		int idx = isApplicable(colID);		
+		if(idx == -1)
+			return MVMethod.INVALID;
+		else
+			return _mvMethodList[idx];
+	}
+	
+	public long getNonMVCount(int colID) {
+		int idx = isApplicable(colID);
+		return (idx == -1) ? 0 : _countList[idx];
+	}
+	
+	public String getReplacement(int colID)  {
+		int idx = isApplicable(colID);		
+		return (idx == -1) ? null : _replacementList[idx];
+	}
+	
+	@Override
+	public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+		build(in);
+		return apply(in, out);
+	}
+	
+	@Override
+	public void build(FrameBlock in) {
+		try {
+			for( int j=0; j<_colList.length; j++ ) {
+				int colID = _colList[j];
+				if( _mvMethodList[j] == MVMethod.GLOBAL_MEAN ) {
+					//compute global column mean (scale)
+					long off = _countList[j];
+					for( int i=0; i<in.getNumRows(); i++ )
+						_meanFn.execute2(_meanList[j], UtilFunctions.objectToDouble(
+							in.getSchema()[colID-1], in.get(i, colID-1)), off+i+1);
+					_replacementList[j] = String.valueOf(_meanList[j]._sum);
+					_countList[j] += in.getNumRows();
+				}
+				else if( _mvMethodList[j] == MVMethod.GLOBAL_MODE ) {
+					//compute global column mode (categorical), i.e., most frequent category
+					HashMap<String,Long> hist = _hist.containsKey(colID) ? 
+							_hist.get(colID) : new HashMap<String,Long>();
+					for( int i=0; i<in.getNumRows(); i++ ) {
+						String key = String.valueOf(in.get(i, colID-1));
+						if( key != null && !key.isEmpty() ) {
+							Long val = hist.get(key);
+							hist.put(key, (val!=null) ? val+1 : 1);
+						}	
+					}
+					_hist.put(colID, hist);
+					long max = Long.MIN_VALUE; 
+					for( Entry<String, Long> e : hist.entrySet() ) 
+						if( e.getValue() > max  ) {
+							_replacementList[j] = e.getKey();
+							max = e.getValue();
+						}
+				}
+			}
+		}
+		catch(Exception ex) {
+			throw new RuntimeException(ex);
+		}
+	}
+
+	@Override
+	public String[] apply(String[] words) 
+	{	
+		if( isApplicable() )
+			for(int i=0; i < _colList.length; i++) {
+				int colID = _colList[i];
+				String w = UtilFunctions.unquote(words[colID-1]);
+				if(TfUtils.isNA(_NAstrings, w))
+					w = words[colID-1] = _replacementList[i];
+				
+				if ( _isMVScaled.get(i) )
+					if ( _mvscMethodList[i] == MVMethod.GLOBAL_MEAN )
+						words[colID-1] = Double.toString( UtilFunctions.parseToDouble(w) - _meanList[i]._sum );
+					else
+						words[colID-1] = Double.toString( (UtilFunctions.parseToDouble(w) - _meanList[i]._sum) / _varList[i].mean._sum );
+			}
+		
+		if(_scnomvList != null)
+		for(int i=0; i < _scnomvList.length; i++)
+		{
+			int colID = _scnomvList[i];
+			if ( _scnomvMethodList[i] == MVMethod.GLOBAL_MEAN )
+				words[colID-1] = Double.toString( UtilFunctions.parseToDouble(words[colID-1]) - _scnomvMeanList[i]._sum );
+			else
+				words[colID-1] = Double.toString( (UtilFunctions.parseToDouble(words[colID-1]) - _scnomvMeanList[i]._sum) / _scnomvVarList[i].mean._sum );
+		}
+			
+		return words;
+	}
+	
+	@Override
+	public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
+		for(int i=0; i<in.getNumRows(); i++) {
+			for(int j=0; j<_colList.length; j++) {
+				int colID = _colList[j];
+				if( Double.isNaN(out.quickGetValue(i, colID-1)) )
+					out.quickSetValue(i, colID-1, Double.parseDouble(_replacementList[j]));
+			}
+		}
+		return out;
+	}
+	
+	@Override
+	public FrameBlock getMetaData(FrameBlock out) {
+		for( int j=0; j<_colList.length; j++ ) {
+			out.getColumnMetadata(_colList[j]-1)
+			   .setMvValue(_replacementList[j]);
+		}
+		return out;
+	}
+
+	public void initMetaData(FrameBlock meta) {
+		//init replacement lists, replace recoded values to
+		//apply mv imputation potentially after recoding
+		for( int j=0; j<_colList.length; j++ ) {
+			int colID = _colList[j];	
+			String mvVal = UtilFunctions.unquote(meta.getColumnMetadata(colID-1).getMvValue()); 
+			if( _rcList.contains(colID) ) {
+				Long mvVal2 = meta.getRecodeMap(colID-1).get(mvVal);
+				if( mvVal2 == null)
+					throw new RuntimeException("Missing recode value for impute value '"+mvVal+"' (colID="+colID+").");
+				_replacementList[j] = mvVal2.toString();
+			}
+			else {
+				_replacementList[j] = mvVal;
+			}
+		}
+	}
+
+	public void initRecodeIDList(List<Integer> rcList) {
+		_rcList = rcList;
+	}
+	
+	/**
+	 * Exposes the internal histogram after build.
+	 * 
+	 * @param colID column ID
+	 * @return histogram (map of string keys and long values)
+	 */
+	public HashMap<String,Long> getHistogram( int colID ) {
+		return _hist.get(colID);
+	}
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderOmit.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderOmit.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderOmit.java
new file mode 100644
index 0000000..af09cee
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderOmit.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class EncoderOmit extends Encoder 
+{	
+	private static final long serialVersionUID = 1978852120416654195L;
+
+	private int _rmRows = 0;
+
+	public EncoderOmit(JSONObject parsedSpec, String[] colnames, int clen) 
+		throws JSONException 
+	{
+		super(null, clen);
+		if (!parsedSpec.containsKey(TfUtils.TXMETHOD_OMIT))
+			return;
+		int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, colnames, TfUtils.TXMETHOD_OMIT);
+		initColList(collist);
+	}
+	
+	public int getNumRemovedRows() {
+		return _rmRows;
+	}
+	
+	public boolean omit(String[] words, TfUtils agents) 
+	{
+		if( !isApplicable() )
+			return false;
+		
+		for(int i=0; i<_colList.length; i++) {
+			int colID = _colList[i];
+			if(TfUtils.isNA(agents.getNAStrings(),UtilFunctions.unquote(words[colID-1].trim())))
+				return true;
+		}
+		return false;
+	}
+
+	@Override
+	public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+		return apply(in, out);
+	}
+	
+	@Override
+	public void build(FrameBlock in) {	
+		//do nothing
+	}
+	
+	@Override
+	public String[] apply(String[] words) {
+		return null;
+	}
+	
+	@Override
+	public MatrixBlock apply(FrameBlock in, MatrixBlock out) 
+	{
+		//determine output size
+		int numRows = 0;
+		for(int i=0; i<out.getNumRows(); i++) {
+			boolean valid = true;
+			for(int j=0; j<_colList.length; j++)
+				valid &= !Double.isNaN(out.quickGetValue(i, _colList[j]-1));
+			numRows += valid ? 1 : 0;
+		}
+		
+		//copy over valid rows into the output
+		MatrixBlock ret = new MatrixBlock(numRows, out.getNumColumns(), false);
+		int pos = 0;
+		for(int i=0; i<in.getNumRows(); i++) {
+			//determine if valid row or omit
+			boolean valid = true;
+			for(int j=0; j<_colList.length; j++)
+				valid &= !Double.isNaN(out.quickGetValue(i, _colList[j]-1));
+			//copy row if necessary
+			if( valid ) {
+				for(int j=0; j<out.getNumColumns(); j++)
+					ret.quickSetValue(pos, j, out.quickGetValue(i, j));
+				pos++;
+			}
+		}
+	
+		//keep info an remove rows
+		_rmRows = out.getNumRows() - pos;
+		
+		return ret; 
+	}
+
+	@Override
+	public FrameBlock getMetaData(FrameBlock out) {
+		//do nothing
+		return out;
+	}
+	
+	@Override
+	public void initMetaData(FrameBlock meta) {
+		//do nothing
+	}
+}
+ 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
index 08722fd..d84ea0d 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
@@ -19,19 +19,10 @@
 
 package org.apache.sysml.runtime.transform.encode;
 
-import java.io.IOException;
-import java.util.Iterator;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.transform.DistinctValue;
-import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 /**
@@ -89,20 +80,4 @@ public class EncoderPassThrough extends Encoder
 	public void initMetaData(FrameBlock meta) {
 		//do nothing
 	}
-	
-	
-	@Override
-	public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException {
-		throw new RuntimeException("File-based api not supported.");
-	}
-
-	@Override
-	public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
-		throw new RuntimeException("File-based api not supported.");	
-	}
-
-	@Override
-	public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException {
-		throw new RuntimeException("File-based api not supported.");
-	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java
new file mode 100644
index 0000000..bb8592c
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.sysml.lops.Lop;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+public class EncoderRecode extends Encoder 
+{	
+	private static final long serialVersionUID = 8213163881283341874L;
+
+	private int[] _mvrcdList = null;
+	private int[] _fullrcdList = null;
+	
+	//recode maps and custom map for partial recode maps 
+	private HashMap<Integer, HashMap<String, Long>> _rcdMaps  = new HashMap<Integer, HashMap<String, Long>>();
+	private HashMap<Integer, HashMap<String,String>> _finalMaps = null;
+	private HashMap<Integer, HashSet<Object>> _rcdMapsPart = null;
+	
+	public EncoderRecode(JSONObject parsedSpec, String[] colnames, int clen)
+		throws JSONException 
+	{
+		super(null, clen);
+		int rcdCount = 0;
+		
+		if( parsedSpec.containsKey(TfUtils.TXMETHOD_RECODE) ) {
+			int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, colnames, TfUtils.TXMETHOD_RECODE);
+			rcdCount = initColList(collist);
+		}
+		
+		if ( parsedSpec.containsKey(TfUtils.TXMETHOD_MVRCD)) {
+			_mvrcdList = TfMetaUtils.parseJsonIDList(parsedSpec, colnames, TfUtils.TXMETHOD_MVRCD);
+			rcdCount += _mvrcdList.length;
+		}
+		
+		if ( rcdCount > 0 ) {
+			_fullrcdList = new int[rcdCount];
+			int idx = -1;
+			if(_colList != null)
+				for(int i=0; i < _colList.length; i++)
+					_fullrcdList[++idx] = _colList[i]; 
+			
+			if(_mvrcdList != null)
+				for(int i=0; i < _mvrcdList.length; i++)
+					_fullrcdList[++idx] = _mvrcdList[i]; 
+		}
+	}
+	
+	public HashMap<Integer, HashMap<String,Long>> getCPRecodeMaps() { 
+		return _rcdMaps; 
+	}
+	
+	public HashMap<Integer, HashSet<Object>> getCPRecodeMapsPartial() { 
+		return _rcdMapsPart; 
+	}
+	
+	public HashMap<Integer, HashMap<String,String>> getRecodeMaps() {
+		return _finalMaps;
+	}
+	
+	private String lookupRCDMap(int colID, String key) {
+		if( _finalMaps!=null )
+			return _finalMaps.get(colID).get(key);
+		else { //used for cp
+			Long tmp = _rcdMaps.get(colID).get(key);
+			return (tmp!=null) ? Long.toString(tmp) : null;
+		}
+	}
+	
+	@Override
+	public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+		if( !isApplicable() )
+			return out;
+		
+		//build and apply recode maps 
+		build(in);
+		apply(in, out);
+		
+		return out;
+	}
+
+	@Override
+	public void build(FrameBlock in) {
+		if( !isApplicable() )
+			return;		
+
+		Iterator<String[]> iter = in.getStringRowIterator();
+		while( iter.hasNext() ) {
+			String[] row = iter.next(); 
+			for( int j=0; j<_colList.length; j++ ) {
+				int colID = _colList[j]; //1-based
+				//allocate column map if necessary
+				if( !_rcdMaps.containsKey(colID) ) 
+					_rcdMaps.put(colID, new HashMap<String,Long>());
+				//probe and build column map
+				HashMap<String,Long> map = _rcdMaps.get(colID);
+				String key = row[colID-1];
+				if( key!=null && !key.isEmpty() && !map.containsKey(key) )
+					map.put(key, Long.valueOf(map.size()+1));
+			}
+		}
+	}
+
+	public void buildPartial(FrameBlock in) {
+		if( !isApplicable() )
+			return;		
+
+		//ensure allocated partial recode map
+		if( _rcdMapsPart == null )
+			_rcdMapsPart = new HashMap<Integer, HashSet<Object>>();
+		
+		//construct partial recode map (tokens w/o codes)
+		//iterate over columns for sequential access
+		for( int j=0; j<_colList.length; j++ ) {
+			int colID = _colList[j]; //1-based
+			//allocate column map if necessary
+			if( !_rcdMapsPart.containsKey(colID) ) 
+				_rcdMapsPart.put(colID, new HashSet<Object>());
+			HashSet<Object> map = _rcdMapsPart.get(colID);
+			//probe and build column map
+			for( int i=0; i<in.getNumRows(); i++ )
+				map.add(in.get(i, colID-1));
+			//cleanup unnecessary entries once
+			map.remove(null);
+			map.remove("");
+		}
+	}
+	
+	/**
+	 * Method to apply transformations.
+	 */
+	@Override
+	public String[] apply(String[] words) 
+	{
+		if( !isApplicable() )
+			return words;
+		
+		//apply recode maps on relevant columns of given row
+		for(int i=0; i < _colList.length; i++) {
+			//prepare input and get code
+			int colID = _colList[i];
+			String key = UtilFunctions.unquote(words[colID-1].trim());
+			String val = lookupRCDMap(colID, key);			
+			// replace unseen keys with NaN 
+			words[colID-1] = (val!=null) ? val : "NaN";
+		}
+			
+		return words;
+	}
+	
+	@Override
+	public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
+		//apply recode maps column wise
+		for( int j=0; j<_colList.length; j++ ) {
+			int colID = _colList[j];
+			for( int i=0; i<in.getNumRows(); i++ ) {
+				Object okey = in.get(i, colID-1);
+				String key = (okey!=null) ? okey.toString() : null;
+				String val = lookupRCDMap(colID, key);			
+				out.quickSetValue(i, colID-1, (val!=null) ? 
+						Double.parseDouble(val) : Double.NaN);
+			}
+		}
+		
+		return out;
+	}
+
+	@Override
+	public FrameBlock getMetaData(FrameBlock meta) {
+		if( !isApplicable() )
+			return meta;
+		
+		//inverse operation to initRecodeMaps
+		
+		//allocate output rows
+		int maxDistinct = 0;
+		for( int j=0; j<_colList.length; j++ )
+			if( _rcdMaps.containsKey(_colList[j]) )
+				maxDistinct = Math.max(maxDistinct, _rcdMaps.get(_colList[j]).size());
+		meta.ensureAllocatedColumns(maxDistinct);
+		
+		//create compact meta data representation
+		for( int j=0; j<_colList.length; j++ ) {
+			int colID = _colList[j]; //1-based
+			int rowID = 0;
+			if( _rcdMaps.containsKey(_colList[j]) )
+				for( Entry<String, Long> e : _rcdMaps.get(colID).entrySet() ) {
+					String tmp = constructRecodeMapEntry(e.getKey(), e.getValue());
+					meta.set(rowID++, colID-1, tmp); 
+				}
+			meta.getColumnMetadata(colID-1).setNumDistinct(
+					_rcdMaps.get(colID).size());
+		}
+		
+		return meta;
+	}
+	
+
+	/**
+	 * Construct the recodemaps from the given input frame for all 
+	 * columns registered for recode.
+	 * 
+	 * @param meta frame block
+	 */
+	public void initMetaData( FrameBlock meta ) {
+		if( meta == null || meta.getNumRows()<=0 )
+			return;
+		
+		for( int j=0; j<_colList.length; j++ ) {
+			int colID = _colList[j]; //1-based
+			_rcdMaps.put(colID, meta.getRecodeMap(colID-1));
+		}
+	}
+	
+	/**
+	 * Returns the Recode map entry which consists of concatenation of code, delimiter and token. 
+	 * @param token	is part of Recode map
+	 * @param code  is code for token 
+	 * @return the concatenation of code and token with delimiter in between
+	 */
+	public static String constructRecodeMapEntry(String token, Long code) {
+		return token + Lop.DATATYPE_PREFIX + code.toString();
+	}
+}
+ 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
index 62b90b4..afb7ee9 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
@@ -50,7 +50,6 @@ import org.apache.wink.json4j.JSONObject;
 
 public class TfMetaUtils 
 {
-
 	public static boolean isIDSpecification(String spec) throws DMLRuntimeException {
 		try {
 			JSONObject jSpec = new JSONObject(spec);

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
index af2e75f..b506444 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
@@ -88,9 +88,7 @@ public class FrameFunctionTest extends AutomatedTestBase
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 	
 		boolean oldIPA = OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS;
-		boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
 		OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = IPA;
-		OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
 		
 		try
 		{
@@ -126,7 +124,6 @@ public class FrameFunctionTest extends AutomatedTestBase
 			rtplatform = platformOld;
 			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
 			OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = oldIPA;
-			OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
index ecc958b..c629eee 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.lops.LopProperties.ExecType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.io.FrameWriter;
@@ -201,10 +200,6 @@ public class FrameMatrixReblockTest extends AutomatedTestBase
 		if( rtplatform == RUNTIME_PLATFORM.SPARK )
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 		
-		boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
-		if( ofmt.equals("csv") )
-			OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
-		
 		try
 		{
 			int cols = multColBlks ? cols2 : cols1;
@@ -235,7 +230,6 @@ public class FrameMatrixReblockTest extends AutomatedTestBase
 		finally {
 			rtplatform = platformOld;
 			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
-			OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
index 5066582..ceeec07 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
@@ -21,7 +21,6 @@ package org.apache.sysml.test.integration.functions.frame;
 
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.lops.LopProperties.ExecType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.io.FrameReaderFactory;
@@ -101,10 +100,6 @@ public class FrameMetaReadWriteTest extends AutomatedTestBase
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 	
 		String ofmt = OutputInfo.outputInfoToStringExternal(oinfo);
-
-		boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
-		if( ofmt.equals("csv") )
-			OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
 		
 		try
 		{
@@ -148,7 +143,6 @@ public class FrameMetaReadWriteTest extends AutomatedTestBase
 		finally {
 			rtplatform = platformOld;
 			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
-			OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java b/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java
index 35078f3..056e619 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java
@@ -22,7 +22,6 @@ package org.apache.sysml.test.integration.functions.transform;
 import org.junit.Test;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.io.FrameReader;
 import org.apache.sysml.runtime.io.FrameReaderFactory;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
@@ -75,7 +74,6 @@ public class FrameCSVReadWriteTest extends AutomatedTestBase
 	{
 		//set runtime platform
 		RUNTIME_PLATFORM rtold = rtplatform;
-		boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
 		rtplatform = rt;
 
 		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
@@ -94,7 +92,6 @@ public class FrameCSVReadWriteTest extends AutomatedTestBase
 			programArgs = new String[]{"-explain","-args", 
 				HOME + "input/" + DATASET, output("R") };
 	
-			OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
 			runTest(true, false, null, -1); 
 			
 			//read input/output and compare
@@ -113,7 +110,6 @@ public class FrameCSVReadWriteTest extends AutomatedTestBase
 		finally {
 			rtplatform = rtold;
 			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
-			OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/transform/RunTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/transform/RunTest.java b/src/test/java/org/apache/sysml/test/integration/functions/transform/RunTest.java
deleted file mode 100644
index 81c0bab..0000000
--- a/src/test/java/org/apache/sysml/test/integration/functions/transform/RunTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.test.integration.functions.transform;
-
-import java.io.IOException;
-
-import org.junit.Test;
-
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.test.integration.AutomatedTestBase;
-import org.apache.sysml.test.integration.TestConfiguration;
-import org.apache.sysml.test.utils.TestUtils;
-
-/**
- * 
- * 
- */
-public class RunTest extends AutomatedTestBase 
-{
-	
-	private final static String TEST_NAME1 = "Transform";
-	private final static String TEST_NAME2 = "Apply";
-	private final static String TEST_DIR = "functions/transform/";
-	private final static String TEST_CLASS_DIR = TEST_DIR + RunTest.class.getSimpleName() + "/";
-	
-	private final static String HOMES_DATASET 	= "homes/homes.csv";
-	//private final static String HOMES_SPEC 		= "homes/homes.tfspec.json";
-	private final static String HOMES_SPEC2 	= "homes/homes.tfspec2.json";
-	//private final static String HOMES_IDSPEC 	= "homes/homes.tfidspec.json";
-	//private final static String HOMES_TFDATA 	= "homes/homes.transformed.csv";
-	//private final static String HOMES_COLNAMES 	= "homes/homes.csv.colnames";
-	
-	private final static String HOMES_NAN_DATASET 	= "homes/homesNAN.csv";
-	private final static String HOMES_NAN_SPEC 		= "homes/homesNAN.tfspec.json";
-	//private final static String HOMES_NAN_IDSPEC 	= "homes/homesNAN.tfidspec.json";
-	private final static String HOMES_NAN_COLNAMES 	= "homes/homesNAN.colnames.csv";
-	
-	private final static String HOMES_MISSING_DATASET 	= "homes/homesAllMissing.csv";
-	private final static String HOMES_MISSING_SPEC 		= "homes/homesAllMissing.tfspec.json";
-	private final static String HOMES_MISSING_IDSPEC 	= "homes/homesAllMissing.tfidspec.json";
-	
-	@Override
-	public void setUp() 
-	{
-		TestUtils.clearAssertionInformation();
-		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1,new String[]{"R"}));
-	}
-	
-	// ---- NAN BinaryBlock ----
-	
-	@Test
-	public void runTestWithNAN_HybridBB() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HYBRID, "binary");
-	}
-
-	@Test
-	public void runTestWithNAN_SPHybridBB() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HYBRID_SPARK, "binary");
-	}
-
-	@Test
-	public void runTestWithNAN_HadoopBB() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HADOOP, "binary");
-	}
-
-	@Test
-	public void runTestWithNAN_SparkBB() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.SPARK, "binary");
-	}
-
-	// ---- NAN CSV ----
-	
-	@Test
-	public void runTestWithNAN_HybridCSV() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HYBRID, "csv");
-	}
-
-	@Test
-	public void runTestWithNAN_SPHybridCSV() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HYBRID_SPARK, "csv");
-	}
-
-	@Test
-	public void runTestWithNAN_HadoopCSV() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HADOOP, "csv");
-	}
-
-	@Test
-	public void runTestWithNAN_SparkCSV() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.SPARK, "csv");
-	}
-
-	// ---- Test2 BinaryBlock ----
-	
-	@Test
-	public void runTest2_HybridBB() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HYBRID, "binary");
-	}
-
-	@Test
-	public void runTest2_SPHybridBB() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HYBRID_SPARK, "binary");
-	}
-
-	@Test
-	public void runTest2_HadoopBB() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HADOOP, "binary");
-	}
-
-	@Test
-	public void runTest2_SparkBB() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.SPARK, "binary");
-	}
-
-	// ---- Test2 CSV ----
-	
-	@Test
-	public void runTest2_HybridCSV() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HYBRID, "csv");
-	}
-
-	@Test
-	public void runTest2_SPHybridCSV() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HYBRID_SPARK, "csv");
-	}
-
-	@Test
-	public void runTest2_HadoopCSV() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HADOOP, "csv");
-	}
-
-	@Test
-	public void runTest2_SparkCSV() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.SPARK, "csv");
-	}
-
-	// ---- HomesMissing BinaryBlock ----
-	
-	@Test
-	public void runAllMissing_HybridBB() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_SPEC, null, true, RUNTIME_PLATFORM.HYBRID, "binary");
-	}
-
-	@Test
-	public void runAllMissing_SPHybridBB() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_SPEC, null, true, RUNTIME_PLATFORM.HYBRID_SPARK, "binary");
-	}
-
-	@Test
-	public void runAllMissing_HadoopBB() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_SPEC, null, true, RUNTIME_PLATFORM.HADOOP, "binary");
-	}
-
-	@Test
-	public void runAllMissing_SparkBB() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_SPEC, null, true, RUNTIME_PLATFORM.SPARK, "binary");
-	}
-
-	// ---- HomesMissing CSV ----
-	
-	@Test
-	public void runAllMissing_HybridCSV() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_IDSPEC, null, true, RUNTIME_PLATFORM.HYBRID, "csv");
-	}
-
-	@Test
-	public void runAllMissing_SPHybridCSV() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_IDSPEC, null, true, RUNTIME_PLATFORM.HYBRID_SPARK, "csv");
-	}
-
-	@Test
-	public void runAllMissing_HadoopCSV() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_IDSPEC, null, true, RUNTIME_PLATFORM.HADOOP, "csv");
-	}
-
-	@Test
-	public void runAllMissing_SparkCSV() throws DMLRuntimeException, IOException {
-		runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_IDSPEC, null, true, RUNTIME_PLATFORM.SPARK, "csv");
-	}
-
-	// ------------------
-	
-	/**
-	 * 
-	 * @param sparseM1
-	 * @param sparseM2
-	 * @param instType
-	 * @throws IOException 
-	 * @throws DMLRuntimeException 
-	 */
-	private void runScalingTest( String dataset, String spec, String colnames, boolean exception, RUNTIME_PLATFORM rt, String ofmt) throws IOException, DMLRuntimeException
-	{
-		RUNTIME_PLATFORM platformOld = rtplatform;
-		rtplatform = rt;
-	
-		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
-		if( rtplatform == RUNTIME_PLATFORM.SPARK  || rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK)
-			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-
-		try
-		{
-			getAndLoadTestConfiguration(TEST_NAME1);
-			
-			/* This is for running the junit test the new way, i.e., construct the arguments directly */
-			String HOME = SCRIPT_DIR + TEST_DIR;
-			fullDMLScriptName = null;
-			
-			if (colnames == null) {
-				fullDMLScriptName  = HOME + TEST_NAME1 + ".dml";
-				programArgs = new String[]{"-nvargs", 
-											"DATA=" + HOME + "input/" + dataset,
-											"TFSPEC=" + HOME + "input/" + spec,
-											"TFMTD=" + output("tfmtd"),
-											"TFDATA=" + output("tfout"),
-											"OFMT=" + ofmt };
-			}
-			else {
-				fullDMLScriptName  = HOME + TEST_NAME1 + "_colnames.dml";
-				programArgs = new String[]{"-nvargs", 
-											"DATA=" + HOME + "input/" + dataset,
-											"TFSPEC=" + HOME + "input/" + spec,
-											"COLNAMES=" + HOME + "input/" + colnames,
-											"TFMTD=" + output("tfmtd"),
-											"TFDATA=" + output("tfout"),
-											"OFMT=" + ofmt };
-			}
-	
-			boolean exceptionExpected = exception;
-			runTest(true, exceptionExpected, null, -1); 
-			
-			fullDMLScriptName = HOME + TEST_NAME2 + ".dml";
-			programArgs = new String[]{"-nvargs", 
-											"DATA=" + HOME + "input/" + dataset,
-											"APPLYMTD=" + output("tfmtd"),  // generated above
-											"TFMTD=" + output("test_tfmtd"),
-											"TFDATA=" + output("test_tfout"),
-											"OFMT=" + ofmt };
-	
-			exceptionExpected = exception;
-			runTest(true, exceptionExpected, null, -1); 
-			
-		}
-		finally
-		{
-			rtplatform = platformOld;
-			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
-		}
-	}	
-}
\ No newline at end of file