You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/07/08 18:06:02 UTC

[2/2] incubator-systemml git commit: [SYSTEMML-569] Extended spark transformencode on frames (mv, omit, bin)

[SYSTEMML-569] Extended spark transformencode on frames (mv, omit, bin)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/172bfcac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/172bfcac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/172bfcac

Branch: refs/heads/master
Commit: 172bfcacc0b260c49c18c9e26d2dfc81f7e3051e
Parents: 12f2da9
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Jul 7 21:52:31 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Jul 8 10:59:18 2016 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/parser/DataExpression.java |   1 +
 ...ReturnParameterizedBuiltinSPInstruction.java | 156 +++++++++++++++-
 .../sysml/runtime/matrix/data/FrameBlock.java   |   5 +-
 .../sysml/runtime/transform/BinAgent.java       |   3 +-
 .../sysml/runtime/transform/MVImputeAgent.java  | 184 +++++++++++++------
 .../sysml/runtime/transform/RecodeAgent.java    |   2 +-
 .../transform/encode/EncoderComposite.java      |   6 +-
 .../sysml/runtime/util/UtilFunctions.java       |   2 +-
 .../TransformFrameEncodeApplyTest.java          | 178 ++++++++++++++++++
 .../transform/TransformFrameEncodeApply.dml     |  34 ++++
 .../functions/transform/ZPackageSuite.java      |   1 +
 11 files changed, 498 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/parser/DataExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/DataExpression.java b/src/main/java/org/apache/sysml/parser/DataExpression.java
index c33f965..96089f0 100644
--- a/src/main/java/org/apache/sysml/parser/DataExpression.java
+++ b/src/main/java/org/apache/sysml/parser/DataExpression.java
@@ -763,6 +763,7 @@ public class DataExpression extends DataIdentifier
 		        // if the MTD file exists, check the values specified in read statement match values in metadata MTD file
 		        if (configObject != null){
 		        	parseMetaDataFileParameters(mtdFileName, configObject, conditional);
+		        	inferredFormatType = true;
 		        }
 		        else {
 		        	LOG.warn("Metadata file: " + new Path(mtdFileName) + " not provided");

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index fc9e9ce..a53f673 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 import org.apache.spark.Accumulator;
 import org.apache.spark.AccumulatorParam;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
@@ -38,8 +39,10 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction.RDDTransformApplyFunction;
 import org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction.RDDTransformApplyOffsetFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
@@ -48,10 +51,13 @@ import org.apache.sysml.runtime.io.FrameReader;
 import org.apache.sysml.runtime.io.FrameReaderFactory;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.FrameBlock.ColumnMetadata;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.transform.MVImputeAgent;
+import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
 import org.apache.sysml.runtime.transform.RecodeAgent;
 import org.apache.sysml.runtime.transform.encode.Encoder;
 import org.apache.sysml.runtime.transform.encode.EncoderComposite;
@@ -126,12 +132,19 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
 					fo.getSchema(), (int)fo.getNumColumns(), null);
 			
 			Accumulator<Long> accMax = sec.getSparkContext().accumulator(0L, new MaxAcc()); 
-			in.mapPartitionsToPair(new TransformEncodeBuildFunction(encoderBuild))
-			  .distinct().groupByKey()
-			  .flatMap(new TransformEncodeGroupFunction(accMax))
-			  .saveAsTextFile(fometa.getFileName()); //trigger eval
+			JavaRDD<String> rcMaps = in
+					.mapPartitionsToPair(new TransformEncodeBuildFunction(encoderBuild))
+					.distinct().groupByKey()
+					.flatMap(new TransformEncodeGroupFunction(accMax));
+			if( containsMVImputeEncoder(encoderBuild) ) {
+				MVImputeAgent mva = getMVImputeEncoder(encoderBuild);
+				rcMaps = rcMaps.union(
+						in.mapPartitionsToPair(new TransformEncodeBuild2Function(mva))
+						  .groupByKey().flatMap(new TransformEncodeGroup2Function(mva)) );
+			}
+			rcMaps.saveAsTextFile(fometa.getFileName()); //trigger eval
 			
-			//reuse multi-threaded reader 
+			//consolidate meta data frame (reuse multi-threaded reader, special handling missing values) 
 			FrameReader reader = FrameReaderFactory.createFrameReader(InputInfo.TextCellInputInfo);
 			FrameBlock meta = reader.readFrameFromHDFS(fometa.getFileName(), accMax.value(), fo.getNumColumns());
 			meta.recomputeColumnCardinality(); //recompute num distinct items per column
@@ -169,6 +182,32 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
 	
 	/**
 	 * 
+	 * @param encoder
+	 * @return
+	 */
+	private boolean containsMVImputeEncoder(Encoder encoder) {
+		if( encoder instanceof EncoderComposite )
+			for( Encoder cencoder : ((EncoderComposite)encoder).getEncoders() )
+				if( cencoder instanceof MVImputeAgent )
+					return true;
+		return false;	
+	}
+	
+	/**
+	 * 
+	 * @param encoder
+	 * @return
+	 */
+	private MVImputeAgent getMVImputeEncoder(Encoder encoder) {
+		if( encoder instanceof EncoderComposite )
+			for( Encoder cencoder : ((EncoderComposite)encoder).getEncoders() )
+				if( cencoder instanceof MVImputeAgent )
+					return (MVImputeAgent) cencoder;
+		return null;	
+	}
+	
+	/**
+	 * 
 	 */
 	public static class TransformEncodeBuildFunction implements PairFlatMapFunction<Iterator<Tuple2<Long, FrameBlock>>, Integer, String>
 	{
@@ -266,4 +305,111 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
 			return Math.max(arg0, arg1);	
 		}
 	}
+	
+	/**
+	 * 
+	 */
+	public static class TransformEncodeBuild2Function implements PairFlatMapFunction<Iterator<Tuple2<Long, FrameBlock>>, Integer, ColumnMetadata>
+	{
+		private static final long serialVersionUID = 6336375833412029279L;
+
+		private MVImputeAgent _encoder = null;
+		
+		public TransformEncodeBuild2Function(MVImputeAgent encoder) {
+			_encoder = encoder;
+		}
+		
+		@Override
+		public Iterable<Tuple2<Integer, ColumnMetadata>> call(Iterator<Tuple2<Long, FrameBlock>> iter)
+			throws Exception 
+		{
+			//build meta data (e.g., histograms and means)
+			while( iter.hasNext() ) {
+				FrameBlock block = iter.next()._2();
+				_encoder.build(block);	
+			}
+			
+			//extract meta data
+			ArrayList<Tuple2<Integer,ColumnMetadata>> ret = new ArrayList<Tuple2<Integer,ColumnMetadata>>();
+			int[] collist = _encoder.getColList();
+			for( int j=0; j<collist.length; j++ ) {
+				if( _encoder.getMethod(collist[j]) == MVMethod.GLOBAL_MODE ) {
+					HashMap<String,Long> hist = _encoder.getHistogram(collist[j]);
+					for( Entry<String,Long> e : hist.entrySet() )
+						ret.add(new Tuple2<Integer,ColumnMetadata>(collist[j], 
+								new ColumnMetadata(e.getValue(), e.getKey())));
+				}
+				else if( _encoder.getMethod(collist[j]) == MVMethod.GLOBAL_MEAN ) {
+					ret.add(new Tuple2<Integer,ColumnMetadata>(collist[j], 
+							new ColumnMetadata(_encoder.getNonMVCount(collist[j]), String.valueOf(_encoder.getMeans()[j]._sum))));
+				}
+				else if( _encoder.getMethod(collist[j]) == MVMethod.CONSTANT ) {
+					ret.add(new Tuple2<Integer,ColumnMetadata>(collist[j],
+							new ColumnMetadata(0, _encoder.getReplacement(collist[j]))));
+				}
+			}
+			
+			return ret;
+		}
+	}
+	
+	/**
+	 * 
+	 */
+	public static class TransformEncodeGroup2Function implements FlatMapFunction<Tuple2<Integer, Iterable<ColumnMetadata>>, String>
+	{
+		private static final long serialVersionUID = 702100641492347459L;
+		
+		private MVImputeAgent _encoder = null;
+		
+		public TransformEncodeGroup2Function(MVImputeAgent encoder) {	
+			_encoder = encoder;
+		}
+
+		@Override
+		public Iterable<String> call(Tuple2<Integer, Iterable<ColumnMetadata>> arg0)
+				throws Exception 
+		{
+			int colix = arg0._1();
+			Iterator<ColumnMetadata> iter = arg0._2().iterator();
+			ArrayList<String> ret = new ArrayList<String>();
+			
+			//compute global mode of categorical feature, i.e., value with highest frequency
+			if( _encoder.getMethod(colix) == MVMethod.GLOBAL_MODE ) {
+				HashMap<String, Long> hist = new HashMap<String,Long>();
+				while( iter.hasNext() ) {
+					ColumnMetadata cmeta = iter.next(); 
+					Long tmp = hist.get(cmeta.getMvValue());
+					hist.put(cmeta.getMvValue(), cmeta.getNumDistinct() + ((tmp!=null)?tmp:0));
+				}
+				long max = Long.MIN_VALUE; String mode = null;
+				for( Entry<String, Long> e : hist.entrySet() ) 
+					if( e.getValue() > max  ) {
+						mode = e.getKey();
+						max = e.getValue();
+					}
+				ret.add("-2 " + colix + " " + mode);
+			}
+			//compute global mean of categorical feature
+			else if( _encoder.getMethod(colix) == MVMethod.GLOBAL_MEAN ) {
+				KahanObject kbuff = new KahanObject(0, 0);
+				KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+				int count = 0;
+				while( iter.hasNext() ) {
+					ColumnMetadata cmeta = iter.next(); 
+					kplus.execute2(kbuff, Double.parseDouble(cmeta.getMvValue()));
+					count += cmeta.getNumDistinct();
+				}
+				if( count > 0 )
+					ret.add("-2 " + colix + " " + String.valueOf(kbuff._sum/count));
+			}
+			//pass-through constant label
+			else if( _encoder.getMethod(colix) == MVMethod.CONSTANT ) {
+				if( iter.hasNext() )
+					ret.add("-2 " + colix + " " + iter.next().getMvValue());
+			}
+			
+			return ret;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 2088e85..051ce58 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -25,6 +25,7 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.io.Serializable;
 import java.lang.ref.SoftReference;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1256,7 +1257,9 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 	/**
 	 * 
 	 */
-	public static class ColumnMetadata {
+	public static class ColumnMetadata implements Serializable {
+		private static final long serialVersionUID = -90094082422100311L;
+		
 		private long _ndistinct = 0;
 		private String _mvValue = null;
 		

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
index fe83627..ad7cbfc 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
@@ -375,8 +375,7 @@ public class BinAgent extends Encoder
 
 	@Override
 	public FrameBlock getMetaData(FrameBlock meta) {
-		// TODO Auto-generated method stub
-		return null;
+		return meta;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
index 344693c..68896ac 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
@@ -30,6 +30,7 @@ import java.util.BitSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -67,15 +68,8 @@ public class MVImputeAgent extends Encoder
 	
 	public enum MVMethod { INVALID, GLOBAL_MEAN, GLOBAL_MODE, CONSTANT };
 	
-	/* 
-	 * Imputation Methods:
-	 * 1 - global_mean
-	 * 2 - global_mode
-	 * 3 - constant
-	 * 
-	 */
-	private byte[] _mvMethodList = null;
-	private byte[] _mvscMethodList = null;	// scaling methods for attributes that are imputed and also scaled
+	private MVMethod[] _mvMethodList = null;
+	private MVMethod[] _mvscMethodList = null;	// scaling methods for attributes that are imputed and also scaled
 	
 	private BitSet _isMVScaled = null;
 	private CM _varFn = CM.getCMFnObject(AggregateOperationTypes.VARIANCE);		// function object that understands variance computation
@@ -86,10 +80,9 @@ public class MVImputeAgent extends Encoder
 	private long[] _countList = null;				// #of non-missing values
 	
 	private CM_COV_Object[] _varList = null;		// column-level variances, computed so far (for scaling)
-	
 
 	private int[] 			_scnomvList = null;			// List of attributes that are scaled but not imputed
-	private byte[]			_scnomvMethodList = null;	// scaling methods: 0 for invalid; 1 for mean-subtraction; 2 for z-scoring
+	private MVMethod[]		_scnomvMethodList = null;	// scaling methods: 0 for invalid; 1 for mean-subtraction; 2 for z-scoring
 	private KahanObject[] 	_scnomvMeanList = null;		// column-level means, for attributes scaled but not imputed
 	private long[] 			_scnomvCountList = null;	// #of non-missing values, for attributes scaled but not imputed
 	private CM_COV_Object[] _scnomvVarList = null;		// column-level variances, computed so far
@@ -97,6 +90,7 @@ public class MVImputeAgent extends Encoder
 	private String[] _replacementList = null;		// replacements: for global_mean, mean; and for global_mode, recode id of mode category
 	private String[] _NAstrings = null;
 	private List<Integer> _rcList = null; 
+	private HashMap<Integer,HashMap<String,Long>> _hist = null;
 	
 	public String[] getReplacements() { return _replacementList; }
 	public KahanObject[] getMeans()   { return _meanList; }
@@ -108,9 +102,16 @@ public class MVImputeAgent extends Encoder
 		throws JSONException
 	{
 		super(null, clen);
+		
+		//handle column list
 		int[] collist = TfMetaUtils.parseJsonObjectIDList(parsedSpec, TfUtils.TXMETHOD_IMPUTE);
 		initColList(collist);
 	
+		//handle method list
+		parseMethodsAndReplacments(parsedSpec);
+		
+		//create reuse histograms
+		_hist = new HashMap<Integer, HashMap<String,Long>>();
 	}
 			
 	public MVImputeAgent(JSONObject parsedSpec, String[] NAstrings, int clen)
@@ -136,7 +137,7 @@ public class MVImputeAgent extends Encoder
 			int mvLength = mvattrs.size();
 			
 			_colList = new int[mvLength];
-			_mvMethodList = new byte[mvLength];
+			_mvMethodList = new MVMethod[mvLength];
 			
 			_meanList = new KahanObject[mvLength];
 			_countList = new long[mvLength];
@@ -147,7 +148,7 @@ public class MVImputeAgent extends Encoder
 			
 			for(int i=0; i < _colList.length; i++) {
 				_colList[i] = UtilFunctions.toInt(mvattrs.get(i));
-				_mvMethodList[i] = (byte) UtilFunctions.toInt(mvmthds.get(i)); 
+				_mvMethodList[i] = MVMethod.values()[UtilFunctions.toInt(mvmthds.get(i))]; 
 				_meanList[i] = new KahanObject(0, 0);
 			}
 			
@@ -173,7 +174,7 @@ public class MVImputeAgent extends Encoder
 		else
 		{
 			if ( _colList != null ) 
-				_mvscMethodList = new byte[_colList.length];
+				_mvscMethodList = new MVMethod[_colList.length];
 			
 			JSONObject scobj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_SCALE);
 			JSONArray scattrs = (JSONArray) scobj.get(TfUtils.JSON_ATTRS);
@@ -195,7 +196,7 @@ public class MVImputeAgent extends Encoder
 				if(mvidx != -1)
 				{
 					_isMVScaled.set(mvidx);
-					_mvscMethodList[mvidx] = mthd;
+					_mvscMethodList[mvidx] = MVMethod.values()[mthd];
 					_varList[mvidx] = new CM_COV_Object();
 				}
 				else
@@ -205,7 +206,7 @@ public class MVImputeAgent extends Encoder
 			if(scnomv > 0)
 			{
 				_scnomvList = new int[scnomv];			
-				_scnomvMethodList = new byte[scnomv];	
+				_scnomvMethodList = new MVMethod[scnomv];	
 	
 				_scnomvMeanList = new KahanObject[scnomv];
 				_scnomvCountList = new long[scnomv];
@@ -219,7 +220,7 @@ public class MVImputeAgent extends Encoder
 					if(isApplicable(colID) == -1)
 					{	// scaled but not imputed
 						_scnomvList[idx] = colID;
-						_scnomvMethodList[idx] = mthd;
+						_scnomvMethodList[idx] = MVMethod.values()[mthd];
 						_scnomvMeanList[idx] = new KahanObject(0, 0);
 						_scnomvVarList[idx] = new CM_COV_Object();
 						idx++;
@@ -229,6 +230,28 @@ public class MVImputeAgent extends Encoder
 		}
 	}
 	
+	/**
+	 * 
+	 * @param parsedSpec
+	 * @throws JSONException
+	 */
+	private void parseMethodsAndReplacments(JSONObject parsedSpec) throws JSONException {
+		JSONArray mvspec = (JSONArray) parsedSpec.get(TfUtils.TXMETHOD_IMPUTE);
+		_mvMethodList = new MVMethod[mvspec.size()];
+		_replacementList = new String[mvspec.size()];
+		_meanList = new KahanObject[mvspec.size()];
+		_countList = new long[mvspec.size()];
+		for(int i=0; i < mvspec.size(); i++) {
+			JSONObject mvobj = (JSONObject)mvspec.get(i);
+			_mvMethodList[i] = MVMethod.valueOf(mvobj.get("method").toString().toUpperCase()); 
+			if( _mvMethodList[i] == MVMethod.CONSTANT ) {
+				_replacementList[i] = mvobj.getString("value").toString();
+			}
+			_meanList[i] = new KahanObject(0, 0);
+		}
+	}
+	
+	
 	public void prepare(String[] words) throws IOException {
 		
 		try {
@@ -242,13 +265,13 @@ public class MVImputeAgent extends Encoder
 				if(!TfUtils.isNA(_NAstrings, w)) {
 					_countList[i]++;
 					
-					boolean computeMean = (_mvMethodList[i] == 1 || _isMVScaled.get(i) );
+					boolean computeMean = (_mvMethodList[i] == MVMethod.GLOBAL_MEAN || _isMVScaled.get(i) );
 					if(computeMean) {
 						// global_mean
 						double d = UtilFunctions.parseToDouble(w);
 						_meanFn.execute2(_meanList[i], d, _countList[i]);
 						
-						if (_isMVScaled.get(i) && _mvscMethodList[i] == 2)
+						if (_isMVScaled.get(i) && _mvscMethodList[i] == MVMethod.GLOBAL_MODE)
 							_varFn.execute(_varList[i], d);
 					}
 					else {
@@ -271,7 +294,7 @@ public class MVImputeAgent extends Encoder
 				double d = UtilFunctions.parseToDouble(w);
 				_scnomvCountList[i]++; 		// not required, this is always equal to total #records processed
 				_meanFn.execute2(_scnomvMeanList[i], d, _scnomvCountList[i]);
-				if(_scnomvMethodList[i] == 2)
+				if(_scnomvMethodList[i] == MVMethod.GLOBAL_MODE)
 					_varFn.execute(_scnomvVarList[i], d);
 			}
 		} catch(Exception e) {
@@ -311,15 +334,15 @@ public class MVImputeAgent extends Encoder
 	
 	private DistinctValue prepMeanOutput(int taskID, int idx, StringBuilder sb, boolean scnomv) throws CharacterCodingException {
 		
-		byte mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]);
+		MVMethod mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]);
 		
-		if ( scnomv || mthd == 1 || _isMVScaled.get(idx) ) {
+		if ( scnomv || mthd == MVMethod.GLOBAL_MEAN || _isMVScaled.get(idx) ) {
 			String suffix = null;
 			if(scnomv)
 				suffix = "scnomv";
-			else if ( mthd ==1 && _isMVScaled.get(idx) )
+			else if ( mthd == MVMethod.GLOBAL_MEAN && _isMVScaled.get(idx) )
 				suffix = "scmv"; 	// both scaled and mv imputed
-			else if ( mthd == 1 )
+			else if ( mthd == MVMethod.GLOBAL_MEAN )
 				suffix = "noscmv";
 			else
 				suffix = "scnomv";
@@ -341,8 +364,8 @@ public class MVImputeAgent extends Encoder
 	}
 	
 	private DistinctValue prepMeanCorrectionOutput(int taskID, int idx, StringBuilder sb, boolean scnomv) throws CharacterCodingException {
-		byte mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]);
-		if ( scnomv || mthd == 1 || _isMVScaled.get(idx) ) {
+		MVMethod mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]);
+		if ( scnomv || mthd == MVMethod.GLOBAL_MEAN || _isMVScaled.get(idx) ) {
 			sb.setLength(0);
 			//CORRECTION_PREFIX + "_" + taskID + "_" + Double.toString(mean._correction);
 			sb.append(CORRECTION_PREFIX);
@@ -357,8 +380,8 @@ public class MVImputeAgent extends Encoder
 	}
 	
 	private DistinctValue prepMeanCountOutput(int taskID, int idx, StringBuilder sb, boolean scnomv) throws CharacterCodingException {
-		byte mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]);
-		if ( scnomv || mthd == 1 || _isMVScaled.get(idx) ) {
+		MVMethod mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]);
+		if ( scnomv || mthd == MVMethod.GLOBAL_MEAN || _isMVScaled.get(idx) ) {
 			sb.setLength(0);
 			//s = COUNT_PREFIX + "_" + taskID + "_" + Long.toString(count);
 			sb.append(COUNT_PREFIX);
@@ -373,8 +396,8 @@ public class MVImputeAgent extends Encoder
 	}
 	
 	private DistinctValue prepTotalCountOutput(int taskID, int idx, StringBuilder sb, boolean scnomv, TfUtils agents) throws CharacterCodingException {
-		byte mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]);
-		if ( scnomv || mthd == 1 || _isMVScaled.get(idx) ) {
+		MVMethod mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]);
+		if ( scnomv || mthd == MVMethod.GLOBAL_MEAN || _isMVScaled.get(idx) ) {
 			sb.setLength(0);
 			//TOTAL_COUNT_PREFIX + "_" + taskID + "_" + Long.toString(TransformationAgent._numValidRecords);
 			sb.append(TOTAL_COUNT_PREFIX);
@@ -390,8 +413,8 @@ public class MVImputeAgent extends Encoder
 	private DistinctValue prepConstantOutput(int idx, StringBuilder sb) throws CharacterCodingException {
 		if ( _mvMethodList == null )
 			return null;
-		byte mthd = _mvMethodList[idx];
-		if ( mthd == 3 ) {
+		MVMethod mthd = _mvMethodList[idx];
+		if ( mthd == MVMethod.CONSTANT ) {
 			sb.setLength(0);
 			sb.append(CONSTANT_PREFIX);
 			sb.append("_");
@@ -402,7 +425,7 @@ public class MVImputeAgent extends Encoder
 	}
 	
 	private DistinctValue prepVarOutput(int taskID, int idx, StringBuilder sb, boolean scnomv) throws CharacterCodingException {
-		if ( scnomv || _isMVScaled.get(idx) && _mvscMethodList[idx] == 2 ) {
+		if ( scnomv || _isMVScaled.get(idx) && _mvscMethodList[idx] == MVMethod.GLOBAL_MODE ) {
 			sb.setLength(0);
 			sb.append(VARIANCE_PREFIX);
 			sb.append("_");
@@ -560,7 +583,7 @@ public class MVImputeAgent extends Encoder
 					
 					double imputedValue = Double.NaN;
 					KahanObject gmean = null;
-					if ( _mvMethodList[i] == 1 ) 
+					if ( _mvMethodList[i] == MVMethod.GLOBAL_MEAN ) 
 					{
 						gmean = _meanList[i];
 						imputedValue = _meanList[i]._sum;
@@ -568,7 +591,7 @@ public class MVImputeAgent extends Encoder
 						double mean = ( _countList[i] == 0 ? 0.0 : _meanList[i]._sum); 
 						writeTfMtd(colID, Double.toString(mean), outputDir, fs, agents);
 					}
-					else if ( _mvMethodList[i] == 3 ) 
+					else if ( _mvMethodList[i] == MVMethod.CONSTANT ) 
 					{
 						writeTfMtd(colID, _replacementList[i], outputDir, fs, agents);
 						
@@ -584,7 +607,7 @@ public class MVImputeAgent extends Encoder
 					if ( _isMVScaled.get(i) ) 
 					{
 						double sdev = -1.0;
-						if ( _mvscMethodList[i] == 2 ) {
+						if ( _mvscMethodList[i] == MVMethod.GLOBAL_MODE ) {
 							// Adjust variance with missing values
 							long totalMissingCount = (agents.getValid() - _countList[i]);
 							_varFn.execute(_varList[i], imputedValue, totalMissingCount);
@@ -601,7 +624,7 @@ public class MVImputeAgent extends Encoder
 					int colID = _scnomvList[i];
 					double mean = (_scnomvCountList[i] == 0 ? 0.0 : _scnomvMeanList[i]._sum);
 					double sdev = -1.0;
-					if ( _scnomvMethodList[i] == 2 ) 
+					if ( _scnomvMethodList[i] == MVMethod.GLOBAL_MODE ) 
 					{
 						double var = _scnomvVarList[i].getRequiredResult(new CMOperator(_varFn, AggregateOperationTypes.VARIANCE));
 						sdev = Math.sqrt(var);
@@ -788,7 +811,7 @@ public class MVImputeAgent extends Encoder
 					// since missing values themselves are replaced with gmean.
 					long totalMissingCount = (totalRecordCount-totalValidCount);
 					int idx = isApplicable(colID);
-					if(idx != -1 && _mvMethodList[idx] == 3) 
+					if(idx != -1 && _mvMethodList[idx] == MVMethod.CONSTANT) 
 						_meanFn.execute(gmean, UtilFunctions.parseToDouble(_replacementList[idx]), totalRecordCount);
 					_varFn.execute(gcm, gmean._sum, totalMissingCount);
 				}
@@ -863,10 +886,10 @@ public class MVImputeAgent extends Encoder
 				for(int i=0; i<_colList.length;i++) {
 					int colID = _colList[i];
 					
-					if ( _mvMethodList[i] == 1 || _mvMethodList[i] == 2 )
+					if ( _mvMethodList[i] == MVMethod.GLOBAL_MEAN || _mvMethodList[i] == MVMethod.GLOBAL_MODE )
 						// global_mean or global_mode
 						_replacementList[i] = readReplacement(colID, fs, tfMtdDir, agents);
-					else if ( _mvMethodList[i] == 3 ) {
+					else if ( _mvMethodList[i] == MVMethod.CONSTANT ) {
 						// constant: replace a missing value by a given constant
 						// nothing to do. The constant values are loaded already during configure 
 					}
@@ -894,15 +917,8 @@ public class MVImputeAgent extends Encoder
 		int idx = isApplicable(colID);		
 		if(idx == -1)
 			return MVMethod.INVALID;
-		
-		switch(_mvMethodList[idx])
-		{
-			case 1: return MVMethod.GLOBAL_MEAN;
-			case 2: return MVMethod.GLOBAL_MODE;
-			case 3: return MVMethod.CONSTANT;
-			default: return MVMethod.INVALID;
-		}
-		
+		else
+			return _mvMethodList[idx];
 	}
 	
 	public long getNonMVCount(int colID) {
@@ -917,14 +933,48 @@ public class MVImputeAgent extends Encoder
 	
 	@Override
 	public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
-		// TODO Auto-generated method stub
-		return null;
+		build(in);
+		return apply(in, out);
 	}
 	
 	@Override
 	public void build(FrameBlock in) {
-		// TODO Auto-generated method stub
-		
+		try {
+			for( int j=0; j<_colList.length; j++ ) {
+				int colID = _colList[j];
+				if( _mvMethodList[j] == MVMethod.GLOBAL_MEAN ) {
+					//compute global column mean (scale)
+					long off = _countList[j];
+					for( int i=0; i<in.getNumRows(); i++ )
+						_meanFn.execute2(_meanList[j], UtilFunctions.objectToDouble(
+							in.getSchema().get(colID-1), in.get(i, colID-1)), off+i+1);
+					_replacementList[j] = String.valueOf(_meanList[j]._sum);
+					_countList[j] += in.getNumRows();
+				}
+				else if( _mvMethodList[j] == MVMethod.GLOBAL_MODE ) {
+					//compute global column mode (categorical), i.e., most frequent category
+					HashMap<String,Long> hist = _hist.containsKey(colID) ? 
+							_hist.get(colID) : new HashMap<String,Long>();
+					for( int i=0; i<in.getNumRows(); i++ ) {
+						String key = String.valueOf(in.get(i, colID-1));
+						if( key != null && !key.isEmpty() ) {
+							Long val = hist.get(key);
+							hist.put(key, (val!=null) ? val+1 : 1);
+						}	
+					}
+					_hist.put(colID, hist);
+					long max = Long.MIN_VALUE; 
+					for( Entry<String, Long> e : hist.entrySet() ) 
+						if( e.getValue() > max  ) {
+							_replacementList[j] = e.getKey();
+							max = e.getValue();
+						}
+				}
+			}
+		}
+		catch(Exception ex) {
+			throw new RuntimeException(ex);
+		}
 	}
 
 	@Override
@@ -938,7 +988,7 @@ public class MVImputeAgent extends Encoder
 					w = words[colID-1] = _replacementList[i];
 				
 				if ( _isMVScaled.get(i) )
-					if ( _mvscMethodList[i] == 1 )
+					if ( _mvscMethodList[i] == MVMethod.GLOBAL_MEAN )
 						words[colID-1] = Double.toString( UtilFunctions.parseToDouble(w) - _meanList[i]._sum );
 					else
 						words[colID-1] = Double.toString( (UtilFunctions.parseToDouble(w) - _meanList[i]._sum) / _varList[i].mean._sum );
@@ -948,7 +998,7 @@ public class MVImputeAgent extends Encoder
 		for(int i=0; i < _scnomvList.length; i++)
 		{
 			int colID = _scnomvList[i];
-			if ( _scnomvMethodList[i] == 1 )
+			if ( _scnomvMethodList[i] == MVMethod.GLOBAL_MEAN )
 				words[colID-1] = Double.toString( UtilFunctions.parseToDouble(words[colID-1]) - _scnomvMeanList[i]._sum );
 			else
 				words[colID-1] = Double.toString( (UtilFunctions.parseToDouble(words[colID-1]) - _scnomvMeanList[i]._sum) / _scnomvVarList[i].mean._sum );
@@ -971,8 +1021,11 @@ public class MVImputeAgent extends Encoder
 	
 	@Override
 	public FrameBlock getMetaData(FrameBlock out) {
-		// TODO Auto-generated method stub
-		return null;
+		for( int j=0; j<_colList.length; j++ ) {
+			out.getColumnMetadata(_colList[j]-1)
+			   .setMvValue(_replacementList[j]);
+		}
+		return out;
 	}
 	
 	/**
@@ -983,14 +1036,13 @@ public class MVImputeAgent extends Encoder
 	public void initMetaData(FrameBlock meta) {
 		//init replacement lists, replace recoded values to
 		//apply mv imputation potentially after recoding
-		_replacementList = new String[_colList.length];
 		for( int j=0; j<_colList.length; j++ ) {
-			int colID = _colList[j];
+			int colID = _colList[j];	
 			String mvVal = UtilFunctions.unquote(meta.getColumnMetadata(colID-1).getMvValue()); 
 			if( _rcList.contains(colID) ) {
 				Long mvVal2 = meta.getRecodeMap(colID-1).get(mvVal);
-				if( mvVal2 == null) 
-					throw new RuntimeException("Missing recode value for impute value '"+mvVal+"'.");
+				if( mvVal2 == null)
+					throw new RuntimeException("Missing recode value for impute value '"+mvVal+"' (colID="+colID+").");
 				_replacementList[j] = mvVal2.toString();
 			}
 			else {
@@ -1006,4 +1058,14 @@ public class MVImputeAgent extends Encoder
 	public void initRecodeIDList(List<Integer> rcList) {
 		_rcList = rcList;
 	}
+	
+	/**
+	 * Exposes the internal histogram after build.
+	 * 
+	 * @param colID
+	 * @return
+	 */
+	public HashMap<String,Long> getHistogram( int colID ) {
+		return _hist.get(colID);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
index 01d7c85..5abe9db 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
@@ -412,7 +412,7 @@ public class RecodeAgent extends Encoder
 				//probe and build column map
 				HashMap<String,Long> map = _rcdMaps.get(colID);
 				String key = row[colID-1];
-				if( !map.containsKey(key) )
+				if( key!=null && !key.isEmpty() && !map.containsKey(key) )
 					map.put(key, new Long(map.size()+1));
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
index bafa655..d6bf9d4 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
@@ -78,10 +78,10 @@ public class EncoderComposite extends Encoder
 		
 		//propagate meta data 
 		_meta = new FrameBlock(in.getNumColumns(), ValueType.STRING);
-		for( Encoder encoder : _encoders ) {
-			encoder.initMetaData(_meta);
+		for( Encoder encoder : _encoders )
 			_meta = encoder.getMetaData(_meta);
-		}
+		for( Encoder encoder : _encoders )
+			encoder.initMetaData(_meta);
 		
 		//apply meta data
 		for( Encoder encoder : _encoders )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 5e6e5b7..6bce4ff 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -369,7 +369,7 @@ public class UtilFunctions
 	public static double objectToDouble(ValueType vt, Object in) {
 		if( in == null )  return 0;
 		switch( vt ) {
-			case STRING:  return Double.parseDouble((String)in);
+			case STRING:  return !((String)in).isEmpty() ? Double.parseDouble((String)in) : 0;
 			case BOOLEAN: return ((Boolean)in)?1d:0d;
 			case INT:     return (Long)in;
 			case DOUBLE:  return (Double)in;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeApplyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeApplyTest.java b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeApplyTest.java
new file mode 100644
index 0000000..b61060b
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeApplyTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.transform;
+
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.io.MatrixReaderFactory;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class TransformFrameEncodeApplyTest extends AutomatedTestBase 
+{
+	private final static String TEST_NAME1 = "TransformFrameEncodeApply";
+	private final static String TEST_DIR = "functions/transform/";
+	private final static String TEST_CLASS_DIR = TEST_DIR + TransformFrameEncodeApplyTest.class.getSimpleName() + "/";
+	
+	//dataset and transform tasks without missing values
+	private final static String DATASET1 	= "homes3/homes.csv";
+	private final static String SPEC1 		= "homes3/homes.tfspec_recode.json"; 
+	private final static String SPEC2 		= "homes3/homes.tfspec_dummy.json";
+	private final static String SPEC3 		= "homes3/homes.tfspec_bin.json"; //incl recode
+	
+	//dataset and transform tasks with missing values
+	private final static String DATASET2 	= "homes/homes.csv";
+	private final static String SPEC4 		= "homes3/homes.tfspec_impute.json";
+	private final static String SPEC5 		= "homes3/homes.tfspec_omit.json";
+	
+	public enum TransformType {
+		RECODE,
+		DUMMY,
+		BIN,
+		IMPUTE,
+		OMIT,
+	}
+	
+	@Override
+	public void setUp()  {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "y" }) );
+	}
+	
+	@Test
+	public void testHomesRecodeSingleNodeCSV() {
+		runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.RECODE);
+	}
+	
+	@Test
+	public void testHomesRecodeSparkCSV() {
+		runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.RECODE);
+	}
+	
+	@Test
+	public void testHomesDummycodeSingleNodeCSV() {
+		runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.DUMMY);
+	}
+	
+	@Test
+	public void testHomesDummycodeSparkCSV() {
+		runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.DUMMY);
+	}
+	
+	@Test
+	public void testHomesBinningSingleNodeCSV() {
+		runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.BIN);
+	}
+	
+	@Test
+	public void testHomesBinningSparkCSV() {
+		runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.BIN);
+	}
+	
+	@Test
+	public void testHomesOmitSingleNodeCSV() {
+		runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.OMIT);
+	}
+	
+	@Test
+	public void testHomesOmitSparkCSV() {
+		runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.OMIT);
+	}
+	
+	@Test
+	public void testHomesImputeSingleNodeCSV() {
+		runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.IMPUTE);
+	}
+	
+	@Test
+	public void testHomesImputeSparkCSV() {
+		runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.IMPUTE);
+	}
+
+	/**
+	 * 
+	 * @param rt
+	 * @param ofmt
+	 * @param dataset
+	 */
+	private void runTransformTest( RUNTIME_PLATFORM rt, String ofmt, TransformType type )
+	{
+		//set runtime platform
+		RUNTIME_PLATFORM rtold = rtplatform;
+		boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
+		rtplatform = rt;
+
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		if( rtplatform == RUNTIME_PLATFORM.SPARK || rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK)
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+		//set transform specification
+		String SPEC = null; String DATASET = null;
+		switch( type ) {
+			case RECODE: SPEC = SPEC1; DATASET = DATASET1; break;
+			case DUMMY:  SPEC = SPEC2; DATASET = DATASET1; break;
+			case BIN:    SPEC = SPEC3; DATASET = DATASET1; break;
+			case IMPUTE: SPEC = SPEC4; DATASET = DATASET2; break;
+			case OMIT:   SPEC = SPEC5; DATASET = DATASET2; break;
+		}
+
+		if( !ofmt.equals("csv") )
+			throw new RuntimeException("Unsupported test output format");
+		
+		try
+		{
+			getAndLoadTestConfiguration(TEST_NAME1);
+			
+			String HOME = SCRIPT_DIR + TEST_DIR;
+			fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+			programArgs = new String[]{"-explain","-nvargs", 
+				"DATA=" + HOME + "input/" + DATASET,
+				"TFSPEC=" + HOME + "input/" + SPEC,
+				"TFDATA1=" + output("tfout1"),
+				"TFDATA2=" + output("tfout2"),
+				"OFMT=" + ofmt };
+	
+			OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
+			runTest(true, false, null, -1); 
+			
+			//read input/output and compare
+			double[][] R1 = DataConverter.convertToDoubleMatrix(MatrixReaderFactory
+				.createMatrixReader(InputInfo.CSVInputInfo)
+				.readMatrixFromHDFS(output("tfout1"), -1L, -1L, 1000, 1000, -1));
+			double[][] R2 = DataConverter.convertToDoubleMatrix(MatrixReaderFactory
+				.createMatrixReader(InputInfo.CSVInputInfo)
+				.readMatrixFromHDFS(output("tfout2"), -1L, -1L, 1000, 1000, -1));
+			TestUtils.compareMatrices(R1, R2, R1.length, R1[0].length, 0);			
+		}
+		catch(Exception ex) {
+			throw new RuntimeException(ex);
+		}
+		finally {
+			rtplatform = rtold;
+			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+			OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/test/scripts/functions/transform/TransformFrameEncodeApply.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/transform/TransformFrameEncodeApply.dml b/src/test/scripts/functions/transform/TransformFrameEncodeApply.dml
new file mode 100644
index 0000000..08c98d0
--- /dev/null
+++ b/src/test/scripts/functions/transform/TransformFrameEncodeApply.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+F1 = read($DATA, data_type="frame", format="csv");
+
+jspec = read($TFSPEC, data_type="scalar", value_type="string");
+
+[X, M] = transformencode(target=F1, spec=jspec);
+
+if(1==1){}
+
+X2 = transformapply(target=F1, spec=jspec, meta=M);
+
+write(X, $TFDATA1, format=$OFMT);
+write(X2, $TFDATA2, format=$OFMT);
+

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
index 5122a60..bdcc36b 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
@@ -31,6 +31,7 @@ import org.junit.runners.Suite;
 	TransformAndApplyTest.class,
 	TransformEncodeDecodeTest.class,
 	TransformFrameApplyTest.class,
+	TransformFrameEncodeApplyTest.class,
 	TransformFrameEncodeDecodeTest.class,
 	TransformReadMetaTest.class,
 	TransformTest.class,