You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 06:17:45 UTC
[15/21] flink git commit: [FLINK-6711] Activate strict checkstyle for
flink-hadoop-compatibility
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
index 13d971c..b1135f0 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
@@ -18,9 +18,6 @@
package org.apache.flink.test.hadoopcompatibility.mapred;
-import java.io.IOException;
-import java.util.Iterator;
-
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -28,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
import org.apache.flink.test.util.MultipleProgramsTestBase;
+
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
@@ -41,6 +39,12 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * IT case for the {@link HadoopReduceCombineFunction}.
+ */
@RunWith(Parameterized.class)
public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase {
@@ -68,7 +72,7 @@ public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase
counts.writeAsText(resultPath);
env.execute();
- String expected = "(0,5)\n"+
+ String expected = "(0,5)\n" +
"(1,6)\n" +
"(2,6)\n" +
"(3,4)\n";
@@ -115,7 +119,7 @@ public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase
counts.writeAsText(resultPath);
env.execute();
- String expected = "(0,5)\n"+
+ String expected = "(0,5)\n" +
"(1,6)\n" +
"(2,5)\n" +
"(3,5)\n";
@@ -144,7 +148,7 @@ public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase
env.execute();
// return expected result
- String expected = "(0,0)\n"+
+ String expected = "(0,0)\n" +
"(1,0)\n" +
"(2,1)\n" +
"(3,1)\n" +
@@ -152,62 +156,71 @@ public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase
compareResultsByLinesInMemory(expected, resultPath);
}
-
+
+ /**
+ * A {@link Reducer} to sum counts.
+ */
public static class SumReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
@Override
public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
-
+
int sum = 0;
- while(v.hasNext()) {
+ while (v.hasNext()) {
sum += v.next().get();
}
out.collect(k, new IntWritable(sum));
}
-
+
@Override
public void configure(JobConf arg0) { }
@Override
public void close() throws IOException { }
}
-
+
+ /**
+ * A {@link Reducer} to sum counts that modifies the key.
+ */
public static class KeyChangingReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
@Override
public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
- while(v.hasNext()) {
+ while (v.hasNext()) {
out.collect(new IntWritable(k.get() % 4), v.next());
}
}
-
+
@Override
public void configure(JobConf arg0) { }
@Override
public void close() throws IOException { }
}
-
+
+ /**
+ * A {@link Reducer} to sum counts for a specific prefix.
+ */
public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
private String countPrefix;
-
+
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int commentCnt = 0;
- while(vs.hasNext()) {
+ while (vs.hasNext()) {
String v = vs.next().toString();
- if(v.startsWith(this.countPrefix)) {
+ if (v.startsWith(this.countPrefix)) {
commentCnt++;
}
}
out.collect(k, new IntWritable(commentCnt));
}
-
+
@Override
- public void configure(final JobConf c) {
+ public void configure(final JobConf c) {
this.countPrefix = c.get("my.cntPrefix");
}
@@ -215,10 +228,13 @@ public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase
public void close() throws IOException { }
}
+ /**
+ * Test mapper.
+ */
public static class Mapper1 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable,
IntWritable>> {
private static final long serialVersionUID = 1L;
- Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
+ Tuple2<IntWritable, IntWritable> outT = new Tuple2<IntWritable, IntWritable>();
@Override
public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
throws Exception {
@@ -228,10 +244,13 @@ public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase
}
}
+ /**
+ * Test mapper.
+ */
public static class Mapper2 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable,
IntWritable>> {
private static final long serialVersionUID = 1L;
- Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
+ Tuple2<IntWritable, IntWritable> outT = new Tuple2<IntWritable, IntWritable>();
@Override
public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
throws Exception {
@@ -241,9 +260,12 @@ public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase
}
}
+ /**
+ * Test mapper.
+ */
public static class Mapper3 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>> {
private static final long serialVersionUID = 1L;
- Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
+ Tuple2<IntWritable, IntWritable> outT = new Tuple2<IntWritable, IntWritable>();
@Override
public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
throws Exception {
@@ -253,6 +275,9 @@ public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase
}
}
+ /**
+ * Test mapper.
+ */
public static class Mapper4 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
index abc0e9c..3a22af0 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
@@ -18,15 +18,13 @@
package org.apache.flink.test.hadoopcompatibility.mapred;
-import java.io.IOException;
-import java.util.Iterator;
-
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
import org.apache.flink.test.util.MultipleProgramsTestBase;
+
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
@@ -39,6 +37,12 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * IT cases for the {@link HadoopReduceFunction}.
+ */
@RunWith(Parameterized.class)
public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
@@ -65,7 +69,7 @@ public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
commentCnts.writeAsText(resultPath);
env.execute();
- String expected = "(0,0)\n"+
+ String expected = "(0,0)\n" +
"(1,3)\n" +
"(2,5)\n" +
"(3,5)\n" +
@@ -113,7 +117,7 @@ public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
helloCnts.writeAsText(resultPath);
env.execute();
- String expected = "(0,0)\n"+
+ String expected = "(0,0)\n" +
"(1,0)\n" +
"(2,1)\n" +
"(3,1)\n" +
@@ -121,69 +125,78 @@ public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
compareResultsByLinesInMemory(expected, resultPath);
}
-
+
+ /**
+ * A {@link Reducer} to sum counts.
+ */
public static class CommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
-
+
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int commentCnt = 0;
- while(vs.hasNext()) {
+ while (vs.hasNext()) {
String v = vs.next().toString();
- if(v.startsWith("Comment")) {
+ if (v.startsWith("Comment")) {
commentCnt++;
}
}
out.collect(k, new IntWritable(commentCnt));
}
-
+
@Override
public void configure(final JobConf arg0) { }
@Override
public void close() throws IOException { }
}
-
+
+ /**
+ * A {@link Reducer} to sum counts.
+ */
public static class AllCommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
-
+
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int commentCnt = 0;
- while(vs.hasNext()) {
+ while (vs.hasNext()) {
String v = vs.next().toString();
- if(v.startsWith("Comment")) {
+ if (v.startsWith("Comment")) {
commentCnt++;
}
}
out.collect(new IntWritable(42), new IntWritable(commentCnt));
}
-
+
@Override
public void configure(final JobConf arg0) { }
@Override
public void close() throws IOException { }
}
-
+
+ /**
+ * A {@link Reducer} to sum counts for a specific prefix.
+ */
public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
private String countPrefix;
-
+
@Override
public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
throws IOException {
int commentCnt = 0;
- while(vs.hasNext()) {
+ while (vs.hasNext()) {
String v = vs.next().toString();
- if(v.startsWith(this.countPrefix)) {
+ if (v.startsWith(this.countPrefix)) {
commentCnt++;
}
}
out.collect(k, new IntWritable(commentCnt));
}
-
+
@Override
- public void configure(final JobConf c) {
+ public void configure(final JobConf c) {
this.countPrefix = c.get("my.cntPrefix");
}
@@ -191,6 +204,9 @@ public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
public void close() throws IOException { }
}
+ /**
+ * Test mapper.
+ */
public static class Mapper1 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> {
private static final long serialVersionUID = 1L;
@Override
@@ -201,6 +217,9 @@ public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
}
}
+ /**
+ * Test mapper.
+ */
public static class Mapper2 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> {
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
index eed6f8f..b1992ff 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
@@ -18,45 +18,49 @@
package org.apache.flink.test.hadoopcompatibility.mapred;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
+
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test data.
+ */
public class HadoopTestData {
public static DataSet<Tuple2<IntWritable, Text>> getKVPairDataSet(ExecutionEnvironment env) {
-
+
List<Tuple2<IntWritable, Text>> data = new ArrayList<Tuple2<IntWritable, Text>>();
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(1),new Text("Hi")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(2),new Text("Hello")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(3),new Text("Hello world")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(4),new Text("Hello world, how are you?")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(5),new Text("I am fine.")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(6),new Text("Luke Skywalker")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(7),new Text("Comment#1")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(8),new Text("Comment#2")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(9),new Text("Comment#3")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(10),new Text("Comment#4")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(11),new Text("Comment#5")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(12),new Text("Comment#6")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(13),new Text("Comment#7")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(14),new Text("Comment#8")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(15),new Text("Comment#9")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(16),new Text("Comment#10")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(17),new Text("Comment#11")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(18),new Text("Comment#12")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(19),new Text("Comment#13")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(20),new Text("Comment#14")));
- data.add(new Tuple2<IntWritable, Text>(new IntWritable(21),new Text("Comment#15")));
-
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(1), new Text("Hi")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(2), new Text("Hello")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(3), new Text("Hello world")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(4), new Text("Hello world, how are you?")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(5), new Text("I am fine.")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(6), new Text("Luke Skywalker")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(7), new Text("Comment#1")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(8), new Text("Comment#2")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(9), new Text("Comment#3")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(10), new Text("Comment#4")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(11), new Text("Comment#5")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(12), new Text("Comment#6")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(13), new Text("Comment#7")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(14), new Text("Comment#8")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(15), new Text("Comment#9")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(16), new Text("Comment#10")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(17), new Text("Comment#11")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(18), new Text("Comment#12")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(19), new Text("Comment#13")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(20), new Text("Comment#14")));
+ data.add(new Tuple2<IntWritable, Text>(new IntWritable(21), new Text("Comment#15")));
+
Collections.shuffle(data);
-
+
return env.fromCollection(data);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
index ce0143a..2bf69bd 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
@@ -18,16 +18,14 @@
package org.apache.flink.test.hadoopcompatibility.mapred.example;
-import java.io.IOException;
-import java.util.Iterator;
-
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
+
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -39,95 +37,101 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
-
+import java.io.IOException;
+import java.util.Iterator;
/**
* Implements a word count which takes the input file and counts the number of
* occurrences of each word in the file and writes the result back to disk.
- *
- * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to
+ *
+ * <p>This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to
* common Java types for better usage in a Flink job and how to use Hadoop Output Formats.
*/
public class HadoopMapredCompatWordCount {
-
+
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
return;
}
-
+
final String inputPath = args[0];
final String outputPath = args[1];
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
// Set up the Hadoop Input Format
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));
-
+
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
-
- DataSet<Tuple2<Text, LongWritable>> words =
+
+ DataSet<Tuple2<Text, LongWritable>> words =
text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
.groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));
-
+
// Set up Hadoop Output Format
- HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
+ HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
-
+
// Output & Execute
words.output(hadoopOutputFormat).setParallelism(1);
env.execute("Hadoop Compat WordCount");
}
-
-
+
+ /**
+ * A {@link Mapper} that splits a line into words.
+ */
public static final class Tokenizer implements Mapper<LongWritable, Text, Text, LongWritable> {
@Override
- public void map(LongWritable k, Text v, OutputCollector<Text, LongWritable> out, Reporter rep)
+ public void map(LongWritable k, Text v, OutputCollector<Text, LongWritable> out, Reporter rep)
throws IOException {
// normalize and split the line
String line = v.toString();
String[] tokens = line.toLowerCase().split("\\W+");
-
+
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
- out.collect(new Text(token), new LongWritable(1l));
+ out.collect(new Text(token), new LongWritable(1L));
}
}
}
-
+
@Override
public void configure(JobConf arg0) { }
-
+
@Override
public void close() throws IOException { }
-
+
}
-
+
+ /**
+ * A {@link Reducer} to sum counts.
+ */
public static final class Counter implements Reducer<Text, LongWritable, Text, LongWritable> {
@Override
public void reduce(Text k, Iterator<LongWritable> vs, OutputCollector<Text, LongWritable> out, Reporter rep)
throws IOException {
-
+
long cnt = 0;
- while(vs.hasNext()) {
+ while (vs.hasNext()) {
cnt += vs.next().get();
}
out.collect(k, new LongWritable(cnt));
-
+
}
-
+
@Override
public void configure(JobConf arg0) { }
-
+
@Override
public void close() throws IOException { }
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
index 524318c..ff7c1b7 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
@@ -18,43 +18,47 @@
package org.apache.flink.test.hadoopcompatibility.mapred.wrapper;
-import java.util.ArrayList;
-import java.util.NoSuchElementException;
-
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+
import org.apache.hadoop.io.IntWritable;
import org.junit.Assert;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+
+/**
+ * Tests for the {@link HadoopTupleUnwrappingIterator}.
+ */
public class HadoopTupleUnwrappingIteratorTest {
@Test
public void testValueIterator() {
-
- HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt =
+
+ HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt =
new HadoopTupleUnwrappingIterator<IntWritable, IntWritable>(new WritableSerializer
<IntWritable>(IntWritable.class));
-
+
// many values
-
+
ArrayList<Tuple2<IntWritable, IntWritable>> tList = new ArrayList<Tuple2<IntWritable, IntWritable>>();
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(1)));
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(2)));
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(3)));
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(4)));
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(5)));
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(6)));
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(7)));
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(8)));
-
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(1)));
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(2)));
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(3)));
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(4)));
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(5)));
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(6)));
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(7)));
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1), new IntWritable(8)));
+
int expectedKey = 1;
- int[] expectedValues = new int[] {1,2,3,4,5,6,7,8};
-
+ int[] expectedValues = new int[] {1, 2, 3, 4, 5, 6, 7, 8};
+
valIt.set(tList.iterator());
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
- for(int expectedValue : expectedValues) {
+ for (int expectedValue : expectedValues) {
Assert.assertTrue(valIt.hasNext());
Assert.assertTrue(valIt.hasNext());
Assert.assertTrue(valIt.next().get() == expectedValue);
@@ -63,18 +67,18 @@ public class HadoopTupleUnwrappingIteratorTest {
Assert.assertFalse(valIt.hasNext());
Assert.assertFalse(valIt.hasNext());
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-
+
// one value
-
+
tList.clear();
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(2),new IntWritable(10)));
-
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(2), new IntWritable(10)));
+
expectedKey = 2;
expectedValues = new int[]{10};
-
+
valIt.set(tList.iterator());
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
- for(int expectedValue : expectedValues) {
+ for (int expectedValue : expectedValues) {
Assert.assertTrue(valIt.hasNext());
Assert.assertTrue(valIt.hasNext());
Assert.assertTrue(valIt.next().get() == expectedValue);
@@ -83,23 +87,23 @@ public class HadoopTupleUnwrappingIteratorTest {
Assert.assertFalse(valIt.hasNext());
Assert.assertFalse(valIt.hasNext());
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-
+
// more values
-
+
tList.clear();
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(10)));
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(4)));
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(7)));
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(9)));
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(21)));
-
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3), new IntWritable(10)));
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3), new IntWritable(4)));
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3), new IntWritable(7)));
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3), new IntWritable(9)));
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(21)));
+
expectedKey = 3;
- expectedValues = new int[]{10,4,7,9,21};
-
+ expectedValues = new int[]{10, 4, 7, 9, 21};
+
valIt.set(tList.iterator());
Assert.assertTrue(valIt.hasNext());
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
- for(int expectedValue : expectedValues) {
+ for (int expectedValue : expectedValues) {
Assert.assertTrue(valIt.hasNext());
Assert.assertTrue(valIt.hasNext());
Assert.assertTrue(valIt.next().get() == expectedValue);
@@ -108,22 +112,22 @@ public class HadoopTupleUnwrappingIteratorTest {
Assert.assertFalse(valIt.hasNext());
Assert.assertFalse(valIt.hasNext());
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-
+
// no has next calls
-
+
tList.clear();
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(5)));
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(8)));
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(42)));
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(-1)));
- tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(0)));
-
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(5)));
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(8)));
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(42)));
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(-1)));
+ tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4), new IntWritable(0)));
+
expectedKey = 4;
- expectedValues = new int[]{5,8,42,-1,0};
-
+ expectedValues = new int[]{5, 8, 42, -1, 0};
+
valIt.set(tList.iterator());
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
- for(int expectedValue : expectedValues) {
+ for (int expectedValue : expectedValues) {
Assert.assertTrue(valIt.next().get() == expectedValue);
}
try {
@@ -135,5 +139,5 @@ public class HadoopTupleUnwrappingIteratorTest {
Assert.assertFalse(valIt.hasNext());
Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
index 48aa258..a23a50d 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -18,36 +18,42 @@
package org.apache.flink.test.hadoopcompatibility.mapreduce;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.test.hadoopcompatibility.mapreduce.example.WordCount;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.OperatingSystem;
+
import org.junit.Assume;
import org.junit.Before;
+/**
+ * IT cases for both the {@link HadoopInputFormat} and {@link HadoopOutputFormat}.
+ */
public class HadoopInputOutputITCase extends JavaProgramTestBase {
-
+
protected String textPath;
protected String resultPath;
-
+
@Before
public void checkOperatingSystem() {
// FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems
Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
}
-
+
@Override
protected void preSubmit() throws Exception {
textPath = createTempFile("text.txt", WordCountData.TEXT);
resultPath = getTempDirPath("result");
this.setParallelism(4);
}
-
+
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"});
}
-
+
@Override
protected void testProgram() throws Exception {
WordCount.main(new String[] { textPath, resultPath });
http://git-wip-us.apache.org/repos/asf/flink/blob/fab8fe57/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
index ed83d78..09af3df 100644
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
+++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -18,11 +18,16 @@
package org.apache.flink.test.hadoopcompatibility.mapreduce.example;
-import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
+
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
@@ -30,71 +35,67 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
/**
* Implements a word count which takes the input file and counts the number of
* occurrences of each word in the file and writes the result back to disk.
- *
- * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to
+ *
+ * <p>This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to
* common Java types for better usage in a Flink job and how to use Hadoop Output Formats.
*/
@SuppressWarnings("serial")
public class WordCount {
-
+
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: WordCount <input path> <result path>");
return;
}
-
+
final String inputPath = args[0];
final String outputPath = args[1];
-
+
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
// Set up the Hadoop Input Format
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);
TextInputFormat.addInputPath(job, new Path(inputPath));
-
+
// Create a Flink job with it
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
-
+
// Tokenize the line and convert from Writable "Text" to String for better handling
DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
-
+
// Sum up the words
DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
-
+
// Convert String back to Writable "Text" for use with Hadoop Output Format
DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
-
+
// Set up Hadoop Output Format
HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job);
hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test
TextOutputFormat.setOutputPath(job, new Path(outputPath));
-
+
// Output & Execute
hadoopResult.output(hadoopOutputFormat);
env.execute("Word Count");
}
-
+
/**
* Splits a line into words and converts Hadoop Writables into normal Java data types.
*/
public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
-
+
@Override
public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String line = value.f1.toString();
String[] tokens = line.toLowerCase().split("\\W+");
-
+
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
@@ -103,17 +104,17 @@ public class WordCount {
}
}
}
-
+
/**
* Converts Java data types to Hadoop Writables.
*/
public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
-
+
@Override
public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception {
return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1));
}
-
+
}
-
+
}