You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/12/02 20:14:57 UTC

[flink] 02/06: [FLINK-21407][doc][formats] Move haddop input and output formats to hadoop.md formats page

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e7d8209a526b7088883ce0dd4ef278bb8fc57251
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Nov 5 14:37:02 2021 +0100

    [FLINK-21407][doc][formats] Move haddop input and output formats to hadoop.md formats page
---
 .../docs/connectors/datastream/formats/hadoop.md   | 184 ++++++++++++++++++++-
 ...adoop_compatibility.md => hadoop_map_reduce.md} | 117 +------------
 2 files changed, 187 insertions(+), 114 deletions(-)

diff --git a/docs/content/docs/connectors/datastream/formats/hadoop.md b/docs/content/docs/connectors/datastream/formats/hadoop.md
index c6159cc..0756f02 100644
--- a/docs/content/docs/connectors/datastream/formats/hadoop.md
+++ b/docs/content/docs/connectors/datastream/formats/hadoop.md
@@ -28,11 +28,185 @@ under the License.
 
 # Hadoop formats
 
-Apache Flink allows users to access many different systems as data sources.
-The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept
-of so called `InputFormat`s
+## Project Configuration
 
-One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a wrapper that allows
-users to use all existing Hadoop input formats with Flink.
+Support for Hadoop input/output formats is part of the `flink-java` and
+`flink-scala` Maven modules that are always required when writing Flink jobs.
+The code is located in `org.apache.flink.api.java.hadoop` and
+`org.apache.flink.api.scala.hadoop` in an additional sub-package for the
+`mapred` and `mapreduce` API.
+
+Support for Hadoop Mappers and Reducers is contained in the `flink-hadoop-compatibility`
+Maven module.
+This code resides in the `org.apache.flink.hadoopcompatibility`
+package.
+
+Add the following dependency to your `pom.xml` if you want to reuse Mappers
+and Reducers.
+
+```xml
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-hadoop-compatibility{{< scala_version >}}</artifactId>
+	<version>{{< version >}}</version>
+</dependency>
+```
+
+If you want to run your Flink application locally (e.g. from your IDE), you also need to add
+a `hadoop-client` dependency such as:
+
+```xml
+<dependency>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-client</artifactId>
+    <version>2.8.3</version>
+    <scope>provided</scope>
+</dependency>
+```
+
+## Using Hadoop InputFormats
+
+To use Hadoop `InputFormats` with Flink the format must first be wrapped
+using either `readHadoopFile` or `createHadoopInput` of the
+`HadoopInputs` utility class.
+The former is used for input formats derived
+from `FileInputFormat` while the latter has to be used for general purpose
+input formats.
+The resulting `InputFormat` can be used to create a data source by using
+`ExecutionEnvironmen#createInput`.
+
+The resulting `DataSet` contains 2-tuples where the first field
+is the key and the second field is the value retrieved from the Hadoop
+InputFormat.
+
+The following example shows how to use Hadoop's `TextInputFormat`.
+
+{{< tabs "baa59ec9-046e-4fe3-a2db-db5ee09d0635" >}}
+{{< tab "Java" >}}
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+DataSet<Tuple2<LongWritable, Text>> input =
+    env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
+                        LongWritable.class, Text.class, textPath));
+
+// Do something with the data.
+[...]
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+
+```scala
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val input: DataSet[(LongWritable, Text)] =
+  env.createInput(HadoopInputs.readHadoopFile(
+                    new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
+
+// Do something with the data.
+[...]
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+## Using Hadoop OutputFormats
+
+Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class
+that implements `org.apache.hadoop.mapred.OutputFormat` or extends
+`org.apache.hadoop.mapreduce.OutputFormat` is supported.
+The OutputFormat wrapper expects its input data to be a DataSet containing
+2-tuples of key and value. These are to be processed by the Hadoop OutputFormat.
+
+The following example shows how to use Hadoop's `TextOutputFormat`.
+
+{{< tabs "d4af1c52-0e4c-490c-8c35-e3d60b1b52ee" >}}
+{{< tab "Java" >}}
+
+```java
+// Obtain the result we want to emit
+DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
+
+// Set up the Hadoop TextOutputFormat.
+HadoopOutputFormat<Text, IntWritable> hadoopOF =
+  // create the Flink wrapper.
+  new HadoopOutputFormat<Text, IntWritable>(
+    // set the Hadoop OutputFormat and specify the job.
+    new TextOutputFormat<Text, IntWritable>(), job
+  );
+hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
+TextOutputFormat.setOutputPath(job, new Path(outputPath));
+
+// Emit data using the Hadoop TextOutputFormat.
+hadoopResult.output(hadoopOF);
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+
+```scala
+// Obtain your result to emit.
+val hadoopResult: DataSet[(Text, IntWritable)] = [...]
+
+val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
+  new TextOutputFormat[Text, IntWritable],
+  new JobConf)
+
+hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
+FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
+
+hadoopResult.output(hadoopOF)
+
+
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+## Complete Hadoop WordCount Example
+
+The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+// Set up the Hadoop TextInputFormat.
+Job job = Job.getInstance();
+HadoopInputFormat<LongWritable, Text> hadoopIF =
+  new HadoopInputFormat<LongWritable, Text>(
+    new TextInputFormat(), LongWritable.class, Text.class, job
+  );
+TextInputFormat.addInputPath(job, new Path(inputPath));
+
+// Read data using the Hadoop TextInputFormat.
+DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
+
+DataSet<Tuple2<Text, LongWritable>> result = text
+  // use Hadoop Mapper (Tokenizer) as MapFunction
+  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
+    new Tokenizer()
+  ))
+  .groupBy(0)
+  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
+  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
+    new Counter(), new Counter()
+  ));
+
+// Set up the Hadoop TextOutputFormat.
+HadoopOutputFormat<Text, LongWritable> hadoopOF =
+  new HadoopOutputFormat<Text, LongWritable>(
+    new TextOutputFormat<Text, LongWritable>(), job
+  );
+hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
+TextOutputFormat.setOutputPath(job, new Path(outputPath));
+
+// Emit data using the Hadoop TextOutputFormat.
+result.output(hadoopOF);
+
+// Execute Program
+env.execute("Hadoop WordCount");
+```
 
 {{< top >}}
diff --git a/docs/content/docs/dev/dataset/hadoop_compatibility.md b/docs/content/docs/dev/dataset/hadoop_map_reduce.md
similarity index 67%
rename from docs/content/docs/dev/dataset/hadoop_compatibility.md
rename to docs/content/docs/dev/dataset/hadoop_map_reduce.md
index acbaf6b..c75c3c5 100644
--- a/docs/content/docs/dev/dataset/hadoop_compatibility.md
+++ b/docs/content/docs/dev/dataset/hadoop_map_reduce.md
@@ -1,9 +1,9 @@
 ---
-title: "Hadoop Compatibility"
+title: "Hadoop MapReduce compatibility with Flink"
 weight: 8
 type: docs
 aliases:
-  - /dev/batch/hadoop_compatibility.html
+  - /dev/batch/hadoop_map_reduce.html
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -24,7 +24,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Hadoop Compatibility
+# Flink and Map Reduce compatibility
 
 Flink is compatible with Apache Hadoop MapReduce interfaces and therefore allows
 reusing code that was implemented for Hadoop MapReduce.
@@ -32,15 +32,15 @@ reusing code that was implemented for Hadoop MapReduce.
 You can:
 
 - use Hadoop's `Writable` [data types]({{< ref "docs/dev/datastream/fault-tolerance/serialization/types_serialization" >}}#supported-data-types) in Flink programs.
-- use any Hadoop `InputFormat` as a [DataSource]({{ ref "docs/dev/dataset/overview" >}}#data-sources).
-- use any Hadoop `OutputFormat` as a [DataSink]({{ ref "docs/dev/dataset/overview" >}}#data-sinks).
+- use any Hadoop `InputFormat` as a [DataSource]({{ ref "docs/dev/connectors/formats/hadoop.html" >}}#data-sources).
+- use any Hadoop `OutputFormat` as a [DataSink]({{ ref "docs/dev/connectors/formats/hadoop.html" >}}#data-sinks).
 - use a Hadoop `Mapper` as [FlatMapFunction]({{ ref "docs/dev/dataset/transformations" >}}#flatmap).
 - use a Hadoop `Reducer` as [GroupReduceFunction]({{ ref "docs/dev/dataset/transformations" >}}#groupreduce-on-grouped-dataset).
 
 This document shows how to use existing Hadoop MapReduce code with Flink. Please refer to the
 [Connecting to other systems]({{< ref "docs/deployment/filesystems/overview" >}}#hadoop-file-system-hdfs-and-its-other-implementations) guide for reading from Hadoop supported file systems.
 
-### Project Configuration
+## Project Configuration
 
 Support for Hadoop input/output formats is part of the `flink-java` and
 `flink-scala` Maven modules that are always required when writing Flink jobs.
@@ -76,108 +76,7 @@ a `hadoop-client` dependency such as:
 </dependency>
 ```
 
-### Using Hadoop InputFormats
-
-To use Hadoop `InputFormats` with Flink the format must first be wrapped
-using either `readHadoopFile` or `createHadoopInput` of the
-`HadoopInputs` utility class.
-The former is used for input formats derived
-from `FileInputFormat` while the latter has to be used for general purpose
-input formats.
-The resulting `InputFormat` can be used to create a data source by using
-`ExecutionEnvironmen#createInput`.
-
-The resulting `DataSet` contains 2-tuples where the first field
-is the key and the second field is the value retrieved from the Hadoop
-InputFormat.
-
-The following example shows how to use Hadoop's `TextInputFormat`.
-
-{{< tabs "baa59ec9-046e-4fe3-a2db-db5ee09d0635" >}}
-{{< tab "Java" >}}
-
-```java
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-DataSet<Tuple2<LongWritable, Text>> input =
-    env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
-                        LongWritable.class, Text.class, textPath));
-
-// Do something with the data.
-[...]
-```
-
-{{< /tab >}}
-{{< tab "Scala" >}}
-
-```scala
-val env = ExecutionEnvironment.getExecutionEnvironment
-
-val input: DataSet[(LongWritable, Text)] =
-  env.createInput(HadoopInputs.readHadoopFile(
-                    new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
-
-// Do something with the data.
-[...]
-```
-
-{{< /tab >}}
-{{< /tabs >}}
-
-### Using Hadoop OutputFormats
-
-Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class
-that implements `org.apache.hadoop.mapred.OutputFormat` or extends
-`org.apache.hadoop.mapreduce.OutputFormat` is supported.
-The OutputFormat wrapper expects its input data to be a DataSet containing
-2-tuples of key and value. These are to be processed by the Hadoop OutputFormat.
-
-The following example shows how to use Hadoop's `TextOutputFormat`.
-
-{{< tabs "d4af1c52-0e4c-490c-8c35-e3d60b1b52ee" >}}
-{{< tab "Java" >}}
-
-```java
-// Obtain the result we want to emit
-DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
-
-// Set up the Hadoop TextOutputFormat.
-HadoopOutputFormat<Text, IntWritable> hadoopOF =
-  // create the Flink wrapper.
-  new HadoopOutputFormat<Text, IntWritable>(
-    // set the Hadoop OutputFormat and specify the job.
-    new TextOutputFormat<Text, IntWritable>(), job
-  );
-hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
-TextOutputFormat.setOutputPath(job, new Path(outputPath));
-
-// Emit data using the Hadoop TextOutputFormat.
-hadoopResult.output(hadoopOF);
-```
-
-{{< /tab >}}
-{{< tab "Scala" >}}
-
-```scala
-// Obtain your result to emit.
-val hadoopResult: DataSet[(Text, IntWritable)] = [...]
-
-val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
-  new TextOutputFormat[Text, IntWritable],
-  new JobConf)
-
-hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
-FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
-
-hadoopResult.output(hadoopOF)
-
-
-```
-
-{{< /tab >}}
-{{< /tabs >}}
-
-### Using Hadoop Mappers and Reducers
+## Using Hadoop Mappers and Reducers
 
 Hadoop Mappers are semantically equivalent to Flink's [FlatMapFunctions](dataset_transformations.html#flatmap) and Hadoop Reducers are equivalent to Flink's [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset). Flink provides wrappers for implementations of Hadoop MapReduce's `Mapper` and `Reducer` interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoop's mapr [...]
 
@@ -211,7 +110,7 @@ DataSet<Tuple2<Text, LongWritable>> result = text
 
 **Please note:** The Reducer wrapper works on groups as defined by Flink's [groupBy()](dataset_transformations.html#transformations-on-grouped-dataset) operation. It does not consider any custom partitioners, sort or grouping comparators you might have set in the `JobConf`.
 
-### Complete Hadoop WordCount Example
+## Complete Hadoop WordCount Example
 
 The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.