You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by du...@apache.org on 2016/01/26 02:12:58 UTC

[34/55] [partial] incubator-systemml git commit: [SYSTEMML-482] [SYSTEMML-480] Adding a Git attributes file to enfore Unix-styled line endings, and normalizing all of the line endings.

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java
index 1a646cf..2e3fd75 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java
@@ -1,124 +1,124 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-
-
-public class GTFMTDReducer implements Reducer<IntWritable, DistinctValue, Text, LongWritable> {
-	
-	private JobConf _rJob = null;
-	TfUtils _agents = null;
-	
-	@Override
-	public void configure(JobConf job) {
-		_rJob = job;
-		
-		try {
-			String outputDir = MRJobConfiguration.getOutputs(job)[0];
-			_agents = new TfUtils(job, outputDir);
-		} 
-		catch(IOException e)  { throw new RuntimeException(e); }
-		catch(JSONException e)  { throw new RuntimeException(e); }
-	}
-
-	@Override
-	public void close() throws IOException {
-	}
-	
-	@Override
-	public void reduce(IntWritable key, Iterator<DistinctValue> values,
-			OutputCollector<Text, LongWritable> output, Reporter reporter)
-			throws IOException {
-		
-		FileSystem fs = FileSystem.get(_rJob);
-		
-		int colID = key.get();
-		
-		if(colID < 0) 
-		{
-			// process mapper output for MV and Bin agents
-			colID = colID*-1;
-			_agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(values, _agents.getTfMtdDir(), colID, fs, _agents);
-		}
-		else if ( colID == _agents.getNumCols() + 1)
-		{
-			// process mapper output for OFFSET_FILE
-			ArrayList<OffsetCount> list = new ArrayList<OffsetCount>();
-			while(values.hasNext())
-				list.add(new OffsetCount(values.next().getOffsetCount()));
-			
-			long numTfRows = generateOffsetsFile(list);
-			reporter.incrCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS, numTfRows);
-
-		}
-		else 
-		{
-			// process mapper output for Recode agent
-			_agents.getRecodeAgent().mergeAndOutputTransformationMetadata(values, _agents.getTfMtdDir(), colID, fs, _agents);
-		}
-		
-	}
-	
-	@SuppressWarnings("unchecked")
-	private long generateOffsetsFile(ArrayList<OffsetCount> list) throws IllegalArgumentException, IOException 
-	{
-		Collections.sort(list);
-		
-		@SuppressWarnings("deprecation")
-		SequenceFile.Writer writer = new SequenceFile.Writer(
-				FileSystem.get(_rJob), _rJob, 
-				new Path(_agents.getOffsetFile()+"/part-00000"), 
-				ByteWritable.class, OffsetCount.class);
-		
-		long lineOffset=0;
-		for(OffsetCount oc: list)
-		{
-			long count=oc.count;
-			oc.count=lineOffset;
-			writer.append(new ByteWritable((byte)0), oc);
-			lineOffset+=count;
-		}
-		writer.close();
-		list.clear();
-		
-		return lineOffset;
-	}
-	
-}
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+
+
+public class GTFMTDReducer implements Reducer<IntWritable, DistinctValue, Text, LongWritable> {
+	
+	private JobConf _rJob = null;
+	TfUtils _agents = null;
+	
+	@Override
+	public void configure(JobConf job) {
+		_rJob = job;
+		
+		try {
+			String outputDir = MRJobConfiguration.getOutputs(job)[0];
+			_agents = new TfUtils(job, outputDir);
+		} 
+		catch(IOException e)  { throw new RuntimeException(e); }
+		catch(JSONException e)  { throw new RuntimeException(e); }
+	}
+
+	@Override
+	public void close() throws IOException {
+	}
+	
+	@Override
+	public void reduce(IntWritable key, Iterator<DistinctValue> values,
+			OutputCollector<Text, LongWritable> output, Reporter reporter)
+			throws IOException {
+		
+		FileSystem fs = FileSystem.get(_rJob);
+		
+		int colID = key.get();
+		
+		if(colID < 0) 
+		{
+			// process mapper output for MV and Bin agents
+			colID = colID*-1;
+			_agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(values, _agents.getTfMtdDir(), colID, fs, _agents);
+		}
+		else if ( colID == _agents.getNumCols() + 1)
+		{
+			// process mapper output for OFFSET_FILE
+			ArrayList<OffsetCount> list = new ArrayList<OffsetCount>();
+			while(values.hasNext())
+				list.add(new OffsetCount(values.next().getOffsetCount()));
+			
+			long numTfRows = generateOffsetsFile(list);
+			reporter.incrCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS, numTfRows);
+
+		}
+		else 
+		{
+			// process mapper output for Recode agent
+			_agents.getRecodeAgent().mergeAndOutputTransformationMetadata(values, _agents.getTfMtdDir(), colID, fs, _agents);
+		}
+		
+	}
+	
+	@SuppressWarnings("unchecked")
+	private long generateOffsetsFile(ArrayList<OffsetCount> list) throws IllegalArgumentException, IOException 
+	{
+		Collections.sort(list);
+		
+		@SuppressWarnings("deprecation")
+		SequenceFile.Writer writer = new SequenceFile.Writer(
+				FileSystem.get(_rJob), _rJob, 
+				new Path(_agents.getOffsetFile()+"/part-00000"), 
+				ByteWritable.class, OffsetCount.class);
+		
+		long lineOffset=0;
+		for(OffsetCount oc: list)
+		{
+			long count=oc.count;
+			oc.count=lineOffset;
+			writer.append(new ByteWritable((byte)0), oc);
+			lineOffset+=count;
+		}
+		writer.close();
+		list.clear();
+		
+		return lineOffset;
+	}
+	
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
index b1e79dd..09b9148 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
@@ -1,106 +1,106 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
-
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-
-/**
- * MR Job to Generate Transform Metadata based on a given transformation specification file (JSON format).
- *
- */
-
-public class GenTfMtdMR {
-
-	public static final String DELIM = ",";
-
-	public static long runJob(String inputPath, String txMtdPath, String specFileWithIDs, String smallestFile, String partOffsetsFile, CSVFileFormatProperties inputDataProperties, long numCols, int replication, String headerLine) throws IOException, ClassNotFoundException, InterruptedException {
-		JobConf job = new JobConf(GenTfMtdMR.class);
-		job.setJobName("GenTfMTD");
-		
-		/* Setup MapReduce Job */
-		job.setJarByClass(GenTfMtdMR.class);
-		
-		// set relevant classes
-		job.setMapperClass(GTFMTDMapper.class);
-		job.setReducerClass(GTFMTDReducer.class);
-	
-		// set input and output properties
-		job.setInputFormat(TextInputFormat.class);
-		job.setOutputFormat(NullOutputFormat.class);
-		
-		job.setMapOutputKeyClass(IntWritable.class);
-		job.setMapOutputValueClass(DistinctValue.class);
-		
-		job.setOutputKeyClass(Text.class);
-		job.setOutputValueClass(LongWritable.class);
-		
-		job.setInt("dfs.replication", replication);
-
-		FileInputFormat.addInputPath(job, new Path(inputPath));
-		// delete outputPath, if exists already.
-		Path outPath = new Path(txMtdPath);
-		FileSystem fs = FileSystem.get(job);
-		fs.delete(outPath, true);
-		FileOutputFormat.setOutputPath(job, outPath);
-
-		job.set(MRJobConfiguration.TF_HAS_HEADER, Boolean.toString(inputDataProperties.hasHeader()));
-		job.set(MRJobConfiguration.TF_DELIM, inputDataProperties.getDelim());
-		if ( inputDataProperties.getNAStrings() != null)
-			// Adding "dummy" string to handle the case of na_strings = ""
-			job.set(MRJobConfiguration.TF_NA_STRINGS, TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) );
-		job.set(MRJobConfiguration.TF_SPEC_FILE, specFileWithIDs);
-		job.set(MRJobConfiguration.TF_SMALLEST_FILE, smallestFile);
-		job.setLong(MRJobConfiguration.TF_NUM_COLS, numCols);
-		job.set(MRJobConfiguration.TF_HEADER, headerLine);
-		
-		job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, txMtdPath);
-		
-		// offsets file to store part-file names and offsets for each input split
-		job.set(MRJobConfiguration.TF_OFFSETS_FILE, partOffsetsFile);
-		
-		//turn off adaptivemr
-		job.setBoolean("adaptivemr.map.enable", false);
-		
-		// Run the job
-		RunningJob runjob = JobClient.runJob(job);
-		
-		Counters c = runjob.getCounters();
-		long tx_numRows = c.findCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS).getCounter();
-
-		return tx_numRows;
-	}
-	
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+
+import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+
+/**
+ * MR Job to Generate Transform Metadata based on a given transformation specification file (JSON format).
+ *
+ */
+
+public class GenTfMtdMR {
+
+	public static final String DELIM = ",";
+
+	public static long runJob(String inputPath, String txMtdPath, String specFileWithIDs, String smallestFile, String partOffsetsFile, CSVFileFormatProperties inputDataProperties, long numCols, int replication, String headerLine) throws IOException, ClassNotFoundException, InterruptedException {
+		JobConf job = new JobConf(GenTfMtdMR.class);
+		job.setJobName("GenTfMTD");
+		
+		/* Setup MapReduce Job */
+		job.setJarByClass(GenTfMtdMR.class);
+		
+		// set relevant classes
+		job.setMapperClass(GTFMTDMapper.class);
+		job.setReducerClass(GTFMTDReducer.class);
+	
+		// set input and output properties
+		job.setInputFormat(TextInputFormat.class);
+		job.setOutputFormat(NullOutputFormat.class);
+		
+		job.setMapOutputKeyClass(IntWritable.class);
+		job.setMapOutputValueClass(DistinctValue.class);
+		
+		job.setOutputKeyClass(Text.class);
+		job.setOutputValueClass(LongWritable.class);
+		
+		job.setInt("dfs.replication", replication);
+
+		FileInputFormat.addInputPath(job, new Path(inputPath));
+		// delete outputPath, if exists already.
+		Path outPath = new Path(txMtdPath);
+		FileSystem fs = FileSystem.get(job);
+		fs.delete(outPath, true);
+		FileOutputFormat.setOutputPath(job, outPath);
+
+		job.set(MRJobConfiguration.TF_HAS_HEADER, Boolean.toString(inputDataProperties.hasHeader()));
+		job.set(MRJobConfiguration.TF_DELIM, inputDataProperties.getDelim());
+		if ( inputDataProperties.getNAStrings() != null)
+			// Adding "dummy" string to handle the case of na_strings = ""
+			job.set(MRJobConfiguration.TF_NA_STRINGS, TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) );
+		job.set(MRJobConfiguration.TF_SPEC_FILE, specFileWithIDs);
+		job.set(MRJobConfiguration.TF_SMALLEST_FILE, smallestFile);
+		job.setLong(MRJobConfiguration.TF_NUM_COLS, numCols);
+		job.set(MRJobConfiguration.TF_HEADER, headerLine);
+		
+		job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, txMtdPath);
+		
+		// offsets file to store part-file names and offsets for each input split
+		job.set(MRJobConfiguration.TF_OFFSETS_FILE, partOffsetsFile);
+		
+		//turn off adaptivemr
+		job.setBoolean("adaptivemr.map.enable", false);
+		
+		// Run the job
+		RunningJob runjob = JobClient.runJob(job);
+		
+		Counters c = runjob.getCounters();
+		long tx_numRows = c.findCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS).getCounter();
+
+		return tx_numRows;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java
index 6b811ef..e0644ff 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java
@@ -1,235 +1,235 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-
-public class GenTfMtdSPARK {
-
-	/**
-	 * Spark code to Generate Transform Metadata based on the given transformation
-	 * specification file (JSON format).
-	 * 
-	 */
-
-	public static long runSparkJob(SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, Text>> inputRDD, 
-									String tfMtdPath, String specFile, 
-									String partOffsetsFile, CSVFileFormatProperties prop, 
-									long numCols, String headerLine
-								) throws IOException, ClassNotFoundException, InterruptedException, IllegalArgumentException, JSONException {
-		
-		// Construct transformation metadata (map-side)
-		// Note: logic is similar to GTFMTDMapper
-		JavaRDD<Tuple2<Integer,DistinctValue>> tfMapOutput 
-			= inputRDD.mapPartitionsWithIndex(
-					new GenTfMtdMap(prop.hasHeader(), 
-									prop.getDelim(), 
-									prop.getNAStrings(), 
-									specFile, 
-									numCols, 
-									headerLine), 
-					true );
-		
-		// Shuffle to group by DistinctValue
-		JavaPairRDD<Integer,Iterable<DistinctValue>> rdd = JavaPairRDD.fromJavaRDD(tfMapOutput).groupByKey();
-		
-		// Construct transformation metadata (Reduce-side)
-		// Note: logic is similar to GTFMTDReducer
-		JavaRDD<Long> out 
-			= rdd.flatMap(new GenTfMtdReduce(prop.hasHeader(), 
-												prop.getDelim(), 
-												prop.getNAStrings(), 
-												headerLine, 
-												tfMtdPath, 
-												partOffsetsFile, 
-												specFile, 
-												numCols)  );
-		
-		// Compute the total number of transformed rows
-		long numRows = out.reduce(new Function2<Long,Long,Long>() {
-			private static final long serialVersionUID = 1263336168859959795L;
-
-			@Override
-			public Long call(Long v1, Long v2) throws Exception {
-				return v1+v2;
-			}
-			
-		});
-		
-		return numRows;
-	}
-	
-	// ----------------------------------------------------------------------------------------------------------------------
-	
-	public static class GenTfMtdMap implements Function2<Integer, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<Integer,DistinctValue>>> {
-
-		private static final long serialVersionUID = -5622745445470598215L;
-		
-		TfUtils _agents = null;
-		
-		GenTfMtdMap(boolean hasHeader, String delim, String naStrings, String specFile, long numCols, String headerLine) throws IllegalArgumentException, IOException, JSONException {
-			
-			// Setup Transformation Agents
-			JobConf job = new JobConf();
-			FileSystem fs = FileSystem.get(job);
-			String[] nas = TfUtils.parseNAStrings(naStrings);
-			
-			JSONObject spec = TfUtils.readSpec(fs, specFile);
-			_agents = new TfUtils(headerLine, hasHeader, delim, nas, spec, numCols, null, null, null);
-
-		}
-		
-		@Override
-		public Iterator<Tuple2<Integer,DistinctValue>> call(Integer partitionID,
-				Iterator<Tuple2<LongWritable, Text>> csvLines) throws Exception {
-			
-			// Construct transformation metadata by looping through csvLines
-			// Note: logic is similar to GTFMTDMapper
-			
-			boolean first = true;
-			Tuple2<LongWritable, Text> rec = null;
-			long _offsetInPartFile = -1;
-			
-			while(csvLines.hasNext()) {
-				rec = csvLines.next();
-				
-				if (first) {
-					first = false;
-					_offsetInPartFile = rec._1().get();
-					
-					if (partitionID == 0 && _agents.hasHeader() && _offsetInPartFile == 0 )
-						continue; // skip the header line
-				}
-				
-				_agents.prepareTfMtd(rec._2().toString());
-			}
-			
-			// Prepare the output in the form of DistinctValues, which subsequently need to be grouped and aggregated. 
-			
-			ArrayList<Tuple2<Integer,DistinctValue>> outList = new ArrayList<Tuple2<Integer,DistinctValue>>();
-			
-			_agents.getMVImputeAgent().mapOutputTransformationMetadata(partitionID, outList, _agents);
-			_agents.getRecodeAgent().mapOutputTransformationMetadata(partitionID, outList, _agents);
-			_agents.getBinAgent().mapOutputTransformationMetadata(partitionID, outList, _agents);
-			
-			DistinctValue dv = new DistinctValue(new OffsetCount("Partition"+partitionID, _offsetInPartFile, _agents.getTotal()));
-			Tuple2<Integer, DistinctValue> tuple = new Tuple2<Integer, DistinctValue>((int) (_agents.getNumCols()+1), dv); 
-			outList.add(tuple);
-
-			return outList.iterator();
-		}
-		
-	}
-	
-	// ------------------------------------------------------------------------------------------------
-	
-	public static class GenTfMtdReduce implements FlatMapFunction<Tuple2<Integer, Iterable<DistinctValue>>, Long> {
-		
-		private static final long serialVersionUID = -2733233671193035242L;
-		TfUtils _agents = null;
-		
-		GenTfMtdReduce(boolean hasHeader, String delim, String naStrings, String headerLine, String tfMtdDir, String offsetFile, String specFile, long numCols) throws IOException, JSONException {
-			String[] nas = TfUtils.parseNAStrings(naStrings); 
-			FileSystem fs = FileSystem.get(new JobConf());
-
-			JSONObject spec = TfUtils.readSpec(fs, specFile);
-			_agents = new TfUtils(headerLine, hasHeader, delim, nas, spec, numCols, tfMtdDir, offsetFile, null);
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public Iterable<Long> call(Tuple2<Integer, Iterable<DistinctValue>> t)
-				throws Exception {
-			
-			int colID = t._1();
-			Iterator<DistinctValue> iterDV = t._2().iterator();
-
-			JobConf job = new JobConf();
-			FileSystem fs = FileSystem.get(job);
-			
-			ArrayList<Long> numRows = new ArrayList<Long>();
-			
-			if(colID < 0) 
-			{
-				// process mapper output for MV and Bin agents
-				colID = colID*-1;
-				_agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(iterDV, _agents.getTfMtdDir(), colID, fs, _agents);
-				numRows.add(0L);
-			}
-			else if ( colID == _agents.getNumCols() + 1)
-			{
-				// process mapper output for OFFSET_FILE
-				ArrayList<OffsetCount> list = new ArrayList<OffsetCount>();
-				while(iterDV.hasNext())
-					list.add(new OffsetCount(iterDV.next().getOffsetCount()));
-				Collections.sort(list);
-				
-				@SuppressWarnings("deprecation")
-				SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, new Path(_agents.getOffsetFile()+"/part-00000"), ByteWritable.class, OffsetCount.class);
-				
-				long lineOffset=0;
-				for(OffsetCount oc: list)
-				{
-					long count=oc.count;
-					oc.count=lineOffset;
-					writer.append(new ByteWritable((byte)0), oc);
-					lineOffset+=count;
-				}
-				writer.close();
-				list.clear();
-				
-				numRows.add(lineOffset);
-			}
-			else 
-			{
-				// process mapper output for Recode agent
-				_agents.getRecodeAgent().mergeAndOutputTransformationMetadata(iterDV, _agents.getTfMtdDir(), colID, fs, _agents);
-				numRows.add(0L);
-			}
-			
-			return numRows;
-		}
-
-	}
-
-	
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import scala.Tuple2;
+
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+
+public class GenTfMtdSPARK {
+
+	/**
+	 * Spark code to Generate Transform Metadata based on the given transformation
+	 * specification file (JSON format).
+	 * 
+	 */
+
+	public static long runSparkJob(SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, Text>> inputRDD, 
+									String tfMtdPath, String specFile, 
+									String partOffsetsFile, CSVFileFormatProperties prop, 
+									long numCols, String headerLine
+								) throws IOException, ClassNotFoundException, InterruptedException, IllegalArgumentException, JSONException {
+		
+		// Construct transformation metadata (map-side)
+		// Note: logic is similar to GTFMTDMapper
+		JavaRDD<Tuple2<Integer,DistinctValue>> tfMapOutput 
+			= inputRDD.mapPartitionsWithIndex(
+					new GenTfMtdMap(prop.hasHeader(), 
+									prop.getDelim(), 
+									prop.getNAStrings(), 
+									specFile, 
+									numCols, 
+									headerLine), 
+					true );
+		
+		// Shuffle to group by DistinctValue
+		JavaPairRDD<Integer,Iterable<DistinctValue>> rdd = JavaPairRDD.fromJavaRDD(tfMapOutput).groupByKey();
+		
+		// Construct transformation metadata (Reduce-side)
+		// Note: logic is similar to GTFMTDReducer
+		JavaRDD<Long> out 
+			= rdd.flatMap(new GenTfMtdReduce(prop.hasHeader(), 
+												prop.getDelim(), 
+												prop.getNAStrings(), 
+												headerLine, 
+												tfMtdPath, 
+												partOffsetsFile, 
+												specFile, 
+												numCols)  );
+		
+		// Compute the total number of transformed rows
+		long numRows = out.reduce(new Function2<Long,Long,Long>() {
+			private static final long serialVersionUID = 1263336168859959795L;
+
+			@Override
+			public Long call(Long v1, Long v2) throws Exception {
+				return v1+v2;
+			}
+			
+		});
+		
+		return numRows;
+	}
+	
+	// ----------------------------------------------------------------------------------------------------------------------
+	
+	public static class GenTfMtdMap implements Function2<Integer, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<Integer,DistinctValue>>> {
+
+		private static final long serialVersionUID = -5622745445470598215L;
+		
+		TfUtils _agents = null;
+		
+		GenTfMtdMap(boolean hasHeader, String delim, String naStrings, String specFile, long numCols, String headerLine) throws IllegalArgumentException, IOException, JSONException {
+			
+			// Setup Transformation Agents
+			JobConf job = new JobConf();
+			FileSystem fs = FileSystem.get(job);
+			String[] nas = TfUtils.parseNAStrings(naStrings);
+			
+			JSONObject spec = TfUtils.readSpec(fs, specFile);
+			_agents = new TfUtils(headerLine, hasHeader, delim, nas, spec, numCols, null, null, null);
+
+		}
+		
+		@Override
+		public Iterator<Tuple2<Integer,DistinctValue>> call(Integer partitionID,
+				Iterator<Tuple2<LongWritable, Text>> csvLines) throws Exception {
+			
+			// Construct transformation metadata by looping through csvLines
+			// Note: logic is similar to GTFMTDMapper
+			
+			boolean first = true;
+			Tuple2<LongWritable, Text> rec = null;
+			long _offsetInPartFile = -1;
+			
+			while(csvLines.hasNext()) {
+				rec = csvLines.next();
+				
+				if (first) {
+					first = false;
+					_offsetInPartFile = rec._1().get();
+					
+					if (partitionID == 0 && _agents.hasHeader() && _offsetInPartFile == 0 )
+						continue; // skip the header line
+				}
+				
+				_agents.prepareTfMtd(rec._2().toString());
+			}
+			
+			// Prepare the output in the form of DistinctValues, which subsequently need to be grouped and aggregated. 
+			
+			ArrayList<Tuple2<Integer,DistinctValue>> outList = new ArrayList<Tuple2<Integer,DistinctValue>>();
+			
+			_agents.getMVImputeAgent().mapOutputTransformationMetadata(partitionID, outList, _agents);
+			_agents.getRecodeAgent().mapOutputTransformationMetadata(partitionID, outList, _agents);
+			_agents.getBinAgent().mapOutputTransformationMetadata(partitionID, outList, _agents);
+			
+			DistinctValue dv = new DistinctValue(new OffsetCount("Partition"+partitionID, _offsetInPartFile, _agents.getTotal()));
+			Tuple2<Integer, DistinctValue> tuple = new Tuple2<Integer, DistinctValue>((int) (_agents.getNumCols()+1), dv); 
+			outList.add(tuple);
+
+			return outList.iterator();
+		}
+		
+	}
+	
+	// ------------------------------------------------------------------------------------------------
+	
+	public static class GenTfMtdReduce implements FlatMapFunction<Tuple2<Integer, Iterable<DistinctValue>>, Long> {
+		
+		private static final long serialVersionUID = -2733233671193035242L;
+		TfUtils _agents = null;
+		
+		GenTfMtdReduce(boolean hasHeader, String delim, String naStrings, String headerLine, String tfMtdDir, String offsetFile, String specFile, long numCols) throws IOException, JSONException {
+			String[] nas = TfUtils.parseNAStrings(naStrings); 
+			FileSystem fs = FileSystem.get(new JobConf());
+
+			JSONObject spec = TfUtils.readSpec(fs, specFile);
+			_agents = new TfUtils(headerLine, hasHeader, delim, nas, spec, numCols, tfMtdDir, offsetFile, null);
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public Iterable<Long> call(Tuple2<Integer, Iterable<DistinctValue>> t)
+				throws Exception {
+			
+			int colID = t._1();
+			Iterator<DistinctValue> iterDV = t._2().iterator();
+
+			JobConf job = new JobConf();
+			FileSystem fs = FileSystem.get(job);
+			
+			ArrayList<Long> numRows = new ArrayList<Long>();
+			
+			if(colID < 0) 
+			{
+				// process mapper output for MV and Bin agents
+				colID = colID*-1;
+				_agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(iterDV, _agents.getTfMtdDir(), colID, fs, _agents);
+				numRows.add(0L);
+			}
+			else if ( colID == _agents.getNumCols() + 1)
+			{
+				// process mapper output for OFFSET_FILE
+				ArrayList<OffsetCount> list = new ArrayList<OffsetCount>();
+				while(iterDV.hasNext())
+					list.add(new OffsetCount(iterDV.next().getOffsetCount()));
+				Collections.sort(list);
+				
+				@SuppressWarnings("deprecation")
+				SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, new Path(_agents.getOffsetFile()+"/part-00000"), ByteWritable.class, OffsetCount.class);
+				
+				long lineOffset=0;
+				for(OffsetCount oc: list)
+				{
+					long count=oc.count;
+					oc.count=lineOffset;
+					writer.append(new ByteWritable((byte)0), oc);
+					lineOffset+=count;
+				}
+				writer.close();
+				list.clear();
+				
+				numRows.add(lineOffset);
+			}
+			else 
+			{
+				// process mapper output for Recode agent
+				_agents.getRecodeAgent().mergeAndOutputTransformationMetadata(iterDV, _agents.getTfMtdDir(), colID, fs, _agents);
+				numRows.add(0L);
+			}
+			
+			return numRows;
+		}
+
+	}
+
+	
+}