You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/11/29 15:15:15 UTC

[flink] 06/06: [FLINK-21407][doc][formats] Split DataSet connectors page into different formats and create a formats sub-folder like in table api doc

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cbedff73193c90f2324951273baa71cdfd8c23f1
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Nov 19 12:02:05 2021 +0100

    [FLINK-21407][doc][formats] Split DataSet connectors page into different formats and create a formats sub-folder like in table api doc
---
 docs/content/docs/connectors/dataset/_index.md     |  23 +++
 .../docs/connectors/dataset/formats/_index.md      |  23 +++
 .../docs/connectors/dataset/formats/avro.md        |  61 ++++++++
 .../formats/azure_table_storage.md}                |  96 +++----------
 .../docs/connectors/dataset/formats/hadoop.md      | 158 +++++++++++++++++++++
 5 files changed, 283 insertions(+), 78 deletions(-)

diff --git a/docs/content/docs/connectors/dataset/_index.md b/docs/content/docs/connectors/dataset/_index.md
new file mode 100644
index 0000000..3b0d8f6
--- /dev/null
+++ b/docs/content/docs/connectors/dataset/_index.md
@@ -0,0 +1,23 @@
+---
+title: DataSet Connectors
+bookCollapseSection: true
+weight: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/docs/content/docs/connectors/dataset/formats/_index.md b/docs/content/docs/connectors/dataset/formats/_index.md
new file mode 100644
index 0000000..282fc69
--- /dev/null
+++ b/docs/content/docs/connectors/dataset/formats/_index.md
@@ -0,0 +1,23 @@
+---
+title: Formats
+bookCollapseSection: true
+weight: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/docs/content/docs/connectors/dataset/formats/avro.md b/docs/content/docs/connectors/dataset/formats/avro.md
new file mode 100644
index 0000000..7320587
--- /dev/null
+++ b/docs/content/docs/connectors/dataset/formats/avro.md
@@ -0,0 +1,61 @@
+---
+title:  "Avro"
+weight: 4
+type: docs
+aliases:
+- /dev/batch/connectors/formats/avro.html
+
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+# Avro format
+
+Flink has built-in support for [Apache Avro](http://avro.apache.org/). This allows to easily read and write Avro data based on an Avro schema with Flink.
+The serialization framework of Flink is able to handle classes generated from Avro schemas. In order to use the Avro format the following dependencies are required for projects using a build automation tool (such as Maven or SBT).
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-avro</artifactId>
+  <version>{{< version >}}</version>
+</dependency>
+```
+
+In order to read data from an Avro file, you have to specify an `AvroInputFormat`.
+
+**Example**:
+
+```java
+AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+DataSet<User> usersDS = env.createInput(users);
+```
+
+Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:
+
+```java
+usersDS.keyBy("name")
+```
+
+
+Note that using the `GenericData.Record` type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use.
+
+Flink's POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type `Object` you can not use the field as a join or grouping key.
+Specifying a field in Avro like this `{"name": "type_double_test", "type": "double"},` works fine, however specifying it as a UNION-type with only one field (`{"name": "type_double_test", "type": ["double"]},`) will generate a field of type `Object`. Note that specifying nullable types (`{"name": "type_double_test", "type": ["null", "double"]},`) is possible!
diff --git a/docs/content/docs/connectors/dataset.md b/docs/content/docs/connectors/dataset/formats/azure_table_storage.md
similarity index 53%
rename from docs/content/docs/connectors/dataset.md
rename to docs/content/docs/connectors/dataset/formats/azure_table_storage.md
index a4ddca3..4c45bfc 100644
--- a/docs/content/docs/connectors/dataset.md
+++ b/docs/content/docs/connectors/dataset/formats/azure_table_storage.md
@@ -1,9 +1,10 @@
 ---
-title:  "DataSet Connectors"
-weight: 3
+title:  "Microsoft Azure table"
+weight: 4
 type: docs
 aliases:
-  - /dev/batch/connectors.html
+- /dev/batch/connectors/formats/azure_table_storage.html
+
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -24,69 +25,12 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# DataSet Connectors
-
-## Reading from and writing to file systems
-
-The Apache Flink project supports multiple [file systems]({{< ref "docs/deployment/filesystems/overview" >}}) that can be used as backing stores
-for input and output connectors. 
-
-## Connecting to other systems using Input/OutputFormat wrappers for Hadoop
-
-Apache Flink allows users to access many different systems as data sources or sinks.
-The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept
-of so called `InputFormat`s and `OutputFormat`s.
-
-One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a wrapper that allows
-users to use all existing Hadoop input formats with Flink.
-
-This section shows some examples for connecting Flink to other systems.
-[Read more about Hadoop compatibility in Flink]({{< ref "docs/dev/dataset/hadoop_compatibility" >}}).
-
-## Avro support in Flink
-
-Flink has extensive built-in support for [Apache Avro](http://avro.apache.org/). This allows to easily read from Avro files with Flink.
-Also, the serialization framework of Flink is able to handle classes generated from Avro schemas. Be sure to include the Flink Avro dependency to the pom.xml of your project.
-
-```xml
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-avro</artifactId>
-  <version>{{< version >}}</version>
-</dependency>
-```
-
-In order to read data from an Avro file, you have to specify an `AvroInputFormat`.
-
-**Example**:
-
-```java
-AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
-DataSet<User> usersDS = env.createInput(users);
-```
-
-Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:
-
-```java
-usersDS.groupBy("name")
-```
-
-
-Note that using the `GenericData.Record` type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use.
-
-Flink's POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type `Object` you can not use the field as a join or grouping key.
-Specifying a field in Avro like this `{"name": "type_double_test", "type": "double"},` works fine, however specifying it as a UNION-type with only one field (`{"name": "type_double_test", "type": ["double"]},`) will generate a field of type `Object`. Note that specifying nullable types (`{"name": "type_double_test", "type": ["null", "double"]},`) is possible!
-
-
-
-### Access Microsoft Azure Table Storage
-
-_Note: This example works starting from Flink 0.6-incubating_
+# Microsoft Azure Table Storage format
 
 This example is using the `HadoopInputFormat` wrapper to use an existing Hadoop input format implementation for accessing [Azure's Table Storage](https://azure.microsoft.com/en-us/documentation/articles/storage-introduction/).
 
 1. Download and compile the `azure-tables-hadoop` project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves.
-Execute the following commands:
+   Execute the following commands:
 
 ```bash
 git clone https://github.com/mooso/azure-tables-hadoop.git
@@ -104,29 +48,29 @@ curl https://flink.apache.org/q/quickstart.sh | bash
 
 ```xml
 <dependency>
-   <groupId>org.apache.flink</groupId>
-   <artifactId>flink-hadoop-compatibility{{< scala_version >}}</artifactId>
-   <version>{{< version >}}</version>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-hadoop-compatibility{{< scala_version >}}</artifactId>
+    <version>{{< version >}}</version>
 </dependency>
 <dependency>
- <groupId>com.microsoft.hadoop</groupId>
- <artifactId>microsoft-hadoop-azure</artifactId>
- <version>0.0.4</version>
+    <groupId>com.microsoft.hadoop</groupId>
+    <artifactId>microsoft-hadoop-azure</artifactId>
+    <version>0.0.5</version>
 </dependency>
 ```
 
 `flink-hadoop-compatibility` is a Flink package that provides the Hadoop input format wrappers.
 `microsoft-hadoop-azure` is adding the project we've build before to our project.
 
-The project is now prepared for starting to code. We recommend to import the project into an IDE, such as Eclipse or IntelliJ. (Import as a Maven project!).
-Browse to the code of the `Job.java` file. Its an empty skeleton for a Flink job.
+The project is now ready for starting to code. We recommend to import the project into an IDE, such as IntelliJ. You should import it as a Maven project.
+Browse to the file `Job.java`. This is an empty skeleton for a Flink job.
 
-Paste the following code into it:
+Paste the following code:
 
 ```java
 import java.util.Map;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.DataStream;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
@@ -147,7 +91,7 @@ public class AzureTableExample {
     HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job());
 
     // set the Account URI, something like: https://apacheflink.table.core.windows.net
-    hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(), "TODO");
+    hdIf.getConfiguration().set(azuretableconfiguration.Keys.ACCOUNT_URI.getKey(), "TODO");
     // set the secret storage key here
     hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO");
     // set the table name here
@@ -178,10 +122,6 @@ public class AzureTableExample {
 }
 ```
 
-The example shows how to access an Azure table and turn data into Flink's `DataSet` (more specifically, the type of the set is `DataSet<Tuple2<Text, WritableEntity>>`). With the `DataSet`, you can apply all known transformations to the DataSet.
-
-## Access MongoDB
-
-This [GitHub repository documents how to use MongoDB with Apache Flink (starting from 0.7-incubating)](https://github.com/okkam-it/flink-mongodb-test).
+The example shows how to access an Azure table and turn data into Flink's `DataSet` (more specifically, the type of the set is `DataSet<Tuple2<Text, WritableEntity>>`). With the `DataSet`, you can apply all known transformations to the DataStream.
 
 {{< top >}}
diff --git a/docs/content/docs/connectors/dataset/formats/hadoop.md b/docs/content/docs/connectors/dataset/formats/hadoop.md
new file mode 100644
index 0000000..64702cb
--- /dev/null
+++ b/docs/content/docs/connectors/dataset/formats/hadoop.md
@@ -0,0 +1,158 @@
+---
+title:  "Hadoop"
+weight: 4
+type: docs
+aliases:
+- /dev/batch/connectors/hadoop.html
+
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Hadoop formats
+
+## Project Configuration
+
+Support for Hadoop is contained in the `flink-hadoop-compatibility`
+Maven module.
+
+Add the following dependency to your `pom.xml` to use hadoop
+
+```xml
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-hadoop-compatibility{{< scala_version >}}</artifactId>
+	<version>{{< version >}}</version>
+</dependency>
+```
+
+If you want to run your Flink application locally (e.g. from your IDE), you also need to add
+a `hadoop-client` dependency such as:
+
+```xml
+<dependency>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-client</artifactId>
+    <version>2.8.3</version>
+    <scope>provided</scope>
+</dependency>
+```
+
+## Using Hadoop InputFormats
+
+To use Hadoop `InputFormats` with Flink the format must first be wrapped
+using either `readHadoopFile` or `createHadoopInput` of the
+`HadoopInputs` utility class.
+The former is used for input formats derived
+from `FileInputFormat` while the latter has to be used for general purpose
+input formats.
+The resulting `InputFormat` can be used to create a data source by using
+`ExecutionEnvironmen#createInput`.
+
+The resulting `DataSet` contains 2-tuples where the first field
+is the key and the second field is the value retrieved from the Hadoop
+InputFormat.
+
+The following example shows how to use Hadoop's `TextInputFormat`.
+
+{{< tabs "baa59ec9-046e-4fe3-a2db-db5ee09d0635" >}}
+{{< tab "Java" >}}
+
+```java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+DataSet<Tuple2<LongWritable, Text>> input =
+    env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
+                        LongWritable.class, Text.class, textPath));
+
+// Do something with the data.
+[...]
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+
+```scala
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val input: DataSet[(LongWritable, Text)] =
+  env.createInput(HadoopInputs.readHadoopFile(
+                    new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
+
+// Do something with the data.
+[...]
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+## Using Hadoop OutputFormats
+
+Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class
+that implements `org.apache.hadoop.mapred.OutputFormat` or extends
+`org.apache.hadoop.mapreduce.OutputFormat` is supported.
+The OutputFormat wrapper expects its input data to be a DataSet containing
+2-tuples of key and value. These are to be processed by the Hadoop OutputFormat.
+
+The following example shows how to use Hadoop's `TextOutputFormat`.
+
+{{< tabs "d4af1c52-0e4c-490c-8c35-e3d60b1b52ee" >}}
+{{< tab "Java" >}}
+
+```java
+// Obtain the result we want to emit
+DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
+
+// Set up the Hadoop TextOutputFormat.
+HadoopOutputFormat<Text, IntWritable> hadoopOF =
+  // create the Flink wrapper.
+  new HadoopOutputFormat<Text, IntWritable>(
+    // set the Hadoop OutputFormat and specify the job.
+    new TextOutputFormat<Text, IntWritable>(), job
+  );
+hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
+TextOutputFormat.setOutputPath(job, new Path(outputPath));
+
+// Emit data using the Hadoop TextOutputFormat.
+hadoopResult.output(hadoopOF);
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+
+```scala
+// Obtain your result to emit.
+val hadoopResult: DataSet[(Text, IntWritable)] = [...]
+
+val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
+  new TextOutputFormat[Text, IntWritable],
+  new JobConf)
+
+hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
+FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
+
+hadoopResult.output(hadoopOF)
+
+
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}