You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/11/29 15:15:12 UTC
[flink] 03/06: [FLINK-21407][doc][formats] Update hadoop doc
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b1c708b60a0e376ba6c510d8f86b417ee78184a7
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Nov 9 15:35:44 2021 +0100
[FLINK-21407][doc][formats] Update hadoop doc
---
.../docs/connectors/datastream/formats/hadoop.md | 71 +++-------------------
docs/content/docs/dev/dataset/hadoop_map_reduce.md | 13 +---
2 files changed, 11 insertions(+), 73 deletions(-)
diff --git a/docs/content/docs/connectors/datastream/formats/hadoop.md b/docs/content/docs/connectors/datastream/formats/hadoop.md
index 0756f02..56553ca 100644
--- a/docs/content/docs/connectors/datastream/formats/hadoop.md
+++ b/docs/content/docs/connectors/datastream/formats/hadoop.md
@@ -30,19 +30,10 @@ under the License.
## Project Configuration
-Support for Hadoop input/output formats is part of the `flink-java` and
-`flink-scala` Maven modules that are always required when writing Flink jobs.
-The code is located in `org.apache.flink.api.java.hadoop` and
-`org.apache.flink.api.scala.hadoop` in an additional sub-package for the
-`mapred` and `mapreduce` API.
-
-Support for Hadoop Mappers and Reducers is contained in the `flink-hadoop-compatibility`
+Support for Hadoop is contained in the `flink-hadoop-compatibility`
Maven module.
-This code resides in the `org.apache.flink.hadoopcompatibility`
-package.
-Add the following dependency to your `pom.xml` if you want to reuse Mappers
-and Reducers.
+Add the following dependency to your `pom.xml` to use hadoop
```xml
<dependency>
@@ -75,7 +66,7 @@ input formats.
The resulting `InputFormat` can be used to create a data source by using
`ExecutionEnvironmen#createInput`.
-The resulting `DataSet` contains 2-tuples where the first field
+The resulting `DataStream` contains 2-tuples where the first field
is the key and the second field is the value retrieved from the Hadoop
InputFormat.
@@ -85,9 +76,9 @@ The following example shows how to use Hadoop's `TextInputFormat`.
{{< tab "Java" >}}
```java
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-DataSet<Tuple2<LongWritable, Text>> input =
+DataStream<Tuple2<LongWritable, Text>> input =
env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
LongWritable.class, Text.class, textPath));
@@ -99,9 +90,9 @@ DataSet<Tuple2<LongWritable, Text>> input =
{{< tab "Scala" >}}
```scala
-val env = ExecutionEnvironment.getExecutionEnvironment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
-val input: DataSet[(LongWritable, Text)] =
+val input: DataStream[(LongWritable, Text)] =
env.createInput(HadoopInputs.readHadoopFile(
new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
@@ -127,7 +118,7 @@ The following example shows how to use Hadoop's `TextOutputFormat`.
```java
// Obtain the result we want to emit
-DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
+DataStream<Tuple2<Text, IntWritable>> hadoopResult = [...]
// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
@@ -148,7 +139,7 @@ hadoopResult.output(hadoopOF);
```scala
// Obtain your result to emit.
-val hadoopResult: DataSet[(Text, IntWritable)] = [...]
+val hadoopResult: DataStream[(Text, IntWritable)] = [...]
val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
new TextOutputFormat[Text, IntWritable],
@@ -165,48 +156,4 @@ hadoopResult.output(hadoopOF)
{{< /tab >}}
{{< /tabs >}}
-## Complete Hadoop WordCount Example
-
-The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.
-
-```java
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-// Set up the Hadoop TextInputFormat.
-Job job = Job.getInstance();
-HadoopInputFormat<LongWritable, Text> hadoopIF =
- new HadoopInputFormat<LongWritable, Text>(
- new TextInputFormat(), LongWritable.class, Text.class, job
- );
-TextInputFormat.addInputPath(job, new Path(inputPath));
-
-// Read data using the Hadoop TextInputFormat.
-DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
-
-DataSet<Tuple2<Text, LongWritable>> result = text
- // use Hadoop Mapper (Tokenizer) as MapFunction
- .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
- new Tokenizer()
- ))
- .groupBy(0)
- // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
- .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
- new Counter(), new Counter()
- ));
-
-// Set up the Hadoop TextOutputFormat.
-HadoopOutputFormat<Text, LongWritable> hadoopOF =
- new HadoopOutputFormat<Text, LongWritable>(
- new TextOutputFormat<Text, LongWritable>(), job
- );
-hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
-TextOutputFormat.setOutputPath(job, new Path(outputPath));
-
-// Emit data using the Hadoop TextOutputFormat.
-result.output(hadoopOF);
-
-// Execute Program
-env.execute("Hadoop WordCount");
-```
-
{{< top >}}
diff --git a/docs/content/docs/dev/dataset/hadoop_map_reduce.md b/docs/content/docs/dev/dataset/hadoop_map_reduce.md
index c75c3c5..193217f 100644
--- a/docs/content/docs/dev/dataset/hadoop_map_reduce.md
+++ b/docs/content/docs/dev/dataset/hadoop_map_reduce.md
@@ -42,19 +42,10 @@ This document shows how to use existing Hadoop MapReduce code with Flink. Please
## Project Configuration
-Support for Hadoop input/output formats is part of the `flink-java` and
-`flink-scala` Maven modules that are always required when writing Flink jobs.
-The code is located in `org.apache.flink.api.java.hadoop` and
-`org.apache.flink.api.scala.hadoop` in an additional sub-package for the
-`mapred` and `mapreduce` API.
-
-Support for Hadoop Mappers and Reducers is contained in the `flink-hadoop-compatibility`
+Support for Hadoop is contained in the `flink-hadoop-compatibility`
Maven module.
-This code resides in the `org.apache.flink.hadoopcompatibility`
-package.
-Add the following dependency to your `pom.xml` if you want to reuse Mappers
-and Reducers.
+Add the following dependency to your `pom.xml` to use hadoop
```xml
<dependency>