You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2021/11/29 15:15:10 UTC

[flink] 01/06: [FLINK-21407][doc][formats] Add formats to DataStream connectors doc

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dd31d03dbc3f29522ec01328b7a93121137df7d7
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Oct 19 16:09:37 2021 +0200

    [FLINK-21407][doc][formats] Add formats to DataStream connectors doc
---
 .../docs/connectors/datastream/formats/_index.md   |  23 ++++
 .../docs/connectors/datastream/formats/avro.md     |  61 ++++++++++
 .../datastream/formats/azure_table_storage.md      | 128 +++++++++++++++++++++
 .../docs/connectors/datastream/formats/hadoop.md   |  38 ++++++
 .../docs/connectors/datastream/formats/parquet.md  |  67 +++++++++++
 docs/content/docs/connectors/datastream/mongodb.md |  33 ++++++
 .../content/docs/connectors/datastream/overview.md |   6 +-
 7 files changed, 355 insertions(+), 1 deletion(-)

diff --git a/docs/content/docs/connectors/datastream/formats/_index.md b/docs/content/docs/connectors/datastream/formats/_index.md
new file mode 100644
index 0000000..282fc69
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/formats/_index.md
@@ -0,0 +1,23 @@
+---
+title: Formats
+bookCollapseSection: true
+weight: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/docs/content/docs/connectors/datastream/formats/avro.md b/docs/content/docs/connectors/datastream/formats/avro.md
new file mode 100644
index 0000000..1b2ffef
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/formats/avro.md
@@ -0,0 +1,61 @@
+---
+title:  "Avro"
+weight: 4
+type: docs
+aliases:
+- /dev/connectors/formats/avro.html
+- /apis/streaming/connectors/formats/avro.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+# Avro format
+
+Flink has built-in support for [Apache Avro](http://avro.apache.org/). This allows to easily read and write Avro data based on an Avro schema with Flink.
+The serialization framework of Flink is able to handle classes generated from Avro schemas. In order to use the Avro format the following dependencies are required for projects using a build automation tool (such as Maven or SBT).
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-avro</artifactId>
+  <version>{{< version >}}</version>
+</dependency>
+```
+
+In order to read data from an Avro file, you have to specify an `AvroInputFormat`.
+
+**Example**:
+
+```java
+AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+DataStream<User> usersDS = env.createInput(users);
+```
+
+Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:
+
+```java
+usersDS.keyBy("name")
+```
+
+
+Note that using the `GenericData.Record` type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use.
+
+Flink's POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type `Object` you can not use the field as a join or grouping key.
+Specifying a field in Avro like this `{"name": "type_double_test", "type": "double"},` works fine, however specifying it as a UNION-type with only one field (`{"name": "type_double_test", "type": ["double"]},`) will generate a field of type `Object`. Note that specifying nullable types (`{"name": "type_double_test", "type": ["null", "double"]},`) is possible!
diff --git a/docs/content/docs/connectors/datastream/formats/azure_table_storage.md b/docs/content/docs/connectors/datastream/formats/azure_table_storage.md
new file mode 100644
index 0000000..6d82356
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/formats/azure_table_storage.md
@@ -0,0 +1,128 @@
+---
+title:  "Microsoft Azure table"
+weight: 4
+type: docs
+aliases:
+- /dev/connectors/formats/azure_table_storage.html
+- /apis/streaming/connectors/formats/azure_table_storage.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Microsoft Azure Table Storage format
+
+This example is using the `HadoopInputFormat` wrapper to use an existing Hadoop input format implementation for accessing [Azure's Table Storage](https://azure.microsoft.com/en-us/documentation/articles/storage-introduction/).
+
+1. Download and compile the `azure-tables-hadoop` project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves.
+   Execute the following commands:
+
+```bash
+git clone https://github.com/mooso/azure-tables-hadoop.git
+cd azure-tables-hadoop
+mvn clean install
+```
+
+2. Setup a new Flink project using the quickstarts:
+
+```bash
+curl https://flink.apache.org/q/quickstart.sh | bash
+```
+
+3. Add the following dependencies (in the `<dependencies>` section) to your `pom.xml` file:
+
+```xml
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-hadoop-compatibility{{< scala_version >}}</artifactId>
+    <version>{{< version >}}</version>
+</dependency>
+<dependency>
+    <groupId>com.microsoft.hadoop</groupId>
+    <artifactId>microsoft-hadoop-azure</artifactId>
+    <version>0.0.4</version>
+</dependency>
+```
+
+`flink-hadoop-compatibility` is a Flink package that provides the Hadoop input format wrappers.
+`microsoft-hadoop-azure` is adding the project we've build before to our project.
+
+The project is now ready for starting to code. We recommend to import the project into an IDE, such as IntelliJ. You should import it as a Maven project.
+Browse to the file `Job.java`. This is an empty skeleton for a Flink job.
+
+Paste the following code:
+
+```java
+import java.util.Map;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataStream;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import com.microsoft.hadoop.azure.AzureTableConfiguration;
+import com.microsoft.hadoop.azure.AzureTableInputFormat;
+import com.microsoft.hadoop.azure.WritableEntity;
+import com.microsoft.windowsazure.storage.table.EntityProperty;
+
+public class AzureTableExample {
+
+  public static void main(String[] args) throws Exception {
+    // set up the execution environment
+    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+    // create a  AzureTableInputFormat, using a Hadoop input format wrapper
+    HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job());
+
+    // set the Account URI, something like: https://apacheflink.table.core.windows.net
+    hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(), "TODO");
+    // set the secret storage key here
+    hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO");
+    // set the table name here
+    hdIf.getConfiguration().set(AzureTableConfiguration.Keys.TABLE_NAME.getKey(), "TODO");
+
+    DataStream<Tuple2<Text, WritableEntity>> input = env.createInput(hdIf);
+    // a little example how to use the data in a mapper.
+    DataStream<String> fin = input.map(new MapFunction<Tuple2<Text,WritableEntity>, String>() {
+      @Override
+      public String map(Tuple2<Text, WritableEntity> arg0) throws Exception {
+        System.err.println("--------------------------------\nKey = "+arg0.f0);
+        WritableEntity we = arg0.f1;
+
+        for(Map.Entry<String, EntityProperty> prop : we.getProperties().entrySet()) {
+          System.err.println("key="+prop.getKey() + " ; value (asString)="+prop.getValue().getValueAsString());
+        }
+
+        return arg0.f0.toString();
+      }
+    });
+
+    // emit result (this works only locally)
+    fin.print();
+
+    // execute program
+    env.execute("Azure Example");
+  }
+}
+```
+
+The example shows how to access an Azure table and turn data into Flink's `DataStream` (more specifically, the type of the set is `DataStream<Tuple2<Text, WritableEntity>>`). With the `DataStream`, you can apply all known transformations to the DataStream.
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/formats/hadoop.md b/docs/content/docs/connectors/datastream/formats/hadoop.md
new file mode 100644
index 0000000..c6159cc
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/formats/hadoop.md
@@ -0,0 +1,38 @@
+---
+title:  "Hadoop"
+weight: 4
+type: docs
+aliases:
+  - /dev/connectors/formats/hadoop.html
+  - /apis/streaming/connectors/formats/hadoop.html
+
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Hadoop formats
+
+Apache Flink allows users to access many different systems as data sources.
+The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept
+of so called `InputFormat`s
+
+One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a wrapper that allows
+users to use all existing Hadoop input formats with Flink.
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/formats/parquet.md b/docs/content/docs/connectors/datastream/formats/parquet.md
new file mode 100644
index 0000000..fcbe797
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/formats/parquet.md
@@ -0,0 +1,67 @@
+---
+title:  "Parquet"
+weight: 4
+type: docs
+aliases:
+- /dev/connectors/formats/parquet.html
+- /apis/streaming/connectors/formats/parquet.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+# Parquet formats
+
+Flink has built-in support for [Apache Parquet](http://parquet.apache.org/). This allows to read and write Parquet data with Flink. 
+In order to use the Parquet format the following dependencies are required for projects using a build automation tool (such as Maven or SBT).
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-parquet_{{< scala_version >}}</artifactId>
+  <version>{{< version >}}</version>
+</dependency>
+```
+
+In order to read data from a Parquet file, you have to specify one of the implementation of `ParquetInputFormat`. There are several depending on your needs:
+- `ParquetPojoInputFormat<E>` to read POJOs from parquet files
+- `ParquetRowInputFormat` to read Flink `Rows` (column oriented records) from parquet files
+- `ParquetMapInputFormat` to read Map records (Map of nested Flink type objects) from parquet files
+- `ParquetAvroInputFormat` to read Avro Generic Records from parquet files
+
+
+**Example for ParquetRowInputFormat**:
+
+```java
+MessageType parquetSchema = // use parquet libs to provide the parquet schema file and parse it or extract it from the parquet files
+ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new Path(filePath),  parquetSchema);
+// project only needed fields if suited to reduce the amount of data. Use: parquetSchema#selectFields(projectedFieldNames);
+DataStream<Row> input = env.createInput(parquetInputFormat);
+```
+
+**Example for ParquetAvroInputFormat**:
+
+```java
+MessageType parquetSchema = // use parquet libs to provide the parquet schema file and parse it or extract it from the parquet files
+ParquetAvroInputFormat parquetInputFormat = new ParquetAvroInputFormat(new Path(filePath),  parquetSchema);
+// project only needed fields if suited to reduce the amount of data. Use: parquetSchema#selectFields(projectedFieldNames);
+DataStream<GenericRecord> input = env.createInput(parquetInputFormat);
+```
+
+
diff --git a/docs/content/docs/connectors/datastream/mongodb.md b/docs/content/docs/connectors/datastream/mongodb.md
new file mode 100644
index 0000000..eaec8ec
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/mongodb.md
@@ -0,0 +1,33 @@
+---
+title:  "MongoDb"
+weight: 4
+type: docs
+aliases:
+- /dev/connectors/mongodb.html
+- /apis/streaming/connectors/mongodb.html
+
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# MongoDB format
+
+This [GitHub repository documents how to use MongoDB with Apache Flink (starting from 0.7-incubating)](https://github.com/okkam-it/flink-mongodb-test).
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/overview.md b/docs/content/docs/connectors/datastream/overview.md
index ec2b1df..116888c 100644
--- a/docs/content/docs/connectors/datastream/overview.md
+++ b/docs/content/docs/connectors/datastream/overview.md
@@ -42,7 +42,9 @@ Connectors provide code for interfacing with various third-party systems. Curren
  * [Apache Cassandra]({{< ref "docs/connectors/datastream/cassandra" >}}) (sink)
  * [Amazon Kinesis Streams]({{< ref "docs/connectors/datastream/kinesis" >}}) (source/sink)
  * [Elasticsearch]({{< ref "docs/connectors/datastream/elasticsearch" >}}) (sink)
- * [FileSystem (Hadoop included) - Streaming and Batch]({{< ref "docs/connectors/datastream/file_sink" >}}) (sink)
+ * [FileSystem (Hadoop included) - Streaming only sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) (sink)
+ * [FileSystem (Hadoop included) - Streaming and Batch sink]({{< ref "docs/connectors/datastream/file_sink" >}}) (sink)
+ * [FileSystem (Hadoop included) - Batch source] ({{< ref "docs/connectors/datastream/formats" >}}) (source)
  * [RabbitMQ]({{< ref "docs/connectors/datastream/rabbitmq" >}}) (source/sink)
  * [Google PubSub]({{< ref "docs/connectors/datastream/pubsub" >}}) (source/sink)
  * [Hybrid Source]({{< ref "docs/connectors/datastream/hybridsource" >}}) (source)
@@ -57,6 +59,8 @@ Note also that while the streaming connectors listed in this section are part of
 Flink project and are included in source releases, they are not included in the binary distributions. 
 Further instructions can be found in the corresponding subsections.
 
+Filesystem source formats are gradually replaced with new Flink Source API starting with Flink 1.14.0.
+
 ## Connectors in Apache Bahir
 
 Additional streaming connectors for Flink are being released through [Apache Bahir](https://bahir.apache.org/), including: