You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/11/29 15:15:09 UTC

[flink] branch master updated (c782735 -> cbedff7)

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from c782735  [FLINK-25065][docs] Update document for "lookup.cache.caching-missing-key" option for jdbc connector (#17918)
     new dd31d03  [FLINK-21407][doc][formats] Add formats to DataStream connectors doc
     new 4913bc8  [FLINK-21407][doc][formats] Move haddop input and output formats to hadoop.md formats page
     new b1c708b  [FLINK-21407][doc][formats] Update hadoop doc
     new 0decd8b  [FLINK-21407][doc][formats] Drop old formats
     new 7b90448  [FLINK-21407] bump microsoft-hadoop-azure version
     new cbedff7  [FLINK-21407][doc][formats] Split DataSet connectors page into different formats and create a formats sub-folder like in table api doc

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content/docs/connectors/dataset/_index.md     |  23 +++
 .../docs/connectors/dataset/formats/_index.md      |  23 +++
 .../docs/connectors/dataset/formats/avro.md        |  61 ++++++++
 .../formats/azure_table_storage.md}                |  96 +++----------
 .../docs/connectors/dataset/formats/hadoop.md      | 158 ++++++++++++++++++++
 .../docs/connectors/datastream/formats/_index.md   |  23 +++
 .../docs/connectors/datastream/formats/avro.md     |  61 ++++++++
 .../datastream/formats/azure_table_storage.md      | 128 +++++++++++++++++
 .../docs/connectors/datastream/formats/hadoop.md   | 159 +++++++++++++++++++++
 docs/content/docs/connectors/datastream/mongodb.md |  33 +++++
 .../content/docs/connectors/datastream/overview.md |   6 +-
 ...adoop_compatibility.md => hadoop_map_reduce.md} | 130 ++---------------
 12 files changed, 702 insertions(+), 199 deletions(-)
 create mode 100644 docs/content/docs/connectors/dataset/_index.md
 create mode 100644 docs/content/docs/connectors/dataset/formats/_index.md
 create mode 100644 docs/content/docs/connectors/dataset/formats/avro.md
 rename docs/content/docs/connectors/{dataset.md => dataset/formats/azure_table_storage.md} (53%)
 create mode 100644 docs/content/docs/connectors/dataset/formats/hadoop.md
 create mode 100644 docs/content/docs/connectors/datastream/formats/_index.md
 create mode 100644 docs/content/docs/connectors/datastream/formats/avro.md
 create mode 100644 docs/content/docs/connectors/datastream/formats/azure_table_storage.md
 create mode 100644 docs/content/docs/connectors/datastream/formats/hadoop.md
 create mode 100644 docs/content/docs/connectors/datastream/mongodb.md
 rename docs/content/docs/dev/dataset/{hadoop_compatibility.md => hadoop_map_reduce.md} (61%)

[flink] 03/06: [FLINK-21407][doc][formats] Update hadoop doc

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b1c708b60a0e376ba6c510d8f86b417ee78184a7
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Nov 9 15:35:44 2021 +0100

    [FLINK-21407][doc][formats] Update hadoop doc
---
 .../docs/connectors/datastream/formats/hadoop.md   | 71 +++-------------------
 docs/content/docs/dev/dataset/hadoop_map_reduce.md | 13 +---
 2 files changed, 11 insertions(+), 73 deletions(-)

diff --git a/docs/content/docs/connectors/datastream/formats/hadoop.md b/docs/content/docs/connectors/datastream/formats/hadoop.md
index 0756f02..56553ca 100644
--- a/docs/content/docs/connectors/datastream/formats/hadoop.md
+++ b/docs/content/docs/connectors/datastream/formats/hadoop.md
@@ -30,19 +30,10 @@ under the License.
 
 ## Project Configuration
 
-Support for Hadoop input/output formats is part of the `flink-java` and
-`flink-scala` Maven modules that are always required when writing Flink jobs.
-The code is located in `org.apache.flink.api.java.hadoop` and
-`org.apache.flink.api.scala.hadoop` in an additional sub-package for the
-`mapred` and `mapreduce` API.
-
-Support for Hadoop Mappers and Reducers is contained in the `flink-hadoop-compatibility`
+Support for Hadoop is contained in the `flink-hadoop-compatibility`
 Maven module.
-This code resides in the `org.apache.flink.hadoopcompatibility`
-package.
 
-Add the following dependency to your `pom.xml` if you want to reuse Mappers
-and Reducers.
+Add the following dependency to your `pom.xml` to use hadoop
 
 ```xml
 <dependency>
@@ -75,7 +66,7 @@ input formats.
 The resulting `InputFormat` can be used to create a data source by using
 `ExecutionEnvironmen#createInput`.
 
-The resulting `DataSet` contains 2-tuples where the first field
+The resulting `DataStream` contains 2-tuples where the first field
 is the key and the second field is the value retrieved from the Hadoop
 InputFormat.
 
@@ -85,9 +76,9 @@ The following example shows how to use Hadoop's `TextInputFormat`.
 {{< tab "Java" >}}
 
 ```java
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-DataSet<Tuple2<LongWritable, Text>> input =
+DataStream<Tuple2<LongWritable, Text>> input =
     env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
                         LongWritable.class, Text.class, textPath));
 
@@ -99,9 +90,9 @@ DataSet<Tuple2<LongWritable, Text>> input =
 {{< tab "Scala" >}}
 
 ```scala
-val env = ExecutionEnvironment.getExecutionEnvironment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-val input: DataSet[(LongWritable, Text)] =
+val input: DataStream[(LongWritable, Text)] =
   env.createInput(HadoopInputs.readHadoopFile(
                     new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
 
@@ -127,7 +118,7 @@ The following example shows how to use Hadoop's `TextOutputFormat`.
 
 ```java
 // Obtain the result we want to emit
-DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
+DataStream<Tuple2<Text, IntWritable>> hadoopResult = [...]
 
 // Set up the Hadoop TextOutputFormat.
 HadoopOutputFormat<Text, IntWritable> hadoopOF =
@@ -148,7 +139,7 @@ hadoopResult.output(hadoopOF);
 
 ```scala
 // Obtain your result to emit.
-val hadoopResult: DataSet[(Text, IntWritable)] = [...]
+val hadoopResult: DataStream[(Text, IntWritable)] = [...]
 
 val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
   new TextOutputFormat[Text, IntWritable],
@@ -165,48 +156,4 @@ hadoopResult.output(hadoopOF)
 {{< /tab >}}
 {{< /tabs >}}
 
-## Complete Hadoop WordCount Example
-
-The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.
-
-```java
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-// Set up the Hadoop TextInputFormat.
-Job job = Job.getInstance();
-HadoopInputFormat<LongWritable, Text> hadoopIF =
-  new HadoopInputFormat<LongWritable, Text>(
-    new TextInputFormat(), LongWritable.class, Text.class, job
-  );
-TextInputFormat.addInputPath(job, new Path(inputPath));
-
-// Read data using the Hadoop TextInputFormat.
-DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
-
-DataSet<Tuple2<Text, LongWritable>> result = text
-  // use Hadoop Mapper (Tokenizer) as MapFunction
-  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
-    new Tokenizer()
-  ))
-  .groupBy(0)
-  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
-  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
-    new Counter(), new Counter()
-  ));
-
-// Set up the Hadoop TextOutputFormat.
-HadoopOutputFormat<Text, LongWritable> hadoopOF =
-  new HadoopOutputFormat<Text, LongWritable>(
-    new TextOutputFormat<Text, LongWritable>(), job
-  );
-hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
-TextOutputFormat.setOutputPath(job, new Path(outputPath));
-
-// Emit data using the Hadoop TextOutputFormat.
-result.output(hadoopOF);
-
-// Execute Program
-env.execute("Hadoop WordCount");
-```
-
 {{< top >}}
diff --git a/docs/content/docs/dev/dataset/hadoop_map_reduce.md b/docs/content/docs/dev/dataset/hadoop_map_reduce.md
index c75c3c5..193217f 100644
--- a/docs/content/docs/dev/dataset/hadoop_map_reduce.md
+++ b/docs/content/docs/dev/dataset/hadoop_map_reduce.md
@@ -42,19 +42,10 @@ This document shows how to use existing Hadoop MapReduce code with Flink. Please
 
 ## Project Configuration
 
-Support for Hadoop input/output formats is part of the `flink-java` and
-`flink-scala` Maven modules that are always required when writing Flink jobs.
-The code is located in `org.apache.flink.api.java.hadoop` and
-`org.apache.flink.api.scala.hadoop` in an additional sub-package for the
-`mapred` and `mapreduce` API.
-
-Support for Hadoop Mappers and Reducers is contained in the `flink-hadoop-compatibility`
+Support for Hadoop is contained in the `flink-hadoop-compatibility`
 Maven module.
-This code resides in the `org.apache.flink.hadoopcompatibility`
-package.
 
-Add the following dependency to your `pom.xml` if you want to reuse Mappers
-and Reducers.
+Add the following dependency to your `pom.xml` to use hadoop
 
 ```xml
 <dependency>

[flink] 01/06: [FLINK-21407][doc][formats] Add formats to DataStream connectors doc

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dd31d03dbc3f29522ec01328b7a93121137df7d7
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Oct 19 16:09:37 2021 +0200

    [FLINK-21407][doc][formats] Add formats to DataStream connectors doc
---
 .../docs/connectors/datastream/formats/_index.md   |  23 ++++
 .../docs/connectors/datastream/formats/avro.md     |  61 ++++++++++
 .../datastream/formats/azure_table_storage.md      | 128 +++++++++++++++++++++
 .../docs/connectors/datastream/formats/hadoop.md   |  38 ++++++
 .../docs/connectors/datastream/formats/parquet.md  |  67 +++++++++++
 docs/content/docs/connectors/datastream/mongodb.md |  33 ++++++
 .../content/docs/connectors/datastream/overview.md |   6 +-
 7 files changed, 355 insertions(+), 1 deletion(-)

diff --git a/docs/content/docs/connectors/datastream/formats/_index.md b/docs/content/docs/connectors/datastream/formats/_index.md
new file mode 100644
index 0000000..282fc69
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/formats/_index.md
@@ -0,0 +1,23 @@
+---
+title: Formats
+bookCollapseSection: true
+weight: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/docs/content/docs/connectors/datastream/formats/avro.md b/docs/content/docs/connectors/datastream/formats/avro.md
new file mode 100644
index 0000000..1b2ffef
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/formats/avro.md
@@ -0,0 +1,61 @@
+---
+title:  "Avro"
+weight: 4
+type: docs
+aliases:
+- /dev/connectors/formats/avro.html
+- /apis/streaming/connectors/formats/avro.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+# Avro format
+
+Flink has built-in support for [Apache Avro](http://avro.apache.org/). This allows to easily read and write Avro data based on an Avro schema with Flink.
+The serialization framework of Flink is able to handle classes generated from Avro schemas. In order to use the Avro format the following dependencies are required for projects using a build automation tool (such as Maven or SBT).
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-avro</artifactId>
+  <version>{{< version >}}</version>
+</dependency>
+```
+
+In order to read data from an Avro file, you have to specify an `AvroInputFormat`.
+
+**Example**:
+
+```java
+AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+DataStream<User> usersDS = env.createInput(users);
+```
+
+Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:
+
+```java
+usersDS.keyBy("name")
+```
+
+
+Note that using the `GenericData.Record` type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use.
+
+Flink's POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type `Object` you can not use the field as a join or grouping key.
+Specifying a field in Avro like this `{"name": "type_double_test", "type": "double"},` works fine, however specifying it as a UNION-type with only one field (`{"name": "type_double_test", "type": ["double"]},`) will generate a field of type `Object`. Note that specifying nullable types (`{"name": "type_double_test", "type": ["null", "double"]},`) is possible!
diff --git a/docs/content/docs/connectors/datastream/formats/azure_table_storage.md b/docs/content/docs/connectors/datastream/formats/azure_table_storage.md
new file mode 100644
index 0000000..6d82356
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/formats/azure_table_storage.md
@@ -0,0 +1,128 @@
+---
+title:  "Microsoft Azure table"
+weight: 4
+type: docs
+aliases:
+- /dev/connectors/formats/azure_table_storage.html
+- /apis/streaming/connectors/formats/azure_table_storage.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Microsoft Azure Table Storage format
+
+This example is using the `HadoopInputFormat` wrapper to use an existing Hadoop input format implementation for accessing [Azure's Table Storage](https://azure.microsoft.com/en-us/documentation/articles/storage-introduction/).
+
+1. Download and compile the `azure-tables-hadoop` project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves.
+   Execute the following commands:
+
+```bash
+git clone https://github.com/mooso/azure-tables-hadoop.git
+cd azure-tables-hadoop
+mvn clean install
+```
+
+2. Setup a new Flink project using the quickstarts:
+
+```bash
+curl https://flink.apache.org/q/quickstart.sh | bash
+```
+
+3. Add the following dependencies (in the `<dependencies>` section) to your `pom.xml` file:
+
+```xml
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-hadoop-compatibility{{< scala_version >}}</artifactId>
+    <version>{{< version >}}</version>
+</dependency>
+<dependency>
+    <groupId>com.microsoft.hadoop</groupId>
+    <artifactId>microsoft-hadoop-azure</artifactId>
+    <version>0.0.4</version>
+</dependency>
+```
+
+`flink-hadoop-compatibility` is a Flink package that provides the Hadoop input format wrappers.
+`microsoft-hadoop-azure` is adding the project we've build before to our project.
+
+The project is now ready for starting to code. We recommend to import the project into an IDE, such as IntelliJ. You should import it as a Maven project.
+Browse to the file `Job.java`. This is an empty skeleton for a Flink job.
+
+Paste the following code:
+
+```java
+import java.util.Map;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataStream;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import com.microsoft.hadoop.azure.AzureTableConfiguration;
+import com.microsoft.hadoop.azure.AzureTableInputFormat;
+import com.microsoft.hadoop.azure.WritableEntity;
+import com.microsoft.windowsazure.storage.table.EntityProperty;
+
+public class AzureTableExample {
+
+  public static void main(String[] args) throws Exception {
+    // set up the execution environment
+    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+    // create a  AzureTableInputFormat, using a Hadoop input format wrapper
+    HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job());
+
+    // set the Account URI, something like: https://apacheflink.table.core.windows.net
+    hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(), "TODO");
+    // set the secret storage key here
+    hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO");
+    // set the table name here
+    hdIf.getConfiguration().set(AzureTableConfiguration.Keys.TABLE_NAME.getKey(), "TODO");
+
+    DataStream<Tuple2<Text, WritableEntity>> input = env.createInput(hdIf);
+    // a little example how to use the data in a mapper.
+    DataStream<String> fin = input.map(new MapFunction<Tuple2<Text,WritableEntity>, String>() {
+      @Override
+      public String map(Tuple2<Text, WritableEntity> arg0) throws Exception {
+        System.err.println("--------------------------------\nKey = "+arg0.f0);
+        WritableEntity we = arg0.f1;
+
+        for(Map.Entry<String, EntityProperty> prop : we.getProperties().entrySet()) {
+          System.err.println("key="+prop.getKey() + " ; value (asString)="+prop.getValue().getValueAsString());
+        }
+
+        return arg0.f0.toString();
+      }
+    });
+
+    // emit result (this works only locally)
+    fin.print();
+
+    // execute program
+    env.execute("Azure Example");
+  }
+}
+```
+
+The example shows how to access an Azure table and turn data into Flink's `DataStream` (more specifically, the type of the set is `DataStream<Tuple2<Text, WritableEntity>>`). With the `DataStream`, you can apply all known transformations to the DataStream.
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/formats/hadoop.md b/docs/content/docs/connectors/datastream/formats/hadoop.md
new file mode 100644
index 0000000..c6159cc
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/formats/hadoop.md
@@ -0,0 +1,38 @@
+---
+title:  "Hadoop"
+weight: 4
+type: docs
+aliases:
+  - /dev/connectors/formats/hadoop.html
+  - /apis/streaming/connectors/formats/hadoop.html
+
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Hadoop formats
+
+Apache Flink allows users to access many different systems as data sources.
+The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept
+of so called `InputFormat`s
+
+One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a wrapper that allows
+users to use all existing Hadoop input formats with Flink.
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/formats/parquet.md b/docs/content/docs/connectors/datastream/formats/parquet.md
new file mode 100644
index 0000000..fcbe797
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/formats/parquet.md
@@ -0,0 +1,67 @@
+---
+title:  "Parquet"
+weight: 4
+type: docs
+aliases:
+- /dev/connectors/formats/parquet.html
+- /apis/streaming/connectors/formats/parquet.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+# Parquet formats
+
+Flink has built-in support for [Apache Parquet](http://parquet.apache.org/). This allows to read and write Parquet data with Flink. 
+In order to use the Parquet format the following dependencies are required for projects using a build automation tool (such as Maven or SBT).
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-parquet_{{< scala_version >}}</artifactId>
+  <version>{{< version >}}</version>
+</dependency>
+```
+
+In order to read data from a Parquet file, you have to specify one of the implementation of `ParquetInputFormat`. There are several depending on your needs:
+- `ParquetPojoInputFormat<E>` to read POJOs from parquet files
+- `ParquetRowInputFormat` to read Flink `Rows` (column oriented records) from parquet files
+- `ParquetMapInputFormat` to read Map records (Map of nested Flink type objects) from parquet files
+- `ParquetAvroInputFormat` to read Avro Generic Records from parquet files
+
+
+**Example for ParquetRowInputFormat**:
+
+```java
+MessageType parquetSchema = // use parquet libs to provide the parquet schema file and parse it or extract it from the parquet files
+ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new Path(filePath),  parquetSchema);
+// project only needed fields if suited to reduce the amount of data. Use: parquetSchema#selectFields(projectedFieldNames);
+DataStream<Row> input = env.createInput(parquetInputFormat);
+```
+
+**Example for ParquetAvroInputFormat**:
+
+```java
+MessageType parquetSchema = // use parquet libs to provide the parquet schema file and parse it or extract it from the parquet files
+ParquetAvroInputFormat parquetInputFormat = new ParquetAvroInputFormat(new Path(filePath),  parquetSchema);
+// project only needed fields if suited to reduce the amount of data. Use: parquetSchema#selectFields(projectedFieldNames);
+DataStream<GenericRecord> input = env.createInput(parquetInputFormat);
+```
+
+
diff --git a/docs/content/docs/connectors/datastream/mongodb.md b/docs/content/docs/connectors/datastream/mongodb.md
new file mode 100644
index 0000000..eaec8ec
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/mongodb.md
@@ -0,0 +1,33 @@
+---
+title:  "MongoDb"
+weight: 4
+type: docs
+aliases:
+- /dev/connectors/mongodb.html
+- /apis/streaming/connectors/mongodb.html
+
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# MongoDB format
+
+This [GitHub repository documents how to use MongoDB with Apache Flink (starting from 0.7-incubating)](https://github.com/okkam-it/flink-mongodb-test).
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/overview.md b/docs/content/docs/connectors/datastream/overview.md
index ec2b1df..116888c 100644
--- a/docs/content/docs/connectors/datastream/overview.md
+++ b/docs/content/docs/connectors/datastream/overview.md
@@ -42,7 +42,9 @@ Connectors provide code for interfacing with various third-party systems. Curren
  * [Apache Cassandra]({{< ref "docs/connectors/datastream/cassandra" >}}) (sink)
  * [Amazon Kinesis Streams]({{< ref "docs/connectors/datastream/kinesis" >}}) (source/sink)
  * [Elasticsearch]({{< ref "docs/connectors/datastream/elasticsearch" >}}) (sink)
- * [FileSystem (Hadoop included) - Streaming and Batch]({{< ref "docs/connectors/datastream/file_sink" >}}) (sink)
+ * [FileSystem (Hadoop included) - Streaming only sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) (sink)
+ * [FileSystem (Hadoop included) - Streaming and Batch sink]({{< ref "docs/connectors/datastream/file_sink" >}}) (sink)
+ * [FileSystem (Hadoop included) - Batch source] ({{< ref "docs/connectors/datastream/formats" >}}) (source)
  * [RabbitMQ]({{< ref "docs/connectors/datastream/rabbitmq" >}}) (source/sink)
  * [Google PubSub]({{< ref "docs/connectors/datastream/pubsub" >}}) (source/sink)
  * [Hybrid Source]({{< ref "docs/connectors/datastream/hybridsource" >}}) (source)
@@ -57,6 +59,8 @@ Note also that while the streaming connectors listed in this section are part of
 Flink project and are included in source releases, they are not included in the binary distributions. 
 Further instructions can be found in the corresponding subsections.
 
+Filesystem source formats are gradually replaced with new Flink Source API starting with Flink 1.14.0.
+
 ## Connectors in Apache Bahir
 
 Additional streaming connectors for Flink are being released through [Apache Bahir](https://bahir.apache.org/), including:

[flink] 04/06: [FLINK-21407][doc][formats] Drop old formats

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0decd8be9a5b8dd4788c58d75dbc02d2b7e4aeaa
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Nov 10 11:56:29 2021 +0100

    [FLINK-21407][doc][formats] Drop old formats
---
 .../docs/connectors/datastream/formats/parquet.md  | 67 ----------------------
 1 file changed, 67 deletions(-)

diff --git a/docs/content/docs/connectors/datastream/formats/parquet.md b/docs/content/docs/connectors/datastream/formats/parquet.md
deleted file mode 100644
index fcbe797..0000000
--- a/docs/content/docs/connectors/datastream/formats/parquet.md
+++ /dev/null
@@ -1,67 +0,0 @@
----
-title:  "Parquet"
-weight: 4
-type: docs
-aliases:
-- /dev/connectors/formats/parquet.html
-- /apis/streaming/connectors/formats/parquet.html
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-
-# Parquet formats
-
-Flink has built-in support for [Apache Parquet](http://parquet.apache.org/). This allows to read and write Parquet data with Flink. 
-In order to use the Parquet format the following dependencies are required for projects using a build automation tool (such as Maven or SBT).
-
-```xml
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-parquet_{{< scala_version >}}</artifactId>
-  <version>{{< version >}}</version>
-</dependency>
-```
-
-In order to read data from a Parquet file, you have to specify one of the implementation of `ParquetInputFormat`. There are several depending on your needs:
-- `ParquetPojoInputFormat<E>` to read POJOs from parquet files
-- `ParquetRowInputFormat` to read Flink `Rows` (column oriented records) from parquet files
-- `ParquetMapInputFormat` to read Map records (Map of nested Flink type objects) from parquet files
-- `ParquetAvroInputFormat` to read Avro Generic Records from parquet files
-
-
-**Example for ParquetRowInputFormat**:
-
-```java
-MessageType parquetSchema = // use parquet libs to provide the parquet schema file and parse it or extract it from the parquet files
-ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new Path(filePath),  parquetSchema);
-// project only needed fields if suited to reduce the amount of data. Use: parquetSchema#selectFields(projectedFieldNames);
-DataStream<Row> input = env.createInput(parquetInputFormat);
-```
-
-**Example for ParquetAvroInputFormat**:
-
-```java
-MessageType parquetSchema = // use parquet libs to provide the parquet schema file and parse it or extract it from the parquet files
-ParquetAvroInputFormat parquetInputFormat = new ParquetAvroInputFormat(new Path(filePath),  parquetSchema);
-// project only needed fields if suited to reduce the amount of data. Use: parquetSchema#selectFields(projectedFieldNames);
-DataStream<GenericRecord> input = env.createInput(parquetInputFormat);
-```
-
-

[flink] 06/06: [FLINK-21407][doc][formats] Split DataSet connectors page into different formats and create a formats sub-folder like in table api doc

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cbedff73193c90f2324951273baa71cdfd8c23f1
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Nov 19 12:02:05 2021 +0100

    [FLINK-21407][doc][formats] Split DataSet connectors page into different formats and create a formats sub-folder like in table api doc
---
 docs/content/docs/connectors/dataset/_index.md     |  23 +++
 .../docs/connectors/dataset/formats/_index.md      |  23 +++
 .../docs/connectors/dataset/formats/avro.md        |  61 ++++++++
 .../formats/azure_table_storage.md}                |  96 +++----------
 .../docs/connectors/dataset/formats/hadoop.md      | 158 +++++++++++++++++++++
 5 files changed, 283 insertions(+), 78 deletions(-)

diff --git a/docs/content/docs/connectors/dataset/_index.md b/docs/content/docs/connectors/dataset/_index.md
new file mode 100644
index 0000000..3b0d8f6
--- /dev/null
+++ b/docs/content/docs/connectors/dataset/_index.md
@@ -0,0 +1,23 @@
+---
+title: DataSet Connectors
+bookCollapseSection: true
+weight: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/docs/content/docs/connectors/dataset/formats/_index.md b/docs/content/docs/connectors/dataset/formats/_index.md
new file mode 100644
index 0000000..282fc69
--- /dev/null
+++ b/docs/content/docs/connectors/dataset/formats/_index.md
@@ -0,0 +1,23 @@
+---
+title: Formats
+bookCollapseSection: true
+weight: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/docs/content/docs/connectors/dataset/formats/avro.md b/docs/content/docs/connectors/dataset/formats/avro.md
new file mode 100644
index 0000000..7320587
--- /dev/null
+++ b/docs/content/docs/connectors/dataset/formats/avro.md
@@ -0,0 +1,61 @@
+---
+title:  "Avro"
+weight: 4
+type: docs
+aliases:
+- /dev/batch/connectors/formats/avro.html
+
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+# Avro format
+
+Flink has built-in support for [Apache Avro](http://avro.apache.org/). This allows to easily read and write Avro data based on an Avro schema with Flink.
+The serialization framework of Flink is able to handle classes generated from Avro schemas. In order to use the Avro format the following dependencies are required for projects using a build automation tool (such as Maven or SBT).
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-avro</artifactId>
+  <version>{{< version >}}</version>
+</dependency>
+```
+
+In order to read data from an Avro file, you have to specify an `AvroInputFormat`.
+
+**Example**:
+
+```java
+AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+DataSet<User> usersDS = env.createInput(users);
+```
+
+Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:
+
+```java
+usersDS.keyBy("name")
+```
+
+
+Note that using the `GenericData.Record` type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use.
+
+Flink's POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type `Object` you can not use the field as a join or grouping key.
+Specifying a field in Avro like this `{"name": "type_double_test", "type": "double"},` works fine, however specifying it as a UNION-type with only one field (`{"name": "type_double_test", "type": ["double"]},`) will generate a field of type `Object`. Note that specifying nullable types (`{"name": "type_double_test", "type": ["null", "double"]},`) is possible!
diff --git a/docs/content/docs/connectors/dataset.md b/docs/content/docs/connectors/dataset/formats/azure_table_storage.md
similarity index 53%
rename from docs/content/docs/connectors/dataset.md
rename to docs/content/docs/connectors/dataset/formats/azure_table_storage.md
index a4ddca3..4c45bfc 100644
--- a/docs/content/docs/connectors/dataset.md
+++ b/docs/content/docs/connectors/dataset/formats/azure_table_storage.md
@@ -1,9 +1,10 @@
 ---
-title:  "DataSet Connectors"
-weight: 3
+title:  "Microsoft Azure table"
+weight: 4
 type: docs
 aliases:
-  - /dev/batch/connectors.html
+- /dev/batch/connectors/formats/azure_table_storage.html
+
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -24,69 +25,12 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# DataSet Connectors
-
-## Reading from and writing to file systems
-
-The Apache Flink project supports multiple [file systems]({{< ref "docs/deployment/filesystems/overview" >}}) that can be used as backing stores
-for input and output connectors. 
-
-## Connecting to other systems using Input/OutputFormat wrappers for Hadoop
-
-Apache Flink allows users to access many different systems as data sources or sinks.
-The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept
-of so called `InputFormat`s and `OutputFormat`s.
-
-One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a wrapper that allows
-users to use all existing Hadoop input formats with Flink.
-
-This section shows some examples for connecting Flink to other systems.
-[Read more about Hadoop compatibility in Flink]({{< ref "docs/dev/dataset/hadoop_compatibility" >}}).
-
-## Avro support in Flink
-
-Flink has extensive built-in support for [Apache Avro](http://avro.apache.org/). This allows to easily read from Avro files with Flink.
-Also, the serialization framework of Flink is able to handle classes generated from Avro schemas. Be sure to include the Flink Avro dependency to the pom.xml of your project.
-
-```xml
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-avro</artifactId>
-  <version>{{< version >}}</version>
-</dependency>
-```
-
-In order to read data from an Avro file, you have to specify an `AvroInputFormat`.
-
-**Example**:
-
-```java
-AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
-DataSet<User> usersDS = env.createInput(users);
-```
-
-Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:
-
-```java
-usersDS.groupBy("name")
-```
-
-
-Note that using the `GenericData.Record` type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use.
-
-Flink's POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type `Object` you can not use the field as a join or grouping key.
-Specifying a field in Avro like this `{"name": "type_double_test", "type": "double"},` works fine, however specifying it as a UNION-type with only one field (`{"name": "type_double_test", "type": ["double"]},`) will generate a field of type `Object`. Note that specifying nullable types (`{"name": "type_double_test", "type": ["null", "double"]},`) is possible!
-
-
-
-### Access Microsoft Azure Table Storage
-
-_Note: This example works starting from Flink 0.6-incubating_
+# Microsoft Azure Table Storage format
 
 This example is using the `HadoopInputFormat` wrapper to use an existing Hadoop input format implementation for accessing [Azure's Table Storage](https://azure.microsoft.com/en-us/documentation/articles/storage-introduction/).
 
 1. Download and compile the `azure-tables-hadoop` project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves.
-Execute the following commands:
+   Execute the following commands:
 
 ```bash
 git clone https://github.com/mooso/azure-tables-hadoop.git
@@ -104,29 +48,29 @@ curl https://flink.apache.org/q/quickstart.sh | bash
 
 ```xml
 <dependency>
-   <groupId>org.apache.flink</groupId>
-   <artifactId>flink-hadoop-compatibility{{< scala_version >}}</artifactId>
-   <version>{{< version >}}</version>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-hadoop-compatibility{{< scala_version >}}</artifactId>
+    <version>{{< version >}}</version>
 </dependency>
 <dependency>
- <groupId>com.microsoft.hadoop</groupId>
- <artifactId>microsoft-hadoop-azure</artifactId>
- <version>0.0.4</version>
+    <groupId>com.microsoft.hadoop</groupId>
+    <artifactId>microsoft-hadoop-azure</artifactId>
+    <version>0.0.5</version>
 </dependency>
 ```
 
 `flink-hadoop-compatibility` is a Flink package that provides the Hadoop input format wrappers.
 `microsoft-hadoop-azure` is adding the project we've build before to our project.
 
-The project is now prepared for starting to code. We recommend to import the project into an IDE, such as Eclipse or IntelliJ. (Import as a Maven project!).
-Browse to the code of the `Job.java` file. Its an empty skeleton for a Flink job.
+The project is now ready for starting to code. We recommend to import the project into an IDE, such as IntelliJ. You should import it as a Maven project.
+Browse to the file `Job.java`. This is an empty skeleton for a Flink job.
 
-Paste the following code into it:
+Paste the following code:
 
 ```java
 import java.util.Map;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.DataStream;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
@@ -147,7 +91,7 @@ public class AzureTableExample {
     HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job());
 
     // set the Account URI, something like: https://apacheflink.table.core.windows.net
-    hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(), "TODO");
+    hdIf.getConfiguration().set(azuretableconfiguration.Keys.ACCOUNT_URI.getKey(), "TODO");
     // set the secret storage key here
     hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO");
     // set the table name here
@@ -178,10 +122,6 @@ public class AzureTableExample {
 }
 ```
 
-The example shows how to access an Azure table and turn data into Flink's `DataSet` (more specifically, the type of the set is `DataSet<Tuple2<Text, WritableEntity>>`). With the `DataSet`, you can apply all known transformations to the DataSet.
-
-## Access MongoDB
-
-This [GitHub repository documents how to use MongoDB with Apache Flink (starting from 0.7-incubating)](https://github.com/okkam-it/flink-mongodb-test).
+The example shows how to access an Azure table and turn data into Flink's `DataSet` (more specifically, the type of the set is `DataSet<Tuple2<Text, WritableEntity>>`). With the `DataSet`, you can apply all known transformations to the DataStream.
 
 {{< top >}}
diff --git a/docs/content/docs/connectors/dataset/formats/hadoop.md b/docs/content/docs/connectors/dataset/formats/hadoop.md
new file mode 100644
index 0000000..64702cb
--- /dev/null
+++ b/docs/content/docs/connectors/dataset/formats/hadoop.md
@@ -0,0 +1,158 @@
+---
+title:  "Hadoop"
+weight: 4
+type: docs
+aliases:
+- /dev/batch/connectors/hadoop.html
+
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Hadoop formats
+
+## Project Configuration
+
+Support for Hadoop is contained in the `flink-hadoop-compatibility`
+Maven module.
+
+Add the following dependency to your `pom.xml` to use hadoop
+
+```xml
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-hadoop-compatibility{{< scala_version >}}</artifactId>
+	<version>{{< version >}}</version>
+</dependency>
+```
+
+If you want to run your Flink application locally (e.g. from your IDE), you also need to add
+a `hadoop-client` dependency such as:
+
+```xml
+<dependency>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-client</artifactId>
+    <version>2.8.3</version>
+    <scope>provided</scope>
+</dependency>
+```
+
+## Using Hadoop InputFormats
+
+To use Hadoop `InputFormats` with Flink the format must first be wrapped
+using either `readHadoopFile` or `createHadoopInput` of the
+`HadoopInputs` utility class.
+The former is used for input formats derived
+from `FileInputFormat` while the latter has to be used for general purpose
+input formats.
+The resulting `InputFormat` can be used to create a data source by using
+`ExecutionEnvironmen#createInput`.
+
+The resulting `DataSet` contains 2-tuples where the first field
+is the key and the second field is the value retrieved from the Hadoop
+InputFormat.
+
+The following example shows how to use Hadoop's `TextInputFormat`.
+
+{{< tabs "baa59ec9-046e-4fe3-a2db-db5ee09d0635" >}}
+{{< tab "Java" >}}
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+DataSet<Tuple2<LongWritable, Text>> input =
+    env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
+                        LongWritable.class, Text.class, textPath));
+
+// Do something with the data.
+[...]
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+
+```scala
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val input: DataSet[(LongWritable, Text)] =
+  env.createInput(HadoopInputs.readHadoopFile(
+                    new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
+
+// Do something with the data.
+[...]
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+## Using Hadoop OutputFormats
+
+Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class
+that implements `org.apache.hadoop.mapred.OutputFormat` or extends
+`org.apache.hadoop.mapreduce.OutputFormat` is supported.
+The OutputFormat wrapper expects its input data to be a DataSet containing
+2-tuples of key and value. These are to be processed by the Hadoop OutputFormat.
+
+The following example shows how to use Hadoop's `TextOutputFormat`.
+
+{{< tabs "d4af1c52-0e4c-490c-8c35-e3d60b1b52ee" >}}
+{{< tab "Java" >}}
+
+```java
+// Obtain the result we want to emit
+DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
+
+// Set up the Hadoop TextOutputFormat.
+HadoopOutputFormat<Text, IntWritable> hadoopOF =
+  // create the Flink wrapper.
+  new HadoopOutputFormat<Text, IntWritable>(
+    // set the Hadoop OutputFormat and specify the job.
+    new TextOutputFormat<Text, IntWritable>(), job
+  );
+hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
+TextOutputFormat.setOutputPath(job, new Path(outputPath));
+
+// Emit data using the Hadoop TextOutputFormat.
+hadoopResult.output(hadoopOF);
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+
+```scala
+// Obtain your result to emit.
+val hadoopResult: DataSet[(Text, IntWritable)] = [...]
+
+val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
+  new TextOutputFormat[Text, IntWritable],
+  new JobConf)
+
+hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
+FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
+
+hadoopResult.output(hadoopOF)
+
+
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}

[flink] 02/06: [FLINK-21407][doc][formats] Move haddop input and output formats to hadoop.md formats page

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4913bc83383ee4ff33c99a9fc27a34b2eda8803d
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Nov 5 14:37:02 2021 +0100

    [FLINK-21407][doc][formats] Move haddop input and output formats to hadoop.md formats page
---
 .../docs/connectors/datastream/formats/hadoop.md   | 184 ++++++++++++++++++++-
 ...adoop_compatibility.md => hadoop_map_reduce.md} | 117 +------------
 2 files changed, 187 insertions(+), 114 deletions(-)

diff --git a/docs/content/docs/connectors/datastream/formats/hadoop.md b/docs/content/docs/connectors/datastream/formats/hadoop.md
index c6159cc..0756f02 100644
--- a/docs/content/docs/connectors/datastream/formats/hadoop.md
+++ b/docs/content/docs/connectors/datastream/formats/hadoop.md
@@ -28,11 +28,185 @@ under the License.
 
 # Hadoop formats
 
-Apache Flink allows users to access many different systems as data sources.
-The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept
-of so called `InputFormat`s
+## Project Configuration
 
-One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a wrapper that allows
-users to use all existing Hadoop input formats with Flink.
+Support for Hadoop input/output formats is part of the `flink-java` and
+`flink-scala` Maven modules that are always required when writing Flink jobs.
+The code is located in `org.apache.flink.api.java.hadoop` and
+`org.apache.flink.api.scala.hadoop` in an additional sub-package for the
+`mapred` and `mapreduce` API.
+
+Support for Hadoop Mappers and Reducers is contained in the `flink-hadoop-compatibility`
+Maven module.
+This code resides in the `org.apache.flink.hadoopcompatibility`
+package.
+
+Add the following dependency to your `pom.xml` if you want to reuse Mappers
+and Reducers.
+
+```xml
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-hadoop-compatibility{{< scala_version >}}</artifactId>
+	<version>{{< version >}}</version>
+</dependency>
+```
+
+If you want to run your Flink application locally (e.g. from your IDE), you also need to add
+a `hadoop-client` dependency such as:
+
+```xml
+<dependency>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-client</artifactId>
+    <version>2.8.3</version>
+    <scope>provided</scope>
+</dependency>
+```
+
+## Using Hadoop InputFormats
+
+To use Hadoop `InputFormats` with Flink the format must first be wrapped
+using either `readHadoopFile` or `createHadoopInput` of the
+`HadoopInputs` utility class.
+The former is used for input formats derived
+from `FileInputFormat` while the latter has to be used for general purpose
+input formats.
+The resulting `InputFormat` can be used to create a data source by using
+`ExecutionEnvironmen#createInput`.
+
+The resulting `DataSet` contains 2-tuples where the first field
+is the key and the second field is the value retrieved from the Hadoop
+InputFormat.
+
+The following example shows how to use Hadoop's `TextInputFormat`.
+
+{{< tabs "baa59ec9-046e-4fe3-a2db-db5ee09d0635" >}}
+{{< tab "Java" >}}
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+DataSet<Tuple2<LongWritable, Text>> input =
+    env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
+                        LongWritable.class, Text.class, textPath));
+
+// Do something with the data.
+[...]
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+
+```scala
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val input: DataSet[(LongWritable, Text)] =
+  env.createInput(HadoopInputs.readHadoopFile(
+                    new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
+
+// Do something with the data.
+[...]
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+## Using Hadoop OutputFormats
+
+Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class
+that implements `org.apache.hadoop.mapred.OutputFormat` or extends
+`org.apache.hadoop.mapreduce.OutputFormat` is supported.
+The OutputFormat wrapper expects its input data to be a DataSet containing
+2-tuples of key and value. These are to be processed by the Hadoop OutputFormat.
+
+The following example shows how to use Hadoop's `TextOutputFormat`.
+
+{{< tabs "d4af1c52-0e4c-490c-8c35-e3d60b1b52ee" >}}
+{{< tab "Java" >}}
+
+```java
+// Obtain the result we want to emit
+DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
+
+// Set up the Hadoop TextOutputFormat.
+HadoopOutputFormat<Text, IntWritable> hadoopOF =
+  // create the Flink wrapper.
+  new HadoopOutputFormat<Text, IntWritable>(
+    // set the Hadoop OutputFormat and specify the job.
+    new TextOutputFormat<Text, IntWritable>(), job
+  );
+hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
+TextOutputFormat.setOutputPath(job, new Path(outputPath));
+
+// Emit data using the Hadoop TextOutputFormat.
+hadoopResult.output(hadoopOF);
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+
+```scala
+// Obtain your result to emit.
+val hadoopResult: DataSet[(Text, IntWritable)] = [...]
+
+val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
+  new TextOutputFormat[Text, IntWritable],
+  new JobConf)
+
+hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
+FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
+
+hadoopResult.output(hadoopOF)
+
+
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+## Complete Hadoop WordCount Example
+
+The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+// Set up the Hadoop TextInputFormat.
+Job job = Job.getInstance();
+HadoopInputFormat<LongWritable, Text> hadoopIF =
+  new HadoopInputFormat<LongWritable, Text>(
+    new TextInputFormat(), LongWritable.class, Text.class, job
+  );
+TextInputFormat.addInputPath(job, new Path(inputPath));
+
+// Read data using the Hadoop TextInputFormat.
+DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
+
+DataSet<Tuple2<Text, LongWritable>> result = text
+  // use Hadoop Mapper (Tokenizer) as MapFunction
+  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
+    new Tokenizer()
+  ))
+  .groupBy(0)
+  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
+  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
+    new Counter(), new Counter()
+  ));
+
+// Set up the Hadoop TextOutputFormat.
+HadoopOutputFormat<Text, LongWritable> hadoopOF =
+  new HadoopOutputFormat<Text, LongWritable>(
+    new TextOutputFormat<Text, LongWritable>(), job
+  );
+hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
+TextOutputFormat.setOutputPath(job, new Path(outputPath));
+
+// Emit data using the Hadoop TextOutputFormat.
+result.output(hadoopOF);
+
+// Execute Program
+env.execute("Hadoop WordCount");
+```
 
 {{< top >}}
diff --git a/docs/content/docs/dev/dataset/hadoop_compatibility.md b/docs/content/docs/dev/dataset/hadoop_map_reduce.md
similarity index 67%
rename from docs/content/docs/dev/dataset/hadoop_compatibility.md
rename to docs/content/docs/dev/dataset/hadoop_map_reduce.md
index acbaf6b..c75c3c5 100644
--- a/docs/content/docs/dev/dataset/hadoop_compatibility.md
+++ b/docs/content/docs/dev/dataset/hadoop_map_reduce.md
@@ -1,9 +1,9 @@
 ---
-title: "Hadoop Compatibility"
+title: "Hadoop MapReduce compatibility with Flink"
 weight: 8
 type: docs
 aliases:
-  - /dev/batch/hadoop_compatibility.html
+  - /dev/batch/hadoop_map_reduce.html
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -24,7 +24,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Hadoop Compatibility
+# Flink and Map Reduce compatibility
 
 Flink is compatible with Apache Hadoop MapReduce interfaces and therefore allows
 reusing code that was implemented for Hadoop MapReduce.
@@ -32,15 +32,15 @@ reusing code that was implemented for Hadoop MapReduce.
 You can:
 
 - use Hadoop's `Writable` [data types]({{< ref "docs/dev/datastream/fault-tolerance/serialization/types_serialization" >}}#supported-data-types) in Flink programs.
-- use any Hadoop `InputFormat` as a [DataSource]({{ ref "docs/dev/dataset/overview" >}}#data-sources).
-- use any Hadoop `OutputFormat` as a [DataSink]({{ ref "docs/dev/dataset/overview" >}}#data-sinks).
+- use any Hadoop `InputFormat` as a [DataSource]({{ ref "docs/dev/connectors/formats/hadoop.html" >}}#data-sources).
+- use any Hadoop `OutputFormat` as a [DataSink]({{ ref "docs/dev/connectors/formats/hadoop.html" >}}#data-sinks).
 - use a Hadoop `Mapper` as [FlatMapFunction]({{ ref "docs/dev/dataset/transformations" >}}#flatmap).
 - use a Hadoop `Reducer` as [GroupReduceFunction]({{ ref "docs/dev/dataset/transformations" >}}#groupreduce-on-grouped-dataset).
 
 This document shows how to use existing Hadoop MapReduce code with Flink. Please refer to the
 [Connecting to other systems]({{< ref "docs/deployment/filesystems/overview" >}}#hadoop-file-system-hdfs-and-its-other-implementations) guide for reading from Hadoop supported file systems.
 
-### Project Configuration
+## Project Configuration
 
 Support for Hadoop input/output formats is part of the `flink-java` and
 `flink-scala` Maven modules that are always required when writing Flink jobs.
@@ -76,108 +76,7 @@ a `hadoop-client` dependency such as:
 </dependency>
 ```
 
-### Using Hadoop InputFormats
-
-To use Hadoop `InputFormats` with Flink the format must first be wrapped
-using either `readHadoopFile` or `createHadoopInput` of the
-`HadoopInputs` utility class.
-The former is used for input formats derived
-from `FileInputFormat` while the latter has to be used for general purpose
-input formats.
-The resulting `InputFormat` can be used to create a data source by using
-`ExecutionEnvironmen#createInput`.
-
-The resulting `DataSet` contains 2-tuples where the first field
-is the key and the second field is the value retrieved from the Hadoop
-InputFormat.
-
-The following example shows how to use Hadoop's `TextInputFormat`.
-
-{{< tabs "baa59ec9-046e-4fe3-a2db-db5ee09d0635" >}}
-{{< tab "Java" >}}
-
-```java
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-DataSet<Tuple2<LongWritable, Text>> input =
-    env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
-                        LongWritable.class, Text.class, textPath));
-
-// Do something with the data.
-[...]
-```
-
-{{< /tab >}}
-{{< tab "Scala" >}}
-
-```scala
-val env = ExecutionEnvironment.getExecutionEnvironment
-
-val input: DataSet[(LongWritable, Text)] =
-  env.createInput(HadoopInputs.readHadoopFile(
-                    new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
-
-// Do something with the data.
-[...]
-```
-
-{{< /tab >}}
-{{< /tabs >}}
-
-### Using Hadoop OutputFormats
-
-Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class
-that implements `org.apache.hadoop.mapred.OutputFormat` or extends
-`org.apache.hadoop.mapreduce.OutputFormat` is supported.
-The OutputFormat wrapper expects its input data to be a DataSet containing
-2-tuples of key and value. These are to be processed by the Hadoop OutputFormat.
-
-The following example shows how to use Hadoop's `TextOutputFormat`.
-
-{{< tabs "d4af1c52-0e4c-490c-8c35-e3d60b1b52ee" >}}
-{{< tab "Java" >}}
-
-```java
-// Obtain the result we want to emit
-DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
-
-// Set up the Hadoop TextOutputFormat.
-HadoopOutputFormat<Text, IntWritable> hadoopOF =
-  // create the Flink wrapper.
-  new HadoopOutputFormat<Text, IntWritable>(
-    // set the Hadoop OutputFormat and specify the job.
-    new TextOutputFormat<Text, IntWritable>(), job
-  );
-hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
-TextOutputFormat.setOutputPath(job, new Path(outputPath));
-
-// Emit data using the Hadoop TextOutputFormat.
-hadoopResult.output(hadoopOF);
-```
-
-{{< /tab >}}
-{{< tab "Scala" >}}
-
-```scala
-// Obtain your result to emit.
-val hadoopResult: DataSet[(Text, IntWritable)] = [...]
-
-val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
-  new TextOutputFormat[Text, IntWritable],
-  new JobConf)
-
-hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
-FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
-
-hadoopResult.output(hadoopOF)
-
-
-```
-
-{{< /tab >}}
-{{< /tabs >}}
-
-### Using Hadoop Mappers and Reducers
+## Using Hadoop Mappers and Reducers
 
 Hadoop Mappers are semantically equivalent to Flink's [FlatMapFunctions](dataset_transformations.html#flatmap) and Hadoop Reducers are equivalent to Flink's [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset). Flink provides wrappers for implementations of Hadoop MapReduce's `Mapper` and `Reducer` interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoop's mapr [...]
 
@@ -211,7 +110,7 @@ DataSet<Tuple2<Text, LongWritable>> result = text
 
 **Please note:** The Reducer wrapper works on groups as defined by Flink's [groupBy()](dataset_transformations.html#transformations-on-grouped-dataset) operation. It does not consider any custom partitioners, sort or grouping comparators you might have set in the `JobConf`.
 
-### Complete Hadoop WordCount Example
+## Complete Hadoop WordCount Example
 
 The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.
 

[flink] 05/06: [FLINK-21407] bump microsoft-hadoop-azure version

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7b9044840a279f6759a50ddc07c3fec2c1b1c16d
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Nov 19 11:23:27 2021 +0100

    [FLINK-21407] bump microsoft-hadoop-azure version
---
 .../content/docs/connectors/datastream/formats/azure_table_storage.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/content/docs/connectors/datastream/formats/azure_table_storage.md b/docs/content/docs/connectors/datastream/formats/azure_table_storage.md
index 6d82356..d8d1359 100644
--- a/docs/content/docs/connectors/datastream/formats/azure_table_storage.md
+++ b/docs/content/docs/connectors/datastream/formats/azure_table_storage.md
@@ -55,7 +55,7 @@ curl https://flink.apache.org/q/quickstart.sh | bash
 <dependency>
     <groupId>com.microsoft.hadoop</groupId>
     <artifactId>microsoft-hadoop-azure</artifactId>
-    <version>0.0.4</version>
+    <version>0.0.5</version>
 </dependency>
 ```
 
@@ -92,7 +92,7 @@ public class AzureTableExample {
     HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job());
 
     // set the Account URI, something like: https://apacheflink.table.core.windows.net
-    hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(), "TODO");
+    hdIf.getConfiguration().set(azuretableconfiguration.Keys.ACCOUNT_URI.getKey(), "TODO");
     // set the secret storage key here
     hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO");
     // set the table name here