You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/06/08 19:24:09 UTC
[5/6] systemml git commit: [SYSTEMML-1300] Remove file-based
transform from compiler/runtime
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
deleted file mode 100644
index 77c06ae..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes;
-import org.apache.sysml.runtime.matrix.mapred.CSVReblockMapper;
-import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
-import org.apache.sysml.runtime.matrix.mapred.CSVReblockMapper.IndexedBlockRow;
-import org.apache.sysml.runtime.matrix.mapred.MapperBase;
-
-@SuppressWarnings("deprecation")
-public class ApplyTfBBMapper extends MapperBase implements Mapper<LongWritable, Text, TaggedFirstSecondIndexes, CSVReblockMR.BlockRow>{
-
- boolean _partFileWithHeader = false;
- TfUtils tfmapper = null;
- Reporter _reporter = null;
-
- // variables relevant to CSV Reblock
- private IndexedBlockRow idxRow = null;
- private long rowOffset=0;
- private HashMap<Long, Long> offsetMap=new HashMap<Long, Long>();
- private boolean _first = true;
- private long num=0;
-
- @Override
- public void configure(JobConf job) {
- super.configure(job);
- try {
- _partFileWithHeader = TfUtils.isPartFileWithHeader(job);
- tfmapper = new TfUtils(job);
- tfmapper.loadTfMetadata(job, true);
-
- // Load relevant information for CSV Reblock
- ByteWritable key=new ByteWritable();
- OffsetCount value=new OffsetCount();
- Path p=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
-
- Path path=new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE));
- FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
- String thisfile=path.makeQualified(fs).toString();
-
- SequenceFile.Reader reader = null;
- try {
- reader = new SequenceFile.Reader(fs, p, job);
- while (reader.next(key, value)) {
- // "key" needn't be checked since the offset file has information about a single CSV input (the raw data file)
- if(thisfile.equals(value.filename))
- offsetMap.put(value.fileOffset, value.count);
- }
- }
- finally {
- IOUtilFunctions.closeSilently(reader);
- }
-
- idxRow = new CSVReblockMapper.IndexedBlockRow();
- int maxBclen=0;
-
- for(ArrayList<CSVReblockInstruction> insv: csv_reblock_instructions)
- for(CSVReblockInstruction in: insv)
- {
- if(maxBclen<in.bclen)
- maxBclen=in.bclen;
- }
-
- //always dense since common csv usecase
- idxRow.getRow().data.reset(1, maxBclen, false);
-
- } catch (IOException e) { throw new RuntimeException(e); }
- catch(JSONException e) { throw new RuntimeException(e); }
-
- }
-
- @Override
- public void map(LongWritable rawKey, Text rawValue, OutputCollector<TaggedFirstSecondIndexes,CSVReblockMR.BlockRow> out, Reporter reporter) throws IOException {
-
- if(_first) {
- rowOffset=offsetMap.get(rawKey.get());
- _reporter = reporter;
- _first=false;
- }
-
- // output the header line
- if ( rawKey.get() == 0 && _partFileWithHeader )
- {
- tfmapper.processHeaderLine();
- if ( tfmapper.hasHeader() )
- return;
- }
-
- // parse the input line and apply transformation
- String[] words = tfmapper.getWords(rawValue);
-
- if(!tfmapper.omit(words))
- {
- words = tfmapper.apply(words);
- try {
- tfmapper.check(words);
-
- // Perform CSV Reblock
- CSVReblockInstruction ins = csv_reblock_instructions.get(0).get(0);
- idxRow = CSVReblockMapper.processRow(idxRow, words, rowOffset, num, ins.output, ins.brlen, ins.bclen, ins.fill, ins.fillValue, out);
- }
- catch(DMLRuntimeException e) {
- throw new RuntimeException(e.getMessage() + ":" + rawValue.toString());
- }
- num++;
- }
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- protected void specialOperationsForActualMap(int index,
- OutputCollector<Writable, Writable> out, Reporter reporter)
- throws IOException {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
deleted file mode 100644
index e2885d8..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.JobReturn;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-
-
-@SuppressWarnings("deprecation")
-public class ApplyTfCSVMR {
-
- public static JobReturn runJob(String inputPath, String spec, String mapsPath, String tmpPath, String outputPath, String partOffsetsFile, CSVFileFormatProperties inputDataProperties, long numCols, int replication, String headerLine) throws IOException, ClassNotFoundException, InterruptedException {
- JobConf job = new JobConf(ApplyTfCSVMR.class);
- job.setJobName("ApplyTfCSV");
-
- /* Setup MapReduce Job */
- job.setJarByClass(ApplyTfCSVMR.class);
-
- // set relevant classes
- job.setMapperClass(ApplyTfCSVMapper.class);
- job.setNumReduceTasks(0);
-
- // Add transformation metadata file as well as partOffsetsFile to Distributed cache
- DistributedCache.addCacheFile((new Path(mapsPath)).toUri(), job);
- DistributedCache.createSymlink(job);
-
- Path cachefile=new Path(partOffsetsFile);
- DistributedCache.addCacheFile(cachefile.toUri(), job);
- DistributedCache.createSymlink(job);
-
- // set input and output properties
- job.setInputFormat(TextInputFormat.class);
- job.setOutputFormat(TextOutputFormat.class);
-
- job.setMapOutputKeyClass(NullWritable.class);
- job.setMapOutputValueClass(Text.class);
-
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
-
- job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
-
- FileInputFormat.addInputPath(job, new Path(inputPath));
- // delete outputPath, if exists already.
- Path outPath = new Path(outputPath);
- FileSystem fs = IOUtilFunctions.getFileSystem(outPath, job);
- fs.delete(outPath, true);
- FileOutputFormat.setOutputPath(job, outPath);
-
- job.set(MRJobConfiguration.TF_HAS_HEADER, Boolean.toString(inputDataProperties.hasHeader()));
- job.set(MRJobConfiguration.TF_DELIM, inputDataProperties.getDelim());
- if ( inputDataProperties.getNAStrings() != null)
- // Adding "dummy" string to handle the case of na_strings = ""
- job.set(MRJobConfiguration.TF_NA_STRINGS, TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) );
- job.set(MRJobConfiguration.TF_SPEC, spec);
- job.set(MRJobConfiguration.TF_SMALLEST_FILE, CSVReblockMR.findSmallestFile(job, inputPath));
- job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, outputPath);
- job.setLong(MRJobConfiguration.TF_NUM_COLS, numCols);
- job.set(MRJobConfiguration.TF_TXMTD_PATH, mapsPath);
- job.set(MRJobConfiguration.TF_HEADER, headerLine);
- job.set(CSVReblockMR.ROWID_FILE_NAME, cachefile.toString());
- job.set(MRJobConfiguration.TF_TMP_LOC, tmpPath);
-
- //turn off adaptivemr
- job.setBoolean("adaptivemr.map.enable", false);
-
- // Run the job
- RunningJob runjob = JobClient.runJob(job);
-
- // Since transform CSV produces part files w/ prefix transform-part-*,
- // delete all the "default" part-..... files
- deletePartFiles(fs, outPath);
-
- MatrixCharacteristics mc = new MatrixCharacteristics();
- return new JobReturn(new MatrixCharacteristics[]{mc}, runjob.isSuccessful());
- }
-
- private static void deletePartFiles(FileSystem fs, Path path) throws FileNotFoundException, IOException
- {
- PathFilter filter=new PathFilter(){
- public boolean accept(Path file) {
- return file.getName().startsWith("part-");
- }
- };
- FileStatus[] list = fs.listStatus(path, filter);
- for(FileStatus stat : list) {
- fs.delete(stat.getPath(), false);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
deleted file mode 100644
index 05b8a19..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-
-public class ApplyTfCSVMapper implements Mapper<LongWritable, Text, NullWritable, Text> {
-
- boolean _firstRecordInSplit = true;
- boolean _partFileWithHeader = false;
-
- TfUtils tfmapper = null;
- Reporter _reporter = null;
- BufferedWriter br = null;
- JobConf _rJob = null;
-
- @Override
- public void configure(JobConf job) {
- try {
- _rJob = job;
- _partFileWithHeader = TfUtils.isPartFileWithHeader(job);
- tfmapper = new TfUtils(job);
-
- tfmapper.loadTfMetadata(job, true);
-
- } catch (IOException e) { throw new RuntimeException(e); }
- catch(JSONException e) { throw new RuntimeException(e); }
-
- }
-
- @Override
- public void map(LongWritable rawKey, Text rawValue, OutputCollector<NullWritable, Text> out, Reporter reporter) throws IOException {
-
- if(_firstRecordInSplit)
- {
- _firstRecordInSplit = false;
- _reporter = reporter;
-
- // generate custom output paths so that order of rows in the
- // output (across part files) matches w/ that from input data set
- String partFileSuffix = tfmapper.getPartFileID(_rJob, rawKey.get());
- Path mapOutputPath = new Path(tfmapper.getOutputPath() + "/transform-part-" + partFileSuffix);
-
- // setup the writer for mapper's output
- // the default part-..... files will be deleted later once the job finishes
- FileSystem fs = IOUtilFunctions.getFileSystem(mapOutputPath);
- br = new BufferedWriter(new OutputStreamWriter(fs.create( mapOutputPath, true)));
- }
-
- // output the header line
- if ( rawKey.get() == 0 && _partFileWithHeader )
- {
- _reporter = reporter;
- tfmapper.processHeaderLine();
- if ( tfmapper.hasHeader() )
- return;
- }
-
- // parse the input line and apply transformation
- String[] words = tfmapper.getWords(rawValue);
-
- if(!tfmapper.omit(words))
- {
- try {
- words = tfmapper.apply(words);
- String outStr = tfmapper.checkAndPrepOutputString(words);
- //out.collect(NullWritable.get(), new Text(outStr));
- br.write(outStr + "\n");
- }
- catch(DMLRuntimeException e) {
- throw new RuntimeException(e.getMessage() + ": " + rawValue.toString());
- }
- }
- }
-
- @Override
- public void close() throws IOException {
- IOUtilFunctions.closeSilently(br);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
deleted file mode 100644
index b820449..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-
-
-public class ApplyTfCSVSPARK {
-
- /**
- * Apply transformation metadata and generate the result in CSV format, as a
- * JavaRDD of Strings.
- *
- * @param sec spark execution context
- * @param inputRDD input rdd
- * @param tfMtdPath transform metadata path
- * @param spec transform specification as json string
- * @param tmpPath temporary file path
- * @param prop csv file format properties
- * @param numCols number of columns
- * @param headerLine header line
- * @return JavaPairRDD of long-strings
- * @throws IOException if IOException occurs
- * @throws ClassNotFoundException if ClassNotFoundException occurs
- * @throws InterruptedException if InterruptedException occurs
- * @throws IllegalArgumentException if IllegalArgumentException occurs
- * @throws JSONException if JSONException occurs
- */
- public static JavaPairRDD<Long, String> runSparkJob(
- SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, Text>> inputRDD,
- String tfMtdPath, String spec, String tmpPath, CSVFileFormatProperties prop,
- int numCols, String headerLine)
- throws IOException, ClassNotFoundException, InterruptedException, IllegalArgumentException, JSONException
- {
- // Load transformation metadata and broadcast it
- String[] naStrings = TfUtils.parseNAStrings(prop.getNAStrings());
- JSONObject jspec = new JSONObject(spec);
- TfUtils _tfmapper = new TfUtils(headerLine, prop.hasHeader(), prop.getDelim(), naStrings, jspec, numCols, tfMtdPath, null, tmpPath);
-
- _tfmapper.loadTfMetadata();
-
- Broadcast<TfUtils> bcast_tf = sec.getSparkContext().broadcast(_tfmapper);
-
- /*
- * Construct transformation metadata (map-side) -- the logic is similar
- * to GTFMTDMapper
- *
- * Note: The result of mapPartitionsWithIndex is cached so that the
- * transformed data is not redundantly computed multiple times
- */
- JavaPairRDD<Long, String> applyRDD = inputRDD
- .mapPartitionsWithIndex( new ApplyTfCSVMap(bcast_tf), true)
- .mapToPair(
- new PairFunction<String,Long,String>(){
- private static final long serialVersionUID = 3868143093999082931L;
- @Override
- public Tuple2<Long, String> call(String t) throws Exception {
- return new Tuple2<Long, String>(new Long(1), t);
- }
- }
- ).cache();
-
- /*
- * An action to force execution of apply()
- *
- * We need to trigger the execution of this RDD so as to ensure the
- * creation of a few metadata files (headers, dummycoded information,
- * etc.), which are referenced in the caller function.
- */
- applyRDD.count();
-
- return applyRDD;
- }
-
- public static class ApplyTfCSVMap implements Function2<Integer, Iterator<Tuple2<LongWritable, Text>>, Iterator<String>> {
-
- private static final long serialVersionUID = 1496686437276906911L;
-
- TfUtils _tfmapper = null;
-
- ApplyTfCSVMap(Broadcast<TfUtils> tf) throws IllegalArgumentException, IOException, JSONException {
- _tfmapper = tf.getValue();
- }
-
- @Override
- public Iterator<String> call(Integer partitionID,
- Iterator<Tuple2<LongWritable, Text>> csvLines) throws Exception {
-
- boolean first = true;
- Tuple2<LongWritable, Text> rec = null;
- ArrayList<String> outLines = new ArrayList<String>();
-
- while(csvLines.hasNext()) {
- rec = csvLines.next();
-
- if (first && partitionID == 0) {
- first = false;
-
- _tfmapper.processHeaderLine();
-
- if (_tfmapper.hasHeader() ) {
- continue;
- }
- }
-
- // parse the input line and apply transformation
-
- String[] words = _tfmapper.getWords(rec._2());
-
- if(!_tfmapper.omit(words))
- {
- try {
- words = _tfmapper.apply(words);
- String outStr = _tfmapper.checkAndPrepOutputString(words);
- outLines.add(outStr);
- }
- catch(DMLRuntimeException e) {
- throw new RuntimeException(e.getMessage() + ": " + rec._2().toString());
- }
- }
- }
-
- return outLines.iterator();
- }
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
deleted file mode 100644
index 8878ff0..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.nio.charset.CharacterCodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.wink.json4j.JSONArray;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-import org.apache.sysml.lops.Lop;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.matrix.data.FrameBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.Pair;
-import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
-import org.apache.sysml.runtime.transform.encode.Encoder;
-import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
-import org.apache.sysml.runtime.util.UtilFunctions;
-
-public class BinAgent extends Encoder
-{
- private static final long serialVersionUID = 1917445005206076078L;
-
- public static final String MIN_PREFIX = "min";
- public static final String MAX_PREFIX = "max";
- public static final String NBINS_PREFIX = "nbins";
-
- private int[] _numBins = null;
- private double[] _min=null, _max=null; // min and max among non-missing values
- private double[] _binWidths = null; // width of a bin for each attribute
-
- //frame transform-apply attributes
- private double[][] _binMins = null;
- private double[][] _binMaxs = null;
-
- public BinAgent(JSONObject parsedSpec, String[] colnames, int clen)
- throws JSONException, IOException
- {
- this(parsedSpec, colnames, clen, false);
- }
-
- public BinAgent(JSONObject parsedSpec, String[] colnames, int clen, boolean colsOnly)
- throws JSONException, IOException
- {
- super( null, clen );
- if ( !parsedSpec.containsKey(TfUtils.TXMETHOD_BIN) )
- return;
-
- if( colsOnly ) {
- List<Integer> collist = TfMetaUtils.parseBinningColIDs(parsedSpec, colnames);
- initColList(ArrayUtils.toPrimitive(collist.toArray(new Integer[0])));
- }
- else
- {
- JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_BIN);
- JSONArray attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS);
- JSONArray nbins = (JSONArray) obj.get(TfUtils.JSON_NBINS);
- initColList(attrs);
-
- _numBins = new int[attrs.size()];
- for(int i=0; i < _numBins.length; i++)
- _numBins[i] = UtilFunctions.toInt(nbins.get(i));
-
- // initialize internal transformation metadata
- _min = new double[_colList.length];
- Arrays.fill(_min, Double.MAX_VALUE);
- _max = new double[_colList.length];
- Arrays.fill(_max, -Double.MAX_VALUE);
-
- _binWidths = new double[_colList.length];
- }
- }
-
- public int[] getNumBins() { return _numBins; }
- public double[] getMin() { return _min; }
- public double[] getBinWidths() { return _binWidths; }
-
- public void prepare(String[] words, TfUtils agents) {
- if ( !isApplicable() )
- return;
-
- for(int i=0; i <_colList.length; i++) {
- int colID = _colList[i];
-
- String w = null;
- double d = 0;
-
- // equi-width
- w = UtilFunctions.unquote(words[colID-1].trim());
- if(!TfUtils.isNA(agents.getNAStrings(),w)) {
- d = UtilFunctions.parseToDouble(w);
- if(d < _min[i])
- _min[i] = d;
- if(d > _max[i])
- _max[i] = d;
- }
- }
- }
-
- private DistinctValue prepMinOutput(int idx) throws CharacterCodingException {
- String s = MIN_PREFIX + Double.toString(_min[idx]);
- return new DistinctValue(s, -1L);
- }
-
- private DistinctValue prepMaxOutput(int idx) throws CharacterCodingException {
- String s = MAX_PREFIX + Double.toString(_max[idx]);
- return new DistinctValue(s, -1L);
- }
-
- private DistinctValue prepNBinsOutput(int idx) throws CharacterCodingException {
- String s = NBINS_PREFIX + Double.toString(_numBins[idx]);
- return new DistinctValue(s, -1L);
- }
-
- /**
- * Method to output transformation metadata from the mappers.
- * This information is collected and merged by the reducers.
- */
- @Override
- public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException {
- if( !isApplicable() )
- return;
-
- try {
- for(int i=0; i < _colList.length; i++) {
- int colID = _colList[i];
- IntWritable iw = new IntWritable(-colID);
-
- out.collect(iw, prepMinOutput(i));
- out.collect(iw, prepMaxOutput(i));
- out.collect(iw, prepNBinsOutput(i));
- }
- } catch(Exception e) {
- throw new IOException(e);
- }
- }
-
- public ArrayList<Pair<Integer, DistinctValue>> mapOutputTransformationMetadata(int taskID, ArrayList<Pair<Integer, DistinctValue>> list, TfUtils agents) throws IOException {
- if ( !isApplicable() )
- return list;
-
- try {
- for(int i=0; i < _colList.length; i++) {
- int colID = _colList[i];
- Integer iw = -colID;
-
- list.add( new Pair<Integer,DistinctValue>(iw, prepMinOutput(i)) );
- list.add( new Pair<Integer,DistinctValue>(iw, prepMaxOutput(i)) );
- list.add( new Pair<Integer,DistinctValue>(iw, prepNBinsOutput(i)) );
- }
- } catch(Exception e) {
- throw new IOException(e);
- }
- return list;
- }
-
- private void writeTfMtd(int colID, String min, String max, String binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException
- {
- Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + TfUtils.TXMTD_BIN_FILE_SUFFIX);
- BufferedWriter br = null;
- try {
- br = new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
- br.write(colID + TfUtils.TXMTD_SEP + min + TfUtils.TXMTD_SEP + max + TfUtils.TXMTD_SEP + binwidth + TfUtils.TXMTD_SEP + nbins + "\n");
- }
- finally {
- IOUtilFunctions.closeSilently(br);
- }
- }
-
- /**
- * Method to merge map output transformation metadata.
- */
- @Override
- public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
- double min = Double.MAX_VALUE;
- double max = -Double.MAX_VALUE;
- int nbins = 0;
-
- DistinctValue val = new DistinctValue();
- String w = null;
- double d;
- while(values.hasNext()) {
- val.reset();
- val = values.next();
- w = val.getWord();
-
- if(w.startsWith(MIN_PREFIX)) {
- d = UtilFunctions.parseToDouble(w.substring( MIN_PREFIX.length() ));
- if ( d < min )
- min = d;
- }
- else if(w.startsWith(MAX_PREFIX)) {
- d = UtilFunctions.parseToDouble(w.substring( MAX_PREFIX.length() ));
- if ( d > max )
- max = d;
- }
- else if (w.startsWith(NBINS_PREFIX)) {
- nbins = (int) UtilFunctions.parseToLong( w.substring(NBINS_PREFIX.length() ) );
- }
- else
- throw new RuntimeException("MVImputeAgent: Invalid prefix while merging map output: " + w);
- }
-
- // write merged metadata
- double binwidth = (max-min)/nbins;
- writeTfMtd(colID, Double.toString(min), Double.toString(max), Double.toString(binwidth), Integer.toString(nbins), outputDir, fs, agents);
- }
-
-
- public void outputTransformationMetadata(String outputDir, FileSystem fs, TfUtils agents) throws IOException {
- if( !isApplicable() )
- return;
-
- MVImputeAgent mvagent = agents.getMVImputeAgent();
- for(int i=0; i < _colList.length; i++) {
- int colID = _colList[i];
-
- // If the column is imputed with a constant, then adjust min and max based the value of the constant.
- if ( mvagent.isApplicable(colID) != -1 && mvagent.getMethod(colID) == MVMethod.CONSTANT )
- {
- double cst = UtilFunctions.parseToDouble( mvagent.getReplacement(colID) );
- if ( cst < _min[i])
- _min[i] = cst;
- if ( cst > _max[i])
- _max[i] = cst;
- }
-
- double binwidth = (_max[i] - _min[i])/_numBins[i];
- writeTfMtd(colID, Double.toString(_min[i]), Double.toString(_max[i]), Double.toString(binwidth), Integer.toString(_numBins[i]), outputDir, fs, agents);
- }
- }
-
- // ------------------------------------------------------------------------------------------------
-
- /**
- * Method to load transform metadata for all attributes
- */
- @Override
- public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException {
- if( !isApplicable() )
- return;
-
- if(fs.isDirectory(txMtdDir)) {
- for(int i=0; i<_colList.length;i++) {
- int colID = _colList[i];
-
- Path path = new Path( txMtdDir + "/Bin/" + agents.getName(colID) + TfUtils.TXMTD_BIN_FILE_SUFFIX);
- TfUtils.checkValidInputFile(fs, path, true);
-
- BufferedReader br = null;
- try {
- br = new BufferedReader(new InputStreamReader(fs.open(path)));
- // format: colID,min,max,nbins
- String[] fields = br.readLine().split(TfUtils.TXMTD_SEP);
- double min = UtilFunctions.parseToDouble(fields[1]);
- //double max = UtilFunctions.parseToDouble(fields[2]);
- double binwidth = UtilFunctions.parseToDouble(fields[3]);
- int nbins = UtilFunctions.parseToInt(fields[4]);
-
- _numBins[i] = nbins;
- _min[i] = min;
- _binWidths[i] = binwidth; // (max-min)/nbins;
- }
- finally {
- IOUtilFunctions.closeSilently(br);
- }
- }
- }
- else {
- throw new RuntimeException("Path to recode maps must be a directory: " + txMtdDir);
- }
- }
-
-
- @Override
- public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
- build(in);
- return apply(in, out);
- }
-
- @Override
- public void build(FrameBlock in) {
- // TODO Auto-generated method stub
- }
-
- /**
- * Method to apply transformations.
- */
- @Override
- public String[] apply(String[] words) {
- if( !isApplicable() )
- return words;
-
- for(int i=0; i < _colList.length; i++) {
- int colID = _colList[i];
- try {
- double val = UtilFunctions.parseToDouble(words[colID-1]);
- int binid = 1;
- double tmp = _min[i] + _binWidths[i];
- while(val > tmp && binid < _numBins[i]) {
- tmp += _binWidths[i];
- binid++;
- }
- words[colID-1] = Integer.toString(binid);
- }
- catch(NumberFormatException e) {
- throw new RuntimeException("Encountered \"" + words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an appropriate imputation method.");
- }
- }
-
- return words;
- }
-
- @Override
- public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
- for(int j=0; j<_colList.length; j++) {
- int colID = _colList[j];
- for( int i=0; i<in.getNumRows(); i++ ) {
- double inVal = UtilFunctions.objectToDouble(
- in.getSchema()[colID-1], in.get(i, colID-1));
- int ix = Arrays.binarySearch(_binMaxs[j], inVal);
- int binID = ((ix < 0) ? Math.abs(ix+1) : ix) + 1;
- out.quickSetValue(i, colID-1, binID);
- }
- }
- return out;
- }
-
- @Override
- public FrameBlock getMetaData(FrameBlock meta) {
- return meta;
- }
-
- @Override
- public void initMetaData(FrameBlock meta) {
- _binMins = new double[_colList.length][];
- _binMaxs = new double[_colList.length][];
- for( int j=0; j<_colList.length; j++ ) {
- int colID = _colList[j]; //1-based
- int nbins = (int)meta.getColumnMetadata()[colID-1].getNumDistinct();
- _binMins[j] = new double[nbins];
- _binMaxs[j] = new double[nbins];
- for( int i=0; i<nbins; i++ ) {
- String[] tmp = meta.get(i, colID-1).toString().split(Lop.DATATYPE_PREFIX);
- _binMins[j][i] = Double.parseDouble(tmp[0]);
- _binMaxs[j][i] = Double.parseDouble(tmp[1]);
- }
- }
- }
-}