You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by du...@apache.org on 2016/01/22 17:34:11 UTC
[35/51] [partial] incubator-systemml git commit: [SYSTEMML-482]
[SYSTEMML-480] Adding a Git attributes file to enfore Unix-styled line
endings, and normalizing all of the line endings.
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
index 0c0d399..7fb1ccc 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
@@ -1,112 +1,112 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-
-public class ApplyTfCSVMapper implements Mapper<LongWritable, Text, NullWritable, Text> {
-
- boolean _firstRecordInSplit = true;
- boolean _partFileWithHeader = false;
-
- TfUtils tfmapper = null;
- Reporter _reporter = null;
- BufferedWriter br = null;
- JobConf _rJob = null;
-
- @Override
- public void configure(JobConf job) {
- try {
- _rJob = job;
- _partFileWithHeader = TfUtils.isPartFileWithHeader(job);
- tfmapper = new TfUtils(job);
-
- tfmapper.loadTfMetadata(job, true);
-
- } catch (IOException e) { throw new RuntimeException(e); }
- catch(JSONException e) { throw new RuntimeException(e); }
-
- }
-
- @Override
- public void map(LongWritable rawKey, Text rawValue, OutputCollector<NullWritable, Text> out, Reporter reporter) throws IOException {
-
- if(_firstRecordInSplit)
- {
- _firstRecordInSplit = false;
- _reporter = reporter;
-
- // generate custom output paths so that order of rows in the
- // output (across part files) matches w/ that from input data set
- String partFileSuffix = tfmapper.getPartFileID(_rJob, rawKey.get());
- Path mapOutputPath = new Path(tfmapper.getOutputPath() + "/transform-part-" + partFileSuffix);
-
- // setup the writer for mapper's output
- // the default part-..... files will be deleted later once the job finishes
- br = new BufferedWriter(new OutputStreamWriter(FileSystem.get(_rJob).create( mapOutputPath, true)));
- }
-
- // output the header line
- if ( rawKey.get() == 0 && _partFileWithHeader )
- {
- _reporter = reporter;
- tfmapper.processHeaderLine();
- if ( tfmapper.hasHeader() )
- return;
- }
-
- // parse the input line and apply transformation
- String[] words = tfmapper.getWords(rawValue);
-
- if(!tfmapper.omit(words))
- {
- try {
- words = tfmapper.apply(words);
- String outStr = tfmapper.checkAndPrepOutputString(words);
- //out.collect(NullWritable.get(), new Text(outStr));
- br.write(outStr + "\n");
- }
- catch(DMLRuntimeException e) {
- throw new RuntimeException(e.getMessage() + ": " + rawValue.toString());
- }
- }
- }
-
- @Override
- public void close() throws IOException {
- if ( br != null )
- br.close();
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+
+public class ApplyTfCSVMapper implements Mapper<LongWritable, Text, NullWritable, Text> {
+
+ boolean _firstRecordInSplit = true;
+ boolean _partFileWithHeader = false;
+
+ TfUtils tfmapper = null;
+ Reporter _reporter = null;
+ BufferedWriter br = null;
+ JobConf _rJob = null;
+
+ @Override
+ public void configure(JobConf job) {
+ try {
+ _rJob = job;
+ _partFileWithHeader = TfUtils.isPartFileWithHeader(job);
+ tfmapper = new TfUtils(job);
+
+ tfmapper.loadTfMetadata(job, true);
+
+ } catch (IOException e) { throw new RuntimeException(e); }
+ catch(JSONException e) { throw new RuntimeException(e); }
+
+ }
+
+ @Override
+ public void map(LongWritable rawKey, Text rawValue, OutputCollector<NullWritable, Text> out, Reporter reporter) throws IOException {
+
+ if(_firstRecordInSplit)
+ {
+ _firstRecordInSplit = false;
+ _reporter = reporter;
+
+ // generate custom output paths so that order of rows in the
+ // output (across part files) matches w/ that from input data set
+ String partFileSuffix = tfmapper.getPartFileID(_rJob, rawKey.get());
+ Path mapOutputPath = new Path(tfmapper.getOutputPath() + "/transform-part-" + partFileSuffix);
+
+ // setup the writer for mapper's output
+ // the default part-..... files will be deleted later once the job finishes
+ br = new BufferedWriter(new OutputStreamWriter(FileSystem.get(_rJob).create( mapOutputPath, true)));
+ }
+
+ // output the header line
+ if ( rawKey.get() == 0 && _partFileWithHeader )
+ {
+ _reporter = reporter;
+ tfmapper.processHeaderLine();
+ if ( tfmapper.hasHeader() )
+ return;
+ }
+
+ // parse the input line and apply transformation
+ String[] words = tfmapper.getWords(rawValue);
+
+ if(!tfmapper.omit(words))
+ {
+ try {
+ words = tfmapper.apply(words);
+ String outStr = tfmapper.checkAndPrepOutputString(words);
+ //out.collect(NullWritable.get(), new Text(outStr));
+ br.write(outStr + "\n");
+ }
+ catch(DMLRuntimeException e) {
+ throw new RuntimeException(e.getMessage() + ": " + rawValue.toString());
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if ( br != null )
+ br.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
index 693d687..061f2e3 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
@@ -1,160 +1,160 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-
-
-public class ApplyTfCSVSPARK {
-
- /**
- * Apply transformation metadata and generate the result in CSV format, as a
- * JavaRDD of Strings.
- */
-
- public static JavaPairRDD<Long, String> runSparkJob(
- SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, Text>> inputRDD,
- String tfMtdPath, String specFile,
- String tmpPath, CSVFileFormatProperties prop,
- int numCols, String headerLine
- ) throws IOException, ClassNotFoundException, InterruptedException, IllegalArgumentException, JSONException {
-
- // Load transformation metadata and broadcast it
- JobConf job = new JobConf();
- FileSystem fs = FileSystem.get(job);
-
- String[] naStrings = TfUtils.parseNAStrings(prop.getNAStrings());
- JSONObject spec = TfUtils.readSpec(fs, specFile);
- TfUtils _tfmapper = new TfUtils(headerLine, prop.hasHeader(), prop.getDelim(), naStrings, spec, numCols, tfMtdPath, null, tmpPath);
-
- _tfmapper.loadTfMetadata();
-
- Broadcast<TfUtils> bcast_tf = sec.getSparkContext().broadcast(_tfmapper);
-
- /*
- * Construct transformation metadata (map-side) -- the logic is similar
- * to GTFMTDMapper
- *
- * Note: The result of mapPartitionsWithIndex is cached so that the
- * transformed data is not redundantly computed multiple times
- */
- JavaPairRDD<Long, String> applyRDD = inputRDD
- .mapPartitionsWithIndex( new ApplyTfCSVMap(bcast_tf), true)
- .mapToPair(
- new PairFunction<String,Long,String>(){
- private static final long serialVersionUID = 3868143093999082931L;
- @Override
- public Tuple2<Long, String> call(String t) throws Exception {
- return new Tuple2<Long, String>(new Long(1), t);
- }
- }
- ).cache();
-
- /*
- * An action to force execution of apply()
- *
- * We need to trigger the execution of this RDD so as to ensure the
- * creation of a few metadata files (headers, dummycoded information,
- * etc.), which are referenced in the caller function.
- */
- applyRDD.count();
-
- return applyRDD;
- }
-
- public static class ApplyTfCSVMap implements Function2<Integer, Iterator<Tuple2<LongWritable, Text>>, Iterator<String>> {
-
- private static final long serialVersionUID = 1496686437276906911L;
-
- TfUtils _tfmapper = null;
-
- ApplyTfCSVMap(boolean hasHeader, String delim, String naStrings, String specFile, String tmpPath, String tfMtdPath, long numCols, String headerLine, Broadcast<TfUtils> tf) throws IllegalArgumentException, IOException, JSONException {
- _tfmapper = tf.getValue();
- }
-
- ApplyTfCSVMap(Broadcast<TfUtils> tf) throws IllegalArgumentException, IOException, JSONException {
- _tfmapper = tf.getValue();
- }
-
- @Override
- public Iterator<String> call(Integer partitionID,
- Iterator<Tuple2<LongWritable, Text>> csvLines) throws Exception {
-
- boolean first = true;
- Tuple2<LongWritable, Text> rec = null;
- ArrayList<String> outLines = new ArrayList<String>();
-
- while(csvLines.hasNext()) {
- rec = csvLines.next();
-
- if (first && partitionID == 0) {
- first = false;
-
- _tfmapper.processHeaderLine();
-
- if (_tfmapper.hasHeader() ) {
- //outLines.add(dcdHeader); // if the header needs to be preserved in the output file
- continue;
- }
- }
-
- // parse the input line and apply transformation
-
- String[] words = _tfmapper.getWords(rec._2());
-
- if(!_tfmapper.omit(words))
- {
- try {
- words = _tfmapper.apply(words);
- String outStr = _tfmapper.checkAndPrepOutputString(words);
- outLines.add(outStr);
- }
- catch(DMLRuntimeException e) {
- throw new RuntimeException(e.getMessage() + ": " + rec._2().toString());
- }
- }
- }
-
- return outLines.iterator();
- }
-
- }
-
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import scala.Tuple2;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+
+
+public class ApplyTfCSVSPARK {
+
+ /**
+ * Apply transformation metadata and generate the result in CSV format, as a
+ * JavaRDD of Strings.
+ */
+
+ public static JavaPairRDD<Long, String> runSparkJob(
+ SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, Text>> inputRDD,
+ String tfMtdPath, String specFile,
+ String tmpPath, CSVFileFormatProperties prop,
+ int numCols, String headerLine
+ ) throws IOException, ClassNotFoundException, InterruptedException, IllegalArgumentException, JSONException {
+
+ // Load transformation metadata and broadcast it
+ JobConf job = new JobConf();
+ FileSystem fs = FileSystem.get(job);
+
+ String[] naStrings = TfUtils.parseNAStrings(prop.getNAStrings());
+ JSONObject spec = TfUtils.readSpec(fs, specFile);
+ TfUtils _tfmapper = new TfUtils(headerLine, prop.hasHeader(), prop.getDelim(), naStrings, spec, numCols, tfMtdPath, null, tmpPath);
+
+ _tfmapper.loadTfMetadata();
+
+ Broadcast<TfUtils> bcast_tf = sec.getSparkContext().broadcast(_tfmapper);
+
+ /*
+ * Construct transformation metadata (map-side) -- the logic is similar
+ * to GTFMTDMapper
+ *
+ * Note: The result of mapPartitionsWithIndex is cached so that the
+ * transformed data is not redundantly computed multiple times
+ */
+ JavaPairRDD<Long, String> applyRDD = inputRDD
+ .mapPartitionsWithIndex( new ApplyTfCSVMap(bcast_tf), true)
+ .mapToPair(
+ new PairFunction<String,Long,String>(){
+ private static final long serialVersionUID = 3868143093999082931L;
+ @Override
+ public Tuple2<Long, String> call(String t) throws Exception {
+ return new Tuple2<Long, String>(new Long(1), t);
+ }
+ }
+ ).cache();
+
+ /*
+ * An action to force execution of apply()
+ *
+ * We need to trigger the execution of this RDD so as to ensure the
+ * creation of a few metadata files (headers, dummycoded information,
+ * etc.), which are referenced in the caller function.
+ */
+ applyRDD.count();
+
+ return applyRDD;
+ }
+
+ public static class ApplyTfCSVMap implements Function2<Integer, Iterator<Tuple2<LongWritable, Text>>, Iterator<String>> {
+
+ private static final long serialVersionUID = 1496686437276906911L;
+
+ TfUtils _tfmapper = null;
+
+ ApplyTfCSVMap(boolean hasHeader, String delim, String naStrings, String specFile, String tmpPath, String tfMtdPath, long numCols, String headerLine, Broadcast<TfUtils> tf) throws IllegalArgumentException, IOException, JSONException {
+ _tfmapper = tf.getValue();
+ }
+
+ ApplyTfCSVMap(Broadcast<TfUtils> tf) throws IllegalArgumentException, IOException, JSONException {
+ _tfmapper = tf.getValue();
+ }
+
+ @Override
+ public Iterator<String> call(Integer partitionID,
+ Iterator<Tuple2<LongWritable, Text>> csvLines) throws Exception {
+
+ boolean first = true;
+ Tuple2<LongWritable, Text> rec = null;
+ ArrayList<String> outLines = new ArrayList<String>();
+
+ while(csvLines.hasNext()) {
+ rec = csvLines.next();
+
+ if (first && partitionID == 0) {
+ first = false;
+
+ _tfmapper.processHeaderLine();
+
+ if (_tfmapper.hasHeader() ) {
+ //outLines.add(dcdHeader); // if the header needs to be preserved in the output file
+ continue;
+ }
+ }
+
+ // parse the input line and apply transformation
+
+ String[] words = _tfmapper.getWords(rec._2());
+
+ if(!_tfmapper.omit(words))
+ {
+ try {
+ words = _tfmapper.apply(words);
+ String outStr = _tfmapper.checkAndPrepOutputString(words);
+ outLines.add(outStr);
+ }
+ catch(DMLRuntimeException e) {
+ throw new RuntimeException(e.getMessage() + ": " + rec._2().toString());
+ }
+ }
+ }
+
+ return outLines.iterator();
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
index f08c9ff..b61c781 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
@@ -1,355 +1,355 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.nio.charset.CharacterCodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.wink.json4j.JSONArray;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
-import org.apache.sysml.runtime.util.UtilFunctions;
-
-public class BinAgent extends TransformationAgent {
-
- private static final long serialVersionUID = 1917445005206076078L;
-
- public static final String MIN_PREFIX = "min";
- public static final String MAX_PREFIX = "max";
- public static final String NBINS_PREFIX = "nbins";
-
- private int[] _binList = null;
- //private byte[] _binMethodList = null; // Not used, since only equi-width is supported for now.
- private int[] _numBins = null;
-
- private double[] _min=null, _max=null; // min and max among non-missing values
-
- private double[] _binWidths = null; // width of a bin for each attribute
-
- BinAgent() { }
-
- BinAgent(JSONObject parsedSpec) throws JSONException {
-
- if ( !parsedSpec.containsKey(TX_METHOD.BIN.toString()) )
- return;
-
- JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.BIN.toString());
-
- JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
- //JSONArray mthds = (JSONArray) obj.get(JSON_MTHD);
- JSONArray nbins = (JSONArray) obj.get(JSON_NBINS);
-
- assert(attrs.size() == nbins.size());
-
- _binList = new int[attrs.size()];
- _numBins = new int[attrs.size()];
- for(int i=0; i < _binList.length; i++) {
- _binList[i] = UtilFunctions.toInt(attrs.get(i));
- _numBins[i] = UtilFunctions.toInt(nbins.get(i));
- }
-
- // initialize internal transformation metadata
- _min = new double[_binList.length];
- Arrays.fill(_min, Double.MAX_VALUE);
- _max = new double[_binList.length];
- Arrays.fill(_max, -Double.MAX_VALUE);
-
- _binWidths = new double[_binList.length];
- }
-
- public void prepare(String[] words, TfUtils agents) {
- if ( _binList == null )
- return;
-
- for(int i=0; i <_binList.length; i++) {
- int colID = _binList[i];
-
- String w = null;
- double d = 0;
-
- // equi-width
- w = UtilFunctions.unquote(words[colID-1].trim());
- if(!agents.isNA(w)) {
- d = UtilFunctions.parseToDouble(w);
- if(d < _min[i])
- _min[i] = d;
- if(d > _max[i])
- _max[i] = d;
- }
- }
- }
-
- private DistinctValue prepMinOutput(int idx) throws CharacterCodingException {
- String s = MIN_PREFIX + Double.toString(_min[idx]);
- return new DistinctValue(s, -1L);
- }
-
- private DistinctValue prepMaxOutput(int idx) throws CharacterCodingException {
- String s = MAX_PREFIX + Double.toString(_max[idx]);
- return new DistinctValue(s, -1L);
- }
-
- private DistinctValue prepNBinsOutput(int idx) throws CharacterCodingException {
- String s = NBINS_PREFIX + Double.toString(_numBins[idx]);
- return new DistinctValue(s, -1L);
- }
-
- /**
- * Method to output transformation metadata from the mappers.
- * This information is collected and merged by the reducers.
- *
- * @param out
- * @throws IOException
- */
- @Override
- public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException {
- if ( _binList == null )
- return;
-
- try {
- for(int i=0; i < _binList.length; i++) {
- int colID = _binList[i];
- IntWritable iw = new IntWritable(-colID);
-
- out.collect(iw, prepMinOutput(i));
- out.collect(iw, prepMaxOutput(i));
- out.collect(iw, prepNBinsOutput(i));
- }
- } catch(Exception e) {
- throw new IOException(e);
- }
- }
-
- public ArrayList<Tuple2<Integer, DistinctValue>> mapOutputTransformationMetadata(int taskID, ArrayList<Tuple2<Integer, DistinctValue>> list, TfUtils agents) throws IOException {
- if ( _binList == null )
- return list;
-
- try {
- for(int i=0; i < _binList.length; i++) {
- int colID = _binList[i];
- Integer iw = -colID;
-
- list.add( new Tuple2<Integer,DistinctValue>(iw, prepMinOutput(i)) );
- list.add( new Tuple2<Integer,DistinctValue>(iw, prepMaxOutput(i)) );
- list.add( new Tuple2<Integer,DistinctValue>(iw, prepNBinsOutput(i)) );
- }
- } catch(Exception e) {
- throw new IOException(e);
- }
- return list;
- }
-
- private void writeTfMtd(int colID, String min, String max, String binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException
- {
- Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + BIN_FILE_SUFFIX);
- BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
- br.write(colID + TXMTD_SEP + min + TXMTD_SEP + max + TXMTD_SEP + binwidth + TXMTD_SEP + nbins + "\n");
- br.close();
- }
-
- /**
- * Method to merge map output transformation metadata.
- *
- * @param values
- * @return
- * @throws IOException
- */
- @Override
- public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
- double min = Double.MAX_VALUE;
- double max = -Double.MAX_VALUE;
- int nbins = 0;
-
- DistinctValue val = new DistinctValue();
- String w = null;
- double d;
- while(values.hasNext()) {
- val.reset();
- val = values.next();
- w = val.getWord();
-
- if(w.startsWith(MIN_PREFIX)) {
- d = UtilFunctions.parseToDouble(w.substring( MIN_PREFIX.length() ));
- if ( d < min )
- min = d;
- }
- else if(w.startsWith(MAX_PREFIX)) {
- d = UtilFunctions.parseToDouble(w.substring( MAX_PREFIX.length() ));
- if ( d > max )
- max = d;
- }
- else if (w.startsWith(NBINS_PREFIX)) {
- nbins = (int) UtilFunctions.parseToLong( w.substring(NBINS_PREFIX.length() ) );
- }
- else
- throw new RuntimeException("MVImputeAgent: Invalid prefix while merging map output: " + w);
- }
-
- // write merged metadata
- double binwidth = (max-min)/nbins;
- writeTfMtd(colID, Double.toString(min), Double.toString(max), Double.toString(binwidth), Integer.toString(nbins), outputDir, fs, agents);
- }
-
-
- public void outputTransformationMetadata(String outputDir, FileSystem fs, TfUtils agents) throws IOException {
- if(_binList == null)
- return;
-
- MVImputeAgent mvagent = agents.getMVImputeAgent();
- for(int i=0; i < _binList.length; i++) {
- int colID = _binList[i];
-
- // If the column is imputed with a constant, then adjust min and max based the value of the constant.
- if ( mvagent.isImputed(colID) != -1 && mvagent.getMethod(colID) == MVMethod.CONSTANT )
- {
- double cst = UtilFunctions.parseToDouble( mvagent.getReplacement(colID) );
- if ( cst < _min[i])
- _min[i] = cst;
- if ( cst > _max[i])
- _max[i] = cst;
- }
-
- double binwidth = (_max[i] - _min[i])/_numBins[i];
- writeTfMtd(colID, Double.toString(_min[i]), Double.toString(_max[i]), Double.toString(binwidth), Integer.toString(_numBins[i]), outputDir, fs, agents);
- }
- }
-
- // ------------------------------------------------------------------------------------------------
-
- public int[] getBinList() { return _binList; }
- public int[] getNumBins() { return _numBins; }
- public double[] getMin() { return _min; }
- public double[] getBinWidths() { return _binWidths; }
-
- /**
- * Method to load transform metadata for all attributes
- *
- * @param job
- * @throws IOException
- */
- @Override
- public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException {
- if ( _binList == null )
- return;
-
- if(fs.isDirectory(txMtdDir)) {
- for(int i=0; i<_binList.length;i++) {
- int colID = _binList[i];
-
- Path path = new Path( txMtdDir + "/Bin/" + agents.getName(colID) + BIN_FILE_SUFFIX);
- TfUtils.checkValidInputFile(fs, path, true);
-
- BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)));
- // format: colID,min,max,nbins
- String[] fields = br.readLine().split(TXMTD_SEP);
- double min = UtilFunctions.parseToDouble(fields[1]);
- //double max = UtilFunctions.parseToDouble(fields[2]);
- double binwidth = UtilFunctions.parseToDouble(fields[3]);
- int nbins = UtilFunctions.parseToInt(fields[4]);
-
- _numBins[i] = nbins;
- _min[i] = min;
- _binWidths[i] = binwidth; // (max-min)/nbins;
-
- br.close();
- }
- }
- else {
- fs.close();
- throw new RuntimeException("Path to recode maps must be a directory: " + txMtdDir);
- }
- }
-
- /**
- * Method to apply transformations.
- *
- * @param words
- * @return
- */
- @Override
- public String[] apply(String[] words, TfUtils agents) {
- if ( _binList == null )
- return words;
-
- for(int i=0; i < _binList.length; i++) {
- int colID = _binList[i];
-
- try {
- double val = UtilFunctions.parseToDouble(words[colID-1]);
- int binid = 1;
- double tmp = _min[i] + _binWidths[i];
- while(val > tmp && binid < _numBins[i]) {
- tmp += _binWidths[i];
- binid++;
- }
- words[colID-1] = Integer.toString(binid);
- } catch(NumberFormatException e)
- {
- throw new RuntimeException("Encountered \"" + words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an appropriate imputation method.");
- }
- }
-
- return words;
- }
-
- /**
- * Check if the given column ID is subjected to this transformation.
- *
- */
- public int isBinned(int colID)
- {
- if(_binList == null)
- return -1;
-
- int idx = Arrays.binarySearch(_binList, colID);
- return ( idx >= 0 ? idx : -1);
- }
-
-
- @Override
- public void print() {
- System.out.print("Binning List (Equi-width): \n ");
- for(int i : _binList) {
- System.out.print(i + " ");
- }
- System.out.print("\n ");
- for(int b : _numBins) {
- System.out.print(b + " ");
- }
- System.out.println();
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.CharacterCodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.wink.json4j.JSONArray;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import scala.Tuple2;
+
+import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class BinAgent extends TransformationAgent {
+
+ private static final long serialVersionUID = 1917445005206076078L;
+
+ public static final String MIN_PREFIX = "min";
+ public static final String MAX_PREFIX = "max";
+ public static final String NBINS_PREFIX = "nbins";
+
+ private int[] _binList = null;
+ //private byte[] _binMethodList = null; // Not used, since only equi-width is supported for now.
+ private int[] _numBins = null;
+
+ private double[] _min=null, _max=null; // min and max among non-missing values
+
+ private double[] _binWidths = null; // width of a bin for each attribute
+
+ BinAgent() { }
+
+ BinAgent(JSONObject parsedSpec) throws JSONException {
+
+ if ( !parsedSpec.containsKey(TX_METHOD.BIN.toString()) )
+ return;
+
+ JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.BIN.toString());
+
+ JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
+ //JSONArray mthds = (JSONArray) obj.get(JSON_MTHD);
+ JSONArray nbins = (JSONArray) obj.get(JSON_NBINS);
+
+ assert(attrs.size() == nbins.size());
+
+ _binList = new int[attrs.size()];
+ _numBins = new int[attrs.size()];
+ for(int i=0; i < _binList.length; i++) {
+ _binList[i] = UtilFunctions.toInt(attrs.get(i));
+ _numBins[i] = UtilFunctions.toInt(nbins.get(i));
+ }
+
+ // initialize internal transformation metadata
+ _min = new double[_binList.length];
+ Arrays.fill(_min, Double.MAX_VALUE);
+ _max = new double[_binList.length];
+ Arrays.fill(_max, -Double.MAX_VALUE);
+
+ _binWidths = new double[_binList.length];
+ }
+
+ public void prepare(String[] words, TfUtils agents) {
+ if ( _binList == null )
+ return;
+
+ for(int i=0; i <_binList.length; i++) {
+ int colID = _binList[i];
+
+ String w = null;
+ double d = 0;
+
+ // equi-width
+ w = UtilFunctions.unquote(words[colID-1].trim());
+ if(!agents.isNA(w)) {
+ d = UtilFunctions.parseToDouble(w);
+ if(d < _min[i])
+ _min[i] = d;
+ if(d > _max[i])
+ _max[i] = d;
+ }
+ }
+ }
+
+ private DistinctValue prepMinOutput(int idx) throws CharacterCodingException {
+ String s = MIN_PREFIX + Double.toString(_min[idx]);
+ return new DistinctValue(s, -1L);
+ }
+
+ private DistinctValue prepMaxOutput(int idx) throws CharacterCodingException {
+ String s = MAX_PREFIX + Double.toString(_max[idx]);
+ return new DistinctValue(s, -1L);
+ }
+
+ private DistinctValue prepNBinsOutput(int idx) throws CharacterCodingException {
+ String s = NBINS_PREFIX + Double.toString(_numBins[idx]);
+ return new DistinctValue(s, -1L);
+ }
+
+ /**
+ * Method to output transformation metadata from the mappers.
+ * This information is collected and merged by the reducers.
+ *
+ * @param out
+ * @throws IOException
+ */
+ @Override
+ public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException {
+ if ( _binList == null )
+ return;
+
+ try {
+ for(int i=0; i < _binList.length; i++) {
+ int colID = _binList[i];
+ IntWritable iw = new IntWritable(-colID);
+
+ out.collect(iw, prepMinOutput(i));
+ out.collect(iw, prepMaxOutput(i));
+ out.collect(iw, prepNBinsOutput(i));
+ }
+ } catch(Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public ArrayList<Tuple2<Integer, DistinctValue>> mapOutputTransformationMetadata(int taskID, ArrayList<Tuple2<Integer, DistinctValue>> list, TfUtils agents) throws IOException {
+ if ( _binList == null )
+ return list;
+
+ try {
+ for(int i=0; i < _binList.length; i++) {
+ int colID = _binList[i];
+ Integer iw = -colID;
+
+ list.add( new Tuple2<Integer,DistinctValue>(iw, prepMinOutput(i)) );
+ list.add( new Tuple2<Integer,DistinctValue>(iw, prepMaxOutput(i)) );
+ list.add( new Tuple2<Integer,DistinctValue>(iw, prepNBinsOutput(i)) );
+ }
+ } catch(Exception e) {
+ throw new IOException(e);
+ }
+ return list;
+ }
+
+ private void writeTfMtd(int colID, String min, String max, String binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException
+ {
+ Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + BIN_FILE_SUFFIX);
+ BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
+ br.write(colID + TXMTD_SEP + min + TXMTD_SEP + max + TXMTD_SEP + binwidth + TXMTD_SEP + nbins + "\n");
+ br.close();
+ }
+
+ /**
+ * Method to merge map output transformation metadata.
+ *
+ * @param values
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
+ double min = Double.MAX_VALUE;
+ double max = -Double.MAX_VALUE;
+ int nbins = 0;
+
+ DistinctValue val = new DistinctValue();
+ String w = null;
+ double d;
+ while(values.hasNext()) {
+ val.reset();
+ val = values.next();
+ w = val.getWord();
+
+ if(w.startsWith(MIN_PREFIX)) {
+ d = UtilFunctions.parseToDouble(w.substring( MIN_PREFIX.length() ));
+ if ( d < min )
+ min = d;
+ }
+ else if(w.startsWith(MAX_PREFIX)) {
+ d = UtilFunctions.parseToDouble(w.substring( MAX_PREFIX.length() ));
+ if ( d > max )
+ max = d;
+ }
+ else if (w.startsWith(NBINS_PREFIX)) {
+ nbins = (int) UtilFunctions.parseToLong( w.substring(NBINS_PREFIX.length() ) );
+ }
+ else
+ throw new RuntimeException("MVImputeAgent: Invalid prefix while merging map output: " + w);
+ }
+
+ // write merged metadata
+ double binwidth = (max-min)/nbins;
+ writeTfMtd(colID, Double.toString(min), Double.toString(max), Double.toString(binwidth), Integer.toString(nbins), outputDir, fs, agents);
+ }
+
+
+ public void outputTransformationMetadata(String outputDir, FileSystem fs, TfUtils agents) throws IOException {
+ if(_binList == null)
+ return;
+
+ MVImputeAgent mvagent = agents.getMVImputeAgent();
+ for(int i=0; i < _binList.length; i++) {
+ int colID = _binList[i];
+
+ // If the column is imputed with a constant, then adjust min and max based the value of the constant.
+ if ( mvagent.isImputed(colID) != -1 && mvagent.getMethod(colID) == MVMethod.CONSTANT )
+ {
+ double cst = UtilFunctions.parseToDouble( mvagent.getReplacement(colID) );
+ if ( cst < _min[i])
+ _min[i] = cst;
+ if ( cst > _max[i])
+ _max[i] = cst;
+ }
+
+ double binwidth = (_max[i] - _min[i])/_numBins[i];
+ writeTfMtd(colID, Double.toString(_min[i]), Double.toString(_max[i]), Double.toString(binwidth), Integer.toString(_numBins[i]), outputDir, fs, agents);
+ }
+ }
+
+ // ------------------------------------------------------------------------------------------------
+
+ public int[] getBinList() { return _binList; }
+ public int[] getNumBins() { return _numBins; }
+ public double[] getMin() { return _min; }
+ public double[] getBinWidths() { return _binWidths; }
+
+ /**
+ * Method to load transform metadata for all attributes
+ *
+ * @param job
+ * @throws IOException
+ */
+ @Override
+ public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException {
+ if ( _binList == null )
+ return;
+
+ if(fs.isDirectory(txMtdDir)) {
+ for(int i=0; i<_binList.length;i++) {
+ int colID = _binList[i];
+
+ Path path = new Path( txMtdDir + "/Bin/" + agents.getName(colID) + BIN_FILE_SUFFIX);
+ TfUtils.checkValidInputFile(fs, path, true);
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)));
+ // format: colID,min,max,nbins
+ String[] fields = br.readLine().split(TXMTD_SEP);
+ double min = UtilFunctions.parseToDouble(fields[1]);
+ //double max = UtilFunctions.parseToDouble(fields[2]);
+ double binwidth = UtilFunctions.parseToDouble(fields[3]);
+ int nbins = UtilFunctions.parseToInt(fields[4]);
+
+ _numBins[i] = nbins;
+ _min[i] = min;
+ _binWidths[i] = binwidth; // (max-min)/nbins;
+
+ br.close();
+ }
+ }
+ else {
+ fs.close();
+ throw new RuntimeException("Path to recode maps must be a directory: " + txMtdDir);
+ }
+ }
+
+ /**
+ * Method to apply transformations.
+ *
+ * @param words
+ * @return
+ */
+ @Override
+ public String[] apply(String[] words, TfUtils agents) {
+ if ( _binList == null )
+ return words;
+
+ for(int i=0; i < _binList.length; i++) {
+ int colID = _binList[i];
+
+ try {
+ double val = UtilFunctions.parseToDouble(words[colID-1]);
+ int binid = 1;
+ double tmp = _min[i] + _binWidths[i];
+ while(val > tmp && binid < _numBins[i]) {
+ tmp += _binWidths[i];
+ binid++;
+ }
+ words[colID-1] = Integer.toString(binid);
+ } catch(NumberFormatException e)
+ {
+ throw new RuntimeException("Encountered \"" + words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an appropriate imputation method.");
+ }
+ }
+
+ return words;
+ }
+
+ /**
+ * Check if the given column ID is subjected to this transformation.
+ *
+ */
+ public int isBinned(int colID)
+ {
+ if(_binList == null)
+ return -1;
+
+ int idx = Arrays.binarySearch(_binList, colID);
+ return ( idx >= 0 ? idx : -1);
+ }
+
+
+ @Override
+ public void print() {
+ System.out.print("Binning List (Equi-width): \n ");
+ for(int i : _binList) {
+ System.out.print(i + " ");
+ }
+ System.out.print("\n ");
+ for(int b : _numBins) {
+ System.out.print(b + " ");
+ }
+ System.out.println();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java b/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
index d44a904..2e52657 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
@@ -1,108 +1,108 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.util.UtilFunctions;
-
-public class DistinctValue implements Writable, Serializable {
-
- private static final long serialVersionUID = -8236705946336974836L;
-
- private static final byte [] EMPTY_BYTES = new byte[0];
-
- // word (distinct value)
- private byte[] _bytes;
- private int _length;
- // count
- private long _count;
-
- public DistinctValue() {
- _bytes = EMPTY_BYTES;
- _length = 0;
- _count = -1;
- }
-
- public DistinctValue(String w, long count) throws CharacterCodingException {
- ByteBuffer bb = Text.encode(w, true);
- _bytes = bb.array();
- _length = bb.limit();
- _count = count;
- }
-
- public DistinctValue(OffsetCount oc) throws CharacterCodingException
- {
- this(oc.filename + "," + oc.fileOffset, oc.count);
- }
-
- public void reset() {
- _bytes = EMPTY_BYTES;
- _length = 0;
- _count = -1;
- }
-
- public String getWord() { return new String( _bytes, 0, _length, Charset.forName("UTF-8") ); }
- public long getCount() { return _count; }
-
- @Override
- public void write(DataOutput out) throws IOException {
- // write word
- WritableUtils.writeVInt(out, _length);
- out.write(_bytes, 0, _length);
- // write count
- out.writeLong(_count);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- // read word
- int newLength = WritableUtils.readVInt(in);
- _bytes = new byte[newLength];
- in.readFully(_bytes, 0, newLength);
- _length = newLength;
- if (_length != _bytes.length)
- System.out.println("ERROR in DistinctValue.readFields()");
- // read count
- _count = in.readLong();
- }
-
- public OffsetCount getOffsetCount() {
- OffsetCount oc = new OffsetCount();
- String[] parts = getWord().split(",");
- oc.filename = parts[0];
- oc.fileOffset = UtilFunctions.parseToLong(parts[1]);
- oc.count = getCount();
-
- return oc;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class DistinctValue implements Writable, Serializable {
+
+ private static final long serialVersionUID = -8236705946336974836L;
+
+ private static final byte [] EMPTY_BYTES = new byte[0];
+
+ // word (distinct value)
+ private byte[] _bytes;
+ private int _length;
+ // count
+ private long _count;
+
+ public DistinctValue() {
+ _bytes = EMPTY_BYTES;
+ _length = 0;
+ _count = -1;
+ }
+
+ public DistinctValue(String w, long count) throws CharacterCodingException {
+ ByteBuffer bb = Text.encode(w, true);
+ _bytes = bb.array();
+ _length = bb.limit();
+ _count = count;
+ }
+
+ public DistinctValue(OffsetCount oc) throws CharacterCodingException
+ {
+ this(oc.filename + "," + oc.fileOffset, oc.count);
+ }
+
+ public void reset() {
+ _bytes = EMPTY_BYTES;
+ _length = 0;
+ _count = -1;
+ }
+
+ public String getWord() { return new String( _bytes, 0, _length, Charset.forName("UTF-8") ); }
+ public long getCount() { return _count; }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // write word
+ WritableUtils.writeVInt(out, _length);
+ out.write(_bytes, 0, _length);
+ // write count
+ out.writeLong(_count);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ // read word
+ int newLength = WritableUtils.readVInt(in);
+ _bytes = new byte[newLength];
+ in.readFully(_bytes, 0, newLength);
+ _length = newLength;
+ if (_length != _bytes.length)
+ System.out.println("ERROR in DistinctValue.readFields()");
+ // read count
+ _count = in.readLong();
+ }
+
+ public OffsetCount getOffsetCount() {
+ OffsetCount oc = new OffsetCount();
+ String[] parts = getWord().split(",");
+ oc.filename = parts[0];
+ oc.fileOffset = UtilFunctions.parseToLong(parts[1]);
+ oc.count = getCount();
+
+ return oc;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
index a1c76ba..079ad58 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
@@ -1,426 +1,426 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.wink.json4j.JSONArray;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import com.google.common.base.Functions;
-import com.google.common.collect.Ordering;
-import org.apache.sysml.runtime.util.UtilFunctions;
-
-public class DummycodeAgent extends TransformationAgent {
-
- private static final long serialVersionUID = 5832130477659116489L;
-
- private int[] _dcdList = null;
- private long numCols = 0;
-
- private HashMap<Integer, HashMap<String,String>> _finalMaps = null;
- private HashMap<Integer, HashMap<String,Long>> _finalMapsCP = null;
- private int[] _binList = null;
- private int[] _numBins = null;
-
- private int[] _domainSizes = null; // length = #of dummycoded columns
- private int[] _dcdColumnMap = null; // to help in translating between original and dummycoded column IDs
- private long _dummycodedLength = 0; // #of columns after dummycoded
-
- DummycodeAgent(int[] list) {
- _dcdList = list;
- }
-
- DummycodeAgent(JSONObject parsedSpec, long ncol) throws JSONException {
- numCols = ncol;
-
- if ( !parsedSpec.containsKey(TX_METHOD.DUMMYCODE.toString()) )
- return;
-
- JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.DUMMYCODE.toString());
- JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
-
- _dcdList = new int[attrs.size()];
- for(int i=0; i < _dcdList.length; i++)
- _dcdList[i] = UtilFunctions.toInt(attrs.get(i));
- }
-
- public int[] dcdList() {
- return _dcdList;
- }
-
- /**
- * Method to output transformation metadata from the mappers.
- * This information is collected and merged by the reducers.
- *
- * @param out
- * @throws IOException
- *
- */
- @Override
- public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException {
- // There is no metadata required for dummycode.
- // Required information is output from RecodeAgent.
- return;
- }
-
- @Override
- public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values,
- String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
- // Nothing to do here
- }
-
- public void setRecodeMaps(HashMap<Integer, HashMap<String,String>> maps) {
- _finalMaps = maps;
- }
-
- public void setRecodeMapsCP(HashMap<Integer, HashMap<String,Long>> maps) {
- _finalMapsCP = maps;
- }
-
- public void setNumBins(int[] binList, int[] numbins) {
- _binList = binList;
- _numBins = numbins;
- }
-
- /**
- * Method to generate dummyCodedMaps.csv, with the range of column IDs for each variable in the original data.
- *
- * Each line in dummyCodedMaps.csv file is of the form: [ColID, 1/0, st, end]
- * 1/0 indicates if ColID is dummycoded or not
- * [st,end] is the range of dummycoded column numbers for the given ColID
- *
- * It also generates coltypes.csv, with the type (scale, nominal, etc.) of columns in the output.
- * Recoded columns are of type nominal, binner columns are of type ordinal, dummycoded columns are of type
- * dummycoded, and the remaining are of type scale.
- *
- * @param fs
- * @param txMtdDir
- * @param numCols
- * @param ra
- * @param ba
- * @return Number of columns in the transformed data
- * @throws IOException
- */
- public int genDcdMapsAndColTypes(FileSystem fs, String txMtdDir, int numCols, TfUtils agents) throws IOException {
-
- // initialize all column types in the transformed data to SCALE
- ColumnTypes[] ctypes = new ColumnTypes[(int) _dummycodedLength];
- for(int i=0; i < _dummycodedLength; i++)
- ctypes[i] = ColumnTypes.SCALE;
-
- _dcdColumnMap = new int[numCols];
-
- Path pt=new Path(txMtdDir+"/Dummycode/" + DCD_FILE_NAME);
- BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
-
- int sum=1;
- int idx = 0;
- for(int colID=1; colID <= numCols; colID++)
- {
- if ( _dcdList != null && idx < _dcdList.length && _dcdList[idx] == colID )
- {
- br.write(colID + "," + "1" + "," + sum + "," + (sum+_domainSizes[idx]-1) + "\n");
- _dcdColumnMap[colID-1] = (sum+_domainSizes[idx]-1)-1;
-
- for(int i=sum; i <=(sum+_domainSizes[idx]-1); i++)
- ctypes[i-1] = ColumnTypes.DUMMYCODED;
-
- sum += _domainSizes[idx];
- idx++;
- }
- else
- {
- br.write(colID + "," + "0" + "," + sum + "," + sum + "\n");
- _dcdColumnMap[colID-1] = sum-1;
-
- if ( agents.getBinAgent().isBinned(colID) != -1 )
- ctypes[sum-1] = ColumnTypes.ORDINAL; // binned variable results in an ordinal column
-
- if ( agents.getRecodeAgent().isRecoded(colID) != -1 )
- ctypes[sum-1] = ColumnTypes.NOMINAL;
-
- sum += 1;
- }
- }
- br.close();
-
- // Write coltypes.csv
- pt=new Path(txMtdDir+"/" + COLTYPES_FILE_NAME);
- br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
-
- br.write(columnTypeToID(ctypes[0]) + "");
- for(int i = 1; i < _dummycodedLength; i++)
- br.write( "," + columnTypeToID(ctypes[i]));
- br.close();
-
- return sum-1;
- }
-
- /**
- * Given a dummycoded column id, find the corresponding original column ID.
- *
- * @param colID
- * @return
- */
- public int mapDcdColumnID(int colID)
- {
- for(int i=0; i < _dcdColumnMap.length; i++)
- {
- int st = (i==0 ? 1 : _dcdColumnMap[i-1]+1+1);
- int end = _dcdColumnMap[i]+1;
- //System.out.println((i+1) + ": " + "[" + st + "," + end + "]");
-
- if ( colID >= st && colID <= end)
- return i+1;
- }
- return -1;
- }
-
- public String constructDummycodedHeader(String header, Pattern delim) {
-
- if(_dcdList == null && _binList == null )
- // none of the columns are dummycoded, simply return the given header
- return header;
-
- String[] names = delim.split(header, -1);
- List<String> newNames = null;
-
- StringBuilder sb = new StringBuilder();
-
- // Dummycoding can be performed on either on a recoded column or on a binned column
-
- // process recoded columns
- if(_finalMapsCP != null && _dcdList != null)
- {
- for(int i=0; i <_dcdList.length; i++)
- {
- int colID = _dcdList[i];
- HashMap<String,Long> map = _finalMapsCP.get(colID);
- String colName = UtilFunctions.unquote(names[colID-1]);
-
- if ( map != null )
- {
- // order map entries by their recodeID
- Ordering<String> valueComparator = Ordering.natural().onResultOf(Functions.forMap(map));
- newNames = valueComparator.sortedCopy(map.keySet());
-
- // construct concatenated string of map entries
- sb.setLength(0);
- for(int idx=0; idx < newNames.size(); idx++)
- {
- if(idx==0)
- sb.append( colName + DCD_NAME_SEP + newNames.get(idx));
- else
- sb.append( delim + colName + DCD_NAME_SEP + newNames.get(idx));
- }
- names[colID-1] = sb.toString(); // replace original column name with dcd name
- }
- }
- }
- else if(_finalMaps != null && _dcdList != null) {
- for(int i=0; i <_dcdList.length; i++) {
- int colID = _dcdList[i];
- HashMap<String,String> map = _finalMaps.get(colID);
- String colName = UtilFunctions.unquote(names[colID-1]);
-
- if ( map != null )
- {
- // order map entries by their recodeID (represented as Strings .. "1", "2", etc.)
- Ordering<String> orderByID = new Ordering<String>()
- {
- public int compare(String s1, String s2) {
- return (Integer.parseInt(s1) - Integer.parseInt(s2));
- }
- };
-
- newNames = orderByID.onResultOf(Functions.forMap(map)).sortedCopy(map.keySet());
- // construct concatenated string of map entries
- sb.setLength(0);
- for(int idx=0; idx < newNames.size(); idx++)
- {
- if(idx==0)
- sb.append( colName + DCD_NAME_SEP + newNames.get(idx));
- else
- sb.append( delim + colName + DCD_NAME_SEP + newNames.get(idx));
- }
- names[colID-1] = sb.toString(); // replace original column name with dcd name
- }
- }
- }
-
- // process binned columns
- if (_binList != null)
- for(int i=0; i < _binList.length; i++)
- {
- int colID = _binList[i];
-
- // need to consider only binned and dummycoded columns
- if(isDummyCoded(colID) == -1)
- continue;
-
- int numBins = _numBins[i];
- String colName = UtilFunctions.unquote(names[colID-1]);
-
- sb.setLength(0);
- for(int idx=0; idx < numBins; idx++)
- if(idx==0)
- sb.append( colName + DCD_NAME_SEP + "Bin" + (idx+1) );
- else
- sb.append( delim + colName + DCD_NAME_SEP + "Bin" + (idx+1) );
- names[colID-1] = sb.toString(); // replace original column name with dcd name
- }
-
- // Construct the full header
- sb.setLength(0);
- for(int colID=0; colID < names.length; colID++)
- {
- if (colID == 0)
- sb.append(names[colID]);
- else
- sb.append(delim + names[colID]);
- }
- //System.out.println("DummycodedHeader: " + sb.toString());
-
- return sb.toString();
- }
-
- @Override
- public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException {
- if ( _dcdList == null )
- {
- _dummycodedLength = numCols;
- return;
- }
-
- // sort to-be dummycoded column IDs in ascending order. This is the order in which the new dummycoded record is constructed in apply() function.
- Arrays.sort(_dcdList);
- _domainSizes = new int[_dcdList.length];
-
- _dummycodedLength = numCols;
-
- //HashMap<String, String> map = null;
- for(int i=0; i<_dcdList.length; i++) {
- int colID = _dcdList[i];
-
- // Find the domain size for colID using _finalMaps or _finalMapsCP
- int domainSize = 0;
- if(_finalMaps != null) {
- if(_finalMaps.get(colID) != null)
- domainSize = _finalMaps.get(colID).size();
- }
- else {
- if(_finalMapsCP.get(colID) != null)
- domainSize = _finalMapsCP.get(colID).size();
- }
-
- if ( domainSize != 0 ) {
- // dummycoded column
- _domainSizes[i] = domainSize;
- }
- else {
- // binned column
- if ( _binList != null )
- for(int j=0; j<_binList.length; j++) {
- if (colID == _binList[j]) {
- _domainSizes[i] = _numBins[j];
- break;
- }
- }
- }
- _dummycodedLength += _domainSizes[i]-1;
- //System.out.println("colID=" + colID + ", domainsize=" + _domainSizes[i] + ", dcdLength=" + _dummycodedLength);
- }
- }
-
- /**
- * Method to apply transformations.
- *
- * @param words
- * @return
- */
- @Override
- public String[] apply(String[] words, TfUtils agents) {
-
- if ( _dcdList == null )
- return words;
-
- String[] nwords = new String[(int)_dummycodedLength];
-
- int rcdVal = 0;
-
- for(int colID=1, idx=0, ncolID=1; colID <= words.length; colID++) {
- if(idx < _dcdList.length && colID==_dcdList[idx]) {
- // dummycoded columns
- try {
- rcdVal = UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1]));
- nwords[ ncolID-1+rcdVal-1 ] = "1";
- ncolID += _domainSizes[idx];
- idx++;
- } catch (Exception e) {
- System.out.println("Error in dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] + ", domainSize=" + _domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength);
- throw new RuntimeException(e);
- }
- }
- else {
- nwords[ncolID-1] = words[colID-1];
- ncolID++;
- }
- }
-
- return nwords;
- }
-
- /**
- * Check if the given column ID is subjected to this transformation.
- *
- */
- public int isDummyCoded(int colID)
- {
- if(_dcdList == null)
- return -1;
-
- int idx = Arrays.binarySearch(_dcdList, colID);
- return ( idx >= 0 ? idx : -1);
- }
-
- @Override
- public void print() {
- System.out.print("Dummycoding List: \n ");
- for(int i : _dcdList) {
- System.out.print(i + " ");
- }
- System.out.println();
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.wink.json4j.JSONArray;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import com.google.common.base.Functions;
+import com.google.common.collect.Ordering;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class DummycodeAgent extends TransformationAgent {
+
+ private static final long serialVersionUID = 5832130477659116489L;
+
+ private int[] _dcdList = null;
+ private long numCols = 0;
+
+ private HashMap<Integer, HashMap<String,String>> _finalMaps = null;
+ private HashMap<Integer, HashMap<String,Long>> _finalMapsCP = null;
+ private int[] _binList = null;
+ private int[] _numBins = null;
+
+ private int[] _domainSizes = null; // length = #of dummycoded columns
+ private int[] _dcdColumnMap = null; // to help in translating between original and dummycoded column IDs
+ private long _dummycodedLength = 0; // #of columns after dummycoded
+
+ DummycodeAgent(int[] list) {
+ _dcdList = list;
+ }
+
+ DummycodeAgent(JSONObject parsedSpec, long ncol) throws JSONException {
+ numCols = ncol;
+
+ if ( !parsedSpec.containsKey(TX_METHOD.DUMMYCODE.toString()) )
+ return;
+
+ JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.DUMMYCODE.toString());
+ JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
+
+ _dcdList = new int[attrs.size()];
+ for(int i=0; i < _dcdList.length; i++)
+ _dcdList[i] = UtilFunctions.toInt(attrs.get(i));
+ }
+
+ public int[] dcdList() {
+ return _dcdList;
+ }
+
+ /**
+ * Method to output transformation metadata from the mappers.
+ * This information is collected and merged by the reducers.
+ *
+ * @param out
+ * @throws IOException
+ *
+ */
+ @Override
+ public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException {
+ // There is no metadata required for dummycode.
+ // Required information is output from RecodeAgent.
+ return;
+ }
+
+ @Override
+ public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values,
+ String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
+ // Nothing to do here
+ }
+
+ public void setRecodeMaps(HashMap<Integer, HashMap<String,String>> maps) {
+ _finalMaps = maps;
+ }
+
+ public void setRecodeMapsCP(HashMap<Integer, HashMap<String,Long>> maps) {
+ _finalMapsCP = maps;
+ }
+
+ public void setNumBins(int[] binList, int[] numbins) {
+ _binList = binList;
+ _numBins = numbins;
+ }
+
+ /**
+ * Method to generate dummyCodedMaps.csv, with the range of column IDs for each variable in the original data.
+ *
+ * Each line in dummyCodedMaps.csv file is of the form: [ColID, 1/0, st, end]
+ * 1/0 indicates if ColID is dummycoded or not
+ * [st,end] is the range of dummycoded column numbers for the given ColID
+ *
+ * It also generates coltypes.csv, with the type (scale, nominal, etc.) of columns in the output.
+ * Recoded columns are of type nominal, binner columns are of type ordinal, dummycoded columns are of type
+ * dummycoded, and the remaining are of type scale.
+ *
+ * @param fs
+ * @param txMtdDir
+ * @param numCols
+ * @param ra
+ * @param ba
+ * @return Number of columns in the transformed data
+ * @throws IOException
+ */
+ public int genDcdMapsAndColTypes(FileSystem fs, String txMtdDir, int numCols, TfUtils agents) throws IOException {
+
+ // initialize all column types in the transformed data to SCALE
+ ColumnTypes[] ctypes = new ColumnTypes[(int) _dummycodedLength];
+ for(int i=0; i < _dummycodedLength; i++)
+ ctypes[i] = ColumnTypes.SCALE;
+
+ _dcdColumnMap = new int[numCols];
+
+ Path pt=new Path(txMtdDir+"/Dummycode/" + DCD_FILE_NAME);
+ BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
+
+ int sum=1;
+ int idx = 0;
+ for(int colID=1; colID <= numCols; colID++)
+ {
+ if ( _dcdList != null && idx < _dcdList.length && _dcdList[idx] == colID )
+ {
+ br.write(colID + "," + "1" + "," + sum + "," + (sum+_domainSizes[idx]-1) + "\n");
+ _dcdColumnMap[colID-1] = (sum+_domainSizes[idx]-1)-1;
+
+ for(int i=sum; i <=(sum+_domainSizes[idx]-1); i++)
+ ctypes[i-1] = ColumnTypes.DUMMYCODED;
+
+ sum += _domainSizes[idx];
+ idx++;
+ }
+ else
+ {
+ br.write(colID + "," + "0" + "," + sum + "," + sum + "\n");
+ _dcdColumnMap[colID-1] = sum-1;
+
+ if ( agents.getBinAgent().isBinned(colID) != -1 )
+ ctypes[sum-1] = ColumnTypes.ORDINAL; // binned variable results in an ordinal column
+
+ if ( agents.getRecodeAgent().isRecoded(colID) != -1 )
+ ctypes[sum-1] = ColumnTypes.NOMINAL;
+
+ sum += 1;
+ }
+ }
+ br.close();
+
+ // Write coltypes.csv
+ pt=new Path(txMtdDir+"/" + COLTYPES_FILE_NAME);
+ br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
+
+ br.write(columnTypeToID(ctypes[0]) + "");
+ for(int i = 1; i < _dummycodedLength; i++)
+ br.write( "," + columnTypeToID(ctypes[i]));
+ br.close();
+
+ return sum-1;
+ }
+
+ /**
+ * Given a dummycoded column id, find the corresponding original column ID.
+ *
+ * @param colID
+ * @return
+ */
+ public int mapDcdColumnID(int colID)
+ {
+ for(int i=0; i < _dcdColumnMap.length; i++)
+ {
+ int st = (i==0 ? 1 : _dcdColumnMap[i-1]+1+1);
+ int end = _dcdColumnMap[i]+1;
+ //System.out.println((i+1) + ": " + "[" + st + "," + end + "]");
+
+ if ( colID >= st && colID <= end)
+ return i+1;
+ }
+ return -1;
+ }
+
+ public String constructDummycodedHeader(String header, Pattern delim) {
+
+ if(_dcdList == null && _binList == null )
+ // none of the columns are dummycoded, simply return the given header
+ return header;
+
+ String[] names = delim.split(header, -1);
+ List<String> newNames = null;
+
+ StringBuilder sb = new StringBuilder();
+
+ // Dummycoding can be performed on either on a recoded column or on a binned column
+
+ // process recoded columns
+ if(_finalMapsCP != null && _dcdList != null)
+ {
+ for(int i=0; i <_dcdList.length; i++)
+ {
+ int colID = _dcdList[i];
+ HashMap<String,Long> map = _finalMapsCP.get(colID);
+ String colName = UtilFunctions.unquote(names[colID-1]);
+
+ if ( map != null )
+ {
+ // order map entries by their recodeID
+ Ordering<String> valueComparator = Ordering.natural().onResultOf(Functions.forMap(map));
+ newNames = valueComparator.sortedCopy(map.keySet());
+
+ // construct concatenated string of map entries
+ sb.setLength(0);
+ for(int idx=0; idx < newNames.size(); idx++)
+ {
+ if(idx==0)
+ sb.append( colName + DCD_NAME_SEP + newNames.get(idx));
+ else
+ sb.append( delim + colName + DCD_NAME_SEP + newNames.get(idx));
+ }
+ names[colID-1] = sb.toString(); // replace original column name with dcd name
+ }
+ }
+ }
+ else if(_finalMaps != null && _dcdList != null) {
+ for(int i=0; i <_dcdList.length; i++) {
+ int colID = _dcdList[i];
+ HashMap<String,String> map = _finalMaps.get(colID);
+ String colName = UtilFunctions.unquote(names[colID-1]);
+
+ if ( map != null )
+ {
+ // order map entries by their recodeID (represented as Strings .. "1", "2", etc.)
+ Ordering<String> orderByID = new Ordering<String>()
+ {
+ public int compare(String s1, String s2) {
+ return (Integer.parseInt(s1) - Integer.parseInt(s2));
+ }
+ };
+
+ newNames = orderByID.onResultOf(Functions.forMap(map)).sortedCopy(map.keySet());
+ // construct concatenated string of map entries
+ sb.setLength(0);
+ for(int idx=0; idx < newNames.size(); idx++)
+ {
+ if(idx==0)
+ sb.append( colName + DCD_NAME_SEP + newNames.get(idx));
+ else
+ sb.append( delim + colName + DCD_NAME_SEP + newNames.get(idx));
+ }
+ names[colID-1] = sb.toString(); // replace original column name with dcd name
+ }
+ }
+ }
+
+ // process binned columns
+ if (_binList != null)
+ for(int i=0; i < _binList.length; i++)
+ {
+ int colID = _binList[i];
+
+ // need to consider only binned and dummycoded columns
+ if(isDummyCoded(colID) == -1)
+ continue;
+
+ int numBins = _numBins[i];
+ String colName = UtilFunctions.unquote(names[colID-1]);
+
+ sb.setLength(0);
+ for(int idx=0; idx < numBins; idx++)
+ if(idx==0)
+ sb.append( colName + DCD_NAME_SEP + "Bin" + (idx+1) );
+ else
+ sb.append( delim + colName + DCD_NAME_SEP + "Bin" + (idx+1) );
+ names[colID-1] = sb.toString(); // replace original column name with dcd name
+ }
+
+ // Construct the full header
+ sb.setLength(0);
+ for(int colID=0; colID < names.length; colID++)
+ {
+ if (colID == 0)
+ sb.append(names[colID]);
+ else
+ sb.append(delim + names[colID]);
+ }
+ //System.out.println("DummycodedHeader: " + sb.toString());
+
+ return sb.toString();
+ }
+
+ @Override
+ public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException {
+ if ( _dcdList == null )
+ {
+ _dummycodedLength = numCols;
+ return;
+ }
+
+ // sort to-be dummycoded column IDs in ascending order. This is the order in which the new dummycoded record is constructed in apply() function.
+ Arrays.sort(_dcdList);
+ _domainSizes = new int[_dcdList.length];
+
+ _dummycodedLength = numCols;
+
+ //HashMap<String, String> map = null;
+ for(int i=0; i<_dcdList.length; i++) {
+ int colID = _dcdList[i];
+
+ // Find the domain size for colID using _finalMaps or _finalMapsCP
+ int domainSize = 0;
+ if(_finalMaps != null) {
+ if(_finalMaps.get(colID) != null)
+ domainSize = _finalMaps.get(colID).size();
+ }
+ else {
+ if(_finalMapsCP.get(colID) != null)
+ domainSize = _finalMapsCP.get(colID).size();
+ }
+
+ if ( domainSize != 0 ) {
+ // dummycoded column
+ _domainSizes[i] = domainSize;
+ }
+ else {
+ // binned column
+ if ( _binList != null )
+ for(int j=0; j<_binList.length; j++) {
+ if (colID == _binList[j]) {
+ _domainSizes[i] = _numBins[j];
+ break;
+ }
+ }
+ }
+ _dummycodedLength += _domainSizes[i]-1;
+ //System.out.println("colID=" + colID + ", domainsize=" + _domainSizes[i] + ", dcdLength=" + _dummycodedLength);
+ }
+ }
+
+ /**
+ * Method to apply transformations.
+ *
+ * @param words
+ * @return
+ */
+ @Override
+ public String[] apply(String[] words, TfUtils agents) {
+
+ if ( _dcdList == null )
+ return words;
+
+ String[] nwords = new String[(int)_dummycodedLength];
+
+ int rcdVal = 0;
+
+ for(int colID=1, idx=0, ncolID=1; colID <= words.length; colID++) {
+ if(idx < _dcdList.length && colID==_dcdList[idx]) {
+ // dummycoded columns
+ try {
+ rcdVal = UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1]));
+ nwords[ ncolID-1+rcdVal-1 ] = "1";
+ ncolID += _domainSizes[idx];
+ idx++;
+ } catch (Exception e) {
+ System.out.println("Error in dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] + ", domainSize=" + _domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength);
+ throw new RuntimeException(e);
+ }
+ }
+ else {
+ nwords[ncolID-1] = words[colID-1];
+ ncolID++;
+ }
+ }
+
+ return nwords;
+ }
+
+ /**
+ * Check if the given column ID is subjected to this transformation.
+ *
+ */
+ public int isDummyCoded(int colID)
+ {
+ if(_dcdList == null)
+ return -1;
+
+ int idx = Arrays.binarySearch(_dcdList, colID);
+ return ( idx >= 0 ? idx : -1);
+ }
+
+ @Override
+ public void print() {
+ System.out.print("Dummycoding List: \n ");
+ for(int i : _dcdList) {
+ System.out.print(i + " ");
+ }
+ System.out.println();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
index e254403..4e3ece5 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
@@ -1,107 +1,107 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-
-
-public class GTFMTDMapper implements Mapper<LongWritable, Text, IntWritable, DistinctValue>{
-
- private OutputCollector<IntWritable, DistinctValue> _collector = null;
- private int _mapTaskID = -1;
-
- TfUtils _agents = null;
-
- private boolean _partFileWithHeader = false;
- private boolean _firstRecordInSplit = true;
- private String _partFileName = null;
- private long _offsetInPartFile = -1;
-
- // ----------------------------------------------------------------------------------------------
-
- /**
- * Configure the information used in the mapper, and setup transformation agents.
- */
- @Override
- public void configure(JobConf job) {
- String[] parts = job.get("mapred.task.id").split("_");
- if ( parts[0].equalsIgnoreCase("task")) {
- _mapTaskID = Integer.parseInt(parts[parts.length-1]);
- }
- else if ( parts[0].equalsIgnoreCase("attempt")) {
- _mapTaskID = Integer.parseInt(parts[parts.length-2]);
- }
- else {
- throw new RuntimeException("Unrecognized format for taskID: " + job.get("mapred.task.id"));
- }
-
- try {
- _partFileName = TfUtils.getPartFileName(job);
- _partFileWithHeader = TfUtils.isPartFileWithHeader(job);
- _agents = new TfUtils(job);
- } catch(IOException e) { throw new RuntimeException(e); }
- catch(JSONException e) { throw new RuntimeException(e); }
-
- }
-
-
- public void map(LongWritable rawKey, Text rawValue, OutputCollector<IntWritable, DistinctValue> out, Reporter reporter) throws IOException {
-
- if(_firstRecordInSplit)
- {
- _firstRecordInSplit = false;
- _collector = out;
- _offsetInPartFile = rawKey.get();
- }
-
- // ignore header
- if (_agents.hasHeader() && rawKey.get() == 0 && _partFileWithHeader)
- return;
-
- _agents.prepareTfMtd(rawValue.toString());
- }
-
- @Override
- public void close() throws IOException {
- _agents.getMVImputeAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents);
- _agents.getRecodeAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents);
- _agents.getBinAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents);
-
- // Output part-file offsets to create OFFSETS_FILE, which is to be used in csv reblocking.
- // OffsetCount is denoted as a DistinctValue by concatenating parfile name and offset within partfile.
- _collector.collect(new IntWritable((int)_agents.getNumCols()+1), new DistinctValue(new OffsetCount(_partFileName, _offsetInPartFile, _agents.getValid())));
-
- // reset global variables, required when the jvm is reused.
- _firstRecordInSplit = true;
- _offsetInPartFile = -1;
- _partFileWithHeader = false;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+
+
+public class GTFMTDMapper implements Mapper<LongWritable, Text, IntWritable, DistinctValue>{
+
+ private OutputCollector<IntWritable, DistinctValue> _collector = null;
+ private int _mapTaskID = -1;
+
+ TfUtils _agents = null;
+
+ private boolean _partFileWithHeader = false;
+ private boolean _firstRecordInSplit = true;
+ private String _partFileName = null;
+ private long _offsetInPartFile = -1;
+
+ // ----------------------------------------------------------------------------------------------
+
+ /**
+ * Configure the information used in the mapper, and setup transformation agents.
+ */
+ @Override
+ public void configure(JobConf job) {
+ String[] parts = job.get("mapred.task.id").split("_");
+ if ( parts[0].equalsIgnoreCase("task")) {
+ _mapTaskID = Integer.parseInt(parts[parts.length-1]);
+ }
+ else if ( parts[0].equalsIgnoreCase("attempt")) {
+ _mapTaskID = Integer.parseInt(parts[parts.length-2]);
+ }
+ else {
+ throw new RuntimeException("Unrecognized format for taskID: " + job.get("mapred.task.id"));
+ }
+
+ try {
+ _partFileName = TfUtils.getPartFileName(job);
+ _partFileWithHeader = TfUtils.isPartFileWithHeader(job);
+ _agents = new TfUtils(job);
+ } catch(IOException e) { throw new RuntimeException(e); }
+ catch(JSONException e) { throw new RuntimeException(e); }
+
+ }
+
+
+ public void map(LongWritable rawKey, Text rawValue, OutputCollector<IntWritable, DistinctValue> out, Reporter reporter) throws IOException {
+
+ if(_firstRecordInSplit)
+ {
+ _firstRecordInSplit = false;
+ _collector = out;
+ _offsetInPartFile = rawKey.get();
+ }
+
+ // ignore header
+ if (_agents.hasHeader() && rawKey.get() == 0 && _partFileWithHeader)
+ return;
+
+ _agents.prepareTfMtd(rawValue.toString());
+ }
+
+ @Override
+ public void close() throws IOException {
+ _agents.getMVImputeAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents);
+ _agents.getRecodeAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents);
+ _agents.getBinAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents);
+
+ // Output part-file offsets to create OFFSETS_FILE, which is to be used in csv reblocking.
+ // OffsetCount is denoted as a DistinctValue by concatenating parfile name and offset within partfile.
+ _collector.collect(new IntWritable((int)_agents.getNumCols()+1), new DistinctValue(new OffsetCount(_partFileName, _offsetInPartFile, _agents.getValid())));
+
+ // reset global variables, required when the jvm is reused.
+ _firstRecordInSplit = true;
+ _offsetInPartFile = -1;
+ _partFileWithHeader = false;
+ }
+
+}