You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/06/08 19:24:09 UTC

[5/6] systemml git commit: [SYSTEMML-1300] Remove file-based transform from compiler/runtime

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
deleted file mode 100644
index 77c06ae..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes;
-import org.apache.sysml.runtime.matrix.mapred.CSVReblockMapper;
-import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
-import org.apache.sysml.runtime.matrix.mapred.CSVReblockMapper.IndexedBlockRow;
-import org.apache.sysml.runtime.matrix.mapred.MapperBase;
-
-@SuppressWarnings("deprecation")
-public class ApplyTfBBMapper extends MapperBase implements Mapper<LongWritable, Text, TaggedFirstSecondIndexes, CSVReblockMR.BlockRow>{
-	
-	boolean _partFileWithHeader = false;
-	TfUtils tfmapper = null;
-	Reporter _reporter = null;
-	
-	// variables relevant to CSV Reblock
-	private IndexedBlockRow idxRow = null;
-	private long rowOffset=0;
-	private HashMap<Long, Long> offsetMap=new HashMap<Long, Long>();
-	private boolean _first = true;
-	private long num=0;
-	
-	@Override
-	public void configure(JobConf job) {
-		super.configure(job);
-		try {
-			_partFileWithHeader = TfUtils.isPartFileWithHeader(job);
-			tfmapper = new TfUtils(job);
-			tfmapper.loadTfMetadata(job, true);
-			
-			// Load relevant information for CSV Reblock
-			ByteWritable key=new ByteWritable();
-			OffsetCount value=new OffsetCount();
-			Path p=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
-			
-			Path path=new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE));
-			FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-			String thisfile=path.makeQualified(fs).toString();
-
-			SequenceFile.Reader reader = null;
-			try {
-				reader = new SequenceFile.Reader(fs, p, job);
-				while (reader.next(key, value)) {
-					// "key" needn't be checked since the offset file has information about a single CSV input (the raw data file)
-					if(thisfile.equals(value.filename))
-						offsetMap.put(value.fileOffset, value.count);
-				}
-			}
-			finally {
-				IOUtilFunctions.closeSilently(reader);
-			}
-			
-			idxRow = new CSVReblockMapper.IndexedBlockRow();
-			int maxBclen=0;
-		
-			for(ArrayList<CSVReblockInstruction> insv: csv_reblock_instructions)
-				for(CSVReblockInstruction in: insv)
-				{	
-					if(maxBclen<in.bclen)
-						maxBclen=in.bclen;
-				}
-			
-			//always dense since common csv usecase
-			idxRow.getRow().data.reset(1, maxBclen, false);		
-
-		} catch (IOException e) { throw new RuntimeException(e); }
- 		 catch(JSONException e)  { throw new RuntimeException(e); }
-
-	}
-	
-	@Override
-	public void map(LongWritable rawKey, Text rawValue, OutputCollector<TaggedFirstSecondIndexes,CSVReblockMR.BlockRow> out, Reporter reporter) throws IOException  {
-		
-		if(_first) {
-			rowOffset=offsetMap.get(rawKey.get());
-			_reporter = reporter;
-			_first=false;
-		}
-		
-		// output the header line
-		if ( rawKey.get() == 0 && _partFileWithHeader ) 
-		{
-			tfmapper.processHeaderLine();
-			if ( tfmapper.hasHeader() )
-				return;
-		}
-		
-		// parse the input line and apply transformation
-		String[] words = tfmapper.getWords(rawValue);
-		
-		if(!tfmapper.omit(words))
-		{
-			words = tfmapper.apply(words);
-			try {
-				tfmapper.check(words);
-				
-				// Perform CSV Reblock
-				CSVReblockInstruction ins = csv_reblock_instructions.get(0).get(0);
-				idxRow = CSVReblockMapper.processRow(idxRow, words, rowOffset, num, ins.output, ins.brlen, ins.bclen, ins.fill, ins.fillValue, out);
-			}
-			catch(DMLRuntimeException e) {
-				throw new RuntimeException(e.getMessage() + ":" + rawValue.toString());
-			}
-			num++;
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-	}
-
-	@Override
-	protected void specialOperationsForActualMap(int index,
-			OutputCollector<Writable, Writable> out, Reporter reporter)
-			throws IOException {
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
deleted file mode 100644
index e2885d8..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.JobReturn;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-
-
-@SuppressWarnings("deprecation")
-public class ApplyTfCSVMR {
-	
-	public static JobReturn runJob(String inputPath, String spec, String mapsPath, String tmpPath, String outputPath, String partOffsetsFile, CSVFileFormatProperties inputDataProperties, long numCols, int replication, String headerLine) throws IOException, ClassNotFoundException, InterruptedException {
-		JobConf job = new JobConf(ApplyTfCSVMR.class);
-		job.setJobName("ApplyTfCSV");
-
-		/* Setup MapReduce Job */
-		job.setJarByClass(ApplyTfCSVMR.class);
-		
-		// set relevant classes
-		job.setMapperClass(ApplyTfCSVMapper.class);
-		job.setNumReduceTasks(0);
-	
-		// Add transformation metadata file as well as partOffsetsFile to Distributed cache
-		DistributedCache.addCacheFile((new Path(mapsPath)).toUri(), job);
-		DistributedCache.createSymlink(job);
-		
-		Path cachefile=new Path(partOffsetsFile);
-		DistributedCache.addCacheFile(cachefile.toUri(), job);
-		DistributedCache.createSymlink(job);
-		
-		// set input and output properties
-		job.setInputFormat(TextInputFormat.class);
-		job.setOutputFormat(TextOutputFormat.class);
-		
-		job.setMapOutputKeyClass(NullWritable.class);
-		job.setMapOutputValueClass(Text.class);
-		
-		job.setOutputKeyClass(NullWritable.class);
-		job.setOutputValueClass(Text.class);
-		
-		job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
-		
-		FileInputFormat.addInputPath(job, new Path(inputPath));
-		// delete outputPath, if exists already.
-		Path outPath = new Path(outputPath);
-		FileSystem fs = IOUtilFunctions.getFileSystem(outPath, job);
-		fs.delete(outPath, true);
-		FileOutputFormat.setOutputPath(job, outPath);
-
-		job.set(MRJobConfiguration.TF_HAS_HEADER, 	Boolean.toString(inputDataProperties.hasHeader()));
-		job.set(MRJobConfiguration.TF_DELIM, 		inputDataProperties.getDelim());
-		if ( inputDataProperties.getNAStrings() != null)
-			// Adding "dummy" string to handle the case of na_strings = ""
-			job.set(MRJobConfiguration.TF_NA_STRINGS, TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) );
-		job.set(MRJobConfiguration.TF_SPEC, spec);
-		job.set(MRJobConfiguration.TF_SMALLEST_FILE, CSVReblockMR.findSmallestFile(job, inputPath));
-		job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, outputPath);
-		job.setLong(MRJobConfiguration.TF_NUM_COLS, numCols);
-		job.set(MRJobConfiguration.TF_TXMTD_PATH, mapsPath);
-		job.set(MRJobConfiguration.TF_HEADER, headerLine);
-		job.set(CSVReblockMR.ROWID_FILE_NAME, cachefile.toString());
-		job.set(MRJobConfiguration.TF_TMP_LOC, tmpPath);
-		
-		//turn off adaptivemr
-		job.setBoolean("adaptivemr.map.enable", false);
-
-		// Run the job
-		RunningJob runjob = JobClient.runJob(job);
-		
-		// Since transform CSV produces part files w/ prefix transform-part-*,
-		// delete all the "default" part-..... files
-		deletePartFiles(fs, outPath);
-		
-		MatrixCharacteristics mc = new MatrixCharacteristics();
-		return new JobReturn(new MatrixCharacteristics[]{mc}, runjob.isSuccessful());
-	}
-	
-	private static void deletePartFiles(FileSystem fs, Path path) throws FileNotFoundException, IOException
-	{
-		PathFilter filter=new PathFilter(){
-			public boolean accept(Path file) {
-				return file.getName().startsWith("part-");
-	        }
-		};
-		FileStatus[] list = fs.listStatus(path, filter);
-		for(FileStatus stat : list) {
-			fs.delete(stat.getPath(), false);
-		}
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
deleted file mode 100644
index 05b8a19..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-
-public class ApplyTfCSVMapper implements Mapper<LongWritable, Text, NullWritable, Text> {
-	
-	boolean _firstRecordInSplit = true;
-	boolean _partFileWithHeader = false;
-	
-	TfUtils tfmapper = null;
-	Reporter _reporter = null;
-	BufferedWriter br = null;
-	JobConf _rJob = null;
-	
-	@Override
-	public void configure(JobConf job) {
-		try {
-			_rJob = job;
-			_partFileWithHeader = TfUtils.isPartFileWithHeader(job);
-			tfmapper = new TfUtils(job);
-			
-			tfmapper.loadTfMetadata(job, true);
-			
-		} catch (IOException e) { throw new RuntimeException(e); }
-		catch(JSONException e)  { throw new RuntimeException(e); }
-
-	}
-	
-	@Override
-	public void map(LongWritable rawKey, Text rawValue, OutputCollector<NullWritable, Text> out, Reporter reporter) throws IOException  {
-		
-		if(_firstRecordInSplit)
-		{
-			_firstRecordInSplit = false;
-			_reporter = reporter;
-			
-			// generate custom output paths so that order of rows in the 
-			// output (across part files) matches w/ that from input data set
-			String partFileSuffix = tfmapper.getPartFileID(_rJob, rawKey.get());
-			Path mapOutputPath = new Path(tfmapper.getOutputPath() + "/transform-part-" + partFileSuffix);
-			
-			// setup the writer for mapper's output
-			// the default part-..... files will be deleted later once the job finishes 
-			FileSystem fs = IOUtilFunctions.getFileSystem(mapOutputPath);
-			br = new BufferedWriter(new OutputStreamWriter(fs.create( mapOutputPath, true)));
-		}
-		
-		// output the header line
-		if ( rawKey.get() == 0 && _partFileWithHeader ) 
-		{
-			_reporter = reporter;
-			tfmapper.processHeaderLine();
-			if ( tfmapper.hasHeader() )
-				return;
-		}
-		
-		// parse the input line and apply transformation
-		String[] words = tfmapper.getWords(rawValue);
-		
-		if(!tfmapper.omit(words))
-		{
-			try {
-				words = tfmapper.apply(words);
-				String outStr = tfmapper.checkAndPrepOutputString(words);
-				//out.collect(NullWritable.get(), new Text(outStr));
-				br.write(outStr + "\n");
-			} 
-			catch(DMLRuntimeException e) {
-				throw new RuntimeException(e.getMessage() + ": " + rawValue.toString());
-			}
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-		IOUtilFunctions.closeSilently(br);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
deleted file mode 100644
index b820449..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-
-
-public class ApplyTfCSVSPARK {
-	
-	/**
-	 * Apply transformation metadata and generate the result in CSV format, as a
-	 * JavaRDD of Strings.
-	 * 
-	 * @param sec spark execution context
-	 * @param inputRDD input rdd
-	 * @param tfMtdPath transform metadata path
-	 * @param spec transform specification as json string
-	 * @param tmpPath temporary file path
-	 * @param prop csv file format properties
-	 * @param numCols number of columns
-	 * @param headerLine header line
-	 * @return JavaPairRDD of long-strings
-	 * @throws IOException if IOException occurs
-	 * @throws ClassNotFoundException if ClassNotFoundException occurs
-	 * @throws InterruptedException if InterruptedException occurs
-	 * @throws IllegalArgumentException if IllegalArgumentException occurs
-	 * @throws JSONException if JSONException occurs
-	 */
-	public static JavaPairRDD<Long, String> runSparkJob(
-			SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, Text>> inputRDD, 
-			String tfMtdPath, String spec, String tmpPath, CSVFileFormatProperties prop, 
-			int numCols, String headerLine) 
-		throws IOException, ClassNotFoundException, InterruptedException, IllegalArgumentException, JSONException 
-	{
-		// Load transformation metadata and broadcast it
-		String[] naStrings = TfUtils.parseNAStrings(prop.getNAStrings());
-		JSONObject jspec = new JSONObject(spec);
-		TfUtils _tfmapper = new TfUtils(headerLine, prop.hasHeader(), prop.getDelim(), naStrings, jspec, numCols, tfMtdPath, null, tmpPath);
-		
-		_tfmapper.loadTfMetadata();
-
-		Broadcast<TfUtils> bcast_tf = sec.getSparkContext().broadcast(_tfmapper);
-		
-		/*
-		 * Construct transformation metadata (map-side) -- the logic is similar
-		 * to GTFMTDMapper
-		 * 
-		 * Note: The result of mapPartitionsWithIndex is cached so that the
-		 * transformed data is not redundantly computed multiple times
-		 */
-		JavaPairRDD<Long, String> applyRDD = inputRDD
-				.mapPartitionsWithIndex( new ApplyTfCSVMap(bcast_tf),  true)
-				.mapToPair(
-						new PairFunction<String,Long,String>(){
-							private static final long serialVersionUID = 3868143093999082931L;
-							@Override
-							public Tuple2<Long, String> call(String t) throws Exception {
-								return new Tuple2<Long, String>(new Long(1), t);
-							}
-						}
-				).cache();
-
-		/*
-		 * An action to force execution of apply()
-		 * 
-		 * We need to trigger the execution of this RDD so as to ensure the
-		 * creation of a few metadata files (headers, dummycoded information,
-		 * etc.), which are referenced in the caller function.
-		 */
-		applyRDD.count();
-		
-		return applyRDD;
-	}
-
-	public static class ApplyTfCSVMap implements Function2<Integer, Iterator<Tuple2<LongWritable, Text>>, Iterator<String>> {
-
-		private static final long serialVersionUID = 1496686437276906911L;
-
-		TfUtils _tfmapper = null;
-		
-		ApplyTfCSVMap(Broadcast<TfUtils> tf) throws IllegalArgumentException, IOException, JSONException {
-			_tfmapper = tf.getValue();
-		}
-		
-		@Override
-		public Iterator<String> call(Integer partitionID,
-				Iterator<Tuple2<LongWritable, Text>> csvLines) throws Exception {
-			
-			boolean first = true;
-			Tuple2<LongWritable, Text> rec = null;
-			ArrayList<String> outLines = new ArrayList<String>();
-			
-			while(csvLines.hasNext()) {
-				rec = csvLines.next();
-				
-				if (first && partitionID == 0) {
-					first = false;
-					
-					_tfmapper.processHeaderLine();
-					
-					if (_tfmapper.hasHeader() ) {
-						continue; 
-					}
-				}
-				
-				// parse the input line and apply transformation
-			
-				String[] words = _tfmapper.getWords(rec._2());
-				
-				if(!_tfmapper.omit(words))
-				{
-					try {
-						words = _tfmapper.apply(words);
-						String outStr = _tfmapper.checkAndPrepOutputString(words);
-						outLines.add(outStr);
-					} 
-					catch(DMLRuntimeException e) {
-						throw new RuntimeException(e.getMessage() + ": " + rec._2().toString());
-					}
-				}
-			}
-			
-			return outLines.iterator();
-		}
-		
-	}
-
-	
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
deleted file mode 100644
index 8878ff0..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.nio.charset.CharacterCodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.wink.json4j.JSONArray;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-import org.apache.sysml.lops.Lop;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.matrix.data.FrameBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.Pair;
-import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
-import org.apache.sysml.runtime.transform.encode.Encoder;
-import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
-import org.apache.sysml.runtime.util.UtilFunctions;
-
-public class BinAgent extends Encoder 
-{	
-	private static final long serialVersionUID = 1917445005206076078L;
-
-	public static final String MIN_PREFIX = "min";
-	public static final String MAX_PREFIX = "max";
-	public static final String NBINS_PREFIX = "nbins";
-
-	private int[] _numBins = null;
-	private double[] _min=null, _max=null;	// min and max among non-missing values
-	private double[] _binWidths = null;		// width of a bin for each attribute
-	
-	//frame transform-apply attributes
-	private double[][] _binMins = null;
-	private double[][] _binMaxs = null;
-	
-	public BinAgent(JSONObject parsedSpec, String[] colnames, int clen) 
-		throws JSONException, IOException 
-	{
-		this(parsedSpec, colnames, clen, false);
-	}
-
-	public BinAgent(JSONObject parsedSpec, String[] colnames, int clen, boolean colsOnly) 
-		throws JSONException, IOException 
-	{
-		super( null, clen );		
-		if ( !parsedSpec.containsKey(TfUtils.TXMETHOD_BIN) )
-			return;
-		
-		if( colsOnly ) {
-			List<Integer> collist = TfMetaUtils.parseBinningColIDs(parsedSpec, colnames);
-			initColList(ArrayUtils.toPrimitive(collist.toArray(new Integer[0])));
-		}
-		else 
-		{
-			JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_BIN);		
-			JSONArray attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS);
-			JSONArray nbins = (JSONArray) obj.get(TfUtils.JSON_NBINS);
-			initColList(attrs);
-			
-			_numBins = new int[attrs.size()];
-			for(int i=0; i < _numBins.length; i++)
-				_numBins[i] = UtilFunctions.toInt(nbins.get(i)); 
-			
-			// initialize internal transformation metadata
-			_min = new double[_colList.length];
-			Arrays.fill(_min, Double.MAX_VALUE);
-			_max = new double[_colList.length];
-			Arrays.fill(_max, -Double.MAX_VALUE);
-			
-			_binWidths = new double[_colList.length];
-		}
-	}
-
-	public int[] getNumBins() { return _numBins; }
-	public double[] getMin()  { return _min; }
-	public double[] getBinWidths() { return _binWidths; }
-	
-	public void prepare(String[] words, TfUtils agents) {
-		if ( !isApplicable() )
-			return;
-		
-		for(int i=0; i <_colList.length; i++) {
-			int colID = _colList[i];
-			
-			String w = null;
-			double d = 0;
-				
-			// equi-width
-			w = UtilFunctions.unquote(words[colID-1].trim());
-			if(!TfUtils.isNA(agents.getNAStrings(),w)) {
-				d = UtilFunctions.parseToDouble(w);
-				if(d < _min[i])
-					_min[i] = d;
-				if(d > _max[i])
-					_max[i] = d;
-			}
-		}
-	}
-	
-	private DistinctValue prepMinOutput(int idx) throws CharacterCodingException {
-		String s =  MIN_PREFIX + Double.toString(_min[idx]);
-		return  new DistinctValue(s, -1L);
-	}
-	
-	private DistinctValue prepMaxOutput(int idx) throws CharacterCodingException {
-		String s =  MAX_PREFIX + Double.toString(_max[idx]);
-		return  new DistinctValue(s, -1L);
-	}
-	
-	private DistinctValue prepNBinsOutput(int idx) throws CharacterCodingException {
-		String s =  NBINS_PREFIX + Double.toString(_numBins[idx]);
-		return  new DistinctValue(s, -1L);
-	}
-	
-	/**
-	 * Method to output transformation metadata from the mappers. 
-	 * This information is collected and merged by the reducers.
-	 */
-	@Override
-	public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException {
-		if( !isApplicable() )
-			return;
-		
-		try { 
-			for(int i=0; i < _colList.length; i++) {
-				int colID = _colList[i];
-				IntWritable iw = new IntWritable(-colID);
-				
-				out.collect(iw,  prepMinOutput(i));
-				out.collect(iw,  prepMaxOutput(i));
-				out.collect(iw,  prepNBinsOutput(i));
-			}
-		} catch(Exception e) {
-			throw new IOException(e);
-		}
-	}
-	
-	public ArrayList<Pair<Integer, DistinctValue>> mapOutputTransformationMetadata(int taskID, ArrayList<Pair<Integer, DistinctValue>> list, TfUtils agents) throws IOException {
-		if ( !isApplicable() )
-			return list;
-		
-		try { 
-			for(int i=0; i < _colList.length; i++) {
-				int colID = _colList[i];
-				Integer iw = -colID;
-				
-				list.add( new Pair<Integer,DistinctValue>(iw, prepMinOutput(i)) );
-				list.add( new Pair<Integer,DistinctValue>(iw, prepMaxOutput(i)) );
-				list.add( new Pair<Integer,DistinctValue>(iw, prepNBinsOutput(i)) );
-			}
-		} catch(Exception e) {
-			throw new IOException(e);
-		}
-		return list;
-	}
-
-	private void writeTfMtd(int colID, String min, String max, String binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException 
-	{
-		Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + TfUtils.TXMTD_BIN_FILE_SUFFIX);
-		BufferedWriter br = null;
-		try {
-			br = new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
-			br.write(colID + TfUtils.TXMTD_SEP + min + TfUtils.TXMTD_SEP + max + TfUtils.TXMTD_SEP + binwidth + TfUtils.TXMTD_SEP + nbins + "\n");
-		}
-		finally {
-			IOUtilFunctions.closeSilently(br);
-		}
-	}
-
-	/** 
-	 * Method to merge map output transformation metadata.
-	 */
-	@Override
-	public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
-		double min = Double.MAX_VALUE;
-		double max = -Double.MAX_VALUE;
-		int nbins = 0;
-		
-		DistinctValue val = new DistinctValue();
-		String w = null;
-		double d;
-		while(values.hasNext()) {
-			val.reset();
-			val = values.next();
-			w = val.getWord();
-			
-			if(w.startsWith(MIN_PREFIX)) {
-				d = UtilFunctions.parseToDouble(w.substring( MIN_PREFIX.length() ));
-				if ( d < min )
-					min = d;
-			}
-			else if(w.startsWith(MAX_PREFIX)) {
-				d = UtilFunctions.parseToDouble(w.substring( MAX_PREFIX.length() ));
-				if ( d > max )
-					max = d;
-			}
-			else if (w.startsWith(NBINS_PREFIX)) {
-				nbins = (int) UtilFunctions.parseToLong( w.substring(NBINS_PREFIX.length() ) );
-			}
-			else
-				throw new RuntimeException("MVImputeAgent: Invalid prefix while merging map output: " + w);
-		}
-		
-		// write merged metadata
-		double binwidth = (max-min)/nbins;
-		writeTfMtd(colID, Double.toString(min), Double.toString(max), Double.toString(binwidth), Integer.toString(nbins), outputDir, fs, agents);
-	}
-	
-	
-	public void outputTransformationMetadata(String outputDir, FileSystem fs, TfUtils agents) throws IOException {
-		if( !isApplicable() )
-			return;
-		
-		MVImputeAgent mvagent = agents.getMVImputeAgent();
-		for(int i=0; i < _colList.length; i++) {
-			int colID = _colList[i];
-			
-			// If the column is imputed with a constant, then adjust min and max based the value of the constant.
-			if ( mvagent.isApplicable(colID) != -1 && mvagent.getMethod(colID) == MVMethod.CONSTANT ) 
-			{
-				double cst = UtilFunctions.parseToDouble( mvagent.getReplacement(colID) );
-				if ( cst < _min[i])
-					_min[i] = cst;
-				if ( cst > _max[i])
-					_max[i] = cst;
-			}
-			
-			double binwidth = (_max[i] - _min[i])/_numBins[i];
-			writeTfMtd(colID, Double.toString(_min[i]), Double.toString(_max[i]), Double.toString(binwidth), Integer.toString(_numBins[i]), outputDir, fs, agents);
-		}
-	}
-	
-	// ------------------------------------------------------------------------------------------------
-
-	/**
-	 * Method to load transform metadata for all attributes
-	 */
-	@Override
-	public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException {
-		if( !isApplicable() )
-			return;
-		
-		if(fs.isDirectory(txMtdDir)) {
-			for(int i=0; i<_colList.length;i++) {
-				int colID = _colList[i];
-				
-				Path path = new Path( txMtdDir + "/Bin/" + agents.getName(colID) + TfUtils.TXMTD_BIN_FILE_SUFFIX);
-				TfUtils.checkValidInputFile(fs, path, true); 
-					
-				BufferedReader br = null;
-				try {
-					br = new BufferedReader(new InputStreamReader(fs.open(path)));
-					// format: colID,min,max,nbins
-					String[] fields = br.readLine().split(TfUtils.TXMTD_SEP);
-					double min = UtilFunctions.parseToDouble(fields[1]);
-					//double max = UtilFunctions.parseToDouble(fields[2]);
-					double binwidth = UtilFunctions.parseToDouble(fields[3]);
-					int nbins = UtilFunctions.parseToInt(fields[4]);
-					
-					_numBins[i] = nbins;
-					_min[i] = min;
-					_binWidths[i] = binwidth; // (max-min)/nbins;
-				}
-				finally {
-					IOUtilFunctions.closeSilently(br);
-				}
-			}
-		}
-		else {
-			throw new RuntimeException("Path to recode maps must be a directory: " + txMtdDir);
-		}
-	}
-	
-
-	@Override
-	public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
-		build(in);
-		return apply(in, out);
-	}
-
-	@Override
-	public void build(FrameBlock in) {
-		// TODO Auto-generated method stub
-	}
-	
-	/**
-	 * Method to apply transformations.
-	 */
-	@Override
-	public String[] apply(String[] words) {
-		if( !isApplicable() )
-			return words;
-	
-		for(int i=0; i < _colList.length; i++) {
-			int colID = _colList[i];
-			try {
-				double val = UtilFunctions.parseToDouble(words[colID-1]);
-				int binid = 1;
-				double tmp = _min[i] + _binWidths[i];
-				while(val > tmp && binid < _numBins[i]) {
-					tmp += _binWidths[i];
-					binid++;
-				}
-				words[colID-1] = Integer.toString(binid);
-			} 
-			catch(NumberFormatException e) {
-				throw new RuntimeException("Encountered \"" + words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an appropriate imputation method.");
-			}
-		}
-		
-		return words;
-	}
-
-	@Override
-	public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
-		for(int j=0; j<_colList.length; j++) {
-			int colID = _colList[j];
-			for( int i=0; i<in.getNumRows(); i++ ) {
-				double inVal = UtilFunctions.objectToDouble(
-						in.getSchema()[colID-1], in.get(i, colID-1));
-				int ix = Arrays.binarySearch(_binMaxs[j], inVal);
-				int binID = ((ix < 0) ? Math.abs(ix+1) : ix) + 1;		
-				out.quickSetValue(i, colID-1, binID);
-			}	
-		}
-		return out;
-	}
-
-	@Override
-	public FrameBlock getMetaData(FrameBlock meta) {
-		return meta;
-	}
-	
-	@Override
-	public void initMetaData(FrameBlock meta) {
-		_binMins = new double[_colList.length][];
-		_binMaxs = new double[_colList.length][];
-		for( int j=0; j<_colList.length; j++ ) {
-			int colID = _colList[j]; //1-based
-			int nbins = (int)meta.getColumnMetadata()[colID-1].getNumDistinct();
-			_binMins[j] = new double[nbins];
-			_binMaxs[j] = new double[nbins];
-			for( int i=0; i<nbins; i++ ) {
-				String[] tmp = meta.get(i, colID-1).toString().split(Lop.DATATYPE_PREFIX);
-				_binMins[j][i] = Double.parseDouble(tmp[0]);
-				_binMaxs[j][i] = Double.parseDouble(tmp[1]);
-			}
-		}
-	}
-}