You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/06/08 19:24:06 UTC
[2/6] systemml git commit: [SYSTEMML-1300] Remove file-based
transform from compiler/runtime
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
index 9e30f5c..7743b61 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
@@ -19,40 +19,13 @@
package org.apache.sysml.runtime.transform;
-import java.io.EOFException;
-import java.io.IOException;
import java.io.Serializable;
-import java.util.Arrays;
import java.util.regex.Pattern;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.lops.Lop;
-import org.apache.sysml.parser.DataExpression;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.util.MapReduceTool;
-import org.apache.sysml.runtime.util.UtilFunctions;
-@SuppressWarnings("deprecation")
-public class TfUtils implements Serializable{
-
+public class TfUtils implements Serializable
+{
private static final long serialVersionUID = 526252850872633125L;
protected enum ColumnTypes {
@@ -89,9 +62,7 @@ public class TfUtils implements Serializable{
//transform meta data constants (old file-based transform)
public static final String TXMTD_SEP = ",";
- public static final String TXMTD_COLTYPES = "coltypes.csv";
public static final String TXMTD_COLNAMES = "column.names";
- public static final String TXMTD_DC_COLNAMES = "dummycoded.column.names";
public static final String TXMTD_RCD_MAP_SUFFIX = ".map";
public static final String TXMTD_RCD_DISTINCT_SUFFIX = ".ndistinct";
public static final String TXMTD_BIN_FILE_SUFFIX = ".bin";
@@ -101,184 +72,14 @@ public class TfUtils implements Serializable{
public static final String JSON_MTHD = "methods";
public static final String JSON_CONSTS = "constants";
public static final String JSON_NBINS = "numbins";
- protected static final String MODE_FILE_SUFFIX = ".mode";
- protected static final String SCALE_FILE_SUFFIX = ".scale";
- protected static final String DCD_FILE_NAME = "dummyCodeMaps.csv";
- protected static final String DCD_NAME_SEP = "_";
-
-
- private OmitAgent _oa = null;
- private MVImputeAgent _mia = null;
- private RecodeAgent _ra = null;
- private BinAgent _ba = null;
- private DummycodeAgent _da = null;
-
- private long _numRecordsInPartFile; // Total number of records in the data file
- private long _numValidRecords; // (_numRecordsInPartFile - #of omitted records)
- private long _numTransformedRows; // Number of rows after applying transformations
- private long _numTransformedColumns; // Number of columns after applying transformations
private String _headerLine = null;
private boolean _hasHeader;
private Pattern _delim = null;
private String _delimString = null;
private String[] _NAstrings = null;
- private String[] _outputColumnNames = null;
private int _numInputCols = -1;
- private String _tfMtdDir = null;
- private String _spec = null;
- private String _offsetFile = null;
- private String _tmpDir = null;
- private String _outputPath = null;
-
- public TfUtils(JobConf job, boolean minimal)
- throws IOException, JSONException
- {
- if( !InfrastructureAnalyzer.isLocalMode(job) ) {
- ConfigurationManager.setCachedJobConf(job);
- }
- _NAstrings = TfUtils.parseNAStrings(job);
- _spec = job.get(MRJobConfiguration.TF_SPEC);
- _oa = new OmitAgent(new JSONObject(_spec), null, -1);
- }
-
- // called from GenTFMtdMapper, ApplyTf (Hadoop)
- public TfUtils(JobConf job)
- throws IOException, JSONException
- {
- if( !InfrastructureAnalyzer.isLocalMode(job) ) {
- ConfigurationManager.setCachedJobConf(job);
- }
-
- boolean hasHeader = Boolean.parseBoolean(job.get(MRJobConfiguration.TF_HAS_HEADER));
- String[] naStrings = TfUtils.parseNAStrings(job);
- long numCols = UtilFunctions.parseToLong( job.get(MRJobConfiguration.TF_NUM_COLS) ); // #cols input data
- String spec = job.get(MRJobConfiguration.TF_SPEC);
- String offsetFile = job.get(MRJobConfiguration.TF_OFFSETS_FILE);
- String tmpPath = job.get(MRJobConfiguration.TF_TMP_LOC);
- String outputPath = FileOutputFormat.getOutputPath(job).toString();
- JSONObject jspec = new JSONObject(spec);
-
- init(job.get(MRJobConfiguration.TF_HEADER), hasHeader, job.get(MRJobConfiguration.TF_DELIM), naStrings, jspec, numCols, offsetFile, tmpPath, outputPath);
- }
-
- // called from GenTfMtdReducer
- public TfUtils(JobConf job, String tfMtdDir) throws IOException, JSONException
- {
- this(job);
- _tfMtdDir = tfMtdDir;
- }
-
- // called from GenTFMtdReducer and ApplyTf (Spark)
- public TfUtils(String headerLine, boolean hasHeader, String delim, String[] naStrings, JSONObject spec, long ncol, String tfMtdDir, String offsetFile, String tmpPath) throws IOException, JSONException {
- init (headerLine, hasHeader, delim, naStrings, spec, ncol, offsetFile, tmpPath, null);
- _tfMtdDir = tfMtdDir;
- }
-
- protected static boolean checkValidInputFile(FileSystem fs, Path path, boolean err)
- throws IOException {
- // check non-existing file
- if (!fs.exists(path))
- if ( err )
- throw new IOException("File " + path.toString() + " does not exist on HDFS/LFS.");
- else
- return false;
-
- // check for empty file
- if( MapReduceTool.isFileEmpty(fs, path) )
- if ( err )
- throw new EOFException("Empty input file " + path.toString() + ".");
- else
- return false;
-
- return true;
- }
-
- public static String getPartFileName(JobConf job) throws IOException {
- Path path = new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE));
- FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
- path = path.makeQualified(fs);
- return path.toString();
- }
-
- public static boolean isPartFileWithHeader(JobConf job) throws IOException {
- String thisfile=getPartFileName(job);
- Path path = new Path(job.get(MRJobConfiguration.TF_SMALLEST_FILE));
- FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
- path = path.makeQualified(fs);
- return thisfile.toString().equals(path.toString());
- }
-
- /**
- * Prepare NA strings so that they can be sent to workers via JobConf.
- * A "dummy" string is added at the end to handle the case of empty strings.
- * @param na NA string
- * @return NA string concatenated with NA string separator concatenated with "dummy"
- */
- public static String prepNAStrings(String na) {
- return na + DataExpression.DELIM_NA_STRING_SEP + "dummy";
- }
-
- public static String[] parseNAStrings(String na)
- {
- if ( na == null )
- return null;
-
- String[] tmp = Pattern.compile(Pattern.quote(DataExpression.DELIM_NA_STRING_SEP)).split(na, -1);
- return tmp; //Arrays.copyOf(tmp, tmp.length-1);
- }
-
- public static String[] parseNAStrings(JobConf job)
- {
- return parseNAStrings(job.get(MRJobConfiguration.TF_NA_STRINGS));
- }
-
- private void createAgents(JSONObject spec, String[] naStrings)
- throws IOException, JSONException
- {
- _oa = new OmitAgent(spec, _outputColumnNames, _numInputCols);
- _mia = new MVImputeAgent(spec, null, naStrings, _numInputCols);
- _ra = new RecodeAgent(spec, _outputColumnNames, _numInputCols);
- _ba = new BinAgent(spec, _outputColumnNames, _numInputCols);
- _da = new DummycodeAgent(spec, _outputColumnNames, _numInputCols);
- }
-
- private void parseColumnNames() {
- _outputColumnNames = _delim.split(_headerLine, -1);
- for(int i=0; i < _outputColumnNames.length; i++)
- _outputColumnNames[i] = UtilFunctions.unquote(_outputColumnNames[i]);
- }
-
- private void init(String headerLine, boolean hasHeader, String delim, String[] naStrings, JSONObject spec, long numCols, String offsetFile, String tmpPath, String outputPath) throws IOException, JSONException
- {
- _numRecordsInPartFile = 0;
- _numValidRecords = 0;
- _numTransformedRows = 0;
- _numTransformedColumns = 0;
-
- //TODO: fix hard-wired header propagation to meta data column names
-
- _headerLine = headerLine;
- _hasHeader = hasHeader;
- _delimString = delim;
- _delim = Pattern.compile(Pattern.quote(delim));
- _NAstrings = naStrings;
- _numInputCols = (int)numCols;
- _offsetFile = offsetFile;
- _tmpDir = tmpPath;
- _outputPath = outputPath;
-
- parseColumnNames();
- createAgents(spec, naStrings);
- }
-
- public void incrValid() { _numValidRecords++; }
- public long getValid() { return _numValidRecords; }
- public long getTotal() { return _numRecordsInPartFile; }
- public long getNumTransformedRows() { return _numTransformedRows; }
- public long getNumTransformedColumns() { return _numTransformedColumns; }
-
public String getHeader() { return _headerLine; }
public boolean hasHeader() { return _hasHeader; }
public String getDelimString() { return _delimString; }
@@ -286,24 +87,6 @@ public class TfUtils implements Serializable{
public String[] getNAStrings() { return _NAstrings; }
public long getNumCols() { return _numInputCols; }
- public String getSpec() { return _spec; }
- public String getTfMtdDir() { return _tfMtdDir; }
- public String getOffsetFile() { return _offsetFile; }
- public String getTmpDir() { return _tmpDir; }
- public String getOutputPath() { return _outputPath; }
-
- public String getName(int colID) { return _outputColumnNames[colID-1]; }
-
- public void setValid(long n) { _numValidRecords = n;}
- public void incrTotal() { _numRecordsInPartFile++; }
- public void setTotal(long n) { _numRecordsInPartFile = n;}
-
- public OmitAgent getOmitAgent() { return _oa; }
- public MVImputeAgent getMVImputeAgent(){ return _mia;}
- public RecodeAgent getRecodeAgent() { return _ra; }
- public BinAgent getBinAgent() { return _ba; }
- public DummycodeAgent getDummycodeAgent() { return _da; }
-
/**
* Function that checks if the given string is one of NA strings.
*
@@ -321,229 +104,4 @@ public class TfUtils implements Serializable{
}
return false;
}
-
- public String[] getWords(Text line) {
- return getWords(line.toString());
- }
-
-
- public String[] getWords(String line) {
- return getDelim().split(line.trim(), -1);
- }
-
- /**
- * Process a given row to construct transformation metadata.
- *
- * @param line string to break into words
- * @return string array of words from the line
- * @throws IOException if IOException occurs
- */
- public String[] prepareTfMtd(String line) throws IOException {
- String[] words = getWords(line);
- if(!getOmitAgent().omit(words, this))
- {
- getMVImputeAgent().prepare(words);
- getRecodeAgent().prepare(words, this);
- getBinAgent().prepare(words, this);
- incrValid();
- }
- incrTotal();
-
- return words;
- }
-
- public void loadTfMetadata() throws IOException
- {
- JobConf job = ConfigurationManager.getCachedJobConf();
- loadTfMetadata(job, false);
- }
-
- public void loadTfMetadata(JobConf job, boolean fromLocalFS) throws IOException
- {
- Path tfMtdDir = null;
- FileSystem fs = null;
-
- if(fromLocalFS) {
- // metadata must be read from local file system (e.g., distributed cache in the case of Hadoop)
- tfMtdDir = (DistributedCache.getLocalCacheFiles(job))[0];
- fs = FileSystem.getLocal(job);
- }
- else {
- tfMtdDir = new Path(getTfMtdDir());
- fs = IOUtilFunctions.getFileSystem(tfMtdDir, job);
- }
-
- // load transformation metadata
- getMVImputeAgent().loadTxMtd(job, fs, tfMtdDir, this);
- getRecodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
- getBinAgent().loadTxMtd(job, fs, tfMtdDir, this);
-
- // associate recode maps and bin definitions with dummycoding agent,
- // as recoded and binned columns are typically dummycoded
- getDummycodeAgent().setRecodeMaps( getRecodeAgent().getRecodeMaps() );
- getDummycodeAgent().setNumBins(getBinAgent().getColList(), getBinAgent().getNumBins());
- getDummycodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
-
- }
-
- public String processHeaderLine() throws IOException
- {
- //TODO: fix hard-wired header propagation to meta data column names
-
- FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf());
- String dcdHeader = getDummycodeAgent().constructDummycodedHeader(getHeader(), getDelim());
- getDummycodeAgent().genDcdMapsAndColTypes(fs, getTmpDir(), (int) getNumCols(), this);
-
- // write header information (before and after transformation) to temporary path
- // these files are copied into txMtdPath, once the ApplyTf job is complete.
- DataTransform.generateHeaderFiles(fs, getTmpDir(), getHeader(), dcdHeader);
-
- return dcdHeader;
- //_numTransformedColumns = getDelim().split(dcdHeader, -1).length;
- //return _numTransformedColumns;
- }
-
- public boolean omit(String[] words) {
- if(getOmitAgent() == null)
- return false;
- return getOmitAgent().omit(words, this);
- }
-
- /**
- * Function to apply transformation metadata on a given row.
- *
- * @param words string array of words
- * @return string array of transformed words
- */
- public String[] apply( String[] words ) {
- words = getMVImputeAgent().apply(words);
- words = getRecodeAgent().apply(words);
- words = getBinAgent().apply(words);
- words = getDummycodeAgent().apply(words);
- _numTransformedRows++;
-
- return words;
- }
-
- public void check(String []words) throws DMLRuntimeException
- {
- boolean checkEmptyString = ( getNAStrings() != null );
- if ( checkEmptyString )
- {
- final String msg = "When na.strings are provided, empty string \"\" is considered as a missing value, and it must be imputed appropriately. Encountered an unhandled empty string in column ID: ";
- for(int i=0; i<words.length; i++)
- if ( words[i] != null && words[i].equals(""))
- throw new DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1));
- }
- }
-
- public String checkAndPrepOutputString(String []words) throws DMLRuntimeException {
- return checkAndPrepOutputString(words, new StringBuilder());
- }
-
- public String checkAndPrepOutputString(String []words, StringBuilder sb) throws DMLRuntimeException
- {
- /*
- * Check if empty strings ("") have to be handled.
- *
- * Unless na.strings are provided, empty strings are (implicitly) considered as value zero.
- * When na.strings are provided, then "" is considered a missing value indicator, and the
- * user is expected to provide an appropriate imputation method. Therefore, when na.strings
- * are provided, "" encountered in any column (after all transformations are applied)
- * denotes an erroneous condition.
- */
- boolean checkEmptyString = ( getNAStrings() != null ); //&& !MVImputeAgent.isNA("", TransformationAgent.NAstrings) ) {
-
- //StringBuilder sb = new StringBuilder();
- sb.setLength(0);
- int i =0;
-
- if ( checkEmptyString )
- {
- final String msg = "When na.strings are provided, empty string \"\" is considered as a missing value, and it must be imputed appropriately. Encountered an unhandled empty string in column ID: ";
- if ( words[0] != null )
- if ( words[0].equals("") )
- throw new DMLRuntimeException( msg + getDummycodeAgent().mapDcdColumnID(1));
- else
- sb.append(words[0]);
- else
- sb.append("0");
-
- for(i=1; i<words.length; i++)
- {
- sb.append(_delimString);
-
- if ( words[i] != null )
- if ( words[i].equals("") )
- throw new DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1));
- else
- sb.append(words[i]);
- else
- sb.append("0");
- }
- }
- else
- {
- sb.append(words[0] != null ? words[0] : "0");
- for(i=1; i<words.length; i++)
- {
- sb.append(_delimString);
- sb.append(words[i] != null ? words[i] : "0");
- }
- }
-
- return sb.toString();
- }
-
- private Reader initOffsetsReader(JobConf job) throws IOException
- {
- Path path=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
- FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
- Path[] files = MatrixReader.getSequenceFilePaths(fs, path);
- if ( files.length != 1 )
- throw new IOException("Expecting a single file under counters file: " + path.toString());
-
- Reader reader = new SequenceFile.Reader(fs, files[0], job);
-
- return reader;
- }
-
- /**
- * Function to generate custom file names (transform-part-.....) for
- * mappers' output for ApplyTfCSV job. The idea is to find the index
- * of (thisfile, fileoffset) in the list of all offsets from the
- * counters/offsets file, which was generated from either GenTfMtdMR
- * or AssignRowIDMR job.
- *
- * @param job job configuration
- * @param offset file offset
- * @return part file id (ie, 00001, 00002, etc)
- * @throws IOException if IOException occurs
- */
- public String getPartFileID(JobConf job, long offset) throws IOException
- {
- Reader reader = null;
- int id = 0;
- try {
- reader = initOffsetsReader(job);
- ByteWritable key=new ByteWritable();
- OffsetCount value=new OffsetCount();
- String thisFile = TfUtils.getPartFileName(job);
- while (reader.next(key, value)) {
- if ( thisFile.equals(value.filename) && value.fileOffset == offset )
- break;
- id++;
- }
- }
- finally {
- IOUtilFunctions.closeSilently(reader);
- }
-
- String sid = Integer.toString(id);
- char[] carr = new char[5-sid.length()];
- Arrays.fill(carr, '0');
- String ret = (new String(carr)).concat(sid);
-
- return ret;
- }
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
index a3f01a1..304dcdb 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
@@ -19,20 +19,10 @@
package org.apache.sysml.runtime.transform.encode;
-import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.transform.DistinctValue;
-import org.apache.sysml.runtime.transform.TfUtils;
import org.apache.sysml.runtime.util.UtilFunctions;
import org.apache.wink.json4j.JSONArray;
@@ -152,11 +142,4 @@ public abstract class Encoder implements Serializable
* @param meta frame block
*/
public abstract void initMetaData(FrameBlock meta);
-
-
- //OLD API: kept for a transition phase only
- //TODO stage 2: refactor data and meta data IO into minimal set of ultility functions
- abstract public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException;
- abstract public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException;
- abstract public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderBin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderBin.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderBin.java
new file mode 100644
index 0000000..fbe6994
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderBin.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.wink.json4j.JSONArray;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+import org.apache.sysml.lops.Lop;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class EncoderBin extends Encoder
+{
+ private static final long serialVersionUID = 1917445005206076078L;
+
+ public static final String MIN_PREFIX = "min";
+ public static final String MAX_PREFIX = "max";
+ public static final String NBINS_PREFIX = "nbins";
+
+ private int[] _numBins = null;
+ private double[] _min=null, _max=null; // min and max among non-missing values
+ private double[] _binWidths = null; // width of a bin for each attribute
+
+ //frame transform-apply attributes
+ private double[][] _binMins = null;
+ private double[][] _binMaxs = null;
+
+ public EncoderBin(JSONObject parsedSpec, String[] colnames, int clen)
+ throws JSONException, IOException
+ {
+ this(parsedSpec, colnames, clen, false);
+ }
+
+ public EncoderBin(JSONObject parsedSpec, String[] colnames, int clen, boolean colsOnly)
+ throws JSONException, IOException
+ {
+ super( null, clen );
+ if ( !parsedSpec.containsKey(TfUtils.TXMETHOD_BIN) )
+ return;
+
+ if( colsOnly ) {
+ List<Integer> collist = TfMetaUtils.parseBinningColIDs(parsedSpec, colnames);
+ initColList(ArrayUtils.toPrimitive(collist.toArray(new Integer[0])));
+ }
+ else
+ {
+ JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_BIN);
+ JSONArray attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS);
+ JSONArray nbins = (JSONArray) obj.get(TfUtils.JSON_NBINS);
+ initColList(attrs);
+
+ _numBins = new int[attrs.size()];
+ for(int i=0; i < _numBins.length; i++)
+ _numBins[i] = UtilFunctions.toInt(nbins.get(i));
+
+ // initialize internal transformation metadata
+ _min = new double[_colList.length];
+ Arrays.fill(_min, Double.MAX_VALUE);
+ _max = new double[_colList.length];
+ Arrays.fill(_max, -Double.MAX_VALUE);
+
+ _binWidths = new double[_colList.length];
+ }
+ }
+
+ public void prepare(String[] words, TfUtils agents) {
+ if ( !isApplicable() )
+ return;
+
+ for(int i=0; i <_colList.length; i++) {
+ int colID = _colList[i];
+
+ String w = null;
+ double d = 0;
+
+ // equi-width
+ w = UtilFunctions.unquote(words[colID-1].trim());
+ if(!TfUtils.isNA(agents.getNAStrings(),w)) {
+ d = UtilFunctions.parseToDouble(w);
+ if(d < _min[i])
+ _min[i] = d;
+ if(d > _max[i])
+ _max[i] = d;
+ }
+ }
+ }
+
+ @Override
+ public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+ build(in);
+ return apply(in, out);
+ }
+
+ @Override
+ public void build(FrameBlock in) {
+ // nothing to do
+ }
+
+ /**
+ * Method to apply transformations.
+ */
+ @Override
+ public String[] apply(String[] words) {
+ if( !isApplicable() )
+ return words;
+
+ for(int i=0; i < _colList.length; i++) {
+ int colID = _colList[i];
+ try {
+ double val = UtilFunctions.parseToDouble(words[colID-1]);
+ int binid = 1;
+ double tmp = _min[i] + _binWidths[i];
+ while(val > tmp && binid < _numBins[i]) {
+ tmp += _binWidths[i];
+ binid++;
+ }
+ words[colID-1] = Integer.toString(binid);
+ }
+ catch(NumberFormatException e) {
+ throw new RuntimeException("Encountered \"" + words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an appropriate imputation method.");
+ }
+ }
+
+ return words;
+ }
+
+ @Override
+ public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
+ for(int j=0; j<_colList.length; j++) {
+ int colID = _colList[j];
+ for( int i=0; i<in.getNumRows(); i++ ) {
+ double inVal = UtilFunctions.objectToDouble(
+ in.getSchema()[colID-1], in.get(i, colID-1));
+ int ix = Arrays.binarySearch(_binMaxs[j], inVal);
+ int binID = ((ix < 0) ? Math.abs(ix+1) : ix) + 1;
+ out.quickSetValue(i, colID-1, binID);
+ }
+ }
+ return out;
+ }
+
+ @Override
+ public FrameBlock getMetaData(FrameBlock meta) {
+ return meta;
+ }
+
+ @Override
+ public void initMetaData(FrameBlock meta) {
+ _binMins = new double[_colList.length][];
+ _binMaxs = new double[_colList.length][];
+ for( int j=0; j<_colList.length; j++ ) {
+ int colID = _colList[j]; //1-based
+ int nbins = (int)meta.getColumnMetadata()[colID-1].getNumDistinct();
+ _binMins[j] = new double[nbins];
+ _binMaxs[j] = new double[nbins];
+ for( int i=0; i<nbins; i++ ) {
+ String[] tmp = meta.get(i, colID-1).toString().split(Lop.DATATYPE_PREFIX);
+ _binMins[j][i] = Double.parseDouble(tmp[0]);
+ _binMaxs[j][i] = Double.parseDouble(tmp[1]);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
index 9efbc19..deff887 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
@@ -19,20 +19,11 @@
package org.apache.sysml.runtime.transform.encode;
-import java.io.IOException;
-import java.util.Iterator;
import java.util.List;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.transform.DistinctValue;
-import org.apache.sysml.runtime.transform.TfUtils;
/**
* Simple composite encoder that applies a list of encoders
@@ -90,7 +81,6 @@ public class EncoderComposite extends Encoder
encoder.build(in);
}
-
@Override
public String[] apply(String[] in) {
for( Encoder encoder : _encoders )
@@ -119,19 +109,4 @@ public class EncoderComposite extends Encoder
for( Encoder encoder : _encoders )
encoder.initMetaData(out);
}
-
- @Override
- public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException {
- throw new RuntimeException("File-based api not supported.");
- }
-
- @Override
- public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
- throw new RuntimeException("File-based api not supported.");
- }
-
- @Override
- public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException {
- throw new RuntimeException("File-based api not supported.");
- }
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderDummycode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderDummycode.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderDummycode.java
new file mode 100644
index 0000000..743381a
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderDummycode.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+public class EncoderDummycode extends Encoder
+{
+ private static final long serialVersionUID = 5832130477659116489L;
+
+ private int[] _domainSizes = null; // length = #of dummycoded columns
+ private long _dummycodedLength = 0; // #of columns after dummycoded
+
+ public EncoderDummycode(JSONObject parsedSpec, String[] colnames, int clen) throws JSONException {
+ super(null, clen);
+
+ if ( parsedSpec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) {
+ int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, colnames, TfUtils.TXMETHOD_DUMMYCODE);
+ initColList(collist);
+ }
+ }
+
+ @Override
+ public int getNumCols() {
+ return (int)_dummycodedLength;
+ }
+
+ @Override
+ public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+ return apply(in, out);
+ }
+
+ @Override
+ public void build(FrameBlock in) {
+ //do nothing
+ }
+
+ /**
+ * Method to apply transformations.
+ *
+ * @param words array of strings
+ * @return array of transformed strings
+ */
+ @Override
+ public String[] apply(String[] words)
+ {
+ if( !isApplicable() )
+ return words;
+
+ String[] nwords = new String[(int)_dummycodedLength];
+ int rcdVal = 0;
+
+ for(int colID=1, idx=0, ncolID=1; colID <= words.length; colID++) {
+ if(idx < _colList.length && colID==_colList[idx]) {
+ // dummycoded columns
+ try {
+ rcdVal = UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1]));
+ nwords[ ncolID-1+rcdVal-1 ] = "1";
+ ncolID += _domainSizes[idx];
+ idx++;
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Error in dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1]
+ + ", domainSize=" + _domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength);
+ }
+ }
+ else {
+ nwords[ncolID-1] = words[colID-1];
+ ncolID++;
+ }
+ }
+
+ return nwords;
+ }
+
+ @Override
+ public MatrixBlock apply(FrameBlock in, MatrixBlock out)
+ {
+ MatrixBlock ret = new MatrixBlock(out.getNumRows(), (int)_dummycodedLength, false);
+
+ for( int i=0; i<out.getNumRows(); i++ ) {
+ for(int colID=1, idx=0, ncolID=1; colID <= out.getNumColumns(); colID++) {
+ double val = out.quickGetValue(i, colID-1);
+ if(idx < _colList.length && colID==_colList[idx]) {
+ ret.quickSetValue(i, ncolID-1+(int)val-1, 1);
+ ncolID += _domainSizes[idx];
+ idx++;
+ }
+ else {
+ double ptval = UtilFunctions.objectToDouble(in.getSchema()[colID-1], in.get(i, colID-1));
+ ret.quickSetValue(i, ncolID-1, ptval);
+ ncolID++;
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
+ public FrameBlock getMetaData(FrameBlock out) {
+ return out;
+ }
+
+ @Override
+ public void initMetaData(FrameBlock meta) {
+ //initialize domain sizes and output num columns
+ _domainSizes = new int[_colList.length];
+ _dummycodedLength = _clen;
+ for( int j=0; j<_colList.length; j++ ) {
+ int colID = _colList[j]; //1-based
+ _domainSizes[j] = (int)meta.getColumnMetadata()[colID-1].getNumDistinct();
+ _dummycodedLength += _domainSizes[j]-1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
index f7ceefd..13b2810 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
@@ -28,11 +28,6 @@ import org.apache.commons.lang.ArrayUtils;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
-import org.apache.sysml.runtime.transform.BinAgent;
-import org.apache.sysml.runtime.transform.DummycodeAgent;
-import org.apache.sysml.runtime.transform.MVImputeAgent;
-import org.apache.sysml.runtime.transform.OmitAgent;
-import org.apache.sysml.runtime.transform.RecodeAgent;
import org.apache.sysml.runtime.transform.TfUtils;
import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
import org.apache.sysml.runtime.util.UtilFunctions;
@@ -40,7 +35,6 @@ import org.apache.wink.json4j.JSONObject;
public class EncoderFactory
{
-
public static Encoder createEncoder(String spec, String[] colnames, int clen, FrameBlock meta) throws DMLRuntimeException {
return createEncoder(spec, colnames, UtilFunctions.nCopies(clen, ValueType.STRING), meta);
}
@@ -79,7 +73,7 @@ public class EncoderFactory
//create individual encoders
if( !rcIDs.isEmpty() ) {
- RecodeAgent ra = new RecodeAgent(jSpec, colnames, clen);
+ EncoderRecode ra = new EncoderRecode(jSpec, colnames, clen);
ra.setColList(ArrayUtils.toPrimitive(rcIDs.toArray(new Integer[0])));
lencoders.add(ra);
}
@@ -87,13 +81,13 @@ public class EncoderFactory
lencoders.add(new EncoderPassThrough(
ArrayUtils.toPrimitive(ptIDs.toArray(new Integer[0])), clen));
if( !dcIDs.isEmpty() )
- lencoders.add(new DummycodeAgent(jSpec, colnames, schema.length));
+ lencoders.add(new EncoderDummycode(jSpec, colnames, schema.length));
if( !binIDs.isEmpty() )
- lencoders.add(new BinAgent(jSpec, colnames, schema.length, true));
+ lencoders.add(new EncoderBin(jSpec, colnames, schema.length, true));
if( !oIDs.isEmpty() )
- lencoders.add(new OmitAgent(jSpec, colnames, schema.length));
+ lencoders.add(new EncoderOmit(jSpec, colnames, schema.length));
if( !mvIDs.isEmpty() ) {
- MVImputeAgent ma = new MVImputeAgent(jSpec, colnames, schema.length);
+ EncoderMVImpute ma = new EncoderMVImpute(jSpec, colnames, schema.length);
ma.initRecodeIDList(rcIDs);
lencoders.add(ma);
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderMVImpute.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderMVImpute.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderMVImpute.java
new file mode 100644
index 0000000..55a0bde
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderMVImpute.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.wink.json4j.JSONArray;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+import org.apache.sysml.runtime.functionobjects.CM;
+import org.apache.sysml.runtime.functionobjects.Mean;
+import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class EncoderMVImpute extends Encoder
+{
+ private static final long serialVersionUID = 9057868620144662194L;
+
+ public enum MVMethod { INVALID, GLOBAL_MEAN, GLOBAL_MODE, CONSTANT };
+
+ private MVMethod[] _mvMethodList = null;
+ private MVMethod[] _mvscMethodList = null; // scaling methods for attributes that are imputed and also scaled
+
+ private BitSet _isMVScaled = null;
+ private CM _varFn = CM.getCMFnObject(AggregateOperationTypes.VARIANCE); // function object that understands variance computation
+
+ // objects required to compute mean and variance of all non-missing entries
+ private Mean _meanFn = Mean.getMeanFnObject(); // function object that understands mean computation
+ private KahanObject[] _meanList = null; // column-level means, computed so far
+ private long[] _countList = null; // #of non-missing values
+
+ private CM_COV_Object[] _varList = null; // column-level variances, computed so far (for scaling)
+
+ private int[] _scnomvList = null; // List of attributes that are scaled but not imputed
+ private MVMethod[] _scnomvMethodList = null; // scaling methods: 0 for invalid; 1 for mean-subtraction; 2 for z-scoring
+ private KahanObject[] _scnomvMeanList = null; // column-level means, for attributes scaled but not imputed
+ private long[] _scnomvCountList = null; // #of non-missing values, for attributes scaled but not imputed
+ private CM_COV_Object[] _scnomvVarList = null; // column-level variances, computed so far
+
+ private String[] _replacementList = null; // replacements: for global_mean, mean; and for global_mode, recode id of mode category
+ private String[] _NAstrings = null;
+ private List<Integer> _rcList = null;
+ private HashMap<Integer,HashMap<String,Long>> _hist = null;
+
+ public String[] getReplacements() { return _replacementList; }
+ public KahanObject[] getMeans() { return _meanList; }
+ public CM_COV_Object[] getVars() { return _varList; }
+ public KahanObject[] getMeans_scnomv() { return _scnomvMeanList; }
+ public CM_COV_Object[] getVars_scnomv() { return _scnomvVarList; }
+
+ public EncoderMVImpute(JSONObject parsedSpec, String[] colnames, int clen)
+ throws JSONException
+ {
+ super(null, clen);
+
+ //handle column list
+ int[] collist = TfMetaUtils.parseJsonObjectIDList(parsedSpec, colnames, TfUtils.TXMETHOD_IMPUTE);
+ initColList(collist);
+
+ //handle method list
+ parseMethodsAndReplacments(parsedSpec);
+
+ //create reuse histograms
+ _hist = new HashMap<Integer, HashMap<String,Long>>();
+ }
+
+ public EncoderMVImpute(JSONObject parsedSpec, String[] colnames, String[] NAstrings, int clen)
+ throws JSONException
+ {
+ super(null, clen);
+ boolean isMV = parsedSpec.containsKey(TfUtils.TXMETHOD_IMPUTE);
+ boolean isSC = parsedSpec.containsKey(TfUtils.TXMETHOD_SCALE);
+ _NAstrings = NAstrings;
+
+ if(!isMV) {
+ // MV Impute is not applicable
+ _colList = null;
+ _mvMethodList = null;
+ _meanList = null;
+ _countList = null;
+ _replacementList = null;
+ }
+ else {
+ JSONObject mvobj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_IMPUTE);
+ JSONArray mvattrs = (JSONArray) mvobj.get(TfUtils.JSON_ATTRS);
+ JSONArray mvmthds = (JSONArray) mvobj.get(TfUtils.JSON_MTHD);
+ int mvLength = mvattrs.size();
+
+ _colList = new int[mvLength];
+ _mvMethodList = new MVMethod[mvLength];
+
+ _meanList = new KahanObject[mvLength];
+ _countList = new long[mvLength];
+ _varList = new CM_COV_Object[mvLength];
+
+ _isMVScaled = new BitSet(_colList.length);
+ _isMVScaled.clear();
+
+ for(int i=0; i < _colList.length; i++) {
+ _colList[i] = UtilFunctions.toInt(mvattrs.get(i));
+ _mvMethodList[i] = MVMethod.values()[UtilFunctions.toInt(mvmthds.get(i))];
+ _meanList[i] = new KahanObject(0, 0);
+ }
+
+ _replacementList = new String[mvLength]; // contains replacements for all columns (scale and categorical)
+
+ JSONArray constants = (JSONArray)mvobj.get(TfUtils.JSON_CONSTS);
+ for(int i=0; i < constants.size(); i++) {
+ if ( constants.get(i) == null )
+ _replacementList[i] = "NaN";
+ else
+ _replacementList[i] = constants.get(i).toString();
+ }
+ }
+
+ // Handle scaled attributes
+ if ( !isSC )
+ {
+ // scaling is not applicable
+ _scnomvCountList = null;
+ _scnomvMeanList = null;
+ _scnomvVarList = null;
+ }
+ else
+ {
+ if ( _colList != null )
+ _mvscMethodList = new MVMethod[_colList.length];
+
+ JSONObject scobj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_SCALE);
+ JSONArray scattrs = (JSONArray) scobj.get(TfUtils.JSON_ATTRS);
+ JSONArray scmthds = (JSONArray) scobj.get(TfUtils.JSON_MTHD);
+ int scLength = scattrs.size();
+
+ int[] _allscaled = new int[scLength];
+ int scnomv = 0, colID;
+ byte mthd;
+ for(int i=0; i < scLength; i++)
+ {
+ colID = UtilFunctions.toInt(scattrs.get(i));
+ mthd = (byte) UtilFunctions.toInt(scmthds.get(i));
+
+ _allscaled[i] = colID;
+
+ // check if the attribute is also MV imputed
+ int mvidx = isApplicable(colID);
+ if(mvidx != -1)
+ {
+ _isMVScaled.set(mvidx);
+ _mvscMethodList[mvidx] = MVMethod.values()[mthd];
+ _varList[mvidx] = new CM_COV_Object();
+ }
+ else
+ scnomv++; // count of scaled but not imputed
+ }
+
+ if(scnomv > 0)
+ {
+ _scnomvList = new int[scnomv];
+ _scnomvMethodList = new MVMethod[scnomv];
+
+ _scnomvMeanList = new KahanObject[scnomv];
+ _scnomvCountList = new long[scnomv];
+ _scnomvVarList = new CM_COV_Object[scnomv];
+
+ for(int i=0, idx=0; i < scLength; i++)
+ {
+ colID = UtilFunctions.toInt(scattrs.get(i));
+ mthd = (byte)UtilFunctions.toInt(scmthds.get(i));
+
+ if(isApplicable(colID) == -1)
+ { // scaled but not imputed
+ _scnomvList[idx] = colID;
+ _scnomvMethodList[idx] = MVMethod.values()[mthd];
+ _scnomvMeanList[idx] = new KahanObject(0, 0);
+ _scnomvVarList[idx] = new CM_COV_Object();
+ idx++;
+ }
+ }
+ }
+ }
+ }
+
+ private void parseMethodsAndReplacments(JSONObject parsedSpec) throws JSONException {
+ JSONArray mvspec = (JSONArray) parsedSpec.get(TfUtils.TXMETHOD_IMPUTE);
+ _mvMethodList = new MVMethod[mvspec.size()];
+ _replacementList = new String[mvspec.size()];
+ _meanList = new KahanObject[mvspec.size()];
+ _countList = new long[mvspec.size()];
+ for(int i=0; i < mvspec.size(); i++) {
+ JSONObject mvobj = (JSONObject)mvspec.get(i);
+ _mvMethodList[i] = MVMethod.valueOf(mvobj.get("method").toString().toUpperCase());
+ if( _mvMethodList[i] == MVMethod.CONSTANT ) {
+ _replacementList[i] = mvobj.getString("value").toString();
+ }
+ _meanList[i] = new KahanObject(0, 0);
+ }
+ }
+
+ public void prepare(String[] words) throws IOException {
+
+ try {
+ String w = null;
+ if(_colList != null)
+ for(int i=0; i <_colList.length; i++) {
+ int colID = _colList[i];
+ w = UtilFunctions.unquote(words[colID-1].trim());
+
+ try {
+ if(!TfUtils.isNA(_NAstrings, w)) {
+ _countList[i]++;
+
+ boolean computeMean = (_mvMethodList[i] == MVMethod.GLOBAL_MEAN || _isMVScaled.get(i) );
+ if(computeMean) {
+ // global_mean
+ double d = UtilFunctions.parseToDouble(w);
+ _meanFn.execute2(_meanList[i], d, _countList[i]);
+
+ if (_isMVScaled.get(i) && _mvscMethodList[i] == MVMethod.GLOBAL_MODE)
+ _varFn.execute(_varList[i], d);
+ }
+ else {
+ // global_mode or constant
+ // Nothing to do here. Mode is computed using recode maps.
+ }
+ }
+ } catch (NumberFormatException e)
+ {
+ throw new RuntimeException("Encountered \"" + w + "\" in column ID \"" + colID + "\", when expecting a numeric value. Consider adding \"" + w + "\" to na.strings, along with an appropriate imputation method.");
+ }
+ }
+
+ // Compute mean and variance for attributes that are scaled but not imputed
+ if(_scnomvList != null)
+ for(int i=0; i < _scnomvList.length; i++)
+ {
+ int colID = _scnomvList[i];
+ w = UtilFunctions.unquote(words[colID-1].trim());
+ double d = UtilFunctions.parseToDouble(w);
+ _scnomvCountList[i]++; // not required, this is always equal to total #records processed
+ _meanFn.execute2(_scnomvMeanList[i], d, _scnomvCountList[i]);
+ if(_scnomvMethodList[i] == MVMethod.GLOBAL_MODE)
+ _varFn.execute(_scnomvVarList[i], d);
+ }
+ } catch(Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public MVMethod getMethod(int colID) {
+ int idx = isApplicable(colID);
+ if(idx == -1)
+ return MVMethod.INVALID;
+ else
+ return _mvMethodList[idx];
+ }
+
+ public long getNonMVCount(int colID) {
+ int idx = isApplicable(colID);
+ return (idx == -1) ? 0 : _countList[idx];
+ }
+
+ public String getReplacement(int colID) {
+ int idx = isApplicable(colID);
+ return (idx == -1) ? null : _replacementList[idx];
+ }
+
+ @Override
+ public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+ build(in);
+ return apply(in, out);
+ }
+
+ @Override
+ public void build(FrameBlock in) {
+ try {
+ for( int j=0; j<_colList.length; j++ ) {
+ int colID = _colList[j];
+ if( _mvMethodList[j] == MVMethod.GLOBAL_MEAN ) {
+ //compute global column mean (scale)
+ long off = _countList[j];
+ for( int i=0; i<in.getNumRows(); i++ )
+ _meanFn.execute2(_meanList[j], UtilFunctions.objectToDouble(
+ in.getSchema()[colID-1], in.get(i, colID-1)), off+i+1);
+ _replacementList[j] = String.valueOf(_meanList[j]._sum);
+ _countList[j] += in.getNumRows();
+ }
+ else if( _mvMethodList[j] == MVMethod.GLOBAL_MODE ) {
+ //compute global column mode (categorical), i.e., most frequent category
+ HashMap<String,Long> hist = _hist.containsKey(colID) ?
+ _hist.get(colID) : new HashMap<String,Long>();
+ for( int i=0; i<in.getNumRows(); i++ ) {
+ String key = String.valueOf(in.get(i, colID-1));
+ if( key != null && !key.isEmpty() ) {
+ Long val = hist.get(key);
+ hist.put(key, (val!=null) ? val+1 : 1);
+ }
+ }
+ _hist.put(colID, hist);
+ long max = Long.MIN_VALUE;
+ for( Entry<String, Long> e : hist.entrySet() )
+ if( e.getValue() > max ) {
+ _replacementList[j] = e.getKey();
+ max = e.getValue();
+ }
+ }
+ }
+ }
+ catch(Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public String[] apply(String[] words)
+ {
+ if( isApplicable() )
+ for(int i=0; i < _colList.length; i++) {
+ int colID = _colList[i];
+ String w = UtilFunctions.unquote(words[colID-1]);
+ if(TfUtils.isNA(_NAstrings, w))
+ w = words[colID-1] = _replacementList[i];
+
+ if ( _isMVScaled.get(i) )
+ if ( _mvscMethodList[i] == MVMethod.GLOBAL_MEAN )
+ words[colID-1] = Double.toString( UtilFunctions.parseToDouble(w) - _meanList[i]._sum );
+ else
+ words[colID-1] = Double.toString( (UtilFunctions.parseToDouble(w) - _meanList[i]._sum) / _varList[i].mean._sum );
+ }
+
+ if(_scnomvList != null)
+ for(int i=0; i < _scnomvList.length; i++)
+ {
+ int colID = _scnomvList[i];
+ if ( _scnomvMethodList[i] == MVMethod.GLOBAL_MEAN )
+ words[colID-1] = Double.toString( UtilFunctions.parseToDouble(words[colID-1]) - _scnomvMeanList[i]._sum );
+ else
+ words[colID-1] = Double.toString( (UtilFunctions.parseToDouble(words[colID-1]) - _scnomvMeanList[i]._sum) / _scnomvVarList[i].mean._sum );
+ }
+
+ return words;
+ }
+
+ @Override
+ public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
+ for(int i=0; i<in.getNumRows(); i++) {
+ for(int j=0; j<_colList.length; j++) {
+ int colID = _colList[j];
+ if( Double.isNaN(out.quickGetValue(i, colID-1)) )
+ out.quickSetValue(i, colID-1, Double.parseDouble(_replacementList[j]));
+ }
+ }
+ return out;
+ }
+
+ @Override
+ public FrameBlock getMetaData(FrameBlock out) {
+ for( int j=0; j<_colList.length; j++ ) {
+ out.getColumnMetadata(_colList[j]-1)
+ .setMvValue(_replacementList[j]);
+ }
+ return out;
+ }
+
+ public void initMetaData(FrameBlock meta) {
+ //init replacement lists, replace recoded values to
+ //apply mv imputation potentially after recoding
+ for( int j=0; j<_colList.length; j++ ) {
+ int colID = _colList[j];
+ String mvVal = UtilFunctions.unquote(meta.getColumnMetadata(colID-1).getMvValue());
+ if( _rcList.contains(colID) ) {
+ Long mvVal2 = meta.getRecodeMap(colID-1).get(mvVal);
+ if( mvVal2 == null)
+ throw new RuntimeException("Missing recode value for impute value '"+mvVal+"' (colID="+colID+").");
+ _replacementList[j] = mvVal2.toString();
+ }
+ else {
+ _replacementList[j] = mvVal;
+ }
+ }
+ }
+
+ public void initRecodeIDList(List<Integer> rcList) {
+ _rcList = rcList;
+ }
+
+ /**
+ * Exposes the internal histogram after build.
+ *
+ * @param colID column ID
+ * @return histogram (map of string keys and long values)
+ */
+ public HashMap<String,Long> getHistogram( int colID ) {
+ return _hist.get(colID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderOmit.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderOmit.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderOmit.java
new file mode 100644
index 0000000..af09cee
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderOmit.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class EncoderOmit extends Encoder
+{
+ private static final long serialVersionUID = 1978852120416654195L;
+
+ private int _rmRows = 0;
+
+ public EncoderOmit(JSONObject parsedSpec, String[] colnames, int clen)
+ throws JSONException
+ {
+ super(null, clen);
+ if (!parsedSpec.containsKey(TfUtils.TXMETHOD_OMIT))
+ return;
+ int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, colnames, TfUtils.TXMETHOD_OMIT);
+ initColList(collist);
+ }
+
+ public int getNumRemovedRows() {
+ return _rmRows;
+ }
+
+ public boolean omit(String[] words, TfUtils agents)
+ {
+ if( !isApplicable() )
+ return false;
+
+ for(int i=0; i<_colList.length; i++) {
+ int colID = _colList[i];
+ if(TfUtils.isNA(agents.getNAStrings(),UtilFunctions.unquote(words[colID-1].trim())))
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+ return apply(in, out);
+ }
+
+ @Override
+ public void build(FrameBlock in) {
+ //do nothing
+ }
+
+ @Override
+ public String[] apply(String[] words) {
+ return null;
+ }
+
+ @Override
+ public MatrixBlock apply(FrameBlock in, MatrixBlock out)
+ {
+ //determine output size
+ int numRows = 0;
+ for(int i=0; i<out.getNumRows(); i++) {
+ boolean valid = true;
+ for(int j=0; j<_colList.length; j++)
+ valid &= !Double.isNaN(out.quickGetValue(i, _colList[j]-1));
+ numRows += valid ? 1 : 0;
+ }
+
+ //copy over valid rows into the output
+ MatrixBlock ret = new MatrixBlock(numRows, out.getNumColumns(), false);
+ int pos = 0;
+ for(int i=0; i<in.getNumRows(); i++) {
+ //determine if valid row or omit
+ boolean valid = true;
+ for(int j=0; j<_colList.length; j++)
+ valid &= !Double.isNaN(out.quickGetValue(i, _colList[j]-1));
+ //copy row if necessary
+ if( valid ) {
+ for(int j=0; j<out.getNumColumns(); j++)
+ ret.quickSetValue(pos, j, out.quickGetValue(i, j));
+ pos++;
+ }
+ }
+
+ //keep info an remove rows
+ _rmRows = out.getNumRows() - pos;
+
+ return ret;
+ }
+
+ @Override
+ public FrameBlock getMetaData(FrameBlock out) {
+ //do nothing
+ return out;
+ }
+
+ @Override
+ public void initMetaData(FrameBlock meta) {
+ //do nothing
+ }
+}
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
index 08722fd..d84ea0d 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
@@ -19,19 +19,10 @@
package org.apache.sysml.runtime.transform.encode;
-import java.io.IOException;
-import java.util.Iterator;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.transform.DistinctValue;
-import org.apache.sysml.runtime.transform.TfUtils;
import org.apache.sysml.runtime.util.UtilFunctions;
/**
@@ -89,20 +80,4 @@ public class EncoderPassThrough extends Encoder
public void initMetaData(FrameBlock meta) {
//do nothing
}
-
-
- @Override
- public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException {
- throw new RuntimeException("File-based api not supported.");
- }
-
- @Override
- public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
- throw new RuntimeException("File-based api not supported.");
- }
-
- @Override
- public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException {
- throw new RuntimeException("File-based api not supported.");
- }
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java
new file mode 100644
index 0000000..bb8592c
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.sysml.lops.Lop;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+public class EncoderRecode extends Encoder
+{
+ private static final long serialVersionUID = 8213163881283341874L;
+
+ private int[] _mvrcdList = null;
+ private int[] _fullrcdList = null;
+
+ //recode maps and custom map for partial recode maps
+ private HashMap<Integer, HashMap<String, Long>> _rcdMaps = new HashMap<Integer, HashMap<String, Long>>();
+ private HashMap<Integer, HashMap<String,String>> _finalMaps = null;
+ private HashMap<Integer, HashSet<Object>> _rcdMapsPart = null;
+
+ public EncoderRecode(JSONObject parsedSpec, String[] colnames, int clen)
+ throws JSONException
+ {
+ super(null, clen);
+ int rcdCount = 0;
+
+ if( parsedSpec.containsKey(TfUtils.TXMETHOD_RECODE) ) {
+ int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, colnames, TfUtils.TXMETHOD_RECODE);
+ rcdCount = initColList(collist);
+ }
+
+ if ( parsedSpec.containsKey(TfUtils.TXMETHOD_MVRCD)) {
+ _mvrcdList = TfMetaUtils.parseJsonIDList(parsedSpec, colnames, TfUtils.TXMETHOD_MVRCD);
+ rcdCount += _mvrcdList.length;
+ }
+
+ if ( rcdCount > 0 ) {
+ _fullrcdList = new int[rcdCount];
+ int idx = -1;
+ if(_colList != null)
+ for(int i=0; i < _colList.length; i++)
+ _fullrcdList[++idx] = _colList[i];
+
+ if(_mvrcdList != null)
+ for(int i=0; i < _mvrcdList.length; i++)
+ _fullrcdList[++idx] = _mvrcdList[i];
+ }
+ }
+
+ public HashMap<Integer, HashMap<String,Long>> getCPRecodeMaps() {
+ return _rcdMaps;
+ }
+
+ public HashMap<Integer, HashSet<Object>> getCPRecodeMapsPartial() {
+ return _rcdMapsPart;
+ }
+
+ public HashMap<Integer, HashMap<String,String>> getRecodeMaps() {
+ return _finalMaps;
+ }
+
+ private String lookupRCDMap(int colID, String key) {
+ if( _finalMaps!=null )
+ return _finalMaps.get(colID).get(key);
+ else { //used for cp
+ Long tmp = _rcdMaps.get(colID).get(key);
+ return (tmp!=null) ? Long.toString(tmp) : null;
+ }
+ }
+
+ @Override
+ public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+ if( !isApplicable() )
+ return out;
+
+ //build and apply recode maps
+ build(in);
+ apply(in, out);
+
+ return out;
+ }
+
+ @Override
+ public void build(FrameBlock in) {
+ if( !isApplicable() )
+ return;
+
+ Iterator<String[]> iter = in.getStringRowIterator();
+ while( iter.hasNext() ) {
+ String[] row = iter.next();
+ for( int j=0; j<_colList.length; j++ ) {
+ int colID = _colList[j]; //1-based
+ //allocate column map if necessary
+ if( !_rcdMaps.containsKey(colID) )
+ _rcdMaps.put(colID, new HashMap<String,Long>());
+ //probe and build column map
+ HashMap<String,Long> map = _rcdMaps.get(colID);
+ String key = row[colID-1];
+ if( key!=null && !key.isEmpty() && !map.containsKey(key) )
+ map.put(key, Long.valueOf(map.size()+1));
+ }
+ }
+ }
+
+ public void buildPartial(FrameBlock in) {
+ if( !isApplicable() )
+ return;
+
+ //ensure allocated partial recode map
+ if( _rcdMapsPart == null )
+ _rcdMapsPart = new HashMap<Integer, HashSet<Object>>();
+
+ //construct partial recode map (tokens w/o codes)
+ //iterate over columns for sequential access
+ for( int j=0; j<_colList.length; j++ ) {
+ int colID = _colList[j]; //1-based
+ //allocate column map if necessary
+ if( !_rcdMapsPart.containsKey(colID) )
+ _rcdMapsPart.put(colID, new HashSet<Object>());
+ HashSet<Object> map = _rcdMapsPart.get(colID);
+ //probe and build column map
+ for( int i=0; i<in.getNumRows(); i++ )
+ map.add(in.get(i, colID-1));
+ //cleanup unnecessary entries once
+ map.remove(null);
+ map.remove("");
+ }
+ }
+
+ /**
+ * Method to apply transformations.
+ */
+ @Override
+ public String[] apply(String[] words)
+ {
+ if( !isApplicable() )
+ return words;
+
+ //apply recode maps on relevant columns of given row
+ for(int i=0; i < _colList.length; i++) {
+ //prepare input and get code
+ int colID = _colList[i];
+ String key = UtilFunctions.unquote(words[colID-1].trim());
+ String val = lookupRCDMap(colID, key);
+ // replace unseen keys with NaN
+ words[colID-1] = (val!=null) ? val : "NaN";
+ }
+
+ return words;
+ }
+
+ @Override
+ public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
+ //apply recode maps column wise
+ for( int j=0; j<_colList.length; j++ ) {
+ int colID = _colList[j];
+ for( int i=0; i<in.getNumRows(); i++ ) {
+ Object okey = in.get(i, colID-1);
+ String key = (okey!=null) ? okey.toString() : null;
+ String val = lookupRCDMap(colID, key);
+ out.quickSetValue(i, colID-1, (val!=null) ?
+ Double.parseDouble(val) : Double.NaN);
+ }
+ }
+
+ return out;
+ }
+
+ @Override
+ public FrameBlock getMetaData(FrameBlock meta) {
+ if( !isApplicable() )
+ return meta;
+
+ //inverse operation to initRecodeMaps
+
+ //allocate output rows
+ int maxDistinct = 0;
+ for( int j=0; j<_colList.length; j++ )
+ if( _rcdMaps.containsKey(_colList[j]) )
+ maxDistinct = Math.max(maxDistinct, _rcdMaps.get(_colList[j]).size());
+ meta.ensureAllocatedColumns(maxDistinct);
+
+ //create compact meta data representation
+ for( int j=0; j<_colList.length; j++ ) {
+ int colID = _colList[j]; //1-based
+ int rowID = 0;
+ if( _rcdMaps.containsKey(_colList[j]) )
+ for( Entry<String, Long> e : _rcdMaps.get(colID).entrySet() ) {
+ String tmp = constructRecodeMapEntry(e.getKey(), e.getValue());
+ meta.set(rowID++, colID-1, tmp);
+ }
+ meta.getColumnMetadata(colID-1).setNumDistinct(
+ _rcdMaps.get(colID).size());
+ }
+
+ return meta;
+ }
+
+
+ /**
+ * Construct the recodemaps from the given input frame for all
+ * columns registered for recode.
+ *
+ * @param meta frame block
+ */
+ public void initMetaData( FrameBlock meta ) {
+ if( meta == null || meta.getNumRows()<=0 )
+ return;
+
+ for( int j=0; j<_colList.length; j++ ) {
+ int colID = _colList[j]; //1-based
+ _rcdMaps.put(colID, meta.getRecodeMap(colID-1));
+ }
+ }
+
+ /**
+ * Returns the Recode map entry which consists of concatenation of code, delimiter and token.
+ * @param token is part of Recode map
+ * @param code is code for token
+ * @return the concatenation of code and token with delimiter in between
+ */
+ public static String constructRecodeMapEntry(String token, Long code) {
+ return token + Lop.DATATYPE_PREFIX + code.toString();
+ }
+}
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
index 62b90b4..afb7ee9 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
@@ -50,7 +50,6 @@ import org.apache.wink.json4j.JSONObject;
public class TfMetaUtils
{
-
public static boolean isIDSpecification(String spec) throws DMLRuntimeException {
try {
JSONObject jSpec = new JSONObject(spec);
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
index af2e75f..b506444 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
@@ -88,9 +88,7 @@ public class FrameFunctionTest extends AutomatedTestBase
DMLScript.USE_LOCAL_SPARK_CONFIG = true;
boolean oldIPA = OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS;
- boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = IPA;
- OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
try
{
@@ -126,7 +124,6 @@ public class FrameFunctionTest extends AutomatedTestBase
rtplatform = platformOld;
DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = oldIPA;
- OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
}
}
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
index ecc958b..c629eee 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.lops.LopProperties.ExecType;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.io.FrameWriter;
@@ -201,10 +200,6 @@ public class FrameMatrixReblockTest extends AutomatedTestBase
if( rtplatform == RUNTIME_PLATFORM.SPARK )
DMLScript.USE_LOCAL_SPARK_CONFIG = true;
- boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
- if( ofmt.equals("csv") )
- OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
-
try
{
int cols = multColBlks ? cols2 : cols1;
@@ -235,7 +230,6 @@ public class FrameMatrixReblockTest extends AutomatedTestBase
finally {
rtplatform = platformOld;
DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
- OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
}
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
index 5066582..ceeec07 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
@@ -21,7 +21,6 @@ package org.apache.sysml.test.integration.functions.frame;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.lops.LopProperties.ExecType;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.io.FrameReaderFactory;
@@ -101,10 +100,6 @@ public class FrameMetaReadWriteTest extends AutomatedTestBase
DMLScript.USE_LOCAL_SPARK_CONFIG = true;
String ofmt = OutputInfo.outputInfoToStringExternal(oinfo);
-
- boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
- if( ofmt.equals("csv") )
- OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
try
{
@@ -148,7 +143,6 @@ public class FrameMetaReadWriteTest extends AutomatedTestBase
finally {
rtplatform = platformOld;
DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
- OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
}
}
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java b/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java
index 35078f3..056e619 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java
@@ -22,7 +22,6 @@ package org.apache.sysml.test.integration.functions.transform;
import org.junit.Test;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.runtime.io.FrameReader;
import org.apache.sysml.runtime.io.FrameReaderFactory;
import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
@@ -75,7 +74,6 @@ public class FrameCSVReadWriteTest extends AutomatedTestBase
{
//set runtime platform
RUNTIME_PLATFORM rtold = rtplatform;
- boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
rtplatform = rt;
boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
@@ -94,7 +92,6 @@ public class FrameCSVReadWriteTest extends AutomatedTestBase
programArgs = new String[]{"-explain","-args",
HOME + "input/" + DATASET, output("R") };
- OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
runTest(true, false, null, -1);
//read input/output and compare
@@ -113,7 +110,6 @@ public class FrameCSVReadWriteTest extends AutomatedTestBase
finally {
rtplatform = rtold;
DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
- OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/transform/RunTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/transform/RunTest.java b/src/test/java/org/apache/sysml/test/integration/functions/transform/RunTest.java
deleted file mode 100644
index 81c0bab..0000000
--- a/src/test/java/org/apache/sysml/test/integration/functions/transform/RunTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.test.integration.functions.transform;
-
-import java.io.IOException;
-
-import org.junit.Test;
-
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.test.integration.AutomatedTestBase;
-import org.apache.sysml.test.integration.TestConfiguration;
-import org.apache.sysml.test.utils.TestUtils;
-
-/**
- *
- *
- */
-public class RunTest extends AutomatedTestBase
-{
-
- private final static String TEST_NAME1 = "Transform";
- private final static String TEST_NAME2 = "Apply";
- private final static String TEST_DIR = "functions/transform/";
- private final static String TEST_CLASS_DIR = TEST_DIR + RunTest.class.getSimpleName() + "/";
-
- private final static String HOMES_DATASET = "homes/homes.csv";
- //private final static String HOMES_SPEC = "homes/homes.tfspec.json";
- private final static String HOMES_SPEC2 = "homes/homes.tfspec2.json";
- //private final static String HOMES_IDSPEC = "homes/homes.tfidspec.json";
- //private final static String HOMES_TFDATA = "homes/homes.transformed.csv";
- //private final static String HOMES_COLNAMES = "homes/homes.csv.colnames";
-
- private final static String HOMES_NAN_DATASET = "homes/homesNAN.csv";
- private final static String HOMES_NAN_SPEC = "homes/homesNAN.tfspec.json";
- //private final static String HOMES_NAN_IDSPEC = "homes/homesNAN.tfidspec.json";
- private final static String HOMES_NAN_COLNAMES = "homes/homesNAN.colnames.csv";
-
- private final static String HOMES_MISSING_DATASET = "homes/homesAllMissing.csv";
- private final static String HOMES_MISSING_SPEC = "homes/homesAllMissing.tfspec.json";
- private final static String HOMES_MISSING_IDSPEC = "homes/homesAllMissing.tfidspec.json";
-
- @Override
- public void setUp()
- {
- TestUtils.clearAssertionInformation();
- addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1,new String[]{"R"}));
- }
-
- // ---- NAN BinaryBlock ----
-
- @Test
- public void runTestWithNAN_HybridBB() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HYBRID, "binary");
- }
-
- @Test
- public void runTestWithNAN_SPHybridBB() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HYBRID_SPARK, "binary");
- }
-
- @Test
- public void runTestWithNAN_HadoopBB() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HADOOP, "binary");
- }
-
- @Test
- public void runTestWithNAN_SparkBB() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.SPARK, "binary");
- }
-
- // ---- NAN CSV ----
-
- @Test
- public void runTestWithNAN_HybridCSV() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HYBRID, "csv");
- }
-
- @Test
- public void runTestWithNAN_SPHybridCSV() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HYBRID_SPARK, "csv");
- }
-
- @Test
- public void runTestWithNAN_HadoopCSV() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HADOOP, "csv");
- }
-
- @Test
- public void runTestWithNAN_SparkCSV() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.SPARK, "csv");
- }
-
- // ---- Test2 BinaryBlock ----
-
- @Test
- public void runTest2_HybridBB() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HYBRID, "binary");
- }
-
- @Test
- public void runTest2_SPHybridBB() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HYBRID_SPARK, "binary");
- }
-
- @Test
- public void runTest2_HadoopBB() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HADOOP, "binary");
- }
-
- @Test
- public void runTest2_SparkBB() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.SPARK, "binary");
- }
-
- // ---- Test2 CSV ----
-
- @Test
- public void runTest2_HybridCSV() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HYBRID, "csv");
- }
-
- @Test
- public void runTest2_SPHybridCSV() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HYBRID_SPARK, "csv");
- }
-
- @Test
- public void runTest2_HadoopCSV() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HADOOP, "csv");
- }
-
- @Test
- public void runTest2_SparkCSV() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.SPARK, "csv");
- }
-
- // ---- HomesMissing BinaryBlock ----
-
- @Test
- public void runAllMissing_HybridBB() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_SPEC, null, true, RUNTIME_PLATFORM.HYBRID, "binary");
- }
-
- @Test
- public void runAllMissing_SPHybridBB() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_SPEC, null, true, RUNTIME_PLATFORM.HYBRID_SPARK, "binary");
- }
-
- @Test
- public void runAllMissing_HadoopBB() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_SPEC, null, true, RUNTIME_PLATFORM.HADOOP, "binary");
- }
-
- @Test
- public void runAllMissing_SparkBB() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_SPEC, null, true, RUNTIME_PLATFORM.SPARK, "binary");
- }
-
- // ---- HomesMissing CSV ----
-
- @Test
- public void runAllMissing_HybridCSV() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_IDSPEC, null, true, RUNTIME_PLATFORM.HYBRID, "csv");
- }
-
- @Test
- public void runAllMissing_SPHybridCSV() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_IDSPEC, null, true, RUNTIME_PLATFORM.HYBRID_SPARK, "csv");
- }
-
- @Test
- public void runAllMissing_HadoopCSV() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_IDSPEC, null, true, RUNTIME_PLATFORM.HADOOP, "csv");
- }
-
- @Test
- public void runAllMissing_SparkCSV() throws DMLRuntimeException, IOException {
- runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_IDSPEC, null, true, RUNTIME_PLATFORM.SPARK, "csv");
- }
-
- // ------------------
-
- /**
- *
- * @param sparseM1
- * @param sparseM2
- * @param instType
- * @throws IOException
- * @throws DMLRuntimeException
- */
- private void runScalingTest( String dataset, String spec, String colnames, boolean exception, RUNTIME_PLATFORM rt, String ofmt) throws IOException, DMLRuntimeException
- {
- RUNTIME_PLATFORM platformOld = rtplatform;
- rtplatform = rt;
-
- boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
- if( rtplatform == RUNTIME_PLATFORM.SPARK || rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK)
- DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-
- try
- {
- getAndLoadTestConfiguration(TEST_NAME1);
-
- /* This is for running the junit test the new way, i.e., construct the arguments directly */
- String HOME = SCRIPT_DIR + TEST_DIR;
- fullDMLScriptName = null;
-
- if (colnames == null) {
- fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
- programArgs = new String[]{"-nvargs",
- "DATA=" + HOME + "input/" + dataset,
- "TFSPEC=" + HOME + "input/" + spec,
- "TFMTD=" + output("tfmtd"),
- "TFDATA=" + output("tfout"),
- "OFMT=" + ofmt };
- }
- else {
- fullDMLScriptName = HOME + TEST_NAME1 + "_colnames.dml";
- programArgs = new String[]{"-nvargs",
- "DATA=" + HOME + "input/" + dataset,
- "TFSPEC=" + HOME + "input/" + spec,
- "COLNAMES=" + HOME + "input/" + colnames,
- "TFMTD=" + output("tfmtd"),
- "TFDATA=" + output("tfout"),
- "OFMT=" + ofmt };
- }
-
- boolean exceptionExpected = exception;
- runTest(true, exceptionExpected, null, -1);
-
- fullDMLScriptName = HOME + TEST_NAME2 + ".dml";
- programArgs = new String[]{"-nvargs",
- "DATA=" + HOME + "input/" + dataset,
- "APPLYMTD=" + output("tfmtd"), // generated above
- "TFMTD=" + output("test_tfmtd"),
- "TFDATA=" + output("test_tfout"),
- "OFMT=" + ofmt };
-
- exceptionExpected = exception;
- runTest(true, exceptionExpected, null, -1);
-
- }
- finally
- {
- rtplatform = platformOld;
- DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
- }
- }
-}
\ No newline at end of file