You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/12 14:48:47 UTC
[68/73] [abbrv] prefix all projects in addons and quickstarts with
flink-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
new file mode 100644
index 0000000..deae026
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.util.ReflectionUtils;
+
+
+public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private JobConf jobConf;
+ private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;
+ private transient RecordWriter<K,V> recordWriter;
+ private transient FileOutputCommitter fileOutputCommitter;
+ private transient TaskAttemptContext context;
+ private transient JobContext jobContext;
+
+ public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat, JobConf job) {
+ super();
+ this.mapredOutputFormat = mapredOutputFormat;
+ HadoopUtils.mergeHadoopConf(job);
+ this.jobConf = job;
+ }
+
+ public void setJobConf(JobConf job) {
+ this.jobConf = job;
+ }
+
+ public JobConf getJobConf() {
+ return jobConf;
+ }
+
+ public org.apache.hadoop.mapred.OutputFormat<K,V> getHadoopOutputFormat() {
+ return mapredOutputFormat;
+ }
+
+ public void setHadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat) {
+ this.mapredOutputFormat = mapredOutputFormat;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // OutputFormat
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void configure(Configuration parameters) {
+ // nothing to do
+ }
+
+ /**
+ * create the temporary output file for hadoop RecordWriter.
+ * @param taskNumber The number of the parallel instance.
+ * @param numTasks The number of parallel tasks.
+ * @throws IOException
+ */
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ if (Integer.toString(taskNumber + 1).length() > 6) {
+ throw new IOException("Task id too large.");
+ }
+
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0")
+ + Integer.toString(taskNumber + 1)
+ + "_0");
+
+ try {
+ this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ this.jobConf.set("mapred.task.id", taskAttemptID.toString());
+ // for hadoop 2.2
+ this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+
+ this.fileOutputCommitter = new FileOutputCommitter();
+
+ try {
+ this.jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ this.fileOutputCommitter.setupJob(jobContext);
+
+ this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
+ }
+
+ @Override
+ public void writeRecord(Tuple2<K, V> record) throws IOException {
+ this.recordWriter.write(record.f0, record.f1);
+ }
+
+ /**
+ * commit the task by moving the output file out from the temporary directory.
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ this.recordWriter.close(new HadoopDummyReporter());
+
+ if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
+ this.fileOutputCommitter.commitTask(this.context);
+ }
+ this.fileOutputCommitter.commitJob(this.jobContext);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom serialization methods
+ // --------------------------------------------------------------------------------------------
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeUTF(mapredOutputFormat.getClass().getName());
+ jobConf.write(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ String hadoopOutputFormatName = in.readUTF();
+ if(jobConf == null) {
+ jobConf = new JobConf();
+ }
+ jobConf.readFields(in);
+ try {
+ this.mapredOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K,V>) Class.forName(hadoopOutputFormatName, true, Thread.currentThread().getContextClassLoader()).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to instantiate the hadoop output format", e);
+ }
+ ReflectionUtils.setConf(mapredOutputFormat, jobConf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
new file mode 100644
index 0000000..4e8ffa9
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.example;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat;
+import org.apache.flink.util.Collector;
+
+
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * occurrences of each word in the file and writes the result back to disk.
+ *
+ * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to
+ * common Java types for better usage in a Flink job and how to use Hadoop Output Formats.
+ */
+@SuppressWarnings("serial")
+public class WordCount {
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println("Usage: WordCount <input path> <result path>");
+ return;
+ }
+
+ final String inputPath = args[0];
+ final String outputPath = args[1];
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(1);
+
+ // Set up the Hadoop Input Format
+ HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
+ TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));
+
+ // Create a Flink job with it
+ DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
+
+ // Tokenize the line and convert from Writable "Text" to String for better handling
+ DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
+
+ // Sum up the words
+ DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
+
+ // Convert String back to Writable "Text" for use with Hadoop Output Format
+ DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
+
+ // Set up Hadoop Output Format
+ HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), new JobConf());
+ hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
+ TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
+
+ // Output & Execute
+ hadoopResult.output(hadoopOutputFormat);
+ env.execute("Word Count");
+ }
+
+ /**
+ * Splits a line into words and converts Hadoop Writables into normal Java data types.
+ */
+ public static final class Tokenizer extends FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
+
+ @Override
+ public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) {
+ // normalize and split the line
+ String line = value.f1.toString();
+ String[] tokens = line.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+
+ /**
+ * Converts Java data types to Hadoop Writables.
+ */
+ public static final class HadoopDatatypeMapper extends MapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
+
+ @Override
+ public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception {
+ return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1));
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
new file mode 100644
index 0000000..415f897
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record;
+
+import java.util.List;
+
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.java.record.operators.GenericDataSink;
+import org.apache.flink.compiler.contextcheck.Validatable;
+import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultFlinkTypeConverter;
+import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.FlinkTypeConverter;
+import org.apache.flink.types.Record;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * The HadoopDataSink is a generic wrapper for all Hadoop OutputFormats.
+ *
+ * Example usage:
+ * <pre>
+ * HadoopDataSink out = new HadoopDataSink(new org.apache.hadoop.mapred.TextOutputFormat<Text, IntWritable>(), new JobConf(), "Hadoop TextOutputFormat",reducer, Text.class,IntWritable.class);
+ * org.apache.hadoop.mapred.TextOutputFormat.setOutputPath(out.getJobConf(), new Path(output));
+ * </pre>
+ *
+ * Note that it is possible to provide custom data type converter.
+ *
+ * The HadoopDataSink provides a default converter: {@link org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultFlinkTypeConverter}
+ **/
+public class HadoopDataSink<K,V> extends GenericDataSink implements Validatable {
+
+ private static String DEFAULT_NAME = "<Unnamed Hadoop Data Sink>";
+
+ private JobConf jobConf;
+
+ public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, Operator<Record> input, FlinkTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) {
+ this(hadoopFormat, jobConf, name, ImmutableList.<Operator<Record>>of(input), conv, keyClass, valueClass);
+ }
+
+ public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) {
+ this(hadoopFormat, jobConf, name, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
+ }
+
+ public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) {
+ this(hadoopFormat, jobConf, DEFAULT_NAME, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
+ }
+
+ public HadoopDataSink(OutputFormat<K,V> hadoopFormat, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) {
+ this(hadoopFormat, new JobConf(), DEFAULT_NAME, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
+ }
+
+
+
+ @SuppressWarnings("deprecation")
+ public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, List<Operator<Record>> input, FlinkTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) {
+ super(new HadoopRecordOutputFormat<K,V>(hadoopFormat, jobConf, conv),input, name);
+ Preconditions.checkNotNull(hadoopFormat);
+ Preconditions.checkNotNull(jobConf);
+ this.name = name;
+ this.jobConf = jobConf;
+ jobConf.setOutputKeyClass(keyClass);
+ jobConf.setOutputValueClass(valueClass);
+ }
+
+ public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) {
+ this(hadoopFormat, jobConf, name, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
+ }
+
+ public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) {
+ this(hadoopFormat, jobConf, DEFAULT_NAME, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
+ }
+
+ public HadoopDataSink(OutputFormat<K,V> hadoopFormat, List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) {
+ this(hadoopFormat, new JobConf(), DEFAULT_NAME, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
+ }
+
+ public JobConf getJobConf() {
+ return this.jobConf;
+ }
+
+ @Override
+ public void check() {
+ // see for more details https://github.com/stratosphere/stratosphere/pull/531
+ Preconditions.checkNotNull(FileOutputFormat.getOutputPath(jobConf), "The HadoopDataSink currently expects a correct outputPath.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
new file mode 100644
index 0000000..d55fe87
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record;
+
+
+import org.apache.flink.api.java.record.operators.GenericDataSource;
+import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultHadoopTypeConverter;
+import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopTypeConverter;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+
+import com.google.common.base.Preconditions;
+
+
+
+/**
+ * The HadoopDataSource is a generic wrapper for all Hadoop InputFormats.
+ *
+ * Example usage:
+ * <pre>
+ * HadoopDataSource source = new HadoopDataSource(new org.apache.hadoop.mapred.TextInputFormat(), new JobConf(), "Input Lines");
+ * org.apache.hadoop.mapred.TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
+ * </pre>
+ *
+ * Note that it is possible to provide custom data type converter.
+ *
+ * The HadoopDataSource provides two different standard converters:
+ * * WritableWrapperConverter: Converts Hadoop Types to a record that contains a WritableComparableWrapper (key) and a WritableWrapper
+ * * DefaultHadoopTypeConverter: Converts the standard hadoop types (longWritable, Text) to Flinks's {@link org.apache.flink.types.Value} types.
+ *
+ */
+public class HadoopDataSource<K,V> extends GenericDataSource<HadoopRecordInputFormat<K,V>> {
+
+ private static String DEFAULT_NAME = "<Unnamed Hadoop Data Source>";
+
+ private JobConf jobConf;
+
+ /**
+ *
+ * @param hadoopFormat Implementation of a Hadoop input format
+ * @param jobConf JobConf object (Hadoop)
+ * @param name Name of the DataSource
+ * @param conv Definition of a custom type converter {@link DefaultHadoopTypeConverter}.
+ */
+ public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf, String name, HadoopTypeConverter<K,V> conv) {
+ super(new HadoopRecordInputFormat<K,V>(hadoopFormat, jobConf, conv),name);
+ Preconditions.checkNotNull(hadoopFormat);
+ Preconditions.checkNotNull(jobConf);
+ Preconditions.checkNotNull(conv);
+ this.name = name;
+ this.jobConf = jobConf;
+ }
+
+ public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf, String name) {
+ this(hadoopFormat, jobConf, name, new DefaultHadoopTypeConverter<K,V>() );
+ }
+ public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf) {
+ this(hadoopFormat, jobConf, DEFAULT_NAME);
+ }
+
+ public HadoopDataSource(InputFormat<K,V> hadoopFormat) {
+ this(hadoopFormat, new JobConf(), DEFAULT_NAME);
+ }
+
+ public JobConf getJobConf() {
+ return this.jobConf;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
new file mode 100644
index 0000000..dcf1952
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopTypeConverter;
+import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
+import org.apache.flink.types.Record;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HadoopRecordInputFormat<K, V> implements InputFormat<Record, HadoopInputSplit> {
+
+ private static final long serialVersionUID = 1L;
+
+ public org.apache.hadoop.mapred.InputFormat<K, V> hadoopInputFormat;
+ public HadoopTypeConverter<K,V> converter;
+ private String hadoopInputFormatName;
+ public JobConf jobConf;
+ public transient K key;
+ public transient V value;
+ public RecordReader<K, V> recordReader;
+ private boolean fetched = false;
+ private boolean hasNext;
+
+ public HadoopRecordInputFormat() {
+ super();
+ }
+
+ public HadoopRecordInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> hadoopInputFormat, JobConf job, HadoopTypeConverter<K,V> conv) {
+ super();
+ this.hadoopInputFormat = hadoopInputFormat;
+ this.hadoopInputFormatName = hadoopInputFormat.getClass().getName();
+ this.converter = conv;
+ HadoopUtils.mergeHadoopConf(job);
+ this.jobConf = job;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
+ return null;
+ }
+
+ @Override
+ public HadoopInputSplit[] createInputSplits(int minNumSplits)
+ throws IOException {
+ org.apache.hadoop.mapred.InputSplit[] splitArray = hadoopInputFormat.getSplits(jobConf, minNumSplits);
+ HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length];
+ for(int i=0;i<splitArray.length;i++){
+ hiSplit[i] = new HadoopInputSplit(splitArray[i], jobConf);
+ }
+ return hiSplit;
+ }
+
+ @Override
+ public Class<? extends HadoopInputSplit> getInputSplitType() {
+ return HadoopInputSplit.class;
+ }
+
+ @Override
+ public void open(HadoopInputSplit split) throws IOException {
+ this.recordReader = this.hadoopInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
+ key = this.recordReader.createKey();
+ value = this.recordReader.createValue();
+ this.fetched = false;
+ }
+
+ private void fetchNext() throws IOException {
+ hasNext = this.recordReader.next(key, value);
+ fetched = true;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ if(!fetched) {
+ fetchNext();
+ }
+ return !hasNext;
+ }
+
+ @Override
+ public Record nextRecord(Record record) throws IOException {
+ if(!fetched) {
+ fetchNext();
+ }
+ if(!hasNext) {
+ return null;
+ }
+ converter.convert(record, key, value);
+ fetched = false;
+ return record;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.recordReader.close();
+ }
+
+ /**
+ * Custom serialization methods.
+ * @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
+ */
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeUTF(hadoopInputFormatName);
+ jobConf.write(out);
+ out.writeObject(converter);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ hadoopInputFormatName = in.readUTF();
+ if(jobConf == null) {
+ jobConf = new JobConf();
+ }
+ jobConf.readFields(in);
+ try {
+ this.hadoopInputFormat = (org.apache.hadoop.mapred.InputFormat<K,V>) Class.forName(this.hadoopInputFormatName).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to instantiate the hadoop input format", e);
+ }
+ ReflectionUtils.setConf(hadoopInputFormat, jobConf);
+ converter = (HadoopTypeConverter<K,V>) in.readObject();
+ }
+
+ public void setJobConf(JobConf job) {
+ this.jobConf = job;
+ }
+
+
+ public org.apache.hadoop.mapred.InputFormat<K,V> getHadoopInputFormat() {
+ return hadoopInputFormat;
+ }
+
+ public void setHadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> hadoopInputFormat) {
+ this.hadoopInputFormat = hadoopInputFormat;
+ }
+
+ public JobConf getJobConf() {
+ return jobConf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
new file mode 100644
index 0000000..337b543
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopFileOutputCommitter;
+import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.FlinkTypeConverter;
+import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.types.Record;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.util.ReflectionUtils;
+
+
+public class HadoopRecordOutputFormat<K,V> implements OutputFormat<Record> {
+
+ private static final long serialVersionUID = 1L;
+
+ public JobConf jobConf;
+
+ public org.apache.hadoop.mapred.OutputFormat<K,V> hadoopOutputFormat;
+
+ private String hadoopOutputFormatName;
+
+ public RecordWriter<K,V> recordWriter;
+
+ public FlinkTypeConverter<K,V> converter;
+
+ public HadoopFileOutputCommitter fileOutputCommitterWrapper;
+
+ public HadoopRecordOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> hadoopFormat, JobConf job, FlinkTypeConverter<K,V> conv) {
+ super();
+ this.hadoopOutputFormat = hadoopFormat;
+ this.hadoopOutputFormatName = hadoopFormat.getClass().getName();
+ this.converter = conv;
+ this.fileOutputCommitterWrapper = new HadoopFileOutputCommitter();
+ HadoopUtils.mergeHadoopConf(job);
+ this.jobConf = job;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+ }
+
+ /**
+ * create the temporary output file for hadoop RecordWriter.
+ * @param taskNumber The number of the parallel instance.
+ * @param numTasks The number of parallel tasks.
+ * @throws IOException
+ */
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ this.fileOutputCommitterWrapper.setupJob(this.jobConf);
+ if (Integer.toString(taskNumber + 1).length() <= 6) {
+ this.jobConf.set("mapred.task.id", "attempt__0000_r_" + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") + Integer.toString(taskNumber + 1) + "_0");
+ //compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
+ this.jobConf.set("mapreduce.task.output.dir", this.fileOutputCommitterWrapper.getTempTaskOutputPath(this.jobConf,TaskAttemptID.forName(this.jobConf.get("mapred.task.id"))).toString());
+ } else {
+ throw new IOException("task id too large");
+ }
+ this.recordWriter = this.hadoopOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
+ }
+
+
+ @Override
+ public void writeRecord(Record record) throws IOException {
+ K key = this.converter.convertKey(record);
+ V value = this.converter.convertValue(record);
+ this.recordWriter.write(key, value);
+ }
+
+ /**
+ * commit the task by moving the output file out from the temporary directory.
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ this.recordWriter.close(new HadoopDummyReporter());
+ if (this.fileOutputCommitterWrapper.needsTaskCommit(this.jobConf, TaskAttemptID.forName(this.jobConf.get("mapred.task.id")))) {
+ this.fileOutputCommitterWrapper.commitTask(this.jobConf, TaskAttemptID.forName(this.jobConf.get("mapred.task.id")));
+ }
+ //TODO: commitjob when all the tasks are finished
+ }
+
+
+ /**
+ * Custom serialization methods.
+ * @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
+ */
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeUTF(hadoopOutputFormatName);
+ jobConf.write(out);
+ out.writeObject(converter);
+ out.writeObject(fileOutputCommitterWrapper);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ hadoopOutputFormatName = in.readUTF();
+ if(jobConf == null) {
+ jobConf = new JobConf();
+ }
+ jobConf.readFields(in);
+ try {
+ this.hadoopOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K,V>) Class.forName(this.hadoopOutputFormatName).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to instantiate the hadoop output format", e);
+ }
+ ReflectionUtils.setConf(hadoopOutputFormat, jobConf);
+ converter = (FlinkTypeConverter<K,V>) in.readObject();
+ fileOutputCommitterWrapper = (HadoopFileOutputCommitter) in.readObject();
+ }
+
+
+ public void setJobConf(JobConf job) {
+ this.jobConf = job;
+ }
+
+ public JobConf getJobConf() {
+ return jobConf;
+ }
+
+ public org.apache.hadoop.mapred.OutputFormat<K,V> getHadoopOutputFormat() {
+ return hadoopOutputFormat;
+ }
+
+ public void setHadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> hadoopOutputFormat) {
+ this.hadoopOutputFormat = hadoopOutputFormat;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java
new file mode 100644
index 0000000..4e63717
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
+
+import org.apache.flink.types.BooleanValue;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Convert Flink Record into the default hadoop writables.
+ */
+public class DefaultFlinkTypeConverter<K,V> implements FlinkTypeConverter<K,V> {
+ private static final long serialVersionUID = 1L;
+
+ private Class<K> keyClass;
+ private Class<V> valueClass;
+
+ public DefaultFlinkTypeConverter(Class<K> keyClass, Class<V> valueClass) {
+ this.keyClass= keyClass;
+ this.valueClass = valueClass;
+ }
+ @Override
+ public K convertKey(Record flinkRecord) {
+ if(flinkRecord.getNumFields() > 0) {
+ return convert(flinkRecord, 0, this.keyClass);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public V convertValue(Record flinkRecord) {
+ if(flinkRecord.getNumFields() > 1) {
+ return convert(flinkRecord, 1, this.valueClass);
+ } else {
+ return null;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private<T> T convert(Record flinkType, int pos, Class<T> hadoopType) {
+ if(hadoopType == LongWritable.class ) {
+ return (T) new LongWritable((flinkType.getField(pos, LongValue.class)).getValue());
+ }
+ if(hadoopType == org.apache.hadoop.io.Text.class) {
+ return (T) new Text((flinkType.getField(pos, StringValue.class)).getValue());
+ }
+ if(hadoopType == org.apache.hadoop.io.IntWritable.class) {
+ return (T) new IntWritable((flinkType.getField(pos, IntValue.class)).getValue());
+ }
+ if(hadoopType == org.apache.hadoop.io.FloatWritable.class) {
+ return (T) new FloatWritable((flinkType.getField(pos, FloatValue.class)).getValue());
+ }
+ if(hadoopType == org.apache.hadoop.io.DoubleWritable.class) {
+ return (T) new DoubleWritable((flinkType.getField(pos, DoubleValue.class)).getValue());
+ }
+ if(hadoopType == org.apache.hadoop.io.BooleanWritable.class) {
+ return (T) new BooleanWritable((flinkType.getField(pos, BooleanValue.class)).getValue());
+ }
+ if(hadoopType == org.apache.hadoop.io.ByteWritable.class) {
+ return (T) new ByteWritable((flinkType.getField(pos, ByteValue.class)).getValue());
+ }
+
+ throw new RuntimeException("Unable to convert Flink type ("+flinkType.getClass().getCanonicalName()+") to Hadoop.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java
new file mode 100644
index 0000000..c053e36
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
+
+import org.apache.flink.types.BooleanValue;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.Value;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * Converter for the default hadoop writables.
+ * Key will be in field 0, Value in field 1 of a Record.
+ */
+public class DefaultHadoopTypeConverter<K, V> implements HadoopTypeConverter<K, V> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void convert(Record flinkRecord, K hadoopKey, V hadoopValue) {
+ flinkRecord.setField(0, convert(hadoopKey));
+ flinkRecord.setField(1, convert(hadoopValue));
+ }
+
+ protected Value convert(Object hadoopType) {
+ if(hadoopType instanceof org.apache.hadoop.io.LongWritable ) {
+ return new LongValue(((LongWritable)hadoopType).get());
+ }
+ if(hadoopType instanceof org.apache.hadoop.io.Text) {
+ return new StringValue(((Text)hadoopType).toString());
+ }
+ if(hadoopType instanceof org.apache.hadoop.io.IntWritable) {
+ return new IntValue(((IntWritable)hadoopType).get());
+ }
+ if(hadoopType instanceof org.apache.hadoop.io.FloatWritable) {
+ return new FloatValue(((FloatWritable)hadoopType).get());
+ }
+ if(hadoopType instanceof org.apache.hadoop.io.DoubleWritable) {
+ return new DoubleValue(((DoubleWritable)hadoopType).get());
+ }
+ if(hadoopType instanceof org.apache.hadoop.io.BooleanWritable) {
+ return new BooleanValue(((BooleanWritable)hadoopType).get());
+ }
+ if(hadoopType instanceof org.apache.hadoop.io.ByteWritable) {
+ return new ByteValue(((ByteWritable)hadoopType).get());
+ }
+ if (hadoopType instanceof NullWritable) {
+ return NullValue.getInstance();
+ }
+
+ throw new RuntimeException("Unable to convert Hadoop type ("+hadoopType.getClass().getCanonicalName()+") to a Flink data type.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java
new file mode 100644
index 0000000..9e33606
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
+
+import java.io.Serializable;
+
+import org.apache.flink.types.Record;
+
+/**
+ * An interface describing a class that is able to
+ * convert Flink's Record into Hadoop types model.
+ *
+ * The converter must be Serializable.
+ *
+ * Flink provides a DefaultFlinkTypeConverter. Custom implementations should
+ * chain the type converters.
+ */
+public interface FlinkTypeConverter<K,V> extends Serializable {
+
+ /**
+ * Convert a Flink type to a Hadoop type.
+ */
+ public K convertKey(Record record);
+
+ public V convertValue(Record record);
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
new file mode 100644
index 0000000..1a35dc0
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Hadoop 1.2.1 {@link org.apache.hadoop.mapred.FileOutputCommitter} takes {@link org.apache.hadoop.mapred.JobContext}
+ * as input parameter. However JobContext class is package private, and in Hadoop 2.2.0 it's public.
+ * This class takes {@link org.apache.hadoop.mapred.JobConf} as input instead of JobContext in order to setup and commit tasks.
+ */
+public class HadoopFileOutputCommitter extends FileOutputCommitter implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+ "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+
+ public void setupJob(JobConf conf) throws IOException {
+ Path outputPath = FileOutputFormat.getOutputPath(conf);
+ if (outputPath != null) {
+ Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+ FileSystem fileSys = tmpDir.getFileSystem(conf);
+ if (!fileSys.mkdirs(tmpDir)) {
+ LOG.error("Mkdirs failed to create " + tmpDir.toString());
+ }
+ }
+ }
+
+ private static boolean getOutputDirMarking(JobConf conf) {
+ return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
+ }
+
+ private void markSuccessfulOutputDir(JobConf conf)
+ throws IOException {
+ Path outputPath = FileOutputFormat.getOutputPath(conf);
+ if (outputPath != null) {
+ FileSystem fileSys = outputPath.getFileSystem(conf);
+ // create a file in the folder to mark it
+ if (fileSys.exists(outputPath)) {
+ Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+ fileSys.create(filePath).close();
+ }
+ }
+ }
+
+ private Path getFinalPath(Path jobOutputDir, Path taskOutput,
+ Path taskOutputPath) throws IOException {
+ URI taskOutputUri = taskOutput.toUri();
+ URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
+ if (taskOutputUri == relativePath) {//taskOutputPath is not a parent of taskOutput
+ throw new IOException("Can not get the relative path: base = " +
+ taskOutputPath + " child = " + taskOutput);
+ }
+ if (relativePath.getPath().length() > 0) {
+ return new Path(jobOutputDir, relativePath.getPath());
+ } else {
+ return jobOutputDir;
+ }
+ }
+ private void moveTaskOutputs(JobConf conf, TaskAttemptID taskAttemptID,
+ FileSystem fs,
+ Path jobOutputDir,
+ Path taskOutput)
+ throws IOException {
+ if (fs.isFile(taskOutput)) {
+ Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
+ getTempTaskOutputPath(conf, taskAttemptID));
+ if (!fs.rename(taskOutput, finalOutputPath)) {
+ if (!fs.delete(finalOutputPath, true)) {
+ throw new IOException("Failed to delete earlier output of task: " +
+ taskAttemptID);
+ }
+ if (!fs.rename(taskOutput, finalOutputPath)) {
+ throw new IOException("Failed to save output of task: " +
+ taskAttemptID);
+ }
+ }
+ LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
+ } else if(fs.getFileStatus(taskOutput).isDir()) {
+ FileStatus[] paths = fs.listStatus(taskOutput);
+ Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
+ getTempTaskOutputPath(conf, taskAttemptID));
+ fs.mkdirs(finalOutputPath);
+ if (paths != null) {
+ for (FileStatus path : paths) {
+ moveTaskOutputs(conf,taskAttemptID, fs, jobOutputDir, path.getPath());
+ }
+ }
+ }
+ }
+
+ public void commitTask(JobConf conf, TaskAttemptID taskAttemptID)
+ throws IOException {
+ Path taskOutputPath = getTempTaskOutputPath(conf, taskAttemptID);
+ if (taskOutputPath != null) {
+ FileSystem fs = taskOutputPath.getFileSystem(conf);
+ if (fs.exists(taskOutputPath)) {
+ Path jobOutputPath = taskOutputPath.getParent().getParent();
+ // Move the task outputs to their final place
+ moveTaskOutputs(conf,taskAttemptID, fs, jobOutputPath, taskOutputPath);
+ // Delete the temporary task-specific output directory
+ if (!fs.delete(taskOutputPath, true)) {
+ LOG.info("Failed to delete the temporary output" +
+ " directory of task: " + taskAttemptID + " - " + taskOutputPath);
+ }
+ LOG.info("Saved output of task '" + taskAttemptID + "' to " +
+ jobOutputPath);
+ }
+ }
+ }
+ public boolean needsTaskCommit(JobConf conf, TaskAttemptID taskAttemptID)
+ throws IOException {
+ try {
+ Path taskOutputPath = getTempTaskOutputPath(conf, taskAttemptID);
+ if (taskOutputPath != null) {
+ // Get the file-system for the task output directory
+ FileSystem fs = taskOutputPath.getFileSystem(conf);
+ // since task output path is created on demand,
+ // if it exists, task needs a commit
+ if (fs.exists(taskOutputPath)) {
+ return true;
+ }
+ }
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ return false;
+ }
+
+ public Path getTempTaskOutputPath(JobConf conf, TaskAttemptID taskAttemptID) {
+ Path outputPath = FileOutputFormat.getOutputPath(conf);
+ if (outputPath != null) {
+ Path p = new Path(outputPath,
+ (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
+ "_" + taskAttemptID.toString()));
+ try {
+ FileSystem fs = p.getFileSystem(conf);
+ return p.makeQualified(fs);
+ } catch (IOException ie) {
+ LOG.warn(StringUtils.stringifyException(ie));
+ return p;
+ }
+ }
+ return null;
+ }
+ public void cleanupJob(JobConf conf) throws IOException {
+ // do the clean up of temporary directory
+ Path outputPath = FileOutputFormat.getOutputPath(conf);
+ if (outputPath != null) {
+ Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+ FileSystem fileSys = tmpDir.getFileSystem(conf);
+ if (fileSys.exists(tmpDir)) {
+ fileSys.delete(tmpDir, true);
+ }
+ } else {
+ LOG.warn("Output path is null in cleanup");
+ }
+ }
+
+ public void commitJob(JobConf conf) throws IOException {
+ cleanupJob(conf);
+ if (getOutputDirMarking(conf)) {
+ markSuccessfulOutputDir(conf);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java
new file mode 100644
index 0000000..5860d26
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
+
+import java.io.Serializable;
+
+import org.apache.flink.types.Record;
+
+
+/**
+ * An interface describing a class that is able to
+ * convert Hadoop types into Flink's Record model.
+ *
+ * The converter must be Serializable.
+ *
+ * Flink provides a DefaultHadoopTypeConverter. Custom implementations should
+ * chain the type converters.
+ */
+public interface HadoopTypeConverter<K, V> extends Serializable {
+
+ /**
+ * Convert a Hadoop type to a Flink type.
+ */
+ public void convert(Record record, K hadoopKey, V hadoopValue);
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java
new file mode 100644
index 0000000..0a459b8
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
+
+import org.apache.flink.types.Key;
+import org.apache.hadoop.io.WritableComparable;
+
+public class WritableComparableWrapper<T extends WritableComparable<T>> extends WritableWrapper<T> implements Key<WritableComparableWrapper<T>> {
+ private static final long serialVersionUID = 1L;
+
+ public WritableComparableWrapper() {
+ super();
+ }
+
+ public WritableComparableWrapper(T toWrap) {
+ super(toWrap);
+ }
+
+ @Override
+ public int compareTo(WritableComparableWrapper<T> o) {
+ return super.value().compareTo(o.value());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java
new file mode 100644
index 0000000..629b91e
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.Writable;
+
+public class WritableWrapper<T extends Writable> implements Value {
+ private static final long serialVersionUID = 2L;
+
+ private T wrapped;
+ private String wrappedType;
+ private ClassLoader cl;
+
+ public WritableWrapper() {
+ }
+
+ public WritableWrapper(T toWrap) {
+ wrapped = toWrap;
+ wrappedType = toWrap.getClass().getCanonicalName();
+ }
+
+ public T value() {
+ return wrapped;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeUTF(wrappedType);
+ wrapped.write(out);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ if(cl == null) {
+ cl = Thread.currentThread().getContextClassLoader();
+ }
+ wrappedType = in.readUTF();
+ try {
+ @SuppressWarnings("unchecked")
+ Class<T> wrClass = (Class<T>) Class.forName(wrappedType, true, cl).asSubclass(Writable.class);
+ wrapped = InstantiationUtil.instantiate(wrClass, Writable.class);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Error creating the WritableWrapper", e);
+ }
+ wrapped.readFields(in);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java
new file mode 100644
index 0000000..2a0c4d3
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
+
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+@SuppressWarnings("rawtypes")
+public class WritableWrapperConverter<K extends WritableComparable, V extends Writable> implements HadoopTypeConverter<K,V> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void convert(Record flinkRecord, K hadoopKey, V hadoopValue) {
+ flinkRecord.setField(0, convertKey(hadoopKey));
+ flinkRecord.setField(1, convertValue(hadoopValue));
+ }
+
+ @SuppressWarnings("unchecked")
+ private final Value convertKey(K in) {
+ return new WritableComparableWrapper(in);
+ }
+
+ private final Value convertValue(V in) {
+ return new WritableWrapper<V>(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
new file mode 100644
index 0000000..25caf0c
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.example;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.Program;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.java.record.functions.MapFunction;
+import org.apache.flink.api.java.record.functions.ReduceFunction;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
+import org.apache.flink.api.java.record.io.CsvOutputFormat;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
+import org.apache.flink.client.LocalExecutor;
+import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSource;
+import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.WritableWrapperConverter;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * the occurrences of each word in the file.
+ *
+ * <br /><br />
+ *
+ * <b>Note</b>: This example uses the out dated Record API.
+ * It is recommended to use the new Java API.
+ *
+ * @see org.apache.flink.hadoopcompatibility.mapred.example.WordCount
+ */
+public class WordCount implements Program, ProgramDescription {
+
+ private static final long serialVersionUID = 1L;
+
+
+ /**
+ * Converts a Record containing one string in to multiple string/integer pairs.
+ * The string is tokenized by whitespaces. For each token a new record is emitted,
+ * where the token is the first field and an Integer(1) is the second field.
+ */
+ public static class TokenizeLine extends MapFunction implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void map(Record record, Collector<Record> collector) {
+ // get the first field (as type StringValue) from the record
+ String line = record.getField(1, StringValue.class).getValue();
+ // normalize the line
+ line = line.replaceAll("\\W+", " ").toLowerCase();
+
+ // tokenize the line
+ StringTokenizer tokenizer = new StringTokenizer(line);
+ while (tokenizer.hasMoreTokens()) {
+ String word = tokenizer.nextToken();
+
+ // we emit a (word, 1) pair
+ collector.collect(new Record(new StringValue(word), new IntValue(1)));
+ }
+ }
+ }
+
+ /**
+ * Sums up the counts for a certain given key. The counts are assumed to be at position <code>1</code>
+ * in the record. The other fields are not modified.
+ */
+ @Combinable
+ @ConstantFields(0)
+ public static class CountWords extends ReduceFunction implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
+ Record element = null;
+ int sum = 0;
+ while (records.hasNext()) {
+ element = records.next();
+ int cnt = element.getField(1, IntValue.class).getValue();
+ sum += cnt;
+ }
+
+ element.setField(1, new IntValue(sum));
+ out.collect(element);
+ }
+
+ @Override
+ public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
+ // the logic is the same as in the reduce function, so simply call the reduce method
+ reduce(records, out);
+ }
+ }
+
+
+ @SuppressWarnings({ "rawtypes", "unchecked", "unused" })
+ @Override
+ public Plan getPlan(String... args) {
+ // parse job parameters
+ int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
+ String dataInput = (args.length > 1 ? args[1] : "");
+ String output = (args.length > 2 ? args[2] : "");
+
+
+ HadoopDataSource source = new HadoopDataSource(new TextInputFormat(), new JobConf(), "Input Lines");
+ TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
+
+ // Example with Wrapper Converter
+ HadoopDataSource<LongWritable,Text> sourceHadoopType = new HadoopDataSource<LongWritable, Text>(
+ new TextInputFormat(), new JobConf(), "Input Lines", new WritableWrapperConverter<LongWritable, Text>());
+ TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
+
+ MapOperator mapper = MapOperator.builder(new TokenizeLine())
+ .input(source)
+ .name("Tokenize Lines")
+ .build();
+ ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0)
+ .input(mapper)
+ .name("Count Words")
+ .build();
+ FileDataSink out = new FileDataSink(new CsvOutputFormat(), output, reducer, "Word Counts");
+ CsvOutputFormat.configureRecordFormat(out)
+ .recordDelimiter('\n')
+ .fieldDelimiter(' ')
+ .field(StringValue.class, 0)
+ .field(IntValue.class, 1);
+
+ Plan plan = new Plan(out, "WordCount Example");
+ plan.setDefaultParallelism(numSubTasks);
+ return plan;
+ }
+
+
+ @Override
+ public String getDescription() {
+ return "Parameters: [numSubStasks] [input] [output]";
+ }
+
+
+ public static void main(String[] args) throws Exception {
+ WordCount wc = new WordCount();
+
+ if (args.length < 3) {
+ System.err.println(wc.getDescription());
+ System.exit(1);
+ }
+
+ Plan plan = wc.getPlan(args);
+
+ // This will execute the word-count embedded in a local context. replace this line by the commented
+ // succeeding line to send the job to a local installation or to a cluster for execution
+ LocalExecutor.execute(plan);
+// PlanExecutor ex = new RemoteExecutor("localhost", 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
+// ex.executePlan(plan);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
new file mode 100644
index 0000000..a3cd3d5
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.example;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.Program;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.java.record.functions.MapFunction;
+import org.apache.flink.api.java.record.functions.ReduceFunction;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
+import org.apache.flink.client.LocalExecutor;
+import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSink;
+import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSource;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * the occurrences of each word in the file.
+ *
+ * <br /><br />
+ *
+ * <b>Note</b>: This example uses the out dated Record API.
+ * It is recommended to use the new Java API.
+ *
+ * @see WordCount
+ */
+@SuppressWarnings("serial")
+public class WordCountWithOutputFormat implements Program, ProgramDescription {
+
+ /**
+ * Converts a Record containing one string in to multiple string/integer pairs.
+ * The string is tokenized by whitespaces. For each token a new record is emitted,
+ * where the token is the first field and an Integer(1) is the second field.
+ */
+ public static class TokenizeLine extends MapFunction implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void map(Record record, Collector<Record> collector) {
+ // get the first field (as type StringValue) from the record
+ String line = record.getField(1, StringValue.class).getValue();
+ // normalize the line
+ line = line.replaceAll("\\W+", " ").toLowerCase();
+
+ // tokenize the line
+ StringTokenizer tokenizer = new StringTokenizer(line);
+ while (tokenizer.hasMoreTokens()) {
+ String word = tokenizer.nextToken();
+
+ // we emit a (word, 1) pair
+ collector.collect(new Record(new StringValue(word), new IntValue(1)));
+ }
+ }
+ }
+
+ /**
+ * Sums up the counts for a certain given key. The counts are assumed to be at position <code>1</code>
+ * in the record. The other fields are not modified.
+ */
+ @Combinable
+ @ConstantFields(0)
+ public static class CountWords extends ReduceFunction implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
+ Record element = null;
+ int sum = 0;
+ while (records.hasNext()) {
+ element = records.next();
+ int cnt = element.getField(1, IntValue.class).getValue();
+ sum += cnt;
+ }
+
+ element.setField(1, new IntValue(sum));
+ out.collect(element);
+ }
+
+ @Override
+ public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
+ // the logic is the same as in the reduce function, so simply call the reduce method
+ reduce(records, out);
+ }
+ }
+
+
+ @Override
+ public Plan getPlan(String... args) {
+ // parse job parameters
+ int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
+ String dataInput = (args.length > 1 ? args[1] : "");
+ String output = (args.length > 2 ? args[2] : "");
+
+ HadoopDataSource<LongWritable, Text> source = new HadoopDataSource<LongWritable, Text>(
+ new TextInputFormat(), new JobConf(), "Input Lines");
+ TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
+
+
+ MapOperator mapper = MapOperator.builder(new TokenizeLine())
+ .input(source)
+ .name("Tokenize Lines")
+ .build();
+ ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0)
+ .input(mapper)
+ .name("Count Words")
+ .build();
+ HadoopDataSink<Text, IntWritable> out = new HadoopDataSink<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(),new JobConf(), "Hadoop TextOutputFormat", reducer, Text.class, IntWritable.class);
+ TextOutputFormat.setOutputPath(out.getJobConf(), new Path(output));
+
+ Plan plan = new Plan(out, "Hadoop OutputFormat Example");
+ plan.setDefaultParallelism(numSubTasks);
+ return plan;
+ }
+
+
+ @Override
+ public String getDescription() {
+ return "Parameters: [numSubStasks] [input] [output]";
+ }
+
+
+ public static void main(String[] args) throws Exception {
+ WordCountWithOutputFormat wc = new WordCountWithOutputFormat();
+
+ if (args.length < 3) {
+ System.err.println(wc.getDescription());
+ System.exit(1);
+ }
+
+ Plan plan = wc.getPlan(args);
+
+ // This will execute the word-count embedded in a local context. replace this line by the commented
+ // succeeding line to send the job to a local installation or to a cluster for execution
+ LocalExecutor.execute(plan);
+// PlanExecutor ex = new RemoteExecutor("localhost", 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
+// ex.executePlan(plan);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
new file mode 100644
index 0000000..c679733
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.utils;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+import org.apache.flink.runtime.fs.hdfs.DistributedFileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+
+
+public class HadoopUtils {
+
+ /**
+ * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
+ */
+ public static void mergeHadoopConf(JobConf jobConf) {
+ org.apache.hadoop.conf.Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration();
+ for (Map.Entry<String, String> e : hadoopConf) {
+ jobConf.set(e.getKey(), e.getValue());
+ }
+ }
+
+ public static JobContext instantiateJobContext(JobConf jobConf, JobID jobId) throws Exception {
+ try {
+ // for Hadoop 1.xx
+ Class<?> clazz = null;
+ if(!TaskAttemptContext.class.isInterface()) {
+ clazz = Class.forName("org.apache.hadoop.mapred.JobContext", true, Thread.currentThread().getContextClassLoader());
+ }
+ // for Hadoop 2.xx
+ else {
+ clazz = Class.forName("org.apache.hadoop.mapred.JobContextImpl", true, Thread.currentThread().getContextClassLoader());
+ }
+ Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, org.apache.hadoop.mapreduce.JobID.class);
+ // for Hadoop 1.xx
+ constructor.setAccessible(true);
+ JobContext context = (JobContext) constructor.newInstance(jobConf, jobId);
+
+ return context;
+ } catch(Exception e) {
+ throw new Exception("Could not create instance of JobContext.", e);
+ }
+ }
+
+ public static TaskAttemptContext instantiateTaskAttemptContext(JobConf jobConf, TaskAttemptID taskAttemptID) throws Exception {
+ try {
+ // for Hadoop 1.xx
+ Class<?> clazz = null;
+ if(!TaskAttemptContext.class.isInterface()) {
+ clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContext", true, Thread.currentThread().getContextClassLoader());
+ }
+ // for Hadoop 2.xx
+ else {
+ clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContextImpl", true, Thread.currentThread().getContextClassLoader());
+ }
+ Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, TaskAttemptID.class);
+ // for Hadoop 1.xx
+ constructor.setAccessible(true);
+ TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(jobConf, taskAttemptID);
+ return context;
+ } catch(Exception e) {
+ throw new Exception("Could not create instance of TaskAttemptContext.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
new file mode 100644
index 0000000..058a60f
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This is a dummy progress
+ *
+ */
+public class HadoopDummyProgressable implements Progressable {
+ @Override
+ public void progress() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
new file mode 100644
index 0000000..87a6014
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This is a dummy progress monitor / reporter
+ *
+ */
+public class HadoopDummyReporter implements Reporter {
+
+ @Override
+ public void progress() {
+ }
+
+ @Override
+ public void setStatus(String status) {
+
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ return null;
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ return null;
+ }
+
+ @Override
+ public void incrCounter(Enum<?> key, long amount) {
+
+ }
+
+ @Override
+ public void incrCounter(String group, String counter, long amount) {
+
+ }
+
+ @Override
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ return null;
+ }
+
+ @Override
+ public float getProgress() {
+ return 0;
+ }
+
+}