You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by du...@apache.org on 2016/01/26 02:12:58 UTC
[34/55] [partial] incubator-systemml git commit: [SYSTEMML-482]
[SYSTEMML-480] Adding a Git attributes file to enfore Unix-styled line
endings, and normalizing all of the line endings.
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java
index 1a646cf..2e3fd75 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java
@@ -1,124 +1,124 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-
-
-public class GTFMTDReducer implements Reducer<IntWritable, DistinctValue, Text, LongWritable> {
-
- private JobConf _rJob = null;
- TfUtils _agents = null;
-
- @Override
- public void configure(JobConf job) {
- _rJob = job;
-
- try {
- String outputDir = MRJobConfiguration.getOutputs(job)[0];
- _agents = new TfUtils(job, outputDir);
- }
- catch(IOException e) { throw new RuntimeException(e); }
- catch(JSONException e) { throw new RuntimeException(e); }
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public void reduce(IntWritable key, Iterator<DistinctValue> values,
- OutputCollector<Text, LongWritable> output, Reporter reporter)
- throws IOException {
-
- FileSystem fs = FileSystem.get(_rJob);
-
- int colID = key.get();
-
- if(colID < 0)
- {
- // process mapper output for MV and Bin agents
- colID = colID*-1;
- _agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(values, _agents.getTfMtdDir(), colID, fs, _agents);
- }
- else if ( colID == _agents.getNumCols() + 1)
- {
- // process mapper output for OFFSET_FILE
- ArrayList<OffsetCount> list = new ArrayList<OffsetCount>();
- while(values.hasNext())
- list.add(new OffsetCount(values.next().getOffsetCount()));
-
- long numTfRows = generateOffsetsFile(list);
- reporter.incrCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS, numTfRows);
-
- }
- else
- {
- // process mapper output for Recode agent
- _agents.getRecodeAgent().mergeAndOutputTransformationMetadata(values, _agents.getTfMtdDir(), colID, fs, _agents);
- }
-
- }
-
- @SuppressWarnings("unchecked")
- private long generateOffsetsFile(ArrayList<OffsetCount> list) throws IllegalArgumentException, IOException
- {
- Collections.sort(list);
-
- @SuppressWarnings("deprecation")
- SequenceFile.Writer writer = new SequenceFile.Writer(
- FileSystem.get(_rJob), _rJob,
- new Path(_agents.getOffsetFile()+"/part-00000"),
- ByteWritable.class, OffsetCount.class);
-
- long lineOffset=0;
- for(OffsetCount oc: list)
- {
- long count=oc.count;
- oc.count=lineOffset;
- writer.append(new ByteWritable((byte)0), oc);
- lineOffset+=count;
- }
- writer.close();
- list.clear();
-
- return lineOffset;
- }
-
-}
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+
+
+public class GTFMTDReducer implements Reducer<IntWritable, DistinctValue, Text, LongWritable> {
+
+ private JobConf _rJob = null;
+ TfUtils _agents = null;
+
+ @Override
+ public void configure(JobConf job) {
+ _rJob = job;
+
+ try {
+ String outputDir = MRJobConfiguration.getOutputs(job)[0];
+ _agents = new TfUtils(job, outputDir);
+ }
+ catch(IOException e) { throw new RuntimeException(e); }
+ catch(JSONException e) { throw new RuntimeException(e); }
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void reduce(IntWritable key, Iterator<DistinctValue> values,
+ OutputCollector<Text, LongWritable> output, Reporter reporter)
+ throws IOException {
+
+ FileSystem fs = FileSystem.get(_rJob);
+
+ int colID = key.get();
+
+ if(colID < 0)
+ {
+ // process mapper output for MV and Bin agents
+ colID = colID*-1;
+ _agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(values, _agents.getTfMtdDir(), colID, fs, _agents);
+ }
+ else if ( colID == _agents.getNumCols() + 1)
+ {
+ // process mapper output for OFFSET_FILE
+ ArrayList<OffsetCount> list = new ArrayList<OffsetCount>();
+ while(values.hasNext())
+ list.add(new OffsetCount(values.next().getOffsetCount()));
+
+ long numTfRows = generateOffsetsFile(list);
+ reporter.incrCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS, numTfRows);
+
+ }
+ else
+ {
+ // process mapper output for Recode agent
+ _agents.getRecodeAgent().mergeAndOutputTransformationMetadata(values, _agents.getTfMtdDir(), colID, fs, _agents);
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private long generateOffsetsFile(ArrayList<OffsetCount> list) throws IllegalArgumentException, IOException
+ {
+ Collections.sort(list);
+
+ @SuppressWarnings("deprecation")
+ SequenceFile.Writer writer = new SequenceFile.Writer(
+ FileSystem.get(_rJob), _rJob,
+ new Path(_agents.getOffsetFile()+"/part-00000"),
+ ByteWritable.class, OffsetCount.class);
+
+ long lineOffset=0;
+ for(OffsetCount oc: list)
+ {
+ long count=oc.count;
+ oc.count=lineOffset;
+ writer.append(new ByteWritable((byte)0), oc);
+ lineOffset+=count;
+ }
+ writer.close();
+ list.clear();
+
+ return lineOffset;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
index b1e79dd..09b9148 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
@@ -1,106 +1,106 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
-
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-
-/**
- * MR Job to Generate Transform Metadata based on a given transformation specification file (JSON format).
- *
- */
-
-public class GenTfMtdMR {
-
- public static final String DELIM = ",";
-
- public static long runJob(String inputPath, String txMtdPath, String specFileWithIDs, String smallestFile, String partOffsetsFile, CSVFileFormatProperties inputDataProperties, long numCols, int replication, String headerLine) throws IOException, ClassNotFoundException, InterruptedException {
- JobConf job = new JobConf(GenTfMtdMR.class);
- job.setJobName("GenTfMTD");
-
- /* Setup MapReduce Job */
- job.setJarByClass(GenTfMtdMR.class);
-
- // set relevant classes
- job.setMapperClass(GTFMTDMapper.class);
- job.setReducerClass(GTFMTDReducer.class);
-
- // set input and output properties
- job.setInputFormat(TextInputFormat.class);
- job.setOutputFormat(NullOutputFormat.class);
-
- job.setMapOutputKeyClass(IntWritable.class);
- job.setMapOutputValueClass(DistinctValue.class);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
-
- job.setInt("dfs.replication", replication);
-
- FileInputFormat.addInputPath(job, new Path(inputPath));
- // delete outputPath, if exists already.
- Path outPath = new Path(txMtdPath);
- FileSystem fs = FileSystem.get(job);
- fs.delete(outPath, true);
- FileOutputFormat.setOutputPath(job, outPath);
-
- job.set(MRJobConfiguration.TF_HAS_HEADER, Boolean.toString(inputDataProperties.hasHeader()));
- job.set(MRJobConfiguration.TF_DELIM, inputDataProperties.getDelim());
- if ( inputDataProperties.getNAStrings() != null)
- // Adding "dummy" string to handle the case of na_strings = ""
- job.set(MRJobConfiguration.TF_NA_STRINGS, TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) );
- job.set(MRJobConfiguration.TF_SPEC_FILE, specFileWithIDs);
- job.set(MRJobConfiguration.TF_SMALLEST_FILE, smallestFile);
- job.setLong(MRJobConfiguration.TF_NUM_COLS, numCols);
- job.set(MRJobConfiguration.TF_HEADER, headerLine);
-
- job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, txMtdPath);
-
- // offsets file to store part-file names and offsets for each input split
- job.set(MRJobConfiguration.TF_OFFSETS_FILE, partOffsetsFile);
-
- //turn off adaptivemr
- job.setBoolean("adaptivemr.map.enable", false);
-
- // Run the job
- RunningJob runjob = JobClient.runJob(job);
-
- Counters c = runjob.getCounters();
- long tx_numRows = c.findCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS).getCounter();
-
- return tx_numRows;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+
+import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+
+/**
+ * MR Job to Generate Transform Metadata based on a given transformation specification file (JSON format).
+ *
+ */
+
+public class GenTfMtdMR {
+
+ public static final String DELIM = ",";
+
+ public static long runJob(String inputPath, String txMtdPath, String specFileWithIDs, String smallestFile, String partOffsetsFile, CSVFileFormatProperties inputDataProperties, long numCols, int replication, String headerLine) throws IOException, ClassNotFoundException, InterruptedException {
+ JobConf job = new JobConf(GenTfMtdMR.class);
+ job.setJobName("GenTfMTD");
+
+ /* Setup MapReduce Job */
+ job.setJarByClass(GenTfMtdMR.class);
+
+ // set relevant classes
+ job.setMapperClass(GTFMTDMapper.class);
+ job.setReducerClass(GTFMTDReducer.class);
+
+ // set input and output properties
+ job.setInputFormat(TextInputFormat.class);
+ job.setOutputFormat(NullOutputFormat.class);
+
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(DistinctValue.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(LongWritable.class);
+
+ job.setInt("dfs.replication", replication);
+
+ FileInputFormat.addInputPath(job, new Path(inputPath));
+ // delete outputPath, if exists already.
+ Path outPath = new Path(txMtdPath);
+ FileSystem fs = FileSystem.get(job);
+ fs.delete(outPath, true);
+ FileOutputFormat.setOutputPath(job, outPath);
+
+ job.set(MRJobConfiguration.TF_HAS_HEADER, Boolean.toString(inputDataProperties.hasHeader()));
+ job.set(MRJobConfiguration.TF_DELIM, inputDataProperties.getDelim());
+ if ( inputDataProperties.getNAStrings() != null)
+ // Adding "dummy" string to handle the case of na_strings = ""
+ job.set(MRJobConfiguration.TF_NA_STRINGS, TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) );
+ job.set(MRJobConfiguration.TF_SPEC_FILE, specFileWithIDs);
+ job.set(MRJobConfiguration.TF_SMALLEST_FILE, smallestFile);
+ job.setLong(MRJobConfiguration.TF_NUM_COLS, numCols);
+ job.set(MRJobConfiguration.TF_HEADER, headerLine);
+
+ job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, txMtdPath);
+
+ // offsets file to store part-file names and offsets for each input split
+ job.set(MRJobConfiguration.TF_OFFSETS_FILE, partOffsetsFile);
+
+ //turn off adaptivemr
+ job.setBoolean("adaptivemr.map.enable", false);
+
+ // Run the job
+ RunningJob runjob = JobClient.runJob(job);
+
+ Counters c = runjob.getCounters();
+ long tx_numRows = c.findCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS).getCounter();
+
+ return tx_numRows;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java
index 6b811ef..e0644ff 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java
@@ -1,235 +1,235 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-
-public class GenTfMtdSPARK {
-
- /**
- * Spark code to Generate Transform Metadata based on the given transformation
- * specification file (JSON format).
- *
- */
-
- public static long runSparkJob(SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, Text>> inputRDD,
- String tfMtdPath, String specFile,
- String partOffsetsFile, CSVFileFormatProperties prop,
- long numCols, String headerLine
- ) throws IOException, ClassNotFoundException, InterruptedException, IllegalArgumentException, JSONException {
-
- // Construct transformation metadata (map-side)
- // Note: logic is similar to GTFMTDMapper
- JavaRDD<Tuple2<Integer,DistinctValue>> tfMapOutput
- = inputRDD.mapPartitionsWithIndex(
- new GenTfMtdMap(prop.hasHeader(),
- prop.getDelim(),
- prop.getNAStrings(),
- specFile,
- numCols,
- headerLine),
- true );
-
- // Shuffle to group by DistinctValue
- JavaPairRDD<Integer,Iterable<DistinctValue>> rdd = JavaPairRDD.fromJavaRDD(tfMapOutput).groupByKey();
-
- // Construct transformation metadata (Reduce-side)
- // Note: logic is similar to GTFMTDReducer
- JavaRDD<Long> out
- = rdd.flatMap(new GenTfMtdReduce(prop.hasHeader(),
- prop.getDelim(),
- prop.getNAStrings(),
- headerLine,
- tfMtdPath,
- partOffsetsFile,
- specFile,
- numCols) );
-
- // Compute the total number of transformed rows
- long numRows = out.reduce(new Function2<Long,Long,Long>() {
- private static final long serialVersionUID = 1263336168859959795L;
-
- @Override
- public Long call(Long v1, Long v2) throws Exception {
- return v1+v2;
- }
-
- });
-
- return numRows;
- }
-
- // ----------------------------------------------------------------------------------------------------------------------
-
- public static class GenTfMtdMap implements Function2<Integer, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<Integer,DistinctValue>>> {
-
- private static final long serialVersionUID = -5622745445470598215L;
-
- TfUtils _agents = null;
-
- GenTfMtdMap(boolean hasHeader, String delim, String naStrings, String specFile, long numCols, String headerLine) throws IllegalArgumentException, IOException, JSONException {
-
- // Setup Transformation Agents
- JobConf job = new JobConf();
- FileSystem fs = FileSystem.get(job);
- String[] nas = TfUtils.parseNAStrings(naStrings);
-
- JSONObject spec = TfUtils.readSpec(fs, specFile);
- _agents = new TfUtils(headerLine, hasHeader, delim, nas, spec, numCols, null, null, null);
-
- }
-
- @Override
- public Iterator<Tuple2<Integer,DistinctValue>> call(Integer partitionID,
- Iterator<Tuple2<LongWritable, Text>> csvLines) throws Exception {
-
- // Construct transformation metadata by looping through csvLines
- // Note: logic is similar to GTFMTDMapper
-
- boolean first = true;
- Tuple2<LongWritable, Text> rec = null;
- long _offsetInPartFile = -1;
-
- while(csvLines.hasNext()) {
- rec = csvLines.next();
-
- if (first) {
- first = false;
- _offsetInPartFile = rec._1().get();
-
- if (partitionID == 0 && _agents.hasHeader() && _offsetInPartFile == 0 )
- continue; // skip the header line
- }
-
- _agents.prepareTfMtd(rec._2().toString());
- }
-
- // Prepare the output in the form of DistinctValues, which subsequently need to be grouped and aggregated.
-
- ArrayList<Tuple2<Integer,DistinctValue>> outList = new ArrayList<Tuple2<Integer,DistinctValue>>();
-
- _agents.getMVImputeAgent().mapOutputTransformationMetadata(partitionID, outList, _agents);
- _agents.getRecodeAgent().mapOutputTransformationMetadata(partitionID, outList, _agents);
- _agents.getBinAgent().mapOutputTransformationMetadata(partitionID, outList, _agents);
-
- DistinctValue dv = new DistinctValue(new OffsetCount("Partition"+partitionID, _offsetInPartFile, _agents.getTotal()));
- Tuple2<Integer, DistinctValue> tuple = new Tuple2<Integer, DistinctValue>((int) (_agents.getNumCols()+1), dv);
- outList.add(tuple);
-
- return outList.iterator();
- }
-
- }
-
- // ------------------------------------------------------------------------------------------------
-
- public static class GenTfMtdReduce implements FlatMapFunction<Tuple2<Integer, Iterable<DistinctValue>>, Long> {
-
- private static final long serialVersionUID = -2733233671193035242L;
- TfUtils _agents = null;
-
- GenTfMtdReduce(boolean hasHeader, String delim, String naStrings, String headerLine, String tfMtdDir, String offsetFile, String specFile, long numCols) throws IOException, JSONException {
- String[] nas = TfUtils.parseNAStrings(naStrings);
- FileSystem fs = FileSystem.get(new JobConf());
-
- JSONObject spec = TfUtils.readSpec(fs, specFile);
- _agents = new TfUtils(headerLine, hasHeader, delim, nas, spec, numCols, tfMtdDir, offsetFile, null);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Iterable<Long> call(Tuple2<Integer, Iterable<DistinctValue>> t)
- throws Exception {
-
- int colID = t._1();
- Iterator<DistinctValue> iterDV = t._2().iterator();
-
- JobConf job = new JobConf();
- FileSystem fs = FileSystem.get(job);
-
- ArrayList<Long> numRows = new ArrayList<Long>();
-
- if(colID < 0)
- {
- // process mapper output for MV and Bin agents
- colID = colID*-1;
- _agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(iterDV, _agents.getTfMtdDir(), colID, fs, _agents);
- numRows.add(0L);
- }
- else if ( colID == _agents.getNumCols() + 1)
- {
- // process mapper output for OFFSET_FILE
- ArrayList<OffsetCount> list = new ArrayList<OffsetCount>();
- while(iterDV.hasNext())
- list.add(new OffsetCount(iterDV.next().getOffsetCount()));
- Collections.sort(list);
-
- @SuppressWarnings("deprecation")
- SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, new Path(_agents.getOffsetFile()+"/part-00000"), ByteWritable.class, OffsetCount.class);
-
- long lineOffset=0;
- for(OffsetCount oc: list)
- {
- long count=oc.count;
- oc.count=lineOffset;
- writer.append(new ByteWritable((byte)0), oc);
- lineOffset+=count;
- }
- writer.close();
- list.clear();
-
- numRows.add(lineOffset);
- }
- else
- {
- // process mapper output for Recode agent
- _agents.getRecodeAgent().mergeAndOutputTransformationMetadata(iterDV, _agents.getTfMtdDir(), colID, fs, _agents);
- numRows.add(0L);
- }
-
- return numRows;
- }
-
- }
-
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import scala.Tuple2;
+
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+
+public class GenTfMtdSPARK {
+
+ /**
+ * Spark code to Generate Transform Metadata based on the given transformation
+ * specification file (JSON format).
+ *
+ */
+
+ public static long runSparkJob(SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, Text>> inputRDD,
+ String tfMtdPath, String specFile,
+ String partOffsetsFile, CSVFileFormatProperties prop,
+ long numCols, String headerLine
+ ) throws IOException, ClassNotFoundException, InterruptedException, IllegalArgumentException, JSONException {
+
+ // Construct transformation metadata (map-side)
+ // Note: logic is similar to GTFMTDMapper
+ JavaRDD<Tuple2<Integer,DistinctValue>> tfMapOutput
+ = inputRDD.mapPartitionsWithIndex(
+ new GenTfMtdMap(prop.hasHeader(),
+ prop.getDelim(),
+ prop.getNAStrings(),
+ specFile,
+ numCols,
+ headerLine),
+ true );
+
+ // Shuffle to group by DistinctValue
+ JavaPairRDD<Integer,Iterable<DistinctValue>> rdd = JavaPairRDD.fromJavaRDD(tfMapOutput).groupByKey();
+
+ // Construct transformation metadata (Reduce-side)
+ // Note: logic is similar to GTFMTDReducer
+ JavaRDD<Long> out
+ = rdd.flatMap(new GenTfMtdReduce(prop.hasHeader(),
+ prop.getDelim(),
+ prop.getNAStrings(),
+ headerLine,
+ tfMtdPath,
+ partOffsetsFile,
+ specFile,
+ numCols) );
+
+ // Compute the total number of transformed rows
+ long numRows = out.reduce(new Function2<Long,Long,Long>() {
+ private static final long serialVersionUID = 1263336168859959795L;
+
+ @Override
+ public Long call(Long v1, Long v2) throws Exception {
+ return v1+v2;
+ }
+
+ });
+
+ return numRows;
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------------
+
+ public static class GenTfMtdMap implements Function2<Integer, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<Integer,DistinctValue>>> {
+
+ private static final long serialVersionUID = -5622745445470598215L;
+
+ TfUtils _agents = null;
+
+ GenTfMtdMap(boolean hasHeader, String delim, String naStrings, String specFile, long numCols, String headerLine) throws IllegalArgumentException, IOException, JSONException {
+
+ // Setup Transformation Agents
+ JobConf job = new JobConf();
+ FileSystem fs = FileSystem.get(job);
+ String[] nas = TfUtils.parseNAStrings(naStrings);
+
+ JSONObject spec = TfUtils.readSpec(fs, specFile);
+ _agents = new TfUtils(headerLine, hasHeader, delim, nas, spec, numCols, null, null, null);
+
+ }
+
+ @Override
+ public Iterator<Tuple2<Integer,DistinctValue>> call(Integer partitionID,
+ Iterator<Tuple2<LongWritable, Text>> csvLines) throws Exception {
+
+ // Construct transformation metadata by looping through csvLines
+ // Note: logic is similar to GTFMTDMapper
+
+ boolean first = true;
+ Tuple2<LongWritable, Text> rec = null;
+ long _offsetInPartFile = -1;
+
+ while(csvLines.hasNext()) {
+ rec = csvLines.next();
+
+ if (first) {
+ first = false;
+ _offsetInPartFile = rec._1().get();
+
+ if (partitionID == 0 && _agents.hasHeader() && _offsetInPartFile == 0 )
+ continue; // skip the header line
+ }
+
+ _agents.prepareTfMtd(rec._2().toString());
+ }
+
+ // Prepare the output in the form of DistinctValues, which subsequently need to be grouped and aggregated.
+
+ ArrayList<Tuple2<Integer,DistinctValue>> outList = new ArrayList<Tuple2<Integer,DistinctValue>>();
+
+ _agents.getMVImputeAgent().mapOutputTransformationMetadata(partitionID, outList, _agents);
+ _agents.getRecodeAgent().mapOutputTransformationMetadata(partitionID, outList, _agents);
+ _agents.getBinAgent().mapOutputTransformationMetadata(partitionID, outList, _agents);
+
+ DistinctValue dv = new DistinctValue(new OffsetCount("Partition"+partitionID, _offsetInPartFile, _agents.getTotal()));
+ Tuple2<Integer, DistinctValue> tuple = new Tuple2<Integer, DistinctValue>((int) (_agents.getNumCols()+1), dv);
+ outList.add(tuple);
+
+ return outList.iterator();
+ }
+
+ }
+
+ // ------------------------------------------------------------------------------------------------
+
+ public static class GenTfMtdReduce implements FlatMapFunction<Tuple2<Integer, Iterable<DistinctValue>>, Long> {
+
+ private static final long serialVersionUID = -2733233671193035242L;
+ TfUtils _agents = null;
+
+ GenTfMtdReduce(boolean hasHeader, String delim, String naStrings, String headerLine, String tfMtdDir, String offsetFile, String specFile, long numCols) throws IOException, JSONException {
+ String[] nas = TfUtils.parseNAStrings(naStrings);
+ FileSystem fs = FileSystem.get(new JobConf());
+
+ JSONObject spec = TfUtils.readSpec(fs, specFile);
+ _agents = new TfUtils(headerLine, hasHeader, delim, nas, spec, numCols, tfMtdDir, offsetFile, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Iterable<Long> call(Tuple2<Integer, Iterable<DistinctValue>> t)
+ throws Exception {
+
+ int colID = t._1();
+ Iterator<DistinctValue> iterDV = t._2().iterator();
+
+ JobConf job = new JobConf();
+ FileSystem fs = FileSystem.get(job);
+
+ ArrayList<Long> numRows = new ArrayList<Long>();
+
+ if(colID < 0)
+ {
+ // process mapper output for MV and Bin agents
+ colID = colID*-1;
+ _agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(iterDV, _agents.getTfMtdDir(), colID, fs, _agents);
+ numRows.add(0L);
+ }
+ else if ( colID == _agents.getNumCols() + 1)
+ {
+ // process mapper output for OFFSET_FILE
+ ArrayList<OffsetCount> list = new ArrayList<OffsetCount>();
+ while(iterDV.hasNext())
+ list.add(new OffsetCount(iterDV.next().getOffsetCount()));
+ Collections.sort(list);
+
+ @SuppressWarnings("deprecation")
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, new Path(_agents.getOffsetFile()+"/part-00000"), ByteWritable.class, OffsetCount.class);
+
+ long lineOffset=0;
+ for(OffsetCount oc: list)
+ {
+ long count=oc.count;
+ oc.count=lineOffset;
+ writer.append(new ByteWritable((byte)0), oc);
+ lineOffset+=count;
+ }
+ writer.close();
+ list.clear();
+
+ numRows.add(lineOffset);
+ }
+ else
+ {
+ // process mapper output for Recode agent
+ _agents.getRecodeAgent().mergeAndOutputTransformationMetadata(iterDV, _agents.getTfMtdDir(), colID, fs, _agents);
+ numRows.add(0L);
+ }
+
+ return numRows;
+ }
+
+ }
+
+
+}