You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/07/08 18:06:02 UTC
[2/2] incubator-systemml git commit: [SYSTEMML-569] Extended spark
transformencode on frames (mv, omit, bin)
[SYSTEMML-569] Extended spark transformencode on frames (mv, omit, bin)
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/172bfcac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/172bfcac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/172bfcac
Branch: refs/heads/master
Commit: 172bfcacc0b260c49c18c9e26d2dfc81f7e3051e
Parents: 12f2da9
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Jul 7 21:52:31 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Jul 8 10:59:18 2016 -0700
----------------------------------------------------------------------
.../org/apache/sysml/parser/DataExpression.java | 1 +
...ReturnParameterizedBuiltinSPInstruction.java | 156 +++++++++++++++-
.../sysml/runtime/matrix/data/FrameBlock.java | 5 +-
.../sysml/runtime/transform/BinAgent.java | 3 +-
.../sysml/runtime/transform/MVImputeAgent.java | 184 +++++++++++++------
.../sysml/runtime/transform/RecodeAgent.java | 2 +-
.../transform/encode/EncoderComposite.java | 6 +-
.../sysml/runtime/util/UtilFunctions.java | 2 +-
.../TransformFrameEncodeApplyTest.java | 178 ++++++++++++++++++
.../transform/TransformFrameEncodeApply.dml | 34 ++++
.../functions/transform/ZPackageSuite.java | 1 +
11 files changed, 498 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/parser/DataExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/DataExpression.java b/src/main/java/org/apache/sysml/parser/DataExpression.java
index c33f965..96089f0 100644
--- a/src/main/java/org/apache/sysml/parser/DataExpression.java
+++ b/src/main/java/org/apache/sysml/parser/DataExpression.java
@@ -763,6 +763,7 @@ public class DataExpression extends DataIdentifier
// if the MTD file exists, check the values specified in read statement match values in metadata MTD file
if (configObject != null){
parseMetaDataFileParameters(mtdFileName, configObject, conditional);
+ inferredFormatType = true;
}
else {
LOG.warn("Metadata file: " + new Path(mtdFileName) + " not provided");
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index fc9e9ce..a53f673 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
import org.apache.spark.Accumulator;
import org.apache.spark.AccumulatorParam;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
@@ -38,8 +39,10 @@ import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
import org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction.RDDTransformApplyFunction;
import org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction.RDDTransformApplyOffsetFunction;
import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
@@ -48,10 +51,13 @@ import org.apache.sysml.runtime.io.FrameReader;
import org.apache.sysml.runtime.io.FrameReaderFactory;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.FrameBlock.ColumnMetadata;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.transform.MVImputeAgent;
+import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
import org.apache.sysml.runtime.transform.RecodeAgent;
import org.apache.sysml.runtime.transform.encode.Encoder;
import org.apache.sysml.runtime.transform.encode.EncoderComposite;
@@ -126,12 +132,19 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
fo.getSchema(), (int)fo.getNumColumns(), null);
Accumulator<Long> accMax = sec.getSparkContext().accumulator(0L, new MaxAcc());
- in.mapPartitionsToPair(new TransformEncodeBuildFunction(encoderBuild))
- .distinct().groupByKey()
- .flatMap(new TransformEncodeGroupFunction(accMax))
- .saveAsTextFile(fometa.getFileName()); //trigger eval
+ JavaRDD<String> rcMaps = in
+ .mapPartitionsToPair(new TransformEncodeBuildFunction(encoderBuild))
+ .distinct().groupByKey()
+ .flatMap(new TransformEncodeGroupFunction(accMax));
+ if( containsMVImputeEncoder(encoderBuild) ) {
+ MVImputeAgent mva = getMVImputeEncoder(encoderBuild);
+ rcMaps = rcMaps.union(
+ in.mapPartitionsToPair(new TransformEncodeBuild2Function(mva))
+ .groupByKey().flatMap(new TransformEncodeGroup2Function(mva)) );
+ }
+ rcMaps.saveAsTextFile(fometa.getFileName()); //trigger eval
- //reuse multi-threaded reader
+ //consolidate meta data frame (reuse multi-threaded reader, special handling missing values)
FrameReader reader = FrameReaderFactory.createFrameReader(InputInfo.TextCellInputInfo);
FrameBlock meta = reader.readFrameFromHDFS(fometa.getFileName(), accMax.value(), fo.getNumColumns());
meta.recomputeColumnCardinality(); //recompute num distinct items per column
@@ -169,6 +182,32 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
/**
*
+ * @param encoder
+ * @return
+ */
+ private boolean containsMVImputeEncoder(Encoder encoder) {
+ if( encoder instanceof EncoderComposite )
+ for( Encoder cencoder : ((EncoderComposite)encoder).getEncoders() )
+ if( cencoder instanceof MVImputeAgent )
+ return true;
+ return false;
+ }
+
+ /**
+ *
+ * @param encoder
+ * @return
+ */
+ private MVImputeAgent getMVImputeEncoder(Encoder encoder) {
+ if( encoder instanceof EncoderComposite )
+ for( Encoder cencoder : ((EncoderComposite)encoder).getEncoders() )
+ if( cencoder instanceof MVImputeAgent )
+ return (MVImputeAgent) cencoder;
+ return null;
+ }
+
+ /**
+ *
*/
public static class TransformEncodeBuildFunction implements PairFlatMapFunction<Iterator<Tuple2<Long, FrameBlock>>, Integer, String>
{
@@ -266,4 +305,111 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
return Math.max(arg0, arg1);
}
}
+
+ /**
+ *
+ */
+ public static class TransformEncodeBuild2Function implements PairFlatMapFunction<Iterator<Tuple2<Long, FrameBlock>>, Integer, ColumnMetadata>
+ {
+ private static final long serialVersionUID = 6336375833412029279L;
+
+ private MVImputeAgent _encoder = null;
+
+ public TransformEncodeBuild2Function(MVImputeAgent encoder) {
+ _encoder = encoder;
+ }
+
+ @Override
+ public Iterable<Tuple2<Integer, ColumnMetadata>> call(Iterator<Tuple2<Long, FrameBlock>> iter)
+ throws Exception
+ {
+ //build meta data (e.g., histograms and means)
+ while( iter.hasNext() ) {
+ FrameBlock block = iter.next()._2();
+ _encoder.build(block);
+ }
+
+ //extract meta data
+ ArrayList<Tuple2<Integer,ColumnMetadata>> ret = new ArrayList<Tuple2<Integer,ColumnMetadata>>();
+ int[] collist = _encoder.getColList();
+ for( int j=0; j<collist.length; j++ ) {
+ if( _encoder.getMethod(collist[j]) == MVMethod.GLOBAL_MODE ) {
+ HashMap<String,Long> hist = _encoder.getHistogram(collist[j]);
+ for( Entry<String,Long> e : hist.entrySet() )
+ ret.add(new Tuple2<Integer,ColumnMetadata>(collist[j],
+ new ColumnMetadata(e.getValue(), e.getKey())));
+ }
+ else if( _encoder.getMethod(collist[j]) == MVMethod.GLOBAL_MEAN ) {
+ ret.add(new Tuple2<Integer,ColumnMetadata>(collist[j],
+ new ColumnMetadata(_encoder.getNonMVCount(collist[j]), String.valueOf(_encoder.getMeans()[j]._sum))));
+ }
+ else if( _encoder.getMethod(collist[j]) == MVMethod.CONSTANT ) {
+ ret.add(new Tuple2<Integer,ColumnMetadata>(collist[j],
+ new ColumnMetadata(0, _encoder.getReplacement(collist[j]))));
+ }
+ }
+
+ return ret;
+ }
+ }
+
+ /**
+ *
+ */
+ public static class TransformEncodeGroup2Function implements FlatMapFunction<Tuple2<Integer, Iterable<ColumnMetadata>>, String>
+ {
+ private static final long serialVersionUID = 702100641492347459L;
+
+ private MVImputeAgent _encoder = null;
+
+ public TransformEncodeGroup2Function(MVImputeAgent encoder) {
+ _encoder = encoder;
+ }
+
+ @Override
+ public Iterable<String> call(Tuple2<Integer, Iterable<ColumnMetadata>> arg0)
+ throws Exception
+ {
+ int colix = arg0._1();
+ Iterator<ColumnMetadata> iter = arg0._2().iterator();
+ ArrayList<String> ret = new ArrayList<String>();
+
+ //compute global mode of categorical feature, i.e., value with highest frequency
+ if( _encoder.getMethod(colix) == MVMethod.GLOBAL_MODE ) {
+ HashMap<String, Long> hist = new HashMap<String,Long>();
+ while( iter.hasNext() ) {
+ ColumnMetadata cmeta = iter.next();
+ Long tmp = hist.get(cmeta.getMvValue());
+ hist.put(cmeta.getMvValue(), cmeta.getNumDistinct() + ((tmp!=null)?tmp:0));
+ }
+ long max = Long.MIN_VALUE; String mode = null;
+ for( Entry<String, Long> e : hist.entrySet() )
+ if( e.getValue() > max ) {
+ mode = e.getKey();
+ max = e.getValue();
+ }
+ ret.add("-2 " + colix + " " + mode);
+ }
+ //compute global mean of categorical feature
+ else if( _encoder.getMethod(colix) == MVMethod.GLOBAL_MEAN ) {
+ KahanObject kbuff = new KahanObject(0, 0);
+ KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+ int count = 0;
+ while( iter.hasNext() ) {
+ ColumnMetadata cmeta = iter.next();
+ kplus.execute2(kbuff, Double.parseDouble(cmeta.getMvValue()));
+ count += cmeta.getNumDistinct();
+ }
+ if( count > 0 )
+ ret.add("-2 " + colix + " " + String.valueOf(kbuff._sum/count));
+ }
+ //pass-through constant label
+ else if( _encoder.getMethod(colix) == MVMethod.CONSTANT ) {
+ if( iter.hasNext() )
+ ret.add("-2 " + colix + " " + iter.next().getMvValue());
+ }
+
+ return ret;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 2088e85..051ce58 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -25,6 +25,7 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.io.Serializable;
import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Arrays;
@@ -1256,7 +1257,9 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
/**
*
*/
- public static class ColumnMetadata {
+ public static class ColumnMetadata implements Serializable {
+ private static final long serialVersionUID = -90094082422100311L;
+
private long _ndistinct = 0;
private String _mvValue = null;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
index fe83627..ad7cbfc 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
@@ -375,8 +375,7 @@ public class BinAgent extends Encoder
@Override
public FrameBlock getMetaData(FrameBlock meta) {
- // TODO Auto-generated method stub
- return null;
+ return meta;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
index 344693c..68896ac 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
@@ -30,6 +30,7 @@ import java.util.BitSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map.Entry;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -67,15 +68,8 @@ public class MVImputeAgent extends Encoder
public enum MVMethod { INVALID, GLOBAL_MEAN, GLOBAL_MODE, CONSTANT };
- /*
- * Imputation Methods:
- * 1 - global_mean
- * 2 - global_mode
- * 3 - constant
- *
- */
- private byte[] _mvMethodList = null;
- private byte[] _mvscMethodList = null; // scaling methods for attributes that are imputed and also scaled
+ private MVMethod[] _mvMethodList = null;
+ private MVMethod[] _mvscMethodList = null; // scaling methods for attributes that are imputed and also scaled
private BitSet _isMVScaled = null;
private CM _varFn = CM.getCMFnObject(AggregateOperationTypes.VARIANCE); // function object that understands variance computation
@@ -86,10 +80,9 @@ public class MVImputeAgent extends Encoder
private long[] _countList = null; // #of non-missing values
private CM_COV_Object[] _varList = null; // column-level variances, computed so far (for scaling)
-
private int[] _scnomvList = null; // List of attributes that are scaled but not imputed
- private byte[] _scnomvMethodList = null; // scaling methods: 0 for invalid; 1 for mean-subtraction; 2 for z-scoring
+ private MVMethod[] _scnomvMethodList = null; // scaling methods: 0 for invalid; 1 for mean-subtraction; 2 for z-scoring
private KahanObject[] _scnomvMeanList = null; // column-level means, for attributes scaled but not imputed
private long[] _scnomvCountList = null; // #of non-missing values, for attributes scaled but not imputed
private CM_COV_Object[] _scnomvVarList = null; // column-level variances, computed so far
@@ -97,6 +90,7 @@ public class MVImputeAgent extends Encoder
private String[] _replacementList = null; // replacements: for global_mean, mean; and for global_mode, recode id of mode category
private String[] _NAstrings = null;
private List<Integer> _rcList = null;
+ private HashMap<Integer,HashMap<String,Long>> _hist = null;
public String[] getReplacements() { return _replacementList; }
public KahanObject[] getMeans() { return _meanList; }
@@ -108,9 +102,16 @@ public class MVImputeAgent extends Encoder
throws JSONException
{
super(null, clen);
+
+ //handle column list
int[] collist = TfMetaUtils.parseJsonObjectIDList(parsedSpec, TfUtils.TXMETHOD_IMPUTE);
initColList(collist);
+ //handle method list
+ parseMethodsAndReplacments(parsedSpec);
+
+ //create reuse histograms
+ _hist = new HashMap<Integer, HashMap<String,Long>>();
}
public MVImputeAgent(JSONObject parsedSpec, String[] NAstrings, int clen)
@@ -136,7 +137,7 @@ public class MVImputeAgent extends Encoder
int mvLength = mvattrs.size();
_colList = new int[mvLength];
- _mvMethodList = new byte[mvLength];
+ _mvMethodList = new MVMethod[mvLength];
_meanList = new KahanObject[mvLength];
_countList = new long[mvLength];
@@ -147,7 +148,7 @@ public class MVImputeAgent extends Encoder
for(int i=0; i < _colList.length; i++) {
_colList[i] = UtilFunctions.toInt(mvattrs.get(i));
- _mvMethodList[i] = (byte) UtilFunctions.toInt(mvmthds.get(i));
+ _mvMethodList[i] = MVMethod.values()[UtilFunctions.toInt(mvmthds.get(i))];
_meanList[i] = new KahanObject(0, 0);
}
@@ -173,7 +174,7 @@ public class MVImputeAgent extends Encoder
else
{
if ( _colList != null )
- _mvscMethodList = new byte[_colList.length];
+ _mvscMethodList = new MVMethod[_colList.length];
JSONObject scobj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_SCALE);
JSONArray scattrs = (JSONArray) scobj.get(TfUtils.JSON_ATTRS);
@@ -195,7 +196,7 @@ public class MVImputeAgent extends Encoder
if(mvidx != -1)
{
_isMVScaled.set(mvidx);
- _mvscMethodList[mvidx] = mthd;
+ _mvscMethodList[mvidx] = MVMethod.values()[mthd];
_varList[mvidx] = new CM_COV_Object();
}
else
@@ -205,7 +206,7 @@ public class MVImputeAgent extends Encoder
if(scnomv > 0)
{
_scnomvList = new int[scnomv];
- _scnomvMethodList = new byte[scnomv];
+ _scnomvMethodList = new MVMethod[scnomv];
_scnomvMeanList = new KahanObject[scnomv];
_scnomvCountList = new long[scnomv];
@@ -219,7 +220,7 @@ public class MVImputeAgent extends Encoder
if(isApplicable(colID) == -1)
{ // scaled but not imputed
_scnomvList[idx] = colID;
- _scnomvMethodList[idx] = mthd;
+ _scnomvMethodList[idx] = MVMethod.values()[mthd];
_scnomvMeanList[idx] = new KahanObject(0, 0);
_scnomvVarList[idx] = new CM_COV_Object();
idx++;
@@ -229,6 +230,28 @@ public class MVImputeAgent extends Encoder
}
}
+ /**
+ *
+ * @param parsedSpec
+ * @throws JSONException
+ */
+ private void parseMethodsAndReplacments(JSONObject parsedSpec) throws JSONException {
+ JSONArray mvspec = (JSONArray) parsedSpec.get(TfUtils.TXMETHOD_IMPUTE);
+ _mvMethodList = new MVMethod[mvspec.size()];
+ _replacementList = new String[mvspec.size()];
+ _meanList = new KahanObject[mvspec.size()];
+ _countList = new long[mvspec.size()];
+ for(int i=0; i < mvspec.size(); i++) {
+ JSONObject mvobj = (JSONObject)mvspec.get(i);
+ _mvMethodList[i] = MVMethod.valueOf(mvobj.get("method").toString().toUpperCase());
+ if( _mvMethodList[i] == MVMethod.CONSTANT ) {
+ _replacementList[i] = mvobj.getString("value").toString();
+ }
+ _meanList[i] = new KahanObject(0, 0);
+ }
+ }
+
+
public void prepare(String[] words) throws IOException {
try {
@@ -242,13 +265,13 @@ public class MVImputeAgent extends Encoder
if(!TfUtils.isNA(_NAstrings, w)) {
_countList[i]++;
- boolean computeMean = (_mvMethodList[i] == 1 || _isMVScaled.get(i) );
+ boolean computeMean = (_mvMethodList[i] == MVMethod.GLOBAL_MEAN || _isMVScaled.get(i) );
if(computeMean) {
// global_mean
double d = UtilFunctions.parseToDouble(w);
_meanFn.execute2(_meanList[i], d, _countList[i]);
- if (_isMVScaled.get(i) && _mvscMethodList[i] == 2)
+ if (_isMVScaled.get(i) && _mvscMethodList[i] == MVMethod.GLOBAL_MODE)
_varFn.execute(_varList[i], d);
}
else {
@@ -271,7 +294,7 @@ public class MVImputeAgent extends Encoder
double d = UtilFunctions.parseToDouble(w);
_scnomvCountList[i]++; // not required, this is always equal to total #records processed
_meanFn.execute2(_scnomvMeanList[i], d, _scnomvCountList[i]);
- if(_scnomvMethodList[i] == 2)
+ if(_scnomvMethodList[i] == MVMethod.GLOBAL_MODE)
_varFn.execute(_scnomvVarList[i], d);
}
} catch(Exception e) {
@@ -311,15 +334,15 @@ public class MVImputeAgent extends Encoder
private DistinctValue prepMeanOutput(int taskID, int idx, StringBuilder sb, boolean scnomv) throws CharacterCodingException {
- byte mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]);
+ MVMethod mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]);
- if ( scnomv || mthd == 1 || _isMVScaled.get(idx) ) {
+ if ( scnomv || mthd == MVMethod.GLOBAL_MEAN || _isMVScaled.get(idx) ) {
String suffix = null;
if(scnomv)
suffix = "scnomv";
- else if ( mthd ==1 && _isMVScaled.get(idx) )
+ else if ( mthd == MVMethod.GLOBAL_MEAN && _isMVScaled.get(idx) )
suffix = "scmv"; // both scaled and mv imputed
- else if ( mthd == 1 )
+ else if ( mthd == MVMethod.GLOBAL_MEAN )
suffix = "noscmv";
else
suffix = "scnomv";
@@ -341,8 +364,8 @@ public class MVImputeAgent extends Encoder
}
private DistinctValue prepMeanCorrectionOutput(int taskID, int idx, StringBuilder sb, boolean scnomv) throws CharacterCodingException {
- byte mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]);
- if ( scnomv || mthd == 1 || _isMVScaled.get(idx) ) {
+ MVMethod mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]);
+ if ( scnomv || mthd == MVMethod.GLOBAL_MEAN || _isMVScaled.get(idx) ) {
sb.setLength(0);
//CORRECTION_PREFIX + "_" + taskID + "_" + Double.toString(mean._correction);
sb.append(CORRECTION_PREFIX);
@@ -357,8 +380,8 @@ public class MVImputeAgent extends Encoder
}
private DistinctValue prepMeanCountOutput(int taskID, int idx, StringBuilder sb, boolean scnomv) throws CharacterCodingException {
- byte mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]);
- if ( scnomv || mthd == 1 || _isMVScaled.get(idx) ) {
+ MVMethod mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]);
+ if ( scnomv || mthd == MVMethod.GLOBAL_MEAN || _isMVScaled.get(idx) ) {
sb.setLength(0);
//s = COUNT_PREFIX + "_" + taskID + "_" + Long.toString(count);
sb.append(COUNT_PREFIX);
@@ -373,8 +396,8 @@ public class MVImputeAgent extends Encoder
}
private DistinctValue prepTotalCountOutput(int taskID, int idx, StringBuilder sb, boolean scnomv, TfUtils agents) throws CharacterCodingException {
- byte mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]);
- if ( scnomv || mthd == 1 || _isMVScaled.get(idx) ) {
+ MVMethod mthd = (scnomv ? _scnomvMethodList[idx] : _mvMethodList[idx]);
+ if ( scnomv || mthd == MVMethod.GLOBAL_MEAN || _isMVScaled.get(idx) ) {
sb.setLength(0);
//TOTAL_COUNT_PREFIX + "_" + taskID + "_" + Long.toString(TransformationAgent._numValidRecords);
sb.append(TOTAL_COUNT_PREFIX);
@@ -390,8 +413,8 @@ public class MVImputeAgent extends Encoder
private DistinctValue prepConstantOutput(int idx, StringBuilder sb) throws CharacterCodingException {
if ( _mvMethodList == null )
return null;
- byte mthd = _mvMethodList[idx];
- if ( mthd == 3 ) {
+ MVMethod mthd = _mvMethodList[idx];
+ if ( mthd == MVMethod.CONSTANT ) {
sb.setLength(0);
sb.append(CONSTANT_PREFIX);
sb.append("_");
@@ -402,7 +425,7 @@ public class MVImputeAgent extends Encoder
}
private DistinctValue prepVarOutput(int taskID, int idx, StringBuilder sb, boolean scnomv) throws CharacterCodingException {
- if ( scnomv || _isMVScaled.get(idx) && _mvscMethodList[idx] == 2 ) {
+ if ( scnomv || _isMVScaled.get(idx) && _mvscMethodList[idx] == MVMethod.GLOBAL_MODE ) {
sb.setLength(0);
sb.append(VARIANCE_PREFIX);
sb.append("_");
@@ -560,7 +583,7 @@ public class MVImputeAgent extends Encoder
double imputedValue = Double.NaN;
KahanObject gmean = null;
- if ( _mvMethodList[i] == 1 )
+ if ( _mvMethodList[i] == MVMethod.GLOBAL_MEAN )
{
gmean = _meanList[i];
imputedValue = _meanList[i]._sum;
@@ -568,7 +591,7 @@ public class MVImputeAgent extends Encoder
double mean = ( _countList[i] == 0 ? 0.0 : _meanList[i]._sum);
writeTfMtd(colID, Double.toString(mean), outputDir, fs, agents);
}
- else if ( _mvMethodList[i] == 3 )
+ else if ( _mvMethodList[i] == MVMethod.CONSTANT )
{
writeTfMtd(colID, _replacementList[i], outputDir, fs, agents);
@@ -584,7 +607,7 @@ public class MVImputeAgent extends Encoder
if ( _isMVScaled.get(i) )
{
double sdev = -1.0;
- if ( _mvscMethodList[i] == 2 ) {
+ if ( _mvscMethodList[i] == MVMethod.GLOBAL_MODE ) {
// Adjust variance with missing values
long totalMissingCount = (agents.getValid() - _countList[i]);
_varFn.execute(_varList[i], imputedValue, totalMissingCount);
@@ -601,7 +624,7 @@ public class MVImputeAgent extends Encoder
int colID = _scnomvList[i];
double mean = (_scnomvCountList[i] == 0 ? 0.0 : _scnomvMeanList[i]._sum);
double sdev = -1.0;
- if ( _scnomvMethodList[i] == 2 )
+ if ( _scnomvMethodList[i] == MVMethod.GLOBAL_MODE )
{
double var = _scnomvVarList[i].getRequiredResult(new CMOperator(_varFn, AggregateOperationTypes.VARIANCE));
sdev = Math.sqrt(var);
@@ -788,7 +811,7 @@ public class MVImputeAgent extends Encoder
// since missing values themselves are replaced with gmean.
long totalMissingCount = (totalRecordCount-totalValidCount);
int idx = isApplicable(colID);
- if(idx != -1 && _mvMethodList[idx] == 3)
+ if(idx != -1 && _mvMethodList[idx] == MVMethod.CONSTANT)
_meanFn.execute(gmean, UtilFunctions.parseToDouble(_replacementList[idx]), totalRecordCount);
_varFn.execute(gcm, gmean._sum, totalMissingCount);
}
@@ -863,10 +886,10 @@ public class MVImputeAgent extends Encoder
for(int i=0; i<_colList.length;i++) {
int colID = _colList[i];
- if ( _mvMethodList[i] == 1 || _mvMethodList[i] == 2 )
+ if ( _mvMethodList[i] == MVMethod.GLOBAL_MEAN || _mvMethodList[i] == MVMethod.GLOBAL_MODE )
// global_mean or global_mode
_replacementList[i] = readReplacement(colID, fs, tfMtdDir, agents);
- else if ( _mvMethodList[i] == 3 ) {
+ else if ( _mvMethodList[i] == MVMethod.CONSTANT ) {
// constant: replace a missing value by a given constant
// nothing to do. The constant values are loaded already during configure
}
@@ -894,15 +917,8 @@ public class MVImputeAgent extends Encoder
int idx = isApplicable(colID);
if(idx == -1)
return MVMethod.INVALID;
-
- switch(_mvMethodList[idx])
- {
- case 1: return MVMethod.GLOBAL_MEAN;
- case 2: return MVMethod.GLOBAL_MODE;
- case 3: return MVMethod.CONSTANT;
- default: return MVMethod.INVALID;
- }
-
+ else
+ return _mvMethodList[idx];
}
public long getNonMVCount(int colID) {
@@ -917,14 +933,48 @@ public class MVImputeAgent extends Encoder
@Override
public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
- // TODO Auto-generated method stub
- return null;
+ build(in);
+ return apply(in, out);
}
@Override
public void build(FrameBlock in) {
- // TODO Auto-generated method stub
-
+ try {
+ for( int j=0; j<_colList.length; j++ ) {
+ int colID = _colList[j];
+ if( _mvMethodList[j] == MVMethod.GLOBAL_MEAN ) {
+ //compute global column mean (scale)
+ long off = _countList[j];
+ for( int i=0; i<in.getNumRows(); i++ )
+ _meanFn.execute2(_meanList[j], UtilFunctions.objectToDouble(
+ in.getSchema().get(colID-1), in.get(i, colID-1)), off+i+1);
+ _replacementList[j] = String.valueOf(_meanList[j]._sum);
+ _countList[j] += in.getNumRows();
+ }
+ else if( _mvMethodList[j] == MVMethod.GLOBAL_MODE ) {
+ //compute global column mode (categorical), i.e., most frequent category
+ HashMap<String,Long> hist = _hist.containsKey(colID) ?
+ _hist.get(colID) : new HashMap<String,Long>();
+ for( int i=0; i<in.getNumRows(); i++ ) {
+ String key = String.valueOf(in.get(i, colID-1));
+ if( key != null && !key.isEmpty() ) {
+ Long val = hist.get(key);
+ hist.put(key, (val!=null) ? val+1 : 1);
+ }
+ }
+ _hist.put(colID, hist);
+ long max = Long.MIN_VALUE;
+ for( Entry<String, Long> e : hist.entrySet() )
+ if( e.getValue() > max ) {
+ _replacementList[j] = e.getKey();
+ max = e.getValue();
+ }
+ }
+ }
+ }
+ catch(Exception ex) {
+ throw new RuntimeException(ex);
+ }
}
@Override
@@ -938,7 +988,7 @@ public class MVImputeAgent extends Encoder
w = words[colID-1] = _replacementList[i];
if ( _isMVScaled.get(i) )
- if ( _mvscMethodList[i] == 1 )
+ if ( _mvscMethodList[i] == MVMethod.GLOBAL_MEAN )
words[colID-1] = Double.toString( UtilFunctions.parseToDouble(w) - _meanList[i]._sum );
else
words[colID-1] = Double.toString( (UtilFunctions.parseToDouble(w) - _meanList[i]._sum) / _varList[i].mean._sum );
@@ -948,7 +998,7 @@ public class MVImputeAgent extends Encoder
for(int i=0; i < _scnomvList.length; i++)
{
int colID = _scnomvList[i];
- if ( _scnomvMethodList[i] == 1 )
+ if ( _scnomvMethodList[i] == MVMethod.GLOBAL_MEAN )
words[colID-1] = Double.toString( UtilFunctions.parseToDouble(words[colID-1]) - _scnomvMeanList[i]._sum );
else
words[colID-1] = Double.toString( (UtilFunctions.parseToDouble(words[colID-1]) - _scnomvMeanList[i]._sum) / _scnomvVarList[i].mean._sum );
@@ -971,8 +1021,11 @@ public class MVImputeAgent extends Encoder
@Override
public FrameBlock getMetaData(FrameBlock out) {
- // TODO Auto-generated method stub
- return null;
+ for( int j=0; j<_colList.length; j++ ) {
+ out.getColumnMetadata(_colList[j]-1)
+ .setMvValue(_replacementList[j]);
+ }
+ return out;
}
/**
@@ -983,14 +1036,13 @@ public class MVImputeAgent extends Encoder
public void initMetaData(FrameBlock meta) {
//init replacement lists, replace recoded values to
//apply mv imputation potentially after recoding
- _replacementList = new String[_colList.length];
for( int j=0; j<_colList.length; j++ ) {
- int colID = _colList[j];
+ int colID = _colList[j];
String mvVal = UtilFunctions.unquote(meta.getColumnMetadata(colID-1).getMvValue());
if( _rcList.contains(colID) ) {
Long mvVal2 = meta.getRecodeMap(colID-1).get(mvVal);
- if( mvVal2 == null)
- throw new RuntimeException("Missing recode value for impute value '"+mvVal+"'.");
+ if( mvVal2 == null)
+ throw new RuntimeException("Missing recode value for impute value '"+mvVal+"' (colID="+colID+").");
_replacementList[j] = mvVal2.toString();
}
else {
@@ -1006,4 +1058,14 @@ public class MVImputeAgent extends Encoder
public void initRecodeIDList(List<Integer> rcList) {
_rcList = rcList;
}
+
+ /**
+ * Exposes the internal histogram after build.
+ *
+ * @param colID
+ * @return
+ */
+ public HashMap<String,Long> getHistogram( int colID ) {
+ return _hist.get(colID);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
index 01d7c85..5abe9db 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
@@ -412,7 +412,7 @@ public class RecodeAgent extends Encoder
//probe and build column map
HashMap<String,Long> map = _rcdMaps.get(colID);
String key = row[colID-1];
- if( !map.containsKey(key) )
+ if( key!=null && !key.isEmpty() && !map.containsKey(key) )
map.put(key, new Long(map.size()+1));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
index bafa655..d6bf9d4 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
@@ -78,10 +78,10 @@ public class EncoderComposite extends Encoder
//propagate meta data
_meta = new FrameBlock(in.getNumColumns(), ValueType.STRING);
- for( Encoder encoder : _encoders ) {
- encoder.initMetaData(_meta);
+ for( Encoder encoder : _encoders )
_meta = encoder.getMetaData(_meta);
- }
+ for( Encoder encoder : _encoders )
+ encoder.initMetaData(_meta);
//apply meta data
for( Encoder encoder : _encoders )
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 5e6e5b7..6bce4ff 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -369,7 +369,7 @@ public class UtilFunctions
public static double objectToDouble(ValueType vt, Object in) {
if( in == null ) return 0;
switch( vt ) {
- case STRING: return Double.parseDouble((String)in);
+ case STRING: return !((String)in).isEmpty() ? Double.parseDouble((String)in) : 0;
case BOOLEAN: return ((Boolean)in)?1d:0d;
case INT: return (Long)in;
case DOUBLE: return (Double)in;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeApplyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeApplyTest.java b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeApplyTest.java
new file mode 100644
index 0000000..b61060b
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeApplyTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.transform;
+
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.io.MatrixReaderFactory;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class TransformFrameEncodeApplyTest extends AutomatedTestBase
+{
+ private final static String TEST_NAME1 = "TransformFrameEncodeApply";
+ private final static String TEST_DIR = "functions/transform/";
+ private final static String TEST_CLASS_DIR = TEST_DIR + TransformFrameEncodeApplyTest.class.getSimpleName() + "/";
+
+ //dataset and transform tasks without missing values
+ private final static String DATASET1 = "homes3/homes.csv";
+ private final static String SPEC1 = "homes3/homes.tfspec_recode.json";
+ private final static String SPEC2 = "homes3/homes.tfspec_dummy.json";
+ private final static String SPEC3 = "homes3/homes.tfspec_bin.json"; //incl recode
+
+ //dataset and transform tasks with missing values
+ private final static String DATASET2 = "homes/homes.csv";
+ private final static String SPEC4 = "homes3/homes.tfspec_impute.json";
+ private final static String SPEC5 = "homes3/homes.tfspec_omit.json";
+
+ public enum TransformType {
+ RECODE,
+ DUMMY,
+ BIN,
+ IMPUTE,
+ OMIT,
+ }
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "y" }) );
+ }
+
+ @Test
+ public void testHomesRecodeSingleNodeCSV() {
+ runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.RECODE);
+ }
+
+ @Test
+ public void testHomesRecodeSparkCSV() {
+ runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.RECODE);
+ }
+
+ @Test
+ public void testHomesDummycodeSingleNodeCSV() {
+ runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.DUMMY);
+ }
+
+ @Test
+ public void testHomesDummycodeSparkCSV() {
+ runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.DUMMY);
+ }
+
+ @Test
+ public void testHomesBinningSingleNodeCSV() {
+ runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.BIN);
+ }
+
+ @Test
+ public void testHomesBinningSparkCSV() {
+ runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.BIN);
+ }
+
+ @Test
+ public void testHomesOmitSingleNodeCSV() {
+ runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.OMIT);
+ }
+
+ @Test
+ public void testHomesOmitSparkCSV() {
+ runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.OMIT);
+ }
+
+ @Test
+ public void testHomesImputeSingleNodeCSV() {
+ runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.IMPUTE);
+ }
+
+ @Test
+ public void testHomesImputeSparkCSV() {
+ runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.IMPUTE);
+ }
+
+ /**
+ *
+ * @param rt
+ * @param ofmt
+ * @param dataset
+ */
+ private void runTransformTest( RUNTIME_PLATFORM rt, String ofmt, TransformType type )
+ {
+ //set runtime platform
+ RUNTIME_PLATFORM rtold = rtplatform;
+ boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
+ rtplatform = rt;
+
+ boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+ if( rtplatform == RUNTIME_PLATFORM.SPARK || rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK)
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+ //set transform specification
+ String SPEC = null; String DATASET = null;
+ switch( type ) {
+ case RECODE: SPEC = SPEC1; DATASET = DATASET1; break;
+ case DUMMY: SPEC = SPEC2; DATASET = DATASET1; break;
+ case BIN: SPEC = SPEC3; DATASET = DATASET1; break;
+ case IMPUTE: SPEC = SPEC4; DATASET = DATASET2; break;
+ case OMIT: SPEC = SPEC5; DATASET = DATASET2; break;
+ }
+
+ if( !ofmt.equals("csv") )
+ throw new RuntimeException("Unsupported test output format");
+
+ try
+ {
+ getAndLoadTestConfiguration(TEST_NAME1);
+
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+ programArgs = new String[]{"-explain","-nvargs",
+ "DATA=" + HOME + "input/" + DATASET,
+ "TFSPEC=" + HOME + "input/" + SPEC,
+ "TFDATA1=" + output("tfout1"),
+ "TFDATA2=" + output("tfout2"),
+ "OFMT=" + ofmt };
+
+ OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
+ runTest(true, false, null, -1);
+
+ //read input/output and compare
+ double[][] R1 = DataConverter.convertToDoubleMatrix(MatrixReaderFactory
+ .createMatrixReader(InputInfo.CSVInputInfo)
+ .readMatrixFromHDFS(output("tfout1"), -1L, -1L, 1000, 1000, -1));
+ double[][] R2 = DataConverter.convertToDoubleMatrix(MatrixReaderFactory
+ .createMatrixReader(InputInfo.CSVInputInfo)
+ .readMatrixFromHDFS(output("tfout2"), -1L, -1L, 1000, 1000, -1));
+ TestUtils.compareMatrices(R1, R2, R1.length, R1[0].length, 0);
+ }
+ catch(Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ finally {
+ rtplatform = rtold;
+ DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+ OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/test/scripts/functions/transform/TransformFrameEncodeApply.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/transform/TransformFrameEncodeApply.dml b/src/test/scripts/functions/transform/TransformFrameEncodeApply.dml
new file mode 100644
index 0000000..08c98d0
--- /dev/null
+++ b/src/test/scripts/functions/transform/TransformFrameEncodeApply.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+F1 = read($DATA, data_type="frame", format="csv");
+
+jspec = read($TFSPEC, data_type="scalar", value_type="string");
+
+[X, M] = transformencode(target=F1, spec=jspec);
+
+if(1==1){}
+
+X2 = transformapply(target=F1, spec=jspec, meta=M);
+
+write(X, $TFDATA1, format=$OFMT);
+write(X2, $TFDATA2, format=$OFMT);
+
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
index 5122a60..bdcc36b 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
@@ -31,6 +31,7 @@ import org.junit.runners.Suite;
TransformAndApplyTest.class,
TransformEncodeDecodeTest.class,
TransformFrameApplyTest.class,
+ TransformFrameEncodeApplyTest.class,
TransformFrameEncodeDecodeTest.class,
TransformReadMetaTest.class,
TransformTest.class,