You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by du...@apache.org on 2016/01/22 17:34:08 UTC
[32/51] [partial] incubator-systemml git commit: [SYSTEMML-482]
[SYSTEMML-480] Adding a Git attributes file to enfore Unix-styled line
endings, and normalizing all of the line endings.
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
index 27058de..a4cfaa6 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
@@ -1,549 +1,549 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.BufferedReader;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.parser.DataExpression;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.io.MatrixReader;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.util.MapReduceTool;
-import org.apache.sysml.runtime.util.UtilFunctions;
-import org.apache.sysml.utils.JSONHelper;
-
-
-@SuppressWarnings("deprecation")
-public class TfUtils implements Serializable{
-
- private static final long serialVersionUID = 526252850872633125L;
-
- private OmitAgent _oa = null;
- private MVImputeAgent _mia = null;
- private RecodeAgent _ra = null;
- private BinAgent _ba = null;
- private DummycodeAgent _da = null;
-
- private long _numRecordsInPartFile; // Total number of records in the data file
- private long _numValidRecords; // (_numRecordsInPartFile - #of omitted records)
- private long _numTransformedRows; // Number of rows after applying transformations
- private long _numTransformedColumns; // Number of columns after applying transformations
-
- private String _headerLine = null;
- private boolean _hasHeader;
- private Pattern _delim = null;
- private String _delimString = null;
- private String[] _NAstrings = null;
- private String[] _outputColumnNames = null;
- private long _numInputCols = -1;
-
- private String _tfMtdDir = null;
- private String _specFile = null;
- private String _offsetFile = null;
- private String _tmpDir = null;
- private String _outputPath = null;
-
- protected static boolean checkValidInputFile(FileSystem fs, Path path, boolean err)
- throws IOException {
- // check non-existing file
- if (!fs.exists(path))
- if ( err )
- throw new IOException("File " + path.toString() + " does not exist on HDFS/LFS.");
- else
- return false;
-
- // check for empty file
- if (MapReduceTool.isFileEmpty(fs, path.toString()))
- if ( err )
- throw new EOFException("Empty input file " + path.toString() + ".");
- else
- return false;
-
- return true;
- }
-
- public static String getPartFileName(JobConf job) throws IOException {
- FileSystem fs = FileSystem.get(job);
- Path thisPath=new Path(job.get("map.input.file")).makeQualified(fs);
- return thisPath.toString();
- }
-
- public static boolean isPartFileWithHeader(JobConf job) throws IOException {
- FileSystem fs = FileSystem.get(job);
-
- String thisfile=getPartFileName(job);
- Path smallestFilePath=new Path(job.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs);
-
- if(thisfile.toString().equals(smallestFilePath.toString()))
- return true;
- else
- return false;
- }
-
- public static JSONObject readSpec(FileSystem fs, String specFile) throws IOException {
- BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(specFile))));
- JSONObject obj = JSONHelper.parse(br);
- br.close();
- return obj;
- }
-
- /**
- * Prepare NA strings so that they can be sent to workers via JobConf.
- * A "dummy" string is added at the end to handle the case of empty strings.
- * @param na
- * @return
- */
- public static String prepNAStrings(String na) {
- return na + DataExpression.DELIM_NA_STRING_SEP + "dummy";
- }
-
- public static String[] parseNAStrings(String na)
- {
- if ( na == null )
- return null;
-
- String[] tmp = Pattern.compile(Pattern.quote(DataExpression.DELIM_NA_STRING_SEP)).split(na, -1);
- return tmp; //Arrays.copyOf(tmp, tmp.length-1);
- }
-
- public static String[] parseNAStrings(JobConf job)
- {
- return parseNAStrings(job.get(MRJobConfiguration.TF_NA_STRINGS));
- }
-
- private void createAgents(JSONObject spec) throws IOException, JSONException {
- _oa = new OmitAgent(spec);
- _mia = new MVImputeAgent(spec);
- _ra = new RecodeAgent(spec);
- _ba = new BinAgent(spec);
- _da = new DummycodeAgent(spec, _numInputCols);
- }
-
- public void setupAgents(OmitAgent oa, MVImputeAgent mia, RecodeAgent ra, BinAgent ba, DummycodeAgent da) {
- _oa = oa;
- _mia = mia;
- _ra = ra;
- _ba = ba;
- _da = da;
- }
-
- private void parseColumnNames() {
- _outputColumnNames = _delim.split(_headerLine, -1);
- for(int i=0; i < _outputColumnNames.length; i++)
- _outputColumnNames[i] = UtilFunctions.unquote(_outputColumnNames[i]);
- }
-
- private void init(String headerLine, boolean hasHeader, String delim, String[] naStrings, JSONObject spec, long numCols, String offsetFile, String tmpPath, String outputPath) throws IOException, JSONException
- {
- _numRecordsInPartFile = 0;
- _numValidRecords = 0;
- _numTransformedRows = 0;
- _numTransformedColumns = 0;
-
- _headerLine = headerLine;
- _hasHeader = hasHeader;
- _delimString = delim;
- _delim = Pattern.compile(Pattern.quote(delim));
- _NAstrings = naStrings;
- _numInputCols = numCols;
- _offsetFile = offsetFile;
- _tmpDir = tmpPath;
- _outputPath = outputPath;
-
- parseColumnNames();
- createAgents(spec);
- }
-
- public TfUtils(JobConf job, boolean minimal)
- throws IOException, JSONException
- {
- if( !InfrastructureAnalyzer.isLocalMode(job) ) {
- ConfigurationManager.setCachedJobConf(job);
- }
-
- _NAstrings = TfUtils.parseNAStrings(job);
- _specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
-
- FileSystem fs = FileSystem.get(job);
- JSONObject spec = TfUtils.readSpec(fs, _specFile);
-
- _oa = new OmitAgent(spec);
- }
-
- // called from GenTFMtdMapper, ApplyTf (Hadoop)
- public TfUtils(JobConf job)
- throws IOException, JSONException
- {
- if( !InfrastructureAnalyzer.isLocalMode(job) ) {
- ConfigurationManager.setCachedJobConf(job);
- }
-
- boolean hasHeader = Boolean.parseBoolean(job.get(MRJobConfiguration.TF_HAS_HEADER));
- //Pattern delim = Pattern.compile(Pattern.quote(job.get(MRJobConfiguration.TF_DELIM)));
- String[] naStrings = TfUtils.parseNAStrings(job);
-
- long numCols = UtilFunctions.parseToLong( job.get(MRJobConfiguration.TF_NUM_COLS) ); // #of columns in input data
-
- String specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
- String offsetFile = job.get(MRJobConfiguration.TF_OFFSETS_FILE);
- String tmpPath = job.get(MRJobConfiguration.TF_TMP_LOC);
- String outputPath = FileOutputFormat.getOutputPath(job).toString();
- FileSystem fs = FileSystem.get(job);
- JSONObject spec = TfUtils.readSpec(fs, specFile);
-
- init(job.get(MRJobConfiguration.TF_HEADER), hasHeader, job.get(MRJobConfiguration.TF_DELIM), naStrings, spec, numCols, offsetFile, tmpPath, outputPath);
- }
-
- // called from GenTfMtdReducer
- public TfUtils(JobConf job, String tfMtdDir) throws IOException, JSONException
- {
- this(job);
- _tfMtdDir = tfMtdDir;
- }
-
- // called from GenTFMtdReducer and ApplyTf (Spark)
- public TfUtils(String headerLine, boolean hasHeader, String delim, String[] naStrings, JSONObject spec, long ncol, String tfMtdDir, String offsetFile, String tmpPath) throws IOException, JSONException {
- init (headerLine, hasHeader, delim, naStrings, spec, ncol, offsetFile, tmpPath, null);
- _tfMtdDir = tfMtdDir;
- }
-
- public void incrValid() { _numValidRecords++; }
- public long getValid() { return _numValidRecords; }
- public long getTotal() { return _numRecordsInPartFile; }
- public long getNumTransformedRows() { return _numTransformedRows; }
- public long getNumTransformedColumns() { return _numTransformedColumns; }
-
- public String getHeader() { return _headerLine; }
- public boolean hasHeader() { return _hasHeader; }
- public String getDelimString() { return _delimString; }
- public Pattern getDelim() { return _delim; }
- public String[] getNAStrings() { return _NAstrings; }
- public long getNumCols() { return _numInputCols; }
-
- public String getSpecFile() { return _specFile; }
- public String getTfMtdDir() { return _tfMtdDir; }
- public String getOffsetFile() { return _offsetFile; }
- public String getTmpDir() { return _tmpDir; }
- public String getOutputPath() { return _outputPath; }
-
- public String getName(int colID) { return _outputColumnNames[colID-1]; }
-
- public void setValid(long n) { _numValidRecords = n;}
- public void incrTotal() { _numRecordsInPartFile++; }
- public void setTotal(long n) { _numRecordsInPartFile = n;}
-
- public OmitAgent getOmitAgent() { return _oa; }
- public MVImputeAgent getMVImputeAgent(){ return _mia;}
- public RecodeAgent getRecodeAgent() { return _ra; }
- public BinAgent getBinAgent() { return _ba; }
- public DummycodeAgent getDummycodeAgent() { return _da; }
-
- /**
- * Function that checks if the given string is one of NA strings.
- *
- * @param w
- * @return
- */
- public boolean isNA(String w) {
- if(_NAstrings == null)
- return false;
-
- for(String na : _NAstrings) {
- if(w.equals(na))
- return true;
- }
- return false;
- }
-
- public String[] getWords(Text line)
- {
- return getWords(line.toString());
- }
-
-
- public String[] getWords(String line)
- {
- return getDelim().split(line.trim(), -1);
- }
-
- /**
- * Process a given row to construct transformation metadata.
- *
- * @param line
- * @return
- * @throws IOException
- */
- public String[] prepareTfMtd(String line) throws IOException {
- String[] words = getWords(line);
- if(!getOmitAgent().omit(words, this))
- {
- getMVImputeAgent().prepare(words, this);
- getRecodeAgent().prepare(words, this);
- getBinAgent().prepare(words, this);
- incrValid();;
- }
- incrTotal();
-
- return words;
- }
-
- public void loadTfMetadata() throws IOException
- {
- JobConf job = ConfigurationManager.getCachedJobConf();
- loadTfMetadata(job, false);
- }
-
- public void loadTfMetadata(JobConf job, boolean fromLocalFS) throws IOException
- {
- Path tfMtdDir = null;
- FileSystem fs = null;
-
- if(fromLocalFS) {
- // metadata must be read from local file system (e.g., distributed cache in the case of Hadoop)
- tfMtdDir = (DistributedCache.getLocalCacheFiles(job))[0];
- fs = FileSystem.getLocal(job);
- }
- else {
- fs = FileSystem.get(job);
- tfMtdDir = new Path(getTfMtdDir());
- }
-
- // load transformation metadata
- getMVImputeAgent().loadTxMtd(job, fs, tfMtdDir, this);
- getRecodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
- getBinAgent().loadTxMtd(job, fs, tfMtdDir, this);
-
- // associate recode maps and bin definitions with dummycoding agent,
- // as recoded and binned columns are typically dummycoded
- getDummycodeAgent().setRecodeMaps( getRecodeAgent().getRecodeMaps() );
- getDummycodeAgent().setNumBins(getBinAgent().getBinList(), getBinAgent().getNumBins());
- getDummycodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
-
- }
-
- /*public void loadTfMetadata () throws IOException
- {
- Path tfMtdDir = (DistributedCache.getLocalCacheFiles(_rJob))[0];
- FileSystem localFS = FileSystem.getLocal(_rJob);
-
- loadTfMetadata(_rJob, localFS, tfMtdDir);
-
- FileSystem fs;
- fs = FileSystem.get(_rJob);
- Path thisPath=new Path(_rJob.get("map.input.file")).makeQualified(fs);
- String thisfile=thisPath.toString();
-
- Path smallestFilePath=new Path(_rJob.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs);
- if(thisfile.toString().equals(smallestFilePath.toString()))
- _partFileWithHeader=true;
- else
- _partFileWithHeader = false;
- }*/
-
-
- public String processHeaderLine() throws IOException
- {
- FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf());
- String dcdHeader = getDummycodeAgent().constructDummycodedHeader(getHeader(), getDelim());
- getDummycodeAgent().genDcdMapsAndColTypes(fs, getTmpDir(), (int) getNumCols(), this);
-
- // write header information (before and after transformation) to temporary path
- // these files are copied into txMtdPath, once the ApplyTf job is complete.
- DataTransform.generateHeaderFiles(fs, getTmpDir(), getHeader(), dcdHeader);
-
- return dcdHeader;
- //_numTransformedColumns = getDelim().split(dcdHeader, -1).length;
- //return _numTransformedColumns;
- }
-
- public boolean omit(String[] words) {
- if(getOmitAgent() == null)
- return false;
- return getOmitAgent().omit(words, this);
- }
-
-
- public String[] apply(String[] words) {
- return apply(words, false);
- }
-
- /**
- * Function to apply transformation metadata on a given row.
- *
- * @param words
- * @param optimizeMaps
- * @return
- */
- public String[] apply ( String[] words, boolean optimizeMaps )
- {
- words = getMVImputeAgent().apply(words, this);
-
- if(optimizeMaps)
- // specific case of transform() invoked from CP (to save boxing and unboxing)
- words = getRecodeAgent().cp_apply(words, this);
- else
- words = getRecodeAgent().apply(words, this);
-
- words = getBinAgent().apply(words, this);
- words = getDummycodeAgent().apply(words, this);
-
- _numTransformedRows++;
-
- return words;
- }
-
- public void check(String []words) throws DMLRuntimeException
- {
- boolean checkEmptyString = ( getNAStrings() != null );
- if ( checkEmptyString )
- {
- final String msg = "When na.strings are provided, empty string \"\" is considered as a missing value, and it must be imputed appropriately. Encountered an unhandled empty string in column ID: ";
- for(int i=0; i<words.length; i++)
- if ( words[i] != null && words[i].equals(""))
- throw new DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1));
- }
- }
-
- public String checkAndPrepOutputString(String []words) throws DMLRuntimeException
- {
- return checkAndPrepOutputString(words, new StringBuilder());
- }
-
- public String checkAndPrepOutputString(String []words, StringBuilder sb) throws DMLRuntimeException
- {
- /*
- * Check if empty strings ("") have to be handled.
- *
- * Unless na.strings are provided, empty strings are (implicitly) considered as value zero.
- * When na.strings are provided, then "" is considered a missing value indicator, and the
- * user is expected to provide an appropriate imputation method. Therefore, when na.strings
- * are provided, "" encountered in any column (after all transformations are applied)
- * denotes an erroneous condition.
- */
- boolean checkEmptyString = ( getNAStrings() != null ); //&& !MVImputeAgent.isNA("", TransformationAgent.NAstrings) ) {
-
- //StringBuilder sb = new StringBuilder();
- sb.setLength(0);
- int i =0;
-
- if ( checkEmptyString )
- {
- final String msg = "When na.strings are provided, empty string \"\" is considered as a missing value, and it must be imputed appropriately. Encountered an unhandled empty string in column ID: ";
- if ( words[0] != null )
- if ( words[0].equals("") )
- throw new DMLRuntimeException( msg + getDummycodeAgent().mapDcdColumnID(1));
- else
- sb.append(words[0]);
- else
- sb.append("0");
-
- for(i=1; i<words.length; i++)
- {
- sb.append(_delimString);
-
- if ( words[i] != null )
- if ( words[i].equals("") )
- throw new DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1));
- else
- sb.append(words[i]);
- else
- sb.append("0");
- }
- }
- else
- {
- sb.append(words[0] != null ? words[0] : "0");
- for(i=1; i<words.length; i++)
- {
- sb.append(_delimString);
- sb.append(words[i] != null ? words[i] : "0");
- }
- }
-
- return sb.toString();
- }
-
- private Reader initOffsetsReader(JobConf job) throws IOException
- {
- Path path=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
- FileSystem fs = FileSystem.get(job);
- Path[] files = MatrixReader.getSequenceFilePaths(fs, path);
- if ( files.length != 1 )
- throw new IOException("Expecting a single file under counters file: " + path.toString());
-
- Reader reader = new SequenceFile.Reader(fs, files[0], job);
-
- return reader;
- }
-
- /**
- * Function to generate custom file names (transform-part-.....) for
- * mappers' output for ApplyTfCSV job. The idea is to find the index
- * of (thisfile, fileoffset) in the list of all offsets from the
- * counters/offsets file, which was generated from either GenTfMtdMR
- * or AssignRowIDMR job.
- *
- */
- public String getPartFileID(JobConf job, long offset) throws IOException
- {
- Reader reader = initOffsetsReader(job);
-
- ByteWritable key=new ByteWritable();
- OffsetCount value=new OffsetCount();
- String thisFile = TfUtils.getPartFileName(job);
-
- int id = 0;
- while (reader.next(key, value)) {
- if ( thisFile.equals(value.filename) && value.fileOffset == offset )
- break;
- id++;
- }
- reader.close();
-
- String sid = Integer.toString(id);
- char[] carr = new char[5-sid.length()];
- Arrays.fill(carr, '0');
- String ret = (new String(carr)).concat(sid);
-
- return ret;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+
+import java.io.BufferedReader;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.parser.DataExpression;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.io.MatrixReader;
+import org.apache.sysml.runtime.matrix.CSVReblockMR;
+import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+import org.apache.sysml.runtime.util.MapReduceTool;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.utils.JSONHelper;
+
+
+@SuppressWarnings("deprecation")
+public class TfUtils implements Serializable{
+
+ private static final long serialVersionUID = 526252850872633125L;
+
+ private OmitAgent _oa = null;
+ private MVImputeAgent _mia = null;
+ private RecodeAgent _ra = null;
+ private BinAgent _ba = null;
+ private DummycodeAgent _da = null;
+
+ private long _numRecordsInPartFile; // Total number of records in the data file
+ private long _numValidRecords; // (_numRecordsInPartFile - #of omitted records)
+ private long _numTransformedRows; // Number of rows after applying transformations
+ private long _numTransformedColumns; // Number of columns after applying transformations
+
+ private String _headerLine = null;
+ private boolean _hasHeader;
+ private Pattern _delim = null;
+ private String _delimString = null;
+ private String[] _NAstrings = null;
+ private String[] _outputColumnNames = null;
+ private long _numInputCols = -1;
+
+ private String _tfMtdDir = null;
+ private String _specFile = null;
+ private String _offsetFile = null;
+ private String _tmpDir = null;
+ private String _outputPath = null;
+
+ protected static boolean checkValidInputFile(FileSystem fs, Path path, boolean err)
+ throws IOException {
+ // check non-existing file
+ if (!fs.exists(path))
+ if ( err )
+ throw new IOException("File " + path.toString() + " does not exist on HDFS/LFS.");
+ else
+ return false;
+
+ // check for empty file
+ if (MapReduceTool.isFileEmpty(fs, path.toString()))
+ if ( err )
+ throw new EOFException("Empty input file " + path.toString() + ".");
+ else
+ return false;
+
+ return true;
+ }
+
+ public static String getPartFileName(JobConf job) throws IOException {
+ FileSystem fs = FileSystem.get(job);
+ Path thisPath=new Path(job.get("map.input.file")).makeQualified(fs);
+ return thisPath.toString();
+ }
+
+ public static boolean isPartFileWithHeader(JobConf job) throws IOException {
+ FileSystem fs = FileSystem.get(job);
+
+ String thisfile=getPartFileName(job);
+ Path smallestFilePath=new Path(job.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs);
+
+ if(thisfile.toString().equals(smallestFilePath.toString()))
+ return true;
+ else
+ return false;
+ }
+
+ public static JSONObject readSpec(FileSystem fs, String specFile) throws IOException {
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(specFile))));
+ JSONObject obj = JSONHelper.parse(br);
+ br.close();
+ return obj;
+ }
+
+ /**
+ * Prepare NA strings so that they can be sent to workers via JobConf.
+ * A "dummy" string is added at the end to handle the case of empty strings.
+ * @param na
+ * @return
+ */
+ public static String prepNAStrings(String na) {
+ return na + DataExpression.DELIM_NA_STRING_SEP + "dummy";
+ }
+
+ public static String[] parseNAStrings(String na)
+ {
+ if ( na == null )
+ return null;
+
+ String[] tmp = Pattern.compile(Pattern.quote(DataExpression.DELIM_NA_STRING_SEP)).split(na, -1);
+ return tmp; //Arrays.copyOf(tmp, tmp.length-1);
+ }
+
+ public static String[] parseNAStrings(JobConf job)
+ {
+ return parseNAStrings(job.get(MRJobConfiguration.TF_NA_STRINGS));
+ }
+
+ private void createAgents(JSONObject spec) throws IOException, JSONException {
+ _oa = new OmitAgent(spec);
+ _mia = new MVImputeAgent(spec);
+ _ra = new RecodeAgent(spec);
+ _ba = new BinAgent(spec);
+ _da = new DummycodeAgent(spec, _numInputCols);
+ }
+
+ public void setupAgents(OmitAgent oa, MVImputeAgent mia, RecodeAgent ra, BinAgent ba, DummycodeAgent da) {
+ _oa = oa;
+ _mia = mia;
+ _ra = ra;
+ _ba = ba;
+ _da = da;
+ }
+
+ private void parseColumnNames() {
+ _outputColumnNames = _delim.split(_headerLine, -1);
+ for(int i=0; i < _outputColumnNames.length; i++)
+ _outputColumnNames[i] = UtilFunctions.unquote(_outputColumnNames[i]);
+ }
+
+ private void init(String headerLine, boolean hasHeader, String delim, String[] naStrings, JSONObject spec, long numCols, String offsetFile, String tmpPath, String outputPath) throws IOException, JSONException
+ {
+ _numRecordsInPartFile = 0;
+ _numValidRecords = 0;
+ _numTransformedRows = 0;
+ _numTransformedColumns = 0;
+
+ _headerLine = headerLine;
+ _hasHeader = hasHeader;
+ _delimString = delim;
+ _delim = Pattern.compile(Pattern.quote(delim));
+ _NAstrings = naStrings;
+ _numInputCols = numCols;
+ _offsetFile = offsetFile;
+ _tmpDir = tmpPath;
+ _outputPath = outputPath;
+
+ parseColumnNames();
+ createAgents(spec);
+ }
+
+ public TfUtils(JobConf job, boolean minimal)
+ throws IOException, JSONException
+ {
+ if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+ ConfigurationManager.setCachedJobConf(job);
+ }
+
+ _NAstrings = TfUtils.parseNAStrings(job);
+ _specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
+
+ FileSystem fs = FileSystem.get(job);
+ JSONObject spec = TfUtils.readSpec(fs, _specFile);
+
+ _oa = new OmitAgent(spec);
+ }
+
+ // called from GenTFMtdMapper, ApplyTf (Hadoop)
+ public TfUtils(JobConf job)
+ throws IOException, JSONException
+ {
+ if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+ ConfigurationManager.setCachedJobConf(job);
+ }
+
+ boolean hasHeader = Boolean.parseBoolean(job.get(MRJobConfiguration.TF_HAS_HEADER));
+ //Pattern delim = Pattern.compile(Pattern.quote(job.get(MRJobConfiguration.TF_DELIM)));
+ String[] naStrings = TfUtils.parseNAStrings(job);
+
+ long numCols = UtilFunctions.parseToLong( job.get(MRJobConfiguration.TF_NUM_COLS) ); // #of columns in input data
+
+ String specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
+ String offsetFile = job.get(MRJobConfiguration.TF_OFFSETS_FILE);
+ String tmpPath = job.get(MRJobConfiguration.TF_TMP_LOC);
+ String outputPath = FileOutputFormat.getOutputPath(job).toString();
+ FileSystem fs = FileSystem.get(job);
+ JSONObject spec = TfUtils.readSpec(fs, specFile);
+
+ init(job.get(MRJobConfiguration.TF_HEADER), hasHeader, job.get(MRJobConfiguration.TF_DELIM), naStrings, spec, numCols, offsetFile, tmpPath, outputPath);
+ }
+
+ // called from GenTfMtdReducer
+ public TfUtils(JobConf job, String tfMtdDir) throws IOException, JSONException
+ {
+ this(job);
+ _tfMtdDir = tfMtdDir;
+ }
+
+ // called from GenTFMtdReducer and ApplyTf (Spark)
+ public TfUtils(String headerLine, boolean hasHeader, String delim, String[] naStrings, JSONObject spec, long ncol, String tfMtdDir, String offsetFile, String tmpPath) throws IOException, JSONException {
+ init (headerLine, hasHeader, delim, naStrings, spec, ncol, offsetFile, tmpPath, null);
+ _tfMtdDir = tfMtdDir;
+ }
+
+ public void incrValid() { _numValidRecords++; }
+ public long getValid() { return _numValidRecords; }
+ public long getTotal() { return _numRecordsInPartFile; }
+ public long getNumTransformedRows() { return _numTransformedRows; }
+ public long getNumTransformedColumns() { return _numTransformedColumns; }
+
+ public String getHeader() { return _headerLine; }
+ public boolean hasHeader() { return _hasHeader; }
+ public String getDelimString() { return _delimString; }
+ public Pattern getDelim() { return _delim; }
+ public String[] getNAStrings() { return _NAstrings; }
+ public long getNumCols() { return _numInputCols; }
+
+ public String getSpecFile() { return _specFile; }
+ public String getTfMtdDir() { return _tfMtdDir; }
+ public String getOffsetFile() { return _offsetFile; }
+ public String getTmpDir() { return _tmpDir; }
+ public String getOutputPath() { return _outputPath; }
+
+ public String getName(int colID) { return _outputColumnNames[colID-1]; }
+
+ public void setValid(long n) { _numValidRecords = n;}
+ public void incrTotal() { _numRecordsInPartFile++; }
+ public void setTotal(long n) { _numRecordsInPartFile = n;}
+
+ public OmitAgent getOmitAgent() { return _oa; }
+ public MVImputeAgent getMVImputeAgent(){ return _mia;}
+ public RecodeAgent getRecodeAgent() { return _ra; }
+ public BinAgent getBinAgent() { return _ba; }
+ public DummycodeAgent getDummycodeAgent() { return _da; }
+
+ /**
+ * Function that checks if the given string is one of NA strings.
+ *
+ * @param w
+ * @return
+ */
+ public boolean isNA(String w) {
+ if(_NAstrings == null)
+ return false;
+
+ for(String na : _NAstrings) {
+ if(w.equals(na))
+ return true;
+ }
+ return false;
+ }
+
+ public String[] getWords(Text line)
+ {
+ return getWords(line.toString());
+ }
+
+
+ public String[] getWords(String line)
+ {
+ return getDelim().split(line.trim(), -1);
+ }
+
+ /**
+ * Process a given row to construct transformation metadata.
+ *
+ * @param line
+ * @return
+ * @throws IOException
+ */
+ public String[] prepareTfMtd(String line) throws IOException {
+ String[] words = getWords(line);
+ if(!getOmitAgent().omit(words, this))
+ {
+ getMVImputeAgent().prepare(words, this);
+ getRecodeAgent().prepare(words, this);
+ getBinAgent().prepare(words, this);
+ incrValid();;
+ }
+ incrTotal();
+
+ return words;
+ }
+
+ public void loadTfMetadata() throws IOException
+ {
+ JobConf job = ConfigurationManager.getCachedJobConf();
+ loadTfMetadata(job, false);
+ }
+
+ public void loadTfMetadata(JobConf job, boolean fromLocalFS) throws IOException
+ {
+ Path tfMtdDir = null;
+ FileSystem fs = null;
+
+ if(fromLocalFS) {
+ // metadata must be read from local file system (e.g., distributed cache in the case of Hadoop)
+ tfMtdDir = (DistributedCache.getLocalCacheFiles(job))[0];
+ fs = FileSystem.getLocal(job);
+ }
+ else {
+ fs = FileSystem.get(job);
+ tfMtdDir = new Path(getTfMtdDir());
+ }
+
+ // load transformation metadata
+ getMVImputeAgent().loadTxMtd(job, fs, tfMtdDir, this);
+ getRecodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
+ getBinAgent().loadTxMtd(job, fs, tfMtdDir, this);
+
+ // associate recode maps and bin definitions with dummycoding agent,
+ // as recoded and binned columns are typically dummycoded
+ getDummycodeAgent().setRecodeMaps( getRecodeAgent().getRecodeMaps() );
+ getDummycodeAgent().setNumBins(getBinAgent().getBinList(), getBinAgent().getNumBins());
+ getDummycodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
+
+ }
+
+ /*public void loadTfMetadata () throws IOException
+ {
+ Path tfMtdDir = (DistributedCache.getLocalCacheFiles(_rJob))[0];
+ FileSystem localFS = FileSystem.getLocal(_rJob);
+
+ loadTfMetadata(_rJob, localFS, tfMtdDir);
+
+ FileSystem fs;
+ fs = FileSystem.get(_rJob);
+ Path thisPath=new Path(_rJob.get("map.input.file")).makeQualified(fs);
+ String thisfile=thisPath.toString();
+
+ Path smallestFilePath=new Path(_rJob.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs);
+ if(thisfile.toString().equals(smallestFilePath.toString()))
+ _partFileWithHeader=true;
+ else
+ _partFileWithHeader = false;
+ }*/
+
+
+ public String processHeaderLine() throws IOException
+ {
+ FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf());
+ String dcdHeader = getDummycodeAgent().constructDummycodedHeader(getHeader(), getDelim());
+ getDummycodeAgent().genDcdMapsAndColTypes(fs, getTmpDir(), (int) getNumCols(), this);
+
+ // write header information (before and after transformation) to temporary path
+ // these files are copied into txMtdPath, once the ApplyTf job is complete.
+ DataTransform.generateHeaderFiles(fs, getTmpDir(), getHeader(), dcdHeader);
+
+ return dcdHeader;
+ //_numTransformedColumns = getDelim().split(dcdHeader, -1).length;
+ //return _numTransformedColumns;
+ }
+
+ public boolean omit(String[] words) {
+ if(getOmitAgent() == null)
+ return false;
+ return getOmitAgent().omit(words, this);
+ }
+
+
+ public String[] apply(String[] words) {
+ return apply(words, false);
+ }
+
+ /**
+ * Function to apply transformation metadata on a given row.
+ *
+ * @param words
+ * @param optimizeMaps
+ * @return
+ */
+ public String[] apply ( String[] words, boolean optimizeMaps )
+ {
+ words = getMVImputeAgent().apply(words, this);
+
+ if(optimizeMaps)
+ // specific case of transform() invoked from CP (to save boxing and unboxing)
+ words = getRecodeAgent().cp_apply(words, this);
+ else
+ words = getRecodeAgent().apply(words, this);
+
+ words = getBinAgent().apply(words, this);
+ words = getDummycodeAgent().apply(words, this);
+
+ _numTransformedRows++;
+
+ return words;
+ }
+
+ public void check(String []words) throws DMLRuntimeException
+ {
+ boolean checkEmptyString = ( getNAStrings() != null );
+ if ( checkEmptyString )
+ {
+ final String msg = "When na.strings are provided, empty string \"\" is considered as a missing value, and it must be imputed appropriately. Encountered an unhandled empty string in column ID: ";
+ for(int i=0; i<words.length; i++)
+ if ( words[i] != null && words[i].equals(""))
+ throw new DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1));
+ }
+ }
+
+ public String checkAndPrepOutputString(String []words) throws DMLRuntimeException
+ {
+ return checkAndPrepOutputString(words, new StringBuilder());
+ }
+
+ public String checkAndPrepOutputString(String []words, StringBuilder sb) throws DMLRuntimeException
+ {
+ /*
+ * Check if empty strings ("") have to be handled.
+ *
+ * Unless na.strings are provided, empty strings are (implicitly) considered as value zero.
+ * When na.strings are provided, then "" is considered a missing value indicator, and the
+ * user is expected to provide an appropriate imputation method. Therefore, when na.strings
+ * are provided, "" encountered in any column (after all transformations are applied)
+ * denotes an erroneous condition.
+ */
+ boolean checkEmptyString = ( getNAStrings() != null ); //&& !MVImputeAgent.isNA("", TransformationAgent.NAstrings) ) {
+
+ //StringBuilder sb = new StringBuilder();
+ sb.setLength(0);
+ int i =0;
+
+ if ( checkEmptyString )
+ {
+ final String msg = "When na.strings are provided, empty string \"\" is considered as a missing value, and it must be imputed appropriately. Encountered an unhandled empty string in column ID: ";
+ if ( words[0] != null )
+ if ( words[0].equals("") )
+ throw new DMLRuntimeException( msg + getDummycodeAgent().mapDcdColumnID(1));
+ else
+ sb.append(words[0]);
+ else
+ sb.append("0");
+
+ for(i=1; i<words.length; i++)
+ {
+ sb.append(_delimString);
+
+ if ( words[i] != null )
+ if ( words[i].equals("") )
+ throw new DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1));
+ else
+ sb.append(words[i]);
+ else
+ sb.append("0");
+ }
+ }
+ else
+ {
+ sb.append(words[0] != null ? words[0] : "0");
+ for(i=1; i<words.length; i++)
+ {
+ sb.append(_delimString);
+ sb.append(words[i] != null ? words[i] : "0");
+ }
+ }
+
+ return sb.toString();
+ }
+
+ private Reader initOffsetsReader(JobConf job) throws IOException
+ {
+ Path path=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
+ FileSystem fs = FileSystem.get(job);
+ Path[] files = MatrixReader.getSequenceFilePaths(fs, path);
+ if ( files.length != 1 )
+ throw new IOException("Expecting a single file under counters file: " + path.toString());
+
+ Reader reader = new SequenceFile.Reader(fs, files[0], job);
+
+ return reader;
+ }
+
+ /**
+ * Function to generate custom file names (transform-part-.....) for
+ * mappers' output for ApplyTfCSV job. The idea is to find the index
+ * of (thisfile, fileoffset) in the list of all offsets from the
+ * counters/offsets file, which was generated from either GenTfMtdMR
+ * or AssignRowIDMR job.
+ *
+ */
+ public String getPartFileID(JobConf job, long offset) throws IOException
+ {
+ Reader reader = initOffsetsReader(job);
+
+ ByteWritable key=new ByteWritable();
+ OffsetCount value=new OffsetCount();
+ String thisFile = TfUtils.getPartFileName(job);
+
+ int id = 0;
+ while (reader.next(key, value)) {
+ if ( thisFile.equals(value.filename) && value.fileOffset == offset )
+ break;
+ id++;
+ }
+ reader.close();
+
+ String sid = Integer.toString(id);
+ char[] carr = new char[5-sid.length()];
+ Arrays.fill(carr, '0');
+ String ret = (new String(carr)).concat(sid);
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java b/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java
index e818089..2c5e37f 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java
@@ -1,93 +1,93 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-
-public abstract class TransformationAgent implements Serializable {
-
- private static final long serialVersionUID = -2995384194257356337L;
-
- public static enum TX_METHOD {
- IMPUTE ("impute"),
- RECODE ("recode"),
- BIN ("bin"),
- DUMMYCODE ("dummycode"),
- SCALE ("scale"),
- OMIT ("omit"),
- MVRCD ("mvrcd");
-
- private String _name;
-
- TX_METHOD(String name) { _name = name; }
-
- public String toString() {
- return _name;
- }
- }
-
- protected static String JSON_ATTRS = "attributes";
- protected static String JSON_MTHD = "methods";
- protected static String JSON_CONSTS = "constants";
- protected static String JSON_NBINS = "numbins";
-
- protected static final String MV_FILE_SUFFIX = ".impute";
- protected static final String RCD_MAP_FILE_SUFFIX = ".map";
- protected static final String NDISTINCT_FILE_SUFFIX = ".ndistinct";
- protected static final String MODE_FILE_SUFFIX = ".mode";
- protected static final String BIN_FILE_SUFFIX = ".bin";
- protected static final String SCALE_FILE_SUFFIX = ".scale";
- protected static final String DCD_FILE_NAME = "dummyCodeMaps.csv";
- protected static final String COLTYPES_FILE_NAME = "coltypes.csv";
-
- protected static final String TXMTD_SEP = ",";
- protected static final String DCD_NAME_SEP = "_";
-
- protected static final String OUT_HEADER = "column.names";
- protected static final String OUT_DCD_HEADER = "dummycoded.column.names";
-
- abstract public void print();
- abstract public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException;
- abstract public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException;
-
- abstract public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException;
- abstract public String[] apply(String[] words, TfUtils agents);
-
- protected enum ColumnTypes { SCALE, NOMINAL, ORDINAL, DUMMYCODED, INVALID }
- protected byte columnTypeToID(ColumnTypes type) throws IOException {
- switch(type)
- {
- case SCALE: return 1;
- case NOMINAL: return 2;
- case ORDINAL: return 3;
- case DUMMYCODED: return 1; // Ideally, dummycoded columns should be of a different type. Treating them as SCALE is incorrect, semantically.
- default:
- throw new IOException("Invalid Column Type: " + type);
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+
+public abstract class TransformationAgent implements Serializable {
+
+ private static final long serialVersionUID = -2995384194257356337L;
+
+ public static enum TX_METHOD {
+ IMPUTE ("impute"),
+ RECODE ("recode"),
+ BIN ("bin"),
+ DUMMYCODE ("dummycode"),
+ SCALE ("scale"),
+ OMIT ("omit"),
+ MVRCD ("mvrcd");
+
+ private String _name;
+
+ TX_METHOD(String name) { _name = name; }
+
+ public String toString() {
+ return _name;
+ }
+ }
+
+ protected static String JSON_ATTRS = "attributes";
+ protected static String JSON_MTHD = "methods";
+ protected static String JSON_CONSTS = "constants";
+ protected static String JSON_NBINS = "numbins";
+
+ protected static final String MV_FILE_SUFFIX = ".impute";
+ protected static final String RCD_MAP_FILE_SUFFIX = ".map";
+ protected static final String NDISTINCT_FILE_SUFFIX = ".ndistinct";
+ protected static final String MODE_FILE_SUFFIX = ".mode";
+ protected static final String BIN_FILE_SUFFIX = ".bin";
+ protected static final String SCALE_FILE_SUFFIX = ".scale";
+ protected static final String DCD_FILE_NAME = "dummyCodeMaps.csv";
+ protected static final String COLTYPES_FILE_NAME = "coltypes.csv";
+
+ protected static final String TXMTD_SEP = ",";
+ protected static final String DCD_NAME_SEP = "_";
+
+ protected static final String OUT_HEADER = "column.names";
+ protected static final String OUT_DCD_HEADER = "dummycoded.column.names";
+
+ abstract public void print();
+ abstract public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException;
+ abstract public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException;
+
+ abstract public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException;
+ abstract public String[] apply(String[] words, TfUtils agents);
+
+ protected enum ColumnTypes { SCALE, NOMINAL, ORDINAL, DUMMYCODED, INVALID }
+ protected byte columnTypeToID(ColumnTypes type) throws IOException {
+ switch(type)
+ {
+ case SCALE: return 1;
+ case NOMINAL: return 2;
+ case ORDINAL: return 3;
+ case DUMMYCODED: return 1; // Ideally, dummycoded columns should be of a different type. Treating them as SCALE is incorrect, semantically.
+ default:
+ throw new IOException("Invalid Column Type: " + type);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/standalone/runStandaloneSystemML.bat
----------------------------------------------------------------------
diff --git a/src/main/standalone/runStandaloneSystemML.bat b/src/main/standalone/runStandaloneSystemML.bat
index aba2002..f837970 100644
--- a/src/main/standalone/runStandaloneSystemML.bat
+++ b/src/main/standalone/runStandaloneSystemML.bat
@@ -1,50 +1,50 @@
-::-------------------------------------------------------------
-::
-:: Licensed to the Apache Software Foundation (ASF) under one
-:: or more contributor license agreements. See the NOTICE file
-:: distributed with this work for additional information
-:: regarding copyright ownership. The ASF licenses this file
-:: to you under the Apache License, Version 2.0 (the
-:: "License"); you may not use this file except in compliance
-:: with the License. You may obtain a copy of the License at
-::
-:: http://www.apache.org/licenses/LICENSE-2.0
-::
-:: Unless required by applicable law or agreed to in writing,
-:: software distributed under the License is distributed on an
-:: "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-:: KIND, either express or implied. See the License for the
-:: specific language governing permissions and limitations
-:: under the License.
-::
-::-------------------------------------------------------------
-
-@ECHO OFF
-
-IF "%~1" == "" GOTO Err
-IF "%~1" == "-help" GOTO Msg
-IF "%~1" == "-h" GOTO Msg
-
-setLocal EnableDelayedExpansion
-
-SET HADOOP_HOME=%CD%/lib/hadoop
-
-set CLASSPATH=./lib/*
-echo !CLASSPATH!
-
-set LOG4JPROP=log4j.properties
-
-for /f "tokens=1,* delims= " %%a in ("%*") do set ALLBUTFIRST=%%b
-
-java -Xmx4g -Xms4g -Xmn400m -cp %CLASSPATH% -Dlog4j.configuration=file:%LOG4JPROP% org.apache.sysml.api.DMLScript -f %1 -exec singlenode -config=SystemML-config.xml %ALLBUTFIRST%
-GOTO End
-
-:Err
-ECHO "Wrong Usage. Please provide DML filename to be executed."
-GOTO Msg
-
-:Msg
-ECHO "Usage: runStandaloneSystemML.bat <dml-filename> [arguments] [-help]"
-ECHO "Script internally invokes 'java -Xmx4g -Xms4g -Xmn400m -jar jSystemML.jar -f <dml-filename> -exec singlenode -config=SystemML-config.xml [Optional-Arguments]'"
-
-:End
+::-------------------------------------------------------------
+::
+:: Licensed to the Apache Software Foundation (ASF) under one
+:: or more contributor license agreements. See the NOTICE file
+:: distributed with this work for additional information
+:: regarding copyright ownership. The ASF licenses this file
+:: to you under the Apache License, Version 2.0 (the
+:: "License"); you may not use this file except in compliance
+:: with the License. You may obtain a copy of the License at
+::
+:: http://www.apache.org/licenses/LICENSE-2.0
+::
+:: Unless required by applicable law or agreed to in writing,
+:: software distributed under the License is distributed on an
+:: "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+:: KIND, either express or implied. See the License for the
+:: specific language governing permissions and limitations
+:: under the License.
+::
+::-------------------------------------------------------------
+
+@ECHO OFF
+
+IF "%~1" == "" GOTO Err
+IF "%~1" == "-help" GOTO Msg
+IF "%~1" == "-h" GOTO Msg
+
+setLocal EnableDelayedExpansion
+
+SET HADOOP_HOME=%CD%/lib/hadoop
+
+set CLASSPATH=./lib/*
+echo !CLASSPATH!
+
+set LOG4JPROP=log4j.properties
+
+for /f "tokens=1,* delims= " %%a in ("%*") do set ALLBUTFIRST=%%b
+
+java -Xmx4g -Xms4g -Xmn400m -cp %CLASSPATH% -Dlog4j.configuration=file:%LOG4JPROP% org.apache.sysml.api.DMLScript -f %1 -exec singlenode -config=SystemML-config.xml %ALLBUTFIRST%
+GOTO End
+
+:Err
+ECHO "Wrong Usage. Please provide DML filename to be executed."
+GOTO Msg
+
+:Msg
+ECHO "Usage: runStandaloneSystemML.bat <dml-filename> [arguments] [-help]"
+ECHO "Script internally invokes 'java -Xmx4g -Xms4g -Xmn400m -jar jSystemML.jar -f <dml-filename> -exec singlenode -config=SystemML-config.xml [Optional-Arguments]'"
+
+:End
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/test/scripts/applications/apply-transform/apply-transform.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/applications/apply-transform/apply-transform.dml b/src/test/scripts/applications/apply-transform/apply-transform.dml
index de7fa02..fdd85c7 100644
--- a/src/test/scripts/applications/apply-transform/apply-transform.dml
+++ b/src/test/scripts/applications/apply-transform/apply-transform.dml
@@ -1,156 +1,156 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-cmdLine_missing_value_maps = ifdef($missing_value_maps, " ")
-cmdLine_bin_defns = ifdef($bin_defns, " ")
-cmdLine_dummy_code_maps = ifdef($dummy_code_maps, " ")
-cmdLine_normalization_maps = ifdef($normalization_maps, " ")
-
-original_X = read($X)
-
-if(cmdLine_missing_value_maps != " "){
- missing_val_maps = read(cmdLine_missing_value_maps)
-
- last_data_col = ncol(original_X)-nrow(missing_val_maps)
- X = original_X[,1:last_data_col]
-}else
- X = original_X
-
-# col 1: col index of missing indicator col
-# 0 otherwise
-# col 2: global mean if imputation is needed
-# col 3: num_bins if binning is required
-# col 4: bin width if binning is required
-# col 5: min val if binning is required
-# col 6: begin col if dummy coding is required
-# col 7: end col if dummy coding is required
-# col 8: 1 if normalization is required 0 ow
-# col 9: mean for normalization
-# col 10: std for z-scoring for normalization
-# -1 indicates mean subtraction
-attrinfo = matrix(0, rows=ncol(X), cols=10)
-
-if(cmdLine_missing_value_maps != " "){
- missing_indicator_mat = original_X[,(last_data_col+1):ncol(original_X)]
-
- parfor(i in 1:nrow(missing_val_maps), check=0){
- attr_index_mv = castAsScalar(missing_val_maps[i,1])
- attrinfo[attr_index_mv,1] = i
- attrinfo[attr_index_mv,2] = missing_val_maps[i,2]
- }
-}
-
-if(cmdLine_bin_defns != " "){
- bin_defns = read(cmdLine_bin_defns)
- parfor(i in 1:nrow(bin_defns), check=0){
- attr_index_bin = castAsScalar(bin_defns[i,1])
- attrinfo[attr_index_bin,3] = bin_defns[i,4]
- attrinfo[attr_index_bin,4] = bin_defns[i,2]
- attrinfo[attr_index_bin,5] = bin_defns[i,3]
- }
-}
-
-if(cmdLine_dummy_code_maps != " "){
- dummy_code_maps = read(cmdLine_dummy_code_maps)
- parfor(i in 1:nrow(dummy_code_maps), check=0){
- attr_index_dc = castAsScalar(dummy_code_maps[i,1])
- attrinfo[attr_index_dc,6] = dummy_code_maps[i,2]
- attrinfo[attr_index_dc,7] = dummy_code_maps[i,3]
- }
-}else{
- attrinfo[,6] = seq(1, ncol(X), 1)
- attrinfo[,7] = seq(1, ncol(X), 1)
-}
-
-if(cmdLine_normalization_maps != " "){
- normalization_map = read(cmdLine_normalization_maps)
- parfor(i in 1:nrow(normalization_map), check=0){
- attr_index_normalization = castAsScalar(normalization_map[i,1])
- attrinfo[attr_index_normalization,8] = 1
- attrinfo[attr_index_normalization,9] = castAsScalar(normalization_map[i,2])
- attrinfo[attr_index_normalization,10] = castAsScalar(normalization_map[i,3])
- }
-}
-
-#write(attrinfo, "binning/attrinfo.mtx", format="csv")
-
-cols_in_transformed_X = castAsScalar(attrinfo[nrow(attrinfo),6])
-new_X = matrix(0, rows=nrow(X), cols=cols_in_transformed_X)
-log = matrix(0, rows=ncol(X), cols=2)
-parfor(i in 1:ncol(X), check=0){
- col = X[,i]
-
- mv_col_id = castAsScalar(attrinfo[i,1])
- global_mean = castAsScalar(attrinfo[i,2])
- num_bins = castAsScalar(attrinfo[i,3])
- bin_width = castAsScalar(attrinfo[i,4])
- min_val = castAsScalar(attrinfo[i,5])
- dummy_coding_beg_col = castAsScalar(attrinfo[i,6])
- dummy_coding_end_col = castAsScalar(attrinfo[i,7])
- normalization_needed = castAsScalar(attrinfo[i,8])
- normalization_mean = castAsScalar(attrinfo[i,9])
- normalization_std = castAsScalar(attrinfo[i,10])
-
- if(mv_col_id > 0){
- # fill-in with global mean
- col = col + missing_indicator_mat[,mv_col_id] * global_mean
- }
-
- if(num_bins > 0){
- # only for equiwidth bins
-
- # note that max_val entries will get assigned num_bins+1
- col = round((col - min_val)/bin_width - 0.5) + 1
- less_than_lb = ppred(col, 1, "<")
- more_than_ub = ppred(col, num_bins, ">")
-
- col = (1 - less_than_lb - more_than_ub)*col + more_than_ub*num_bins + less_than_lb
- }
-
- if(dummy_coding_beg_col == dummy_coding_end_col){
- if(normalization_needed == 1){
- if(normalization_std == -1) col = col - normalization_mean
- else col = (col - normalization_mean)/normalization_std
- }
-
- new_X[,dummy_coding_beg_col] = col
- }else{
- min_val = min(col)
- max_val = max(col)
- if(min_val >= 1 & max_val <= dummy_coding_end_col - dummy_coding_beg_col + 1){
- res = table(seq(1, nrow(X), 1), col, nrow(X), (dummy_coding_end_col-dummy_coding_beg_col+1))
- new_X[,dummy_coding_beg_col:dummy_coding_end_col] = res
- }else{
- log[i,1] = 1
- if(min_val < 1) log[i,2] = min_val
- else log[i,2] = max_val
- }
- }
-}
-
-write(new_X, $transformed_X, format="text")
-
-s = "Warning Messages"
-for(i in 1:nrow(log)){
- if(castAsScalar(log[i,1]) == 1)
- s = append(s, "Unseen value in column " + i + " (" + castAsScalar(log[i,2]) + ")")
-}
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+cmdLine_missing_value_maps = ifdef($missing_value_maps, " ")
+cmdLine_bin_defns = ifdef($bin_defns, " ")
+cmdLine_dummy_code_maps = ifdef($dummy_code_maps, " ")
+cmdLine_normalization_maps = ifdef($normalization_maps, " ")
+
+original_X = read($X)
+
+if(cmdLine_missing_value_maps != " "){
+ missing_val_maps = read(cmdLine_missing_value_maps)
+
+ last_data_col = ncol(original_X)-nrow(missing_val_maps)
+ X = original_X[,1:last_data_col]
+}else
+ X = original_X
+
+# col 1: col index of missing indicator col
+# 0 otherwise
+# col 2: global mean if imputation is needed
+# col 3: num_bins if binning is required
+# col 4: bin width if binning is required
+# col 5: min val if binning is required
+# col 6: begin col if dummy coding is required
+# col 7: end col if dummy coding is required
+# col 8: 1 if normalization is required 0 ow
+# col 9: mean for normalization
+# col 10: std for z-scoring for normalization
+# -1 indicates mean subtraction
+attrinfo = matrix(0, rows=ncol(X), cols=10)
+
+if(cmdLine_missing_value_maps != " "){
+ missing_indicator_mat = original_X[,(last_data_col+1):ncol(original_X)]
+
+ parfor(i in 1:nrow(missing_val_maps), check=0){
+ attr_index_mv = castAsScalar(missing_val_maps[i,1])
+ attrinfo[attr_index_mv,1] = i
+ attrinfo[attr_index_mv,2] = missing_val_maps[i,2]
+ }
+}
+
+if(cmdLine_bin_defns != " "){
+ bin_defns = read(cmdLine_bin_defns)
+ parfor(i in 1:nrow(bin_defns), check=0){
+ attr_index_bin = castAsScalar(bin_defns[i,1])
+ attrinfo[attr_index_bin,3] = bin_defns[i,4]
+ attrinfo[attr_index_bin,4] = bin_defns[i,2]
+ attrinfo[attr_index_bin,5] = bin_defns[i,3]
+ }
+}
+
+if(cmdLine_dummy_code_maps != " "){
+ dummy_code_maps = read(cmdLine_dummy_code_maps)
+ parfor(i in 1:nrow(dummy_code_maps), check=0){
+ attr_index_dc = castAsScalar(dummy_code_maps[i,1])
+ attrinfo[attr_index_dc,6] = dummy_code_maps[i,2]
+ attrinfo[attr_index_dc,7] = dummy_code_maps[i,3]
+ }
+}else{
+ attrinfo[,6] = seq(1, ncol(X), 1)
+ attrinfo[,7] = seq(1, ncol(X), 1)
+}
+
+if(cmdLine_normalization_maps != " "){
+ normalization_map = read(cmdLine_normalization_maps)
+ parfor(i in 1:nrow(normalization_map), check=0){
+ attr_index_normalization = castAsScalar(normalization_map[i,1])
+ attrinfo[attr_index_normalization,8] = 1
+ attrinfo[attr_index_normalization,9] = castAsScalar(normalization_map[i,2])
+ attrinfo[attr_index_normalization,10] = castAsScalar(normalization_map[i,3])
+ }
+}
+
+#write(attrinfo, "binning/attrinfo.mtx", format="csv")
+
+cols_in_transformed_X = castAsScalar(attrinfo[nrow(attrinfo),6])
+new_X = matrix(0, rows=nrow(X), cols=cols_in_transformed_X)
+log = matrix(0, rows=ncol(X), cols=2)
+parfor(i in 1:ncol(X), check=0){
+ col = X[,i]
+
+ mv_col_id = castAsScalar(attrinfo[i,1])
+ global_mean = castAsScalar(attrinfo[i,2])
+ num_bins = castAsScalar(attrinfo[i,3])
+ bin_width = castAsScalar(attrinfo[i,4])
+ min_val = castAsScalar(attrinfo[i,5])
+ dummy_coding_beg_col = castAsScalar(attrinfo[i,6])
+ dummy_coding_end_col = castAsScalar(attrinfo[i,7])
+ normalization_needed = castAsScalar(attrinfo[i,8])
+ normalization_mean = castAsScalar(attrinfo[i,9])
+ normalization_std = castAsScalar(attrinfo[i,10])
+
+ if(mv_col_id > 0){
+ # fill-in with global mean
+ col = col + missing_indicator_mat[,mv_col_id] * global_mean
+ }
+
+ if(num_bins > 0){
+ # only for equiwidth bins
+
+ # note that max_val entries will get assigned num_bins+1
+ col = round((col - min_val)/bin_width - 0.5) + 1
+ less_than_lb = ppred(col, 1, "<")
+ more_than_ub = ppred(col, num_bins, ">")
+
+ col = (1 - less_than_lb - more_than_ub)*col + more_than_ub*num_bins + less_than_lb
+ }
+
+ if(dummy_coding_beg_col == dummy_coding_end_col){
+ if(normalization_needed == 1){
+ if(normalization_std == -1) col = col - normalization_mean
+ else col = (col - normalization_mean)/normalization_std
+ }
+
+ new_X[,dummy_coding_beg_col] = col
+ }else{
+ min_val = min(col)
+ max_val = max(col)
+ if(min_val >= 1 & max_val <= dummy_coding_end_col - dummy_coding_beg_col + 1){
+ res = table(seq(1, nrow(X), 1), col, nrow(X), (dummy_coding_end_col-dummy_coding_beg_col+1))
+ new_X[,dummy_coding_beg_col:dummy_coding_end_col] = res
+ }else{
+ log[i,1] = 1
+ if(min_val < 1) log[i,2] = min_val
+ else log[i,2] = max_val
+ }
+ }
+}
+
+write(new_X, $transformed_X, format="text")
+
+s = "Warning Messages"
+for(i in 1:nrow(log)){
+ if(castAsScalar(log[i,1]) == 1)
+ s = append(s, "Unseen value in column " + i + " (" + castAsScalar(log[i,2]) + ")")
+}
write(s, $Log)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/test/scripts/applications/apply-transform/apply-transform.pydml
----------------------------------------------------------------------
diff --git a/src/test/scripts/applications/apply-transform/apply-transform.pydml b/src/test/scripts/applications/apply-transform/apply-transform.pydml
index be04495..f6c40dd 100644
--- a/src/test/scripts/applications/apply-transform/apply-transform.pydml
+++ b/src/test/scripts/applications/apply-transform/apply-transform.pydml
@@ -1,146 +1,146 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-cmdLine_missing_value_maps = ifdef($missing_value_maps, " ")
-cmdLine_bin_defns = ifdef($bin_defns, " ")
-cmdLine_dummy_code_maps = ifdef($dummy_code_maps, " ")
-cmdLine_normalization_maps = ifdef($normalization_maps, " ")
-
-original_X = load($X)
-
-if(cmdLine_missing_value_maps != " "):
- missing_val_maps = read(cmdLine_missing_value_maps)
-
- last_data_col = ncol(original_X)-nrow(missing_val_maps)
- X = original_X[,1:last_data_col]
-else:
- X = original_X
-
-# col 1: col index of missing indicator col
-# 0 otherwise
-# col 2: global mean if imputation is needed
-# col 3: num_bins if binning is required
-# col 4: bin width if binning is required
-# col 5: min val if binning is required
-# col 6: begin col if dummy coding is required
-# col 7: end col if dummy coding is required
-# col 8: 1 if normalization is required 0 ow
-# col 9: mean for normalization
-# col 10: std for z-scoring for normalization
-# -1 indicates mean subtraction
-attrinfo = full(0, rows=ncol(X), cols=10)
-
-if(cmdLine_missing_value_maps != " "):
- missing_indicator_mat = original_X[,(last_data_col+1):ncol(original_X)]
-
- parfor(i in 1:nrow(missing_val_maps), check=0):
- attr_index_mv = castAsScalar(missing_val_maps[i,1])
- attrinfo[attr_index_mv,1] = i
- attrinfo[attr_index_mv,2] = missing_val_maps[i,2]
-
-if(cmdLine_bin_defns != " "):
- bin_defns = read(cmdLine_bin_defns)
- parfor(i in 1:nrow(bin_defns), check=0):
- attr_index_bin = castAsScalar(bin_defns[i,1])
- attrinfo[attr_index_bin,3] = bin_defns[i,4]
- attrinfo[attr_index_bin,4] = bin_defns[i,2]
- attrinfo[attr_index_bin,5] = bin_defns[i,3]
-
-if(cmdLine_dummy_code_maps != " "):
- dummy_code_maps = read(cmdLine_dummy_code_maps)
- parfor(i in 1:nrow(dummy_code_maps), check=0):
- attr_index_dc = castAsScalar(dummy_code_maps[i,1])
- attrinfo[attr_index_dc,6] = dummy_code_maps[i,2]
- attrinfo[attr_index_dc,7] = dummy_code_maps[i,3]
-else:
- attrinfo[,6] = seq(1, ncol(X), 1)
- attrinfo[,7] = seq(1, ncol(X), 1)
-
-if(cmdLine_normalization_maps != " "):
- normalization_map = read(cmdLine_normalization_maps)
- parfor(i in 1:nrow(normalization_map), check=0):
- attr_index_normalization = castAsScalar(normalization_map[i,1])
- attrinfo[attr_index_normalization,8] = 1
- attrinfo[attr_index_normalization,9] = castAsScalar(normalization_map[i,2])
- attrinfo[attr_index_normalization,10] = castAsScalar(normalization_map[i,3])
-
-#write(attrinfo, "binning/attrinfo.mtx", format="csv")
-
-cols_in_transformed_X = castAsScalar(attrinfo[nrow(attrinfo),6])
-new_X = full(0, rows=nrow(X), cols=cols_in_transformed_X)
-log = full(0, rows=ncol(X), cols=2)
-parfor(i in 1:ncol(X), check=0):
- col = X[,i]
-
- mv_col_id = castAsScalar(attrinfo[i,1])
- global_mean = castAsScalar(attrinfo[i,2])
- num_bins = castAsScalar(attrinfo[i,3])
- bin_width = castAsScalar(attrinfo[i,4])
- min_val = castAsScalar(attrinfo[i,5])
- dummy_coding_beg_col = castAsScalar(attrinfo[i,6])
- dummy_coding_end_col = castAsScalar(attrinfo[i,7])
- normalization_needed = castAsScalar(attrinfo[i,8])
- normalization_mean = castAsScalar(attrinfo[i,9])
- normalization_std = castAsScalar(attrinfo[i,10])
-
- if(mv_col_id > 0):
- # fill-in with global mean
- col = col + missing_indicator_mat[,mv_col_id] * global_mean
-
- if(num_bins > 0):
- # only for equiwidth bins
-
- # note that max_val entries will get assigned num_bins+1
- col = round((col - min_val)/bin_width - 0.5) + 1
- less_than_lb = ppred(col, 1, "<")
- more_than_ub = ppred(col, num_bins, ">")
-
- col = (1 - less_than_lb - more_than_ub)*col + more_than_ub*num_bins + less_than_lb
-
- if(dummy_coding_beg_col == dummy_coding_end_col):
- if(normalization_needed == 1):
- if(normalization_std == -1):
- col = col - normalization_mean
- else:
- col = (col - normalization_mean)/normalization_std
-
- new_X[,dummy_coding_beg_col] = col
- else:
- min_val = min(col)
- max_val = max(col)
- if(min_val >= 1 & max_val <= dummy_coding_end_col - dummy_coding_beg_col + 1):
- res = table(seq(1, nrow(X), 1), col, nrow(X), (dummy_coding_end_col-dummy_coding_beg_col+1))
- new_X[,dummy_coding_beg_col:dummy_coding_end_col] = res
- else:
- log[i,1] = 1
- if(min_val < 1):
- log[i,2] = min_val
- else:
- log[i,2] = max_val
-
-save(new_X, $transformed_X, format="text")
-
-s = "Warning Messages"
-for(i in 1:nrow(log)):
- if(castAsScalar(log[i,1]) == 1):
- s = append(s, "Unseen value in column " + i + " (" + castAsScalar(log[i,2]) + ")")
-
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+cmdLine_missing_value_maps = ifdef($missing_value_maps, " ")
+cmdLine_bin_defns = ifdef($bin_defns, " ")
+cmdLine_dummy_code_maps = ifdef($dummy_code_maps, " ")
+cmdLine_normalization_maps = ifdef($normalization_maps, " ")
+
+original_X = load($X)
+
+if(cmdLine_missing_value_maps != " "):
+ missing_val_maps = read(cmdLine_missing_value_maps)
+
+ last_data_col = ncol(original_X)-nrow(missing_val_maps)
+ X = original_X[,1:last_data_col]
+else:
+ X = original_X
+
+# col 1: col index of missing indicator col
+# 0 otherwise
+# col 2: global mean if imputation is needed
+# col 3: num_bins if binning is required
+# col 4: bin width if binning is required
+# col 5: min val if binning is required
+# col 6: begin col if dummy coding is required
+# col 7: end col if dummy coding is required
+# col 8: 1 if normalization is required 0 ow
+# col 9: mean for normalization
+# col 10: std for z-scoring for normalization
+# -1 indicates mean subtraction
+attrinfo = full(0, rows=ncol(X), cols=10)
+
+if(cmdLine_missing_value_maps != " "):
+ missing_indicator_mat = original_X[,(last_data_col+1):ncol(original_X)]
+
+ parfor(i in 1:nrow(missing_val_maps), check=0):
+ attr_index_mv = castAsScalar(missing_val_maps[i,1])
+ attrinfo[attr_index_mv,1] = i
+ attrinfo[attr_index_mv,2] = missing_val_maps[i,2]
+
+if(cmdLine_bin_defns != " "):
+ bin_defns = read(cmdLine_bin_defns)
+ parfor(i in 1:nrow(bin_defns), check=0):
+ attr_index_bin = castAsScalar(bin_defns[i,1])
+ attrinfo[attr_index_bin,3] = bin_defns[i,4]
+ attrinfo[attr_index_bin,4] = bin_defns[i,2]
+ attrinfo[attr_index_bin,5] = bin_defns[i,3]
+
+if(cmdLine_dummy_code_maps != " "):
+ dummy_code_maps = read(cmdLine_dummy_code_maps)
+ parfor(i in 1:nrow(dummy_code_maps), check=0):
+ attr_index_dc = castAsScalar(dummy_code_maps[i,1])
+ attrinfo[attr_index_dc,6] = dummy_code_maps[i,2]
+ attrinfo[attr_index_dc,7] = dummy_code_maps[i,3]
+else:
+ attrinfo[,6] = seq(1, ncol(X), 1)
+ attrinfo[,7] = seq(1, ncol(X), 1)
+
+if(cmdLine_normalization_maps != " "):
+ normalization_map = read(cmdLine_normalization_maps)
+ parfor(i in 1:nrow(normalization_map), check=0):
+ attr_index_normalization = castAsScalar(normalization_map[i,1])
+ attrinfo[attr_index_normalization,8] = 1
+ attrinfo[attr_index_normalization,9] = castAsScalar(normalization_map[i,2])
+ attrinfo[attr_index_normalization,10] = castAsScalar(normalization_map[i,3])
+
+#write(attrinfo, "binning/attrinfo.mtx", format="csv")
+
+cols_in_transformed_X = castAsScalar(attrinfo[nrow(attrinfo),6])
+new_X = full(0, rows=nrow(X), cols=cols_in_transformed_X)
+log = full(0, rows=ncol(X), cols=2)
+parfor(i in 1:ncol(X), check=0):
+ col = X[,i]
+
+ mv_col_id = castAsScalar(attrinfo[i,1])
+ global_mean = castAsScalar(attrinfo[i,2])
+ num_bins = castAsScalar(attrinfo[i,3])
+ bin_width = castAsScalar(attrinfo[i,4])
+ min_val = castAsScalar(attrinfo[i,5])
+ dummy_coding_beg_col = castAsScalar(attrinfo[i,6])
+ dummy_coding_end_col = castAsScalar(attrinfo[i,7])
+ normalization_needed = castAsScalar(attrinfo[i,8])
+ normalization_mean = castAsScalar(attrinfo[i,9])
+ normalization_std = castAsScalar(attrinfo[i,10])
+
+ if(mv_col_id > 0):
+ # fill-in with global mean
+ col = col + missing_indicator_mat[,mv_col_id] * global_mean
+
+ if(num_bins > 0):
+ # only for equiwidth bins
+
+ # note that max_val entries will get assigned num_bins+1
+ col = round((col - min_val)/bin_width - 0.5) + 1
+ less_than_lb = ppred(col, 1, "<")
+ more_than_ub = ppred(col, num_bins, ">")
+
+ col = (1 - less_than_lb - more_than_ub)*col + more_than_ub*num_bins + less_than_lb
+
+ if(dummy_coding_beg_col == dummy_coding_end_col):
+ if(normalization_needed == 1):
+ if(normalization_std == -1):
+ col = col - normalization_mean
+ else:
+ col = (col - normalization_mean)/normalization_std
+
+ new_X[,dummy_coding_beg_col] = col
+ else:
+ min_val = min(col)
+ max_val = max(col)
+ if(min_val >= 1 & max_val <= dummy_coding_end_col - dummy_coding_beg_col + 1):
+ res = table(seq(1, nrow(X), 1), col, nrow(X), (dummy_coding_end_col-dummy_coding_beg_col+1))
+ new_X[,dummy_coding_beg_col:dummy_coding_end_col] = res
+ else:
+ log[i,1] = 1
+ if(min_val < 1):
+ log[i,2] = min_val
+ else:
+ log[i,2] = max_val
+
+save(new_X, $transformed_X, format="text")
+
+s = "Warning Messages"
+for(i in 1:nrow(log)):
+ if(castAsScalar(log[i,1]) == 1):
+ s = append(s, "Unseen value in column " + i + " (" + castAsScalar(log[i,2]) + ")")
+
save(s, $Log)
\ No newline at end of file