You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/01/21 09:33:19 UTC

[flink] branch release-1.14 updated: [FLINK-20188][Connectors][Docs][FileSystem] Added documentation for File Source

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 303e306  [FLINK-20188][Connectors][Docs][FileSystem] Added documentation for File Source
303e306 is described below

commit 303e3064f27d8d648f6b745a62ec12707f3c5cf6
Author: martijnvisser <ma...@2symbols.com>
AuthorDate: Thu Jan 20 21:43:16 2022 +0100

    [FLINK-20188][Connectors][Docs][FileSystem] Added documentation for File Source
    
    Co-authored-by: shihong90 <25...@qq.com>
    Co-authored-by: Alexander Fedulov <14...@users.noreply.github.com>
---
 .../docs/connectors/datastream/file_sink.md        | 742 --------------------
 .../docs/connectors/datastream/filesystem.md}      | 375 ++++++++--
 .../docs/connectors/datastream/formats/_index.md   |  23 +
 .../docs/connectors/datastream/formats/avro.md     |  61 ++
 .../datastream/formats/azure_table_storage.md      |   6 +-
 .../docs/connectors/datastream/formats/hadoop.md   | 159 +++++
 .../docs/connectors/datastream/formats/mongodb.md  |  33 +
 .../docs/connectors/datastream/formats/overview.md |  38 +
 .../docs/connectors/datastream/formats/parquet.md  |  14 +-
 .../connectors/datastream/formats/text_files.md    |  14 +-
 .../docs/connectors/datastream/overview.md         |   2 +-
 .../docs/connectors/datastream/streamfile_sink.md  | 741 --------------------
 .../content.zh/docs/connectors/table/filesystem.md | 342 +++++----
 docs/content.zh/docs/deployment/filesystems/s3.md  |   4 +-
 .../docs/dev/datastream/execution_mode.md          |   2 +-
 .../datastream/{file_sink.md => filesystem.md}     | 258 ++++++-
 .../datastream/formats/azure_table_storage.md      |   4 +-
 .../docs/connectors/datastream/formats/overview.md |  38 +
 .../docs/connectors/datastream/formats/parquet.md  |  14 +-
 .../connectors/datastream/formats/text_files.md    |   6 +-
 .../content/docs/connectors/datastream/overview.md |   4 +-
 .../docs/connectors/datastream/streamfile_sink.md  | 776 ---------------------
 docs/content/docs/connectors/table/filesystem.md   |   4 +-
 docs/content/docs/deployment/filesystems/s3.md     |   7 +-
 docs/content/docs/dev/datastream/execution_mode.md |   4 +-
 25 files changed, 1160 insertions(+), 2511 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/file_sink.md b/docs/content.zh/docs/connectors/datastream/file_sink.md
deleted file mode 100644
index 1e454ba..0000000
--- a/docs/content.zh/docs/connectors/datastream/file_sink.md
+++ /dev/null
@@ -1,742 +0,0 @@
----
-title: File Sink
-weight: 6
-type: docs
-aliases:
-  - /zh/dev/connectors/file_sink.html
-  - /zh/apis/streaming/connectors/filesystem_sink.html
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-# File Sink
-
-这个连接器提供了一个在流和批模式下统一的 Sink 来将分区文件写入到支持 [Flink `FileSystem`]({{< ref "docs/deployment/filesystems/overview" >}}) 接口的文件系统中,它对于流和批模式可以提供相同的一致性语义保证。File Sink 是现有的 [Streaming File Sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) 的一个升级版本,后者仅在流模式下提供了精确一致性。
-
-File Sink 会将数据写入到桶中。由于输入流可能是无界的,因此每个桶中的数据被划分为多个有限大小的文件。如何分桶是可以配置的,默认使用基于时间的分桶策略,这种策略每个小时创建一个新的桶,桶中包含的文件将记录所有该小时内从流中接收到的数据。
-
-桶目录中的实际输出数据会被划分为多个部分文件(part file),每一个接收桶数据的 Sink Subtask ,至少包含一个部分文件(part file)。额外的部分文件(part file)将根据滚动策略创建,滚动策略是可以配置的。对于行编码格式(参考 [File Formats](#file-formats) )默认的策略是根据文件大小和超时时间来滚动文件。超时时间指打开文件的最长持续时间,以及文件关闭前的最长非活动时间。批量编码格式必须在每次 Checkpoint 时滚动文件,但是用户也可以指定额外的基于文件大小和超时时间的策略。
-
-{{< hint info >}}
-<b>重要:</b> 在流模式下使用 FileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 'in-progress' 或 'pending' 状态,下游系统无法安全地读取。
-{{< /hint >}}
-
-
-{{< img src="/fig/streamfilesink_bucketing.png" >}}
-
-## 文件格式
-
- `FileSink` 支持行编码格式和批量编码格式,比如 [Apache Parquet](http://parquet.apache.org) 。
-这两种变体随附了各自的构建器,可以使用以下静态方法创建:
-
- - Row-encoded sink: `FileSink.forRowFormat(basePath, rowEncoder)`
- - Bulk-encoded sink: `FileSink.forBulkFormat(basePath, bulkWriterFactory)`
-
-创建行或批量编码的 Sink 时,我们需要指定存储桶的基本路径和数据的编码逻辑。
-
-更多配置操作以及不同数据格式的实现请参考 `FileSink` 。
-
-### 行编码格式
-
-行编码格式需要指定一个 `Encoder` 。Encoder 负责为每个处于 In-progress 状态文件的`OutputStream` 序列化数据。
-
-`除了桶分配器之外,RowFormatBuilder` 还允许用户指定:
-
- - Custom `RollingPolicy`:自定义滚动策略以覆盖默认的 DefaultRollingPolicy。
- - bucketCheckInterval (默认为1分钟):毫秒间隔,用于基于时间的滚动策略。
-
-字符串元素写入示例:
-
-
-{{< tabs "946da1d5-b046-404e-ab80-a5a5d251d8ee" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.api.common.serialization.SimpleStringEncoder;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.connector.file.sink.FileSink;
-import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
-
-DataStream<String> input = ...;
-
-final FileSink<String> sink = FileSink
-    .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
-    .withRollingPolicy(
-        DefaultRollingPolicy.builder()
-            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
-            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
-            .withMaxPartSize(1024 * 1024 * 1024)
-            .build())
-	.build();
-
-input.sinkTo(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.api.common.serialization.SimpleStringEncoder
-import org.apache.flink.core.fs.Path
-import org.apache.flink.connector.file.sink.FileSink
-import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
-
-val input: DataStream[String] = ...
-
-val sink: FileSink[String] = FileSink
-    .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
-    .withRollingPolicy(
-        DefaultRollingPolicy.builder()
-            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
-            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
-            .withMaxPartSize(1024 * 1024 * 1024)
-            .build())
-    .build()
-
-input.sinkTo(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-这个例子创建了一个简单的 Sink ,将记录分配给默认的一小时时间桶。它还指定了一个滚动策略,该策略在以下三种情况下滚动处于 In-progress 状态的部分文件(part file):
-
- - 它至少包含 15 分钟的数据
- - 最近 5 分钟没有收到新的记录
- - 文件大小达到 1GB (写入最后一条记录后)
-
-### 批量编码格式
-
-批量编码 Sink 的创建与行编码 Sink 相似,不过在这里我们不是指定编码器  `Encoder` 而是指定 `BulkWriter.Factory` 。
-`BulkWriter` 定义了如何添加、刷新元素,以及如何批量编码。
-
-Flink 有四个内置的 BulkWriter Factory :
-
- - `ParquetWriterFactory`
- - `AvroWriterFactory`
- - `SequenceFileWriterFactory`
- - `CompressWriterFactory`
- - `OrcBulkWriterFactory`
-
-{{< hint info >}}
-<b>重要:</b> 批量编码模式仅支持 OnCheckpointRollingPolicy 策略, 在每次 checkpoint 的时候滚动文件。
-<b>重要:</b> 批量编码模式必须使用继承自 CheckpointRollingPolicy 的滚动策略, 这些策略必须在每次 checkpoint 的时候滚动文件,但是用户也可以进一步指定额外的基于文件大小和超时时间的策略。
-{{< /hint >}}
-
-#### Parquet 格式
-
-Flink 包含为不同 Avro 类型,创建 ParquetWriterFactory 的便捷方法,更多信息请参考 `ParquetAvroWriters` 。
-
-要编写其他 Parquet 兼容的数据格式,用户需要创建 ParquetWriterFactory 并实现 `ParquetBuilder` 接口。
-
-在应用中使用 Parquet 批量编码器,你需要添加以下依赖:
-
-{{< artifact flink-parquet withScalaVersion >}}
-
-这个例子使用 FileSink 将 Avro 数据写入 Parquet 格式:
-
-{{< tabs "825da2a2-4bdf-4f2d-9138-2e99a72bb9d4" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.connector.file.sink.FileSink;
-import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
-import org.apache.avro.Schema;
-
-
-Schema schema = ...;
-DataStream<GenericRecord> input = ...;
-
-final FileSink<GenericRecord> sink = FileSink
-	.forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema))
-	.build();
-
-input.sinkTo(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.connector.file.sink.FileSink;
-import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
-import org.apache.avro.Schema
-
-val schema: Schema = ...
-val input: DataStream[GenericRecord] = ...
-
-val sink: FileSink[GenericRecord] = FileSink
-    .forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema))
-    .build()
-
-input.sinkTo(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-类似的,将 Protobuf 数据写入到 Parquet 格式可以通过:
-
-{{< tabs "7f22c88d-e7dd-4299-aa23-02afc61a6319" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.connector.file.sink.FileSink;
-import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;
-
-// ProtoRecord is a generated protobuf Message class.
-DataStream<ProtoRecord> input = ...;
-
-final FileSink<ProtoRecord> sink = FileSink
-	.forBulkFormat(outputBasePath, ParquetProtoWriters.forType(ProtoRecord.class))
-	.build();
-
-input.sinkTo(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.connector.file.sink.FileSink;
-import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters
-
-// ProtoRecord is a generated protobuf Message class.
-val input: DataStream[ProtoRecord] = ...
-
-val sink: FileSink[ProtoRecord] = FileSink
-    .forBulkFormat(outputBasePath, ParquetProtoWriters.forType(classOf[ProtoRecord]))
-    .build()
-
-input.sinkTo(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-#### Avro格式
-
-Flink 也提供了将数据写入 Avro 文件的内置支持。对于创建 AvroWriterFactory 的快捷方法,更多信息可以参考 
-`AvroWriters`.
-
-使用Avro相关的Writer需要在项目中添加以下依赖:
-
-{{< artifact flink-avro >}}
-
-将数据写入 Avro 文件的 FileSink 算子可以通过如下方式创建:
-
-{{< tabs "237658d5-98c7-43c7-9844-268df7ba7afc" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.connector.file.sink.FileSink;
-import org.apache.flink.formats.avro.AvroWriters;
-import org.apache.avro.Schema;
-
-
-Schema schema = ...;
-DataStream<GenericRecord> input = ...;
-
-final FileSink<GenericRecord> sink = FileSink
-	.forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
-	.build();
-
-input.sinkTo(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.connector.file.sink.FileSink;
-import org.apache.flink.formats.avro.AvroWriters
-import org.apache.avro.Schema
-
-val schema: Schema = ...
-val input: DataStream[GenericRecord] = ...
-
-val sink: FileSink[GenericRecord] = FileSink
-    .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
-    .build()
-
-input.sinkTo(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-如果想要创建自定义的 Avro Writer,例如启用压缩等,用户可以实现 `AvroBuilder`
-接口并自行创建一个 `AvroWriterFactory` 实例:
-
-{{< tabs "732055fb-ae45-49dd-81a1-8aadbb9e3d65" >}}
-{{< tab "Java" >}}
-```java
-AvroWriterFactory<?> factory = new AvroWriterFactory<>((AvroBuilder<Address>) out -> {
-	Schema schema = ReflectData.get().getSchema(Address.class);
-	DatumWriter<Address> datumWriter = new ReflectDatumWriter<>(schema);
-
-	DataFileWriter<Address> dataFileWriter = new DataFileWriter<>(datumWriter);
-	dataFileWriter.setCodec(CodecFactory.snappyCodec());
-	dataFileWriter.create(schema, out);
-	return dataFileWriter;
-});
-
-DataStream<Address> stream = ...
-stream.sinkTo(FileSink.forBulkFormat(
-	outputBasePath,
-	factory).build());
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val factory = new AvroWriterFactory[Address](new AvroBuilder[Address]() {
-    override def createWriter(out: OutputStream): DataFileWriter[Address] = {
-        val schema = ReflectData.get.getSchema(classOf[Address])
-        val datumWriter = new ReflectDatumWriter[Address](schema)
-
-        val dataFileWriter = new DataFileWriter[Address](datumWriter)
-        dataFileWriter.setCodec(CodecFactory.snappyCodec)
-        dataFileWriter.create(schema, out)
-        dataFileWriter
-    }
-})
-
-val stream: DataStream[Address] = ...
-stream.sinkTo(FileSink.forBulkFormat(
-    outputBasePath,
-    factory).build());
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-#### ORC Format
-
-为了使用基于批量编码的 ORC 格式,Flink提供了 `OrcBulkWriterFactory` ,它需要用户提供一个 `Vectorizer` 的具体实现。
-
-和其它基于列式存储的批量编码格式类似,Flink中的 `OrcBulkWriter` 将数据按批写出。它通过 ORC 的 VectorizedRowBatch 来实现这一点。
-
-由于输入数据必须先缓存为一个完整的 `VectorizedRowBatch` ,用户需要继承 `Vectorizer` 抽像类并且实现其中的 `vectorize(T element, VectorizedRowBatch batch)` 方法。方法参数中传入的 `VectorizedRowBatch` 使用户只需将输入 `element` 转化为 `ColumnVectors` 并将它存储到所提供的 `VectorizedRowBatch` 实例中。
-
-例如,如果输入元素的类型是 `Person` 并且它的定义如下:
-
-{{< tabs "8d9329a5-d67d-4c17-b940-03616c0bd5d6" >}}
-{{< tab "Java" >}}
-```java
-
-class Person {
-    private final String name;
-    private final int age;
-    ...
-}
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-那么用户可以采用如下方式在子类中将 `Person` 对象转化为 `VectorizedRowBatch` :
-
-{{< tabs "2462164c-3dfc-414c-8bab-e2e8256266d9" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-
-public class PersonVectorizer extends Vectorizer<Person> implements Serializable {
-	public PersonVectorizer(String schema) {
-		super(schema);
-	}
-	@Override
-	public void vectorize(Person element, VectorizedRowBatch batch) throws IOException {
-		BytesColumnVector nameColVector = (BytesColumnVector) batch.cols[0];
-		LongColumnVector ageColVector = (LongColumnVector) batch.cols[1];
-		int row = batch.size++;
-		nameColVector.setVal(row, element.getName().getBytes(StandardCharsets.UTF_8));
-		ageColVector.vector[row] = element.getAge();
-	}
-}
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import java.nio.charset.StandardCharsets
-import org.apache.hadoop.hive.ql.exec.vector.{BytesColumnVector, LongColumnVector}
-
-class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {
-
-  override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
-    val nameColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
-    val ageColVector = batch.cols(1).asInstanceOf[LongColumnVector]
-    nameColVector.setVal(batch.size + 1, element.getName.getBytes(StandardCharsets.UTF_8))
-    ageColVector.vector(batch.size + 1) = element.getAge
-  }
-
-}
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-为了在应用中使用 ORC 批量编码,用户需要添加如下依赖:
-
-{{< artifact flink-orc withScalaVersion >}}
-
-然后使用 ORC 格式的 `FileSink` 可以通过如下方式创建:
-
-{{< tabs "4bc2aa30-6ea9-461f-aa24-8c36856edfcb" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.connector.file.sink.FileSink;
-import org.apache.flink.orc.writer.OrcBulkWriterFactory;
-
-String schema = "struct<_col0:string,_col1:int>";
-DataStream<Person> input = ...;
-
-final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(new PersonVectorizer(schema));
-
-final FileSink<Person> sink = FileSink
-	.forBulkFormat(outputBasePath, writerFactory)
-	.build();
-
-input.sinkTo(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.connector.file.sink.FileSink;
-import org.apache.flink.orc.writer.OrcBulkWriterFactory
-
-val schema: String = "struct<_col0:string,_col1:int>"
-val input: DataStream[Person] = ...
-val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema));
-
-val sink: FileSink[Person] = FileSink
-    .forBulkFormat(outputBasePath, writerFactory)
-    .build()
-
-input.sinkTo(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-用户还可以通过 Hadoop `Configuration` 和 `Properties` 来设置 OrcBulkWriterFactory 中涉及的 Hadoop 属性和 ORC Writer 属性:
-
-{{< tabs "79765e6f-43bf-47ac-801c-2f7da9ac4f87" >}}
-{{< tab "Java" >}}
-```java
-String schema = ...;
-Configuration conf = ...;
-Properties writerProperties = new Properties();
-
-writerProperties.setProperty("orc.compress", "LZ4");
-// 其它 ORC 支持的属性也可以类似设置。
-
-final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(
-    new PersonVectorizer(schema), writerProperties, conf);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val schema: String = ...
-val conf: Configuration = ...
-val writerProperties: Properties = new Properties()
-
-writerProperties.setProperty("orc.compress", "LZ4")
-// 其它 ORC 支持的属性也可以类似设置。
-
-val writerFactory = new OrcBulkWriterFactory(
-    new PersonVectorizer(schema), writerProperties, conf)
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-完整的 ORC Writer 的属性可以参考 [相关文档](https://orc.apache.org/docs/hive-config.html).
-
-给 ORC 文件添加自定义元数据可以通过在实现的 `vectorize(...)` 方法中调用 `addUserMetadata(...)` 实现:
-
-{{< tabs "df5c8b4f-9db0-41b0-89e7-a74a3b473b35" >}}
-{{< tab "Java" >}}
-```java
-
-public class PersonVectorizer extends Vectorizer<Person> implements Serializable {
-	@Override
-	public void vectorize(Person element, VectorizedRowBatch batch) throws IOException {
-		...
-		String metadataKey = ...;
-		ByteBuffer metadataValue = ...;
-		this.addUserMetadata(metadataKey, metadataValue);
-	}
-}
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-
-class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {
-
-  override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
-    ...
-    val metadataKey: String = ...
-    val metadataValue: ByteBuffer = ...
-    addUserMetadata(metadataKey, metadataValue)
-  }
-
-}
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-#### Hadoop SequenceFile 格式
-
-在应用中使用 `SequenceFile` 批量编码器,你需要添加以下依赖:
-
-{{< artifact flink-sequence-file >}}
-
-简单的 `SequenceFile` 写入示例:
-
-{{< tabs "addcc4bc-bd9c-473a-9d5a-d9d9b3efd7d2" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.connector.file.sink.FileSink;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-
-
-DataStream<Tuple2<LongWritable, Text>> input = ...;
-Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
-final FileSink<Tuple2<LongWritable, Text>> sink = FileSink
-  .forBulkFormat(
-    outputBasePath,
-    new SequenceFileWriterFactory<>(hadoopConf, LongWritable.class, Text.class))
-	.build();
-
-input.sinkTo(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.connector.file.sink.FileSink;
-import org.apache.flink.configuration.GlobalConfiguration
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.SequenceFile
-import org.apache.hadoop.io.Text;
-
-val input: DataStream[(LongWritable, Text)] = ...
-val hadoopConf: Configuration = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration())
-val sink: FileSink[(LongWritable, Text)] = FileSink
-  .forBulkFormat(
-    outputBasePath,
-    new SequenceFileWriterFactory(hadoopConf, LongWritable.class, Text.class))
-	.build()
-
-input.sinkTo(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-`SequenceFileWriterFactory` 支持附加构造函数参数指定压缩设置。
-
-## 桶分配
-
-桶分配逻辑定义了如何将数据结构化为基本输出目录中的子目录
-
-行格式和批量格式都使用 `DateTimeBucketAssigner` 作为默认的分配器。
-默认情况下,DateTimeBucketAssigner 基于系统默认时区每小时创建一个桶,格式如下: `yyyy-MM-dd--HH` 。日期格式(即桶的大小)和时区都可以手动配置。
-
-我们可以在格式构建器上调用 `.withBucketAssigner(assigner)` 来自定义 `BucketAssigner` 。
-
-Flink 有两个内置的 BucketAssigners :
-
- - `DateTimeBucketAssigner:默认基于时间的分配器`
- - `BasePathBucketAssigner` :将所有部分文件(part file)存储在基本路径中的分配器(单个全局桶)
-
-## 滚动策略
-
-在流模式下,滚动策略 `RollingPolicy` 定义了指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。在批模式下,临时文件只会在作业处理完所有输入数据后才会变成 Finished 状态,此时滚动策略可以用来控制每个文件的大小。
-
-Flink 有两个内置的滚动策略:
-
- - `DefaultRollingPolicy`
- - `OnCheckpointRollingPolicy`
-
-## 部分文件(part file) 生命周期
-
-为了在下游系统中使用 FileSink 的输出,我们需要了解输出文件的命名规则和生命周期。
-
-部分文件(part file)可以处于以下三种状态之一:
- 1. **In-progress** :当前文件正在写入中。
- 2. **Pending** :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态。
- 3. **Finished** :在成功的 Checkpoint 后(流模式)或作业处理完所有输入数据后(批模式),Pending 状态将变为 Finished 状态。
-
-处于 Finished 状态的文件不会再被修改,可以被下游系统安全地读取。
-
-{{< hint info >}}
-<b>重要:</b> 部分文件的索引在每个 subtask 内部是严格递增的(按文件创建顺序)。但是索引并不总是连续的。当 Job 重启后,所有部分文件的索引从 `max part index + 1` 开始,
-这里的 `max part index` 是所有 subtask 中索引的最大值。
-{{< /hint >}}
-
-对于每个活动的桶,Writer 在任何时候都只有一个处于 In-progress 状态的部分文件(part file),但是可能有几个 Penging 和 Finished 状态的部分文件(part file)。
-
-**部分文件(part file)例子**
-
-为了更好地理解这些文件的生命周期,让我们来看一个包含 2 个 Sink Subtask 的简单例子:
-
-```
-└── 2019-08-25--12
-    ├── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
-    └── part-81fc4980-a6af-41c8-9937-9939408a734b-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
-```
-
-当部分文件 `part-81fc4980-a6af-41c8-9937-9939408a734b-0` 被滚动(假设它变得太大了)时,它将成为 Pending 状态,但是它还没有被重命名。然后 Sink 会创建一个新的部分文件: `part-81fc4980-a6af-41c8-9937-9939408a734b-1`:
-
-```
-└── 2019-08-25--12
-    ├── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
-    ├── part-81fc4980-a6af-41c8-9937-9939408a734b-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
-    └── part-81fc4980-a6af-41c8-9937-9939408a734b-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
-```
-
- `part-81fc4980-a6af-41c8-9937-9939408a734b-0` 现在处于 Pending 状态等待完成,在下一次成功的 Checkpoint 后,它会变成 Finished 状态:
-
-```
-└── 2019-08-25--12
-    ├── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
-    ├── part-81fc4980-a6af-41c8-9937-9939408a734b-0
-    └── part-81fc4980-a6af-41c8-9937-9939408a734b-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
-```
-
-根据分桶策略创建新的桶,但是这并不会影响当前处于 In-progress 状态的文件:
-
-```
-└── 2019-08-25--12
-    ├── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
-    ├── part-81fc4980-a6af-41c8-9937-9939408a734b-0
-    └── part-81fc4980-a6af-41c8-9937-9939408a734b-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
-└── 2019-08-25--13
-    └── part-4005733d-a830-4323-8291-8866de98b582-0.inprogress.2b475fec-1482-4dea-9946-eb4353b475f1
-```
-
-因为分桶策略基于每条记录进行评估,所以旧桶仍然可以接受新的记录。
-
-### 部分文件的配置项
-
-已经完成的文件和进行中的文件仅能通过文件名格式进行区分。
-
-默认情况下,文件命名格式如下所示:
- - **In-progress / Pending:** `part-<uid>-<partFileIndex>.inprogress.uid`
- - **FINISHED:** `part-<uid>-<partFileIndex>`
- 
-其中 uid 是在 Sink 的各个 task 在启动时随机生成的 id,这些 id 是不支持容错的,在 task 重启后 id 会重新生成。
-
-Flink 允许用户通过 `OutputFileConfig` 指定部分文件名的前缀和后缀。
-举例来说,前缀设置为 "prefix" 以及后缀设置为 ".ext" 之后,Sink 创建的文件名如下所示:
-
-```
-└── 2019-08-25--12
-    ├── prefix-4005733d-a830-4323-8291-8866de98b582-0.ext
-    ├── prefix-4005733d-a830-4323-8291-8866de98b582-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
-    ├── prefix-81fc4980-a6af-41c8-9937-9939408a734b-0.ext
-    └── prefix-81fc4980-a6af-41c8-9937-9939408a734b-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
-```
-
-用户可以通过如下方式设置 `OutputFileConfig`:
-
-{{< tabs "074e85ae-45fa-4280-a017-1c836d7b583e" >}}
-{{< tab "Java" >}}
-```java
-
-OutputFileConfig config = OutputFileConfig
- .builder()
- .withPartPrefix("prefix")
- .withPartSuffix(".ext")
- .build();
-            
-FileSink<Tuple2<Integer, Integer>> sink = FileSink
- .forRowFormat((new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))
- .withBucketAssigner(new KeyBucketAssigner())
- .withRollingPolicy(OnCheckpointRollingPolicy.build())
- .withOutputFileConfig(config)
- .build();
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-
-val config = OutputFileConfig
- .builder()
- .withPartPrefix("prefix")
- .withPartSuffix(".ext")
- .build()
-            
-val sink = FileSink
- .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
- .withBucketAssigner(new KeyBucketAssigner())
- .withRollingPolicy(OnCheckpointRollingPolicy.build())
- .withOutputFileConfig(config)
- .build()
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-## 重要注意事项
-
-### 通用注意事项
-
-<span class="label label-danger">重要提示 1</span>: 使用 Hadoop < 2.7 时,请使用 `OnCheckpointRollingPolicy` 滚动策略,该策略会在每次检查点时进行文件滚动。
-这样做的原因是如果部分文件的生命周期跨多个检查点,当 `FileSink` 从之前的检查点进行恢复时会调用文件系统的 `truncate()` 方法清理 in-progress 文件中未提交的数据。
-Hadoop 2.7 之前的版本不支持这个方法,因此 Flink 会报异常。
-
-<span class="label label-danger">重要提示 2</span>: 鉴于 Flink 的 sink 以及 UDF 通常不会区分作业的正常结束(比如有限流)和异常终止,因此正常结束作业的最后一批 in-progress 文件不会被转换到 "完成" 状态。
-
-<span class="label label-danger">重要提示 3</span>: Flink 以及 `FileSink` 不会覆盖已经提交的数据。因此如果尝试从一个包含 in-progress 文件的旧 checkpoint/savepoint 恢复,
-且这些 in-progress 文件会被接下来的成功 checkpoint 提交,Flink 会因为无法找到 in-progress 文件而抛异常,从而恢复失败。
-
-<span class="label label-danger">重要提示 4</span>: 目前 `FileSink` 只支持三种文件系统: HDFS、S3和Local。如果配置了不支持的文件系统,在执行的时候 Flink 会抛出异常。
-
-### Batch 模式
-
-<span class="label label-danger">重要提示 1</span>: 尽管负责写出数据的 Writer 会使用用户提定的并发,负责提交文件的 Committer 将固定并发度为1。
-
-<span class="label label-danger">Important Note 2</span>: 批模式下只有在所有输入都被处理后 Pending 文件才会被提交,即转为 Finished 状态。
-
-<span class="label label-danger">Important Note 3</span>: 在高可用模式下,如果在 Committer 提交文件时发生了 JobManager 重启,已提交的数据可能会被重复产生。这一问题将在后续版本中修复。
-
-###  S3 特有的注意事项
-
-<span class="label label-danger">重要提示 1</span>: 对于 S3,`FileSink`  只支持基于 [Hadoop](https://hadoop.apache.org/) 
-的文件系统实现,不支持基于 [Presto](https://prestodb.io/) 的实现。如果想使用 `FileSink` 向 S3 写入数据并且将 
-checkpoint 放在基于 Presto 的文件系统,建议明确指定 *"s3a://"* (for Hadoop)作为sink的目标路径方案,并且为 checkpoint 路径明确指定 *"s3p://"* (for Presto)。
-如果 Sink 和 checkpoint 都使用 *"s3://"* 路径的话,可能会导致不可预知的行为,因为双方的实现都在“监听”这个路径。
-
-<span class="label label-danger">重要提示 2</span>: `FileSink` 使用 S3 的 [Multi-part Upload](https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html)
-(后续使用MPU代替)特性可以保证精确一次的语义。这个特性支持以独立的块(因此被称为"multi-part")模式上传文件,当 MPU 的所有部分文件
-成功上传之后,可以合并成原始文件。对于失效的 MPUs,S3 提供了一个基于桶生命周期的规则,用户可以用这个规则来丢弃在指定时间内未完成的MPU。
-如果在一些部分文件还未上传时触发 savepoint,并且这个规则设置的比较严格,这意味着相关的 MPU在作业重启之前可能会超时。后续的部分文件没
-有写入到 savepoint, 那么在 Flink 作业从 savepoint 恢复时,会因为拿不到缺失的部分文件,导致任务失败并抛出异常。
-
-{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/file_sink.md b/docs/content.zh/docs/connectors/datastream/filesystem.md
similarity index 67%
copy from docs/content/docs/connectors/datastream/file_sink.md
copy to docs/content.zh/docs/connectors/datastream/filesystem.md
index 4930ba8..6dc5332 100644
--- a/docs/content/docs/connectors/datastream/file_sink.md
+++ b/docs/content.zh/docs/connectors/datastream/filesystem.md
@@ -1,10 +1,13 @@
 ---
-title: File Sink
+title: FileSystem
 weight: 6
 type: docs
 aliases:
-  - /dev/connectors/file_sink.html
-  - /apis/streaming/connectors/filesystem_sink.html
+  - /zh/dev/connectors/file_sink.html
+  - /zh/apis/streaming/connectors/filesystem_sink.html
+  - /zh/docs/connectors/datastream/streamfile_sink/
+  - /zh/docs/connectors/datastream/file_sink/
+
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -25,12 +28,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` that reads or writes (partitioned) files to file systems
 supported by the [Flink `FileSystem` abstraction]({{< ref "docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it is an evolution of the 
-existing [Streaming File Sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) which was designed for providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is designed to provide exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any (distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) (e.g., Avro, CSV, Parquet),
+and produces a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref "docs/dev/datastream/sources" >}}#the-data-source-api),
+a unified data source that reads files - both in batch and in streaming mode.
+It is divided into the following two parts: `SplitEnumerator` and `SourceReader`.
+
+* `SplitEnumerator` is responsible for discovering and identifying the files to read and assigns them to the `SourceReader`.
+* `SourceReader` requests the files it needs to process and reads the file from the filesystem.
+
+You will need to combine the File Source with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}), which allows you to
+parse CSV, decode AVRO, or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator - a recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for periodic file discovery.
+In this case, the `SplitEnumerator` will enumerate like the bounded case but, after a certain interval, repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously detected files and only sends new ones to the `SourceReader`.
+
+### Usage
+
+You can start building a File Source via one of the following API calls:
+
+{{< tabs "FileSourceUsage" >}}
+{{< tab "Java" >}}
+```java
+// reads the contents of a file from a file stream. 
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads batches of records from a file at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+This creates a `FileSource.FileSourceBuilder` on which you can configure all the properties of the File Source.
+
+For the bounded/batch case, the File Source processes all files under the given path(s).
+For the continuous/streaming case, the source periodically checks the paths for new files and will start reading those.
+
+When you start creating a File Source (via the `FileSource.FileSourceBuilder` created through one of the above-mentioned methods),
+the source is in bounded/batch mode by default. You can call `AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)`
+to put the source into continuous streaming mode.
+
+{{< tabs "FileSourceBuilder" >}}
+{{< tab "Java" >}}
+```java
+final FileSource<String> source =
+        FileSource.forRecordStreamFormat(...)
+        .monitorContinuously(Duration.ofMillis(5))  
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Format Types
+
+The reading of each file happens through file readers defined by file formats.
+These define the parsing logic for the contents of the file. There are multiple classes that the source supports.
+The interfaces are a tradeoff between simplicity of implementation and flexibility/efficiency.
+
+* A `StreamFormat` reads the contents of a file from a file stream. It is the simplest format to implement,
+  and provides many features out-of-the-box (like checkpointing logic) but is limited in the optimizations it can apply
+  (such as object reuse, batching, etc.).
+
+* A `BulkFormat` reads batches of records from a file at a time.
+  It is the most "low level" format to implement, but offers the greatest flexibility to optimize the implementation.
+
+#### TextLine Format
+
+A `StreamFormat` reader formats text lines from a file.
+The reader uses Java's built-in `InputStreamReader` to decode the byte stream using
+various supported charset encodings.
+This format does not support optimized recovery from checkpoints. On recovery, it will re-read
+and discard the number of lines that were processed before the last checkpoint. This is due to
+the fact that the offsets of lines in the file cannot be tracked through the charset decoders
+with their internal buffering of stream input and charset decoder state.
+
+#### SimpleStreamFormat Abstract Class
+
+This is a simple version of `StreamFormat` for formats that are not splittable.
+Custom reads of Array or File can be done by implementing `SimpleStreamFormat`:
+
+{{< tabs "SimpleStreamFormat" >}}
+{{< tab "Java" >}}
+```java
+private static final class ArrayReaderFormat extends SimpleStreamFormat<byte[]> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public Reader<byte[]> createReader(Configuration config, FSDataInputStream stream)
+            throws IOException {
+        return new ArrayReader(stream);
+    }
+
+    @Override
+    public TypeInformation<byte[]> getProducedType() {
+        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+    }
+}
+
+final FileSource<byte[]> source =
+                FileSource.forRecordStreamFormat(new ArrayReaderFormat(), path).build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+An example of a `SimpleStreamFormat` is `CsvReaderFormat`. It can be initialized like this:
+```java
+CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
+FileSource<SomePojo> source = 
+        FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();
+```
+
+The schema for CSV parsing, in this case, is automatically derived based on the fields of the `SomePojo` class using the `Jackson` library. (Note: you might need to add `@JsonPropertyOrder({field1, field2, ...})` annotation to your class definition with the fields order exactly matching those of the CSV file columns).
+
+If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level `forSchema` static factory method of `CsvReaderFormat`:
+
+```java
+CsvReaderFormat<T> forSchema(CsvMapper mapper, 
+                             CsvSchema schema, 
+                             TypeInformation<T> typeInformation) 
+```
+
+#### Bulk Format
+
+The BulkFormat reads and decodes batches of records at a time. Examples of bulk formats
+are formats like ORC or Parquet.
+The outer `BulkFormat` class acts mainly as a configuration holder and factory for the
+reader. The actual reading is done by the `BulkFormat.Reader`, which is created in the
+`BulkFormat#createReader(Configuration, FileSourceSplit)` method. If a bulk reader is
+created based on a checkpoint during checkpointed streaming execution, then the reader is
+re-created in the `BulkFormat#restoreReader(Configuration, FileSourceSplit)` method.
+
+A `SimpleStreamFormat` can be turned into a `BulkFormat` by wrapping it in a `StreamFormatAdapter`:
+```java
+BulkFormat<SomePojo, FileSourceSplit> bulkFormat = 
+        new StreamFormatAdapter<>(CsvReaderFormat.forPojo(SomePojo.class));
+```
+
+### Customizing File Enumeration
+
+{{< tabs "CustomizingFileEnumeration" >}}
+{{< tab "Java" >}}
+```java
+/**
+ * A FileEnumerator implementation for hive source, which generates splits based on 
+ * HiveTablePartition.
+ */
+public class HiveSourceFileEnumerator implements FileEnumerator {
+    
+    // reference constructor
+    public HiveSourceFileEnumerator(...) {
+        ...
+    }
+
+    /***
+     * Generates all file splits for the relevant files under the given paths. The {@code
+     * minDesiredSplits} is an optional hint indicating how many splits would be necessary to
+     * exploit parallelism properly.
+     */
+    @Override
+    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
+            throws IOException {
+        // createInputSplits:splitting files into fragmented collections
+        return new ArrayList<>(createInputSplits(...));
+    }
+
+    ...
+
+    /***
+     * A factory to create HiveSourceFileEnumerator.
+     */
+    public static class Provider implements FileEnumerator.Provider {
+
+        ...
+        @Override
+        public FileEnumerator create() {
+            return new HiveSourceFileEnumerator(...);
+        }
+    }
+}
+// use the customizing file enumeration
+new HiveSource<>(
+        ...,
+        new HiveSourceFileEnumerator.Provider(
+        partitions != null ? partitions : Collections.emptyList(),
+        new JobConfWrapper(jobConf)),
+       ...);
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Current Limitations
+
+Watermarking does not work very well for large backlogs of files. This is because watermarks eagerly advance within a file, and the next file might contain data later than the watermark.
+
+For Unbounded File Sources, the enumerator currently remembers paths of all already processed files, which is a state that can, in some cases, grow rather large.
+There are plans to add a compressed form of tracking already processed files in the future (for example, by keeping modification timestamps below boundaries).
+
+### Behind the Scenes
+{{< hint info >}}
+If you are interested in how File Source works through the new data source API design, you may
+want to read this part as a reference. For details about the new data source API, check out the
+[documentation on data sources]({{< ref "docs/dev/datastream/sources.md" >}}) and
+<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface">FLIP-27</a>
+for more descriptive discussions.
+{{< /hint >}}
+
+## File Sink
 
 The file sink writes incoming data into buckets. Given that the incoming streams can be unbounded,
 data in each bucket is organized into part files of finite size. The bucketing behaviour is fully configurable
@@ -41,26 +259,26 @@ Data within the bucket directories is split into part files. Each bucket will co
 each subtask of the sink that has received data for that bucket. Additional part files will be created according to the configurable
 rolling policy. For `Row-encoded Formats` (see [File Formats](#file-formats)) the default policy rolls part files based
 on size, a timeout that specifies the maximum duration for which a file can be open, and a maximum inactivity
-timeout after which the file is closed. For `Bulk-encoded Formats` we roll on every checkpoint and the user can 
+timeout after which the file is closed. For `Bulk-encoded Formats` we roll on every checkpoint and the user can
 specify additional conditions based on size or time.
 
 {{< hint info >}}
 
-**IMPORTANT**: Checkpointing needs to be enabled when using the `FileSink` in `STREAMING` mode. Part files 
-can only be finalized on successful checkpoints. If checkpointing is disabled, part files will forever stay 
+**IMPORTANT**: Checkpointing needs to be enabled when using the `FileSink` in `STREAMING` mode. Part files
+can only be finalized on successful checkpoints. If checkpointing is disabled, part files will forever stay
 in the `in-progress` or the `pending` state, and cannot be safely read by downstream systems.
 
 {{< /hint >}}
 
- {{< img src="/fig/streamfilesink_bucketing.png"  width="100%" >}}
+{{< img src="/fig/streamfilesink_bucketing.png"  width="100%" >}}
 
-## File Formats
+### Format Types
 
 The `FileSink` supports both row-wise and bulk encoding formats, such as [Apache Parquet](http://parquet.apache.org).
 These two variants come with their respective builders that can be created with the following static methods:
 
- - Row-encoded sink: `FileSink.forRowFormat(basePath, rowEncoder)`
- - Bulk-encoded sink: `FileSink.forBulkFormat(basePath, bulkWriterFactory)`
+- Row-encoded sink: `FileSink.forRowFormat(basePath, rowEncoder)`
+- Bulk-encoded sink: `FileSink.forBulkFormat(basePath, bulkWriterFactory)`
 
 When creating either a row or a bulk encoded sink we have to specify the base path where the buckets will be
 stored and the encoding logic for our data.
@@ -68,15 +286,15 @@ stored and the encoding logic for our data.
 Please check out the JavaDoc for {{< javadoc file="org/apache/flink/connector/file/sink/FileSink.html" name="FileSink">}}
 for all the configuration options and more documentation about the implementation of the different data formats.
 
-### Row-encoded Formats
+#### Row-encoded Formats
 
 Row-encoded formats need to specify an `Encoder`
 that is used for serializing individual rows to the `OutputStream` of the in-progress part files.
 
 In addition to the bucket assigner, the RowFormatBuilder allows the user to specify:
 
- - Custom RollingPolicy : Rolling policy to override the DefaultRollingPolicy
- - bucketCheckInterval (default = 1 min) : Millisecond interval for checking time based rolling policies
+- Custom RollingPolicy : Rolling policy to override the DefaultRollingPolicy
+- bucketCheckInterval (default = 1 min) : Interval for checking time based rolling policies
 
 Basic usage for writing String elements thus looks like this:
 
@@ -86,18 +304,21 @@ Basic usage for writing String elements thus looks like this:
 ```java
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 
+import java.time.Duration;
+
 DataStream<String> input = ...;
 
 final FileSink<String> sink = FileSink
     .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
     .withRollingPolicy(
         DefaultRollingPolicy.builder()
-            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
-            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
-            .withMaxPartSize(1024 * 1024 * 1024)
+            .withRolloverInterval(Duration.ofSeconds(10))
+            .withInactivityInterval(Duration.ofSeconds(10))
+            .withMaxPartSize(MemorySize.ofMebiBytes(1))
             .build())
 	.build();
 
@@ -109,18 +330,21 @@ input.sinkTo(sink);
 ```scala
 import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.core.fs.Path
+import org.apache.flink.configuration.MemorySize
 import org.apache.flink.connector.file.sink.FileSink
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 
+import java.time.Duration
+
 val input: DataStream[String] = ...
 
 val sink: FileSink[String] = FileSink
     .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
     .withRollingPolicy(
         DefaultRollingPolicy.builder()
-            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
-            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
-            .withMaxPartSize(1024 * 1024 * 1024)
+            .withRolloverInterval(Duration.ofSeconds(10))
+            .withInactivityInterval(Duration.ofSeconds(10))
+            .withMaxPartSize(MemorySize.ofMebiBytes(1))
             .build())
     .build()
 
@@ -133,11 +357,11 @@ input.sinkTo(sink)
 This example creates a simple sink that assigns records to the default one hour time buckets. It also specifies
 a rolling policy that rolls the in-progress part file on any of the following 3 conditions:
 
- - It contains at least 15 minutes worth of data
- - It hasn't received new records for the last 5 minutes
- - The file size has reached 1 GB (after writing the last record)
+- It contains at least 15 minutes worth of data
+- It hasn't received new records for the last 5 minutes
+- The file size has reached 1 GB (after writing the last record)
 
-### Bulk-encoded Formats
+#### Bulk-encoded Formats
 
 Bulk-encoded sinks are created similarly to the row-encoded ones, but instead of
 specifying an `Encoder`, we have to specify a {{< javadoc file="org/apache/flink/api/common/serialization/BulkWriter.Factory.html" name="BulkWriter.Factory">}}.
@@ -153,11 +377,11 @@ Flink comes with four built-in BulkWriter factories:
 * OrcBulkWriterFactory
 
 {{< hint info >}}
-**Important** Bulk Formats can only have a rolling policy that extends the `CheckpointRollingPolicy`. 
+**Important** Bulk Formats can only have a rolling policy that extends the `CheckpointRollingPolicy`.
 The latter rolls on every checkpoint. A policy can roll additionally based on size or processing time.
 {{< /hint >}}
 
-#### Parquet format
+##### Parquet format
 
 Flink contains built in convenience methods for creating Parquet writer factories for Avro data. These methods
 and their associated documentation can be found in the ParquetAvroWriters class.
@@ -245,10 +469,10 @@ input.sinkTo(sink)
 {{< /tab >}}
 {{< /tabs >}}
 
-#### Avro format
+##### Avro format
 
 Flink also provides built-in support for writing data into Avro files. A list of convenience methods to create
-Avro writer factories and their associated documentation can be found in the 
+Avro writer factories and their associated documentation can be found in the
 AvroWriters class.
 
 To use the Avro writers in your application you need to add the following dependency:
@@ -339,20 +563,20 @@ stream.sinkTo(FileSink.forBulkFormat(
 {{< /tab >}}
 {{< /tabs >}}
 
-#### ORC Format
- 
+##### ORC Format
+
 To enable the data to be bulk encoded in ORC format, Flink offers `OrcBulkWriterFactory`
 which takes a concrete implementation of Vectorizer.
 
-Like any other columnar format that encodes data in bulk fashion, Flink's `OrcBulkWriter` writes the input elements in batches. It uses 
-ORC's `VectorizedRowBatch` to achieve this. 
+Like any other columnar format that encodes data in bulk fashion, Flink's `OrcBulkWriter` writes the input elements in batches. It uses
+ORC's `VectorizedRowBatch` to achieve this.
 
-Since the input element has to be transformed to a `VectorizedRowBatch`, users have to extend the abstract `Vectorizer` 
-class and override the `vectorize(T element, VectorizedRowBatch batch)` method. As you can see, the method provides an 
-instance of `VectorizedRowBatch` to be used directly by the users so users just have to write the logic to transform the 
+Since the input element has to be transformed to a `VectorizedRowBatch`, users have to extend the abstract `Vectorizer`
+class and override the `vectorize(T element, VectorizedRowBatch batch)` method. As you can see, the method provides an
+instance of `VectorizedRowBatch` to be used directly by the users so users just have to write the logic to transform the
 input `element` to `ColumnVectors` and set them in the provided `VectorizedRowBatch` instance.
 
-For example, if the input element is of type `Person` which looks like: 
+For example, if the input element is of type `Person` which looks like:
 
 {{< tabs "5436ac4a-4834-4fb3-9872-2dd5c3145efa" >}}
 {{< tab "Java" >}}
@@ -368,7 +592,7 @@ class Person {
 {{< /tab >}}
 {{< /tabs >}}
 
-Then a child implementation to convert the element of type `Person` and set them in the `VectorizedRowBatch` can be like: 
+Then a child implementation to convert the element of type `Person` and set them in the `VectorizedRowBatch` can be like:
 
 {{< tabs "6eb8e5e8-5177-4c8d-bb5a-96c6333b0b01" >}}
 {{< tab "Java" >}}
@@ -461,7 +685,7 @@ input.sinkTo(sink)
 {{< /tab >}}
 {{< /tabs >}}
 
-OrcBulkWriterFactory can also take Hadoop `Configuration` and `Properties` so that a custom Hadoop configuration and ORC 
+OrcBulkWriterFactory can also take Hadoop `Configuration` and `Properties` so that a custom Hadoop configuration and ORC
 writer properties can be provided.
 
 {{< tabs "eeef2ece-6e21-4bfd-b3ed-3329136f3486" >}}
@@ -492,11 +716,11 @@ val writerFactory = new OrcBulkWriterFactory(
     new PersonVectorizer(schema), writerProperties, conf)
 ```
 {{< /tab >}}
-{{< /tabs >}} 
+{{< /tabs >}}
 
 The complete list of ORC writer properties can be found [here](https://orc.apache.org/docs/hive-config.html).
 
-Users who want to add user metadata to the ORC files can do so by calling `addUserMetadata(...)` inside the overriding 
+Users who want to add user metadata to the ORC files can do so by calling `addUserMetadata(...)` inside the overriding
 `vectorize(...)` method.
 
 {{< tabs "9880fed1-b5d7-440e-a5ca-c9b20f10dac2" >}}
@@ -533,7 +757,7 @@ class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {
 {{< /tab >}}
 {{< /tabs >}}
 
-#### Hadoop SequenceFile format
+##### Hadoop SequenceFile format
 
 To use the `SequenceFile` bulk encoder in your application you need to add the following dependency:
 
@@ -589,7 +813,7 @@ input.sinkTo(sink)
 
 The `SequenceFileWriterFactory` supports additional constructor parameters to specify compression settings.
 
-## Bucket Assignment
+### Bucket Assignment
 
 The bucketing logic defines how the data will be structured into subdirectories inside the base output directory.
 
@@ -602,30 +826,30 @@ We can specify a custom `BucketAssigner` by calling `.withBucketAssigner(assigne
 
 Flink comes with two built-in BucketAssigners:
 
- - `DateTimeBucketAssigner` : Default time based assigner
- - `BasePathBucketAssigner` : Assigner that stores all part files in the base path (single global bucket)
+- `DateTimeBucketAssigner` : Default time based assigner
+- `BasePathBucketAssigner` : Assigner that stores all part files in the base path (single global bucket)
 
-## Rolling Policy
+### Rolling Policy
 
 The `RollingPolicy` defines when a given in-progress part file will be closed and moved to the pending and later to finished state.
 Part files in the "finished" state are the ones that are ready for viewing and are guaranteed to contain valid data that will not be reverted in case of failure.
 In `STREAMING` mode, the Rolling Policy in combination with the checkpointing interval (pending files become finished on the next checkpoint) control how quickly
-part files become available for downstream readers and also the size and number of these parts. In `BATCH` mode, part-files become visible at the end of the job but 
-the rolling policy can control their maximum size. 
+part files become available for downstream readers and also the size and number of these parts. In `BATCH` mode, part-files become visible at the end of the job but
+the rolling policy can control their maximum size.
 
 Flink comes with two built-in RollingPolicies:
 
- - `DefaultRollingPolicy`
- - `OnCheckpointRollingPolicy`
+- `DefaultRollingPolicy`
+- `OnCheckpointRollingPolicy`
 
-## Part file lifecycle
+### Part file lifecycle
 
 In order to use the output of the `FileSink` in downstream systems, we need to understand the naming and lifecycle of the output files produced.
 
 Part files can be in one of three states:
- 1. **In-progress** : The part file that is currently being written to is in-progress
- 2. **Pending** : Closed (due to the specified rolling policy) in-progress files that are waiting to be committed
- 3. **Finished** : On successful checkpoints (`STREAMING`) or at the end of input (`BATCH`) pending files transition to "Finished"
+1. **In-progress** : The part file that is currently being written to is in-progress
+2. **Pending** : Closed (due to the specified rolling policy) in-progress files that are waiting to be committed
+3. **Finished** : On successful checkpoints (`STREAMING`) or at the end of input (`BATCH`) pending files transition to "Finished"
 
 Only finished files are safe to read by downstream systems as those are guaranteed to not be modified later.
 
@@ -672,18 +896,18 @@ New buckets are created as dictated by the bucketing policy, and this doesn't af
 
 Old buckets can still receive new records as the bucketing policy is evaluated on a per-record basis.
 
-### Part file configuration
+#### Part file configuration
 
 Finished files can be distinguished from the in-progress ones by their naming scheme only.
 
 By default, the file naming strategy is as follows:
- - **In-progress / Pending**: `part-<uid>-<partFileIndex>.inprogress.uid`
- - **Finished:** `part-<uid>-<partFileIndex>`
-where `uid` is a random id assigned to a subtask of the sink when the subtask is instantiated. This `uid` is not fault-tolerant 
-so it is regenerated when the subtask recovers from a failure.
+- **In-progress / Pending**: `part-<uid>-<partFileIndex>.inprogress.uid`
+- **Finished:** `part-<uid>-<partFileIndex>`
+  where `uid` is a random id assigned to a subtask of the sink when the subtask is instantiated. This `uid` is not fault-tolerant
+  so it is regenerated when the subtask recovers from a failure.
 
-Flink allows the user to specify a prefix and/or a suffix for his/her part files. 
-This can be done using an `OutputFileConfig`. 
+Flink allows the user to specify a prefix and/or a suffix for his/her part files.
+This can be done using an `OutputFileConfig`.
 For example for a prefix "prefix" and a suffix ".ext" the sink will create the following files:
 
 ```
@@ -735,42 +959,42 @@ val sink = FileSink
 {{< /tab >}}
 {{< /tabs >}}
 
-## Important Considerations
+### Important Considerations
 
-### General
+#### General
 
 <span class="label label-danger">Important Note 1</span>: When using Hadoop < 2.7, please use
 the `OnCheckpointRollingPolicy` which rolls part files on every checkpoint. The reason is that if part files "traverse"
-the checkpoint interval, then, upon recovery from a failure the `FileSink` may use the `truncate()` method of the 
-filesystem to discard uncommitted data from the in-progress file. This method is not supported by pre-2.7 Hadoop versions 
+the checkpoint interval, then, upon recovery from a failure the `FileSink` may use the `truncate()` method of the
+filesystem to discard uncommitted data from the in-progress file. This method is not supported by pre-2.7 Hadoop versions
 and Flink will throw an exception.
 
 <span class="label label-danger">Important Note 2</span>: Given that Flink sinks and UDFs in general do not differentiate between
-normal job termination (*e.g.* finite input stream) and termination due to failure, upon normal termination of a job, the last 
+normal job termination (*e.g.* finite input stream) and termination due to failure, upon normal termination of a job, the last
 in-progress files will not be transitioned to the "finished" state.
 
 <span class="label label-danger">Important Note 3</span>: Flink and the `FileSink` never overwrites committed data.
 Given this, when trying to restore from an old checkpoint/savepoint which assumes an in-progress file which was committed
-by subsequent successful checkpoints, the `FileSink` will refuse to resume and will throw an exception as it cannot locate the 
+by subsequent successful checkpoints, the `FileSink` will refuse to resume and will throw an exception as it cannot locate the
 in-progress file.
 
-<span class="label label-danger">Important Note 4</span>: Currently, the `FileSink` only supports three filesystems: 
+<span class="label label-danger">Important Note 4</span>: Currently, the `FileSink` only supports three filesystems:
 HDFS, S3, and Local. Flink will throw an exception when using an unsupported filesystem at runtime.
 
-### BATCH-specific
+#### BATCH-specific
 
 <span class="label label-danger">Important Note 1</span>: Although the `Writer` is executed with the user-specified
 parallelism, the `Committer` is executed with parallelism equal to 1.
 
-<span class="label label-danger">Important Note 2</span>: Pending files are committed, i.e. transition to `Finished` 
+<span class="label label-danger">Important Note 2</span>: Pending files are committed, i.e. transition to `Finished`
 state, after the whole input has been processed.
 
-<span class="label label-danger">Important Note 3</span>: When High-Availability is activated, if a `JobManager` 
+<span class="label label-danger">Important Note 3</span>: When High-Availability is activated, if a `JobManager`
 failure happens while the `Committers` are committing, then we may have duplicates. This is going to be fixed in  
-future Flink versions 
+future Flink versions
 (see progress in [FLIP-147](https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished)).
 
-### S3-specific
+#### S3-specific
 
 <span class="label label-danger">Important Note 1</span>: For S3, the `FileSink`
 supports only the [Hadoop-based](https://hadoop.apache.org/) FileSystem implementation, not
@@ -791,3 +1015,4 @@ before the job is restarted. This will result in your job not being able to rest
 pending part-files are no longer there and Flink will fail with an exception as it tries to fetch them and fails.
 
 {{< top >}}
+
diff --git a/docs/content.zh/docs/connectors/datastream/formats/_index.md b/docs/content.zh/docs/connectors/datastream/formats/_index.md
new file mode 100644
index 0000000..282fc69
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/formats/_index.md
@@ -0,0 +1,23 @@
+---
+title: Formats
+bookCollapseSection: true
+weight: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
diff --git a/docs/content.zh/docs/connectors/datastream/formats/avro.md b/docs/content.zh/docs/connectors/datastream/formats/avro.md
new file mode 100644
index 0000000..1b2ffef
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/formats/avro.md
@@ -0,0 +1,61 @@
+---
+title:  "Avro"
+weight: 4
+type: docs
+aliases:
+- /dev/connectors/formats/avro.html
+- /apis/streaming/connectors/formats/avro.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+# Avro format
+
+Flink has built-in support for [Apache Avro](http://avro.apache.org/). This allows to easily read and write Avro data based on an Avro schema with Flink.
+The serialization framework of Flink is able to handle classes generated from Avro schemas. In order to use the Avro format the following dependencies are required for projects using a build automation tool (such as Maven or SBT).
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-avro</artifactId>
+  <version>{{< version >}}</version>
+</dependency>
+```
+
+In order to read data from an Avro file, you have to specify an `AvroInputFormat`.
+
+**Example**:
+
+```java
+AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+DataStream<User> usersDS = env.createInput(users);
+```
+
+Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:
+
+```java
+usersDS.keyBy("name")
+```
+
+
+Note that using the `GenericData.Record` type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use.
+
+Flink's POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type `Object` you can not use the field as a join or grouping key.
+Specifying a field in Avro like this `{"name": "type_double_test", "type": "double"},` works fine, however specifying it as a UNION-type with only one field (`{"name": "type_double_test", "type": ["double"]},`) will generate a field of type `Object`. Note that specifying nullable types (`{"name": "type_double_test", "type": ["null", "double"]},`) is possible!
diff --git a/docs/content/docs/connectors/datastream/formats/azure_table_storage.md b/docs/content.zh/docs/connectors/datastream/formats/azure_table_storage.md
similarity index 96%
copy from docs/content/docs/connectors/datastream/formats/azure_table_storage.md
copy to docs/content.zh/docs/connectors/datastream/formats/azure_table_storage.md
index d8d1359..11a7b8f 100644
--- a/docs/content/docs/connectors/datastream/formats/azure_table_storage.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/azure_table_storage.md
@@ -1,5 +1,5 @@
 ---
-title:  "Microsoft Azure table"
+title:  "Azure Table storage"
 weight: 4
 type: docs
 aliases:
@@ -25,9 +25,9 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Microsoft Azure Table Storage format
+# Azure Table Storage
 
-This example is using the `HadoopInputFormat` wrapper to use an existing Hadoop input format implementation for accessing [Azure's Table Storage](https://azure.microsoft.com/en-us/documentation/articles/storage-introduction/).
+This example is using the `HadoopInputFormat` wrapper to use an existing Hadoop input format implementation for accessing [Azure's Table Storage](https://docs.microsoft.com/en-us/azure/storage/tables/table-storage-overview).
 
 1. Download and compile the `azure-tables-hadoop` project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves.
    Execute the following commands:
diff --git a/docs/content.zh/docs/connectors/datastream/formats/hadoop.md b/docs/content.zh/docs/connectors/datastream/formats/hadoop.md
new file mode 100644
index 0000000..e2b2c9f
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/formats/hadoop.md
@@ -0,0 +1,159 @@
+---
+title:  "Hadoop"
+weight: 4
+type: docs
+aliases:
+  - /dev/connectors/formats/hadoop.html
+  - /apis/streaming/connectors/formats/hadoop.html
+
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Hadoop formats
+
+## Project Configuration
+
+Support for Hadoop is contained in the `flink-hadoop-compatibility`
+Maven module.
+
+Add the following dependency to your `pom.xml` to use hadoop
+
+```xml
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-hadoop-compatibility{{< scala_version >}}</artifactId>
+	<version>{{< version >}}</version>
+</dependency>
+```
+
+If you want to run your Flink application locally (e.g. from your IDE), you also need to add
+a `hadoop-client` dependency such as:
+
+```xml
+<dependency>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-client</artifactId>
+    <version>2.8.5</version>
+    <scope>provided</scope>
+</dependency>
+```
+
+## Using Hadoop InputFormats
+
+To use Hadoop `InputFormats` with Flink the format must first be wrapped
+using either `readHadoopFile` or `createHadoopInput` of the
+`HadoopInputs` utility class.
+The former is used for input formats derived
+from `FileInputFormat` while the latter has to be used for general purpose
+input formats.
+The resulting `InputFormat` can be used to create a data source by using
+`ExecutionEnvironmen#createInput`.
+
+The resulting `DataStream` contains 2-tuples where the first field
+is the key and the second field is the value retrieved from the Hadoop
+InputFormat.
+
+The following example shows how to use Hadoop's `TextInputFormat`.
+
+{{< tabs "baa59ec9-046e-4fe3-a2db-db5ee09d0635" >}}
+{{< tab "Java" >}}
+
+```java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+DataStream<Tuple2<LongWritable, Text>> input =
+    env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
+                        LongWritable.class, Text.class, textPath));
+
+// Do something with the data.
+[...]
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+val input: DataStream[(LongWritable, Text)] =
+  env.createInput(HadoopInputs.readHadoopFile(
+                    new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
+
+// Do something with the data.
+[...]
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+## Using Hadoop OutputFormats
+
+Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class
+that implements `org.apache.hadoop.mapred.OutputFormat` or extends
+`org.apache.hadoop.mapreduce.OutputFormat` is supported.
+The OutputFormat wrapper expects its input data to be a DataSet containing
+2-tuples of key and value. These are to be processed by the Hadoop OutputFormat.
+
+The following example shows how to use Hadoop's `TextOutputFormat`.
+
+{{< tabs "d4af1c52-0e4c-490c-8c35-e3d60b1b52ee" >}}
+{{< tab "Java" >}}
+
+```java
+// Obtain the result we want to emit
+DataStream<Tuple2<Text, IntWritable>> hadoopResult = [...]
+
+// Set up the Hadoop TextOutputFormat.
+HadoopOutputFormat<Text, IntWritable> hadoopOF =
+  // create the Flink wrapper.
+  new HadoopOutputFormat<Text, IntWritable>(
+    // set the Hadoop OutputFormat and specify the job.
+    new TextOutputFormat<Text, IntWritable>(), job
+  );
+hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
+TextOutputFormat.setOutputPath(job, new Path(outputPath));
+
+// Emit data using the Hadoop TextOutputFormat.
+hadoopResult.output(hadoopOF);
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+
+```scala
+// Obtain your result to emit.
+val hadoopResult: DataStream[(Text, IntWritable)] = [...]
+
+val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
+  new TextOutputFormat[Text, IntWritable],
+  new JobConf)
+
+hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
+FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
+
+hadoopResult.output(hadoopOF)
+
+
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}
diff --git a/docs/content.zh/docs/connectors/datastream/formats/mongodb.md b/docs/content.zh/docs/connectors/datastream/formats/mongodb.md
new file mode 100644
index 0000000..29a8f01
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/formats/mongodb.md
@@ -0,0 +1,33 @@
+---
+title:  "MongoDb"
+weight: 4
+type: docs
+aliases:
+- /dev/connectors/formats/mongodb.html
+- /apis/streaming/connectors/formats/mongodb.html
+
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# MongoDB format
+
+This [GitHub repository documents how to use MongoDB with Apache Flink (starting from 0.7-incubating)](https://github.com/okkam-it/flink-mongodb-test).
+
+{{< top >}}
diff --git a/docs/content.zh/docs/connectors/datastream/formats/overview.md b/docs/content.zh/docs/connectors/datastream/formats/overview.md
new file mode 100644
index 0000000..74d9226
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/formats/overview.md
@@ -0,0 +1,38 @@
+---
+title: Overview
+weight: 1
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# DataStream Formats
+
+## Available Formats
+
+Formats define how information is encoded for storage. Currently these formats are supported:
+
+ * [Avro]({{< ref "docs/connectors/datastream/formats/avro" >}})
+ * [Azure Table]({{< ref "docs/connectors/datastream/formats/azure_table_storage" >}})
+ * [Hadoop]({{< ref "docs/connectors/datastream/formats/hadoop" >}})
+ * [Parquet]({{< ref "docs/connectors/datastream/formats/parquet" >}})
+ * [Text files]({{< ref "docs/connectors/datastream/formats/text_files" >}})
+ 
+{{< top >}}
+
diff --git a/docs/content/docs/connectors/datastream/formats/parquet.md b/docs/content.zh/docs/connectors/datastream/formats/parquet.md
similarity index 92%
copy from docs/content/docs/connectors/datastream/formats/parquet.md
copy to docs/content.zh/docs/connectors/datastream/formats/parquet.md
index f58eeeb..98d76a6 100644
--- a/docs/content/docs/connectors/datastream/formats/parquet.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/parquet.md
@@ -1,5 +1,5 @@
 ---
-title:  "Parquet files"
+title:  "Parquet"
 weight: 4
 type: docs
 aliases:
@@ -28,12 +28,16 @@ under the License.
 
 # Parquet format
 
-Flink supports reading [parquet](https://parquet.apache.org/) files and producing [Flink rows](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/types/Row.html).
+Flink supports reading [Parquet](https://parquet.apache.org/) files and producing [Flink rows](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/types/Row.html).
 To use the format you need to add the Flink Parquet dependency to your project:
 
 ```xml
-{{< artifact flink-parquet >}}
-```  
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-parquet</artifactId>
+	<version>{{< version >}}</version>
+</dependency>
+```
  
 This format is compatible with the new Source that can be used in both batch and streaming modes.
 Thus, you can use this format in two ways:
@@ -69,7 +73,7 @@ final DataStream<RowData> stream =
 
 **Continuous read example**:
 
-In this example we create a DataStream containing parquet records as Flink Rows that will 
+In this example we create a DataStream containing Parquet records as Flink Rows that will 
 infinitely grow as new files are added to the directory. We monitor for new files each second.
 We project the schema to read only certain fields ("f7", "f4" and "f99").  
 We read records in batches of 500 records. The first boolean parameter specifies if timestamp columns need to be interpreted as UTC.
diff --git a/docs/content/docs/connectors/datastream/formats/text_files.md b/docs/content.zh/docs/connectors/datastream/formats/text_files.md
similarity index 80%
copy from docs/content/docs/connectors/datastream/formats/text_files.md
copy to docs/content.zh/docs/connectors/datastream/formats/text_files.md
index 7e9fcf6..33a6956 100644
--- a/docs/content/docs/connectors/datastream/formats/text_files.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/text_files.md
@@ -28,11 +28,15 @@ under the License.
 
 # Text files format
 
-Flink supports reading from text lines from a file using `TextLineFormat`. This format uses Java's built-in InputStreamReader to decode the byte stream using various supported charset encodings.
-To use the format you need to add the Flink Parquet dependency to your project:
+Flink supports reading from text lines from a file using `TextLineInputFormat`. This format uses Java's built-in InputStreamReader to decode the byte stream using various supported charset encodings.
+To use the format you need to add the Flink Connector Files dependency to your project:
 
 ```xml
-{{< artifact flink-connector-files >}}
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-connector-files</artifactId>
+	<version>{{< version >}}</version>
+</dependency>
 ```
 
 This format is compatible with the new Source that can be used in both batch and streaming modes.
@@ -47,7 +51,7 @@ There is no need for a watermark strategy as records do not contain event timest
 
 ```java
 final FileSource<String> source =
-  FileSource.forRecordStreamFormat(new TextLineFormat(), /* Flink Path */)
+  FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
   .build();
 final DataStream<String> stream =
   env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
@@ -60,7 +64,7 @@ There is no need for a watermark strategy as records do not contain event timest
 
 ```java
 final FileSource<String> source =
-    FileSource.forRecordStreamFormat(new TextLineFormat(), /* Flink Path */)
+    FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
   .monitorContinuously(Duration.ofSeconds(1L))
   .build();
 final DataStream<String> stream =
diff --git a/docs/content.zh/docs/connectors/datastream/overview.md b/docs/content.zh/docs/connectors/datastream/overview.md
index 7f3de30..ce1bf46 100644
--- a/docs/content.zh/docs/connectors/datastream/overview.md
+++ b/docs/content.zh/docs/connectors/datastream/overview.md
@@ -41,7 +41,7 @@ under the License.
  * [Apache Cassandra]({{< ref "docs/connectors/datastream/cassandra" >}}) (sink)
  * [Amazon Kinesis Streams]({{< ref "docs/connectors/datastream/kinesis" >}}) (source/sink)
  * [Elasticsearch]({{< ref "docs/connectors/datastream/elasticsearch" >}}) (sink)
- * [FileSystem (Hadoop included) - Streaming and Batch]({{< ref "docs/connectors/datastream/file_sink" >}}) (sink)
+ * [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) (sink)
  * [RabbitMQ]({{< ref "docs/connectors/datastream/rabbitmq" >}}) (source/sink)
  * [Google PubSub]({{< ref "docs/connectors/datastream/pubsub" >}}) (source/sink)
  * [Hybrid Source]({{< ref "docs/connectors/datastream/hybridsource" >}}) (source)
diff --git a/docs/content.zh/docs/connectors/datastream/streamfile_sink.md b/docs/content.zh/docs/connectors/datastream/streamfile_sink.md
deleted file mode 100644
index 4561891..0000000
--- a/docs/content.zh/docs/connectors/datastream/streamfile_sink.md
+++ /dev/null
@@ -1,741 +0,0 @@
----
-title: Streaming File Sink
-weight: 6
-type: docs
-aliases:
-  - /zh/dev/connectors/streamfile_sink.html
-bookHidden: true
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-# Streaming File Sink
-
-这个连接器提供了一个 Sink 来将分区文件写入到支持 [Flink `FileSystem`]({{< ref "docs/deployment/filesystems/overview" >}}) 接口的文件系统中。
-
-{{< hint warning >}}
-This Streaming File Sink is in the process of being phased out. Please use the unified [File Sink]({{< ref "docs/connectors/datastream/file_sink" >}}) as a drop-in replacement.
-{{< /hint >}}
-
-Streaming File Sink 会将数据写入到桶中。由于输入流可能是无界的,因此每个桶中的数据被划分为多个有限大小的文件。如何分桶是可以配置的,默认使用基于时间的分桶策略,这种策略每个小时创建一个新的桶,桶中包含的文件将记录所有该小时内从流中接收到的数据。
-
-桶目录中的实际输出数据会被划分为多个部分文件(part file),每一个接收桶数据的 Sink Subtask ,至少包含一个部分文件(part file)。额外的部分文件(part file)将根据滚动策略创建,滚动策略是可以配置的。默认的策略是根据文件大小和超时时间来滚动文件。超时时间指打开文件的最长持续时间,以及文件关闭前的最长非活动时间。
-
-{{< hint info >}}
-<b>重要:</b> 使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 'in-progress' 或 'pending' 状态,下游系统无法安全地读取。
-{{< /hint >}}
-
-{{< img src="/fig/streamfilesink_bucketing.png" >}}
-
-## 文件格式
-
- `StreamingFileSink` 支持行编码格式和批量编码格式,比如 [Apache Parquet](http://parquet.apache.org) 。
-这两种变体随附了各自的构建器,可以使用以下静态方法创建:
-
- - Row-encoded sink: `StreamingFileSink.forRowFormat(basePath, rowEncoder)`
- - Bulk-encoded sink: `StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)`
-
-创建行或批量编码的 Sink 时,我们需要指定存储桶的基本路径和数据的编码逻辑。
-
-更多配置操作以及不同数据格式的实现请参考 `StreamingFileSink` 
-
-### 行编码格式
-
-行编码格式需要指定一个 `Encoder` 。Encoder 负责为每个处于 In-progress 状态文件的`OutputStream` 序列化数据。
-
-除了桶分配器之外,`RowFormatBuilder`  还允许用户指定:
-
- - Custom `RollingPolicy` :自定义滚动策略以覆盖默认的 DefaultRollingPolicy
- - bucketCheckInterval (默认为1分钟):毫秒间隔,用于基于时间的滚动策略。
-
-字符串元素写入示例:
-
-
-{{< tabs "804d0538-5382-4b74-b389-8ab1403c804c" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.api.common.serialization.SimpleStringEncoder;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
-import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
-
-DataStream<String> input = ...;
-
-final StreamingFileSink<String> sink = StreamingFileSink
-    .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
-    .withRollingPolicy(
-        DefaultRollingPolicy.builder()
-            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
-            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
-            .withMaxPartSize(1024 * 1024 * 1024)
-            .build())
-	.build();
-
-input.addSink(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.api.common.serialization.SimpleStringEncoder
-import org.apache.flink.core.fs.Path
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
-import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
-
-val input: DataStream[String] = ...
-
-val sink: StreamingFileSink[String] = StreamingFileSink
-    .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
-    .withRollingPolicy(
-        DefaultRollingPolicy.builder()
-            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
-            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
-            .withMaxPartSize(1024 * 1024 * 1024)
-            .build())
-    .build()
-
-input.addSink(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-这个例子创建了一个简单的 Sink ,将记录分配给默认的一小时时间桶。它还指定了一个滚动策略,该策略在以下三种情况下滚动处于 In-progress 状态的部分文件(part file):
-
- - 它至少包含 15 分钟的数据
- - 最近 5 分钟没有收到新的记录
- - 文件大小达到 1GB (写入最后一条记录后)
-
-### 批量编码格式
-
-批量编码 Sink 的创建与行编码 Sink 相似,不过在这里我们不是指定编码器  `Encoder` 而是指定 BulkWriter.`Factory` 。
-`BulkWriter` 定义了如何添加、刷新元素,以及如何批量编码。
-
-Flink 有四个内置的 BulkWriter Factory :
-
- - `ParquetWriterFactory`
- - `AvroWriterFactory`
- - `SequenceFileWriterFactory`
- - `CompressWriterFactory`
- - `OrcBulkWriterFactory`
-
-{{< hint info >}}
-<b>重要:</b> 批量编码模式仅支持 OnCheckpointRollingPolicy 策略, 在每次 checkpoint 的时候切割文件。
-{{< /hint >}}
-
-#### Parquet 格式
-
-Flink 包含为不同 Avro 类型,创建 ParquetWriterFactory 的便捷方法,更多信息请参考 `ParquetAvroWriters` 。
-
-要编写其他 Parquet 兼容的数据格式,用户需要创建 ParquetWriterFactory 并实现 `ParquetBuilder` 接口。
-
-在应用中使用 Parquet 批量编码器,你需要添加以下依赖:
-
-{{< artifact flink-parquet withScalaVersion >}}
-
-这个例子使用 StreamingFileSink 将 Avro 数据写入 Parquet 格式:
-
-{{< tabs "7f839bbf-4d61-48ef-81a6-5649d53fcfae" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
-import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
-import org.apache.avro.Schema;
-
-
-Schema schema = ...;
-DataStream<GenericRecord> input = ...;
-
-final StreamingFileSink<GenericRecord> sink = StreamingFileSink
-	.forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema))
-	.build();
-
-input.addSink(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
-import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
-import org.apache.avro.Schema
-
-val schema: Schema = ...
-val input: DataStream[GenericRecord] = ...
-
-val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
-    .forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema))
-    .build()
-
-input.addSink(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-类似的,将 Protobuf 数据写入到 Parquet 格式可以通过:
-
-{{< tabs "6207391f-278a-4eed-91aa-3112a2934e54" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
-import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;
-
-// ProtoRecord is a generated protobuf Message class.
-DataStream<ProtoRecord> input = ...;
-
-final StreamingFileSink<ProtoRecord> sink = StreamingFileSink
-	.forBulkFormat(outputBasePath, ParquetProtoWriters.forType(ProtoRecord.class))
-	.build();
-
-input.addSink(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
-import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters
-
-// ProtoRecord is a generated protobuf Message class.
-val input: DataStream[ProtoRecord] = ...
-
-val sink: StreamingFileSink[ProtoRecord] = StreamingFileSink
-    .forBulkFormat(outputBasePath, ParquetProtoWriters.forType(classOf[ProtoRecord]))
-    .build()
-
-input.addSink(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-#### Avro格式
-
-Flink 也提供了将数据写入 Avro 文件的内置支持。对于创建 AvroWriterFactory 的快捷方法,更多信息可以参考 
-`AvroWriters`.
-
-使用Avro相关的Writer需要在项目中添加以下依赖:
-
-{{< artifact flink-avro >}}
-
-将数据写入 Avro 文件的 StreamingFileSink 算子可以通过如下方式创建:
-
-{{< tabs "2df2f4da-8346-4ce3-bb4c-bcca28b29811" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
-import org.apache.flink.formats.avro.AvroWriters;
-import org.apache.avro.Schema;
-
-
-Schema schema = ...;
-DataStream<GenericRecord> input = ...;
-
-final StreamingFileSink<GenericRecord> sink = StreamingFileSink
-	.forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
-	.build();
-
-input.addSink(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
-import org.apache.flink.formats.avro.AvroWriters
-import org.apache.avro.Schema
-
-val schema: Schema = ...
-val input: DataStream[GenericRecord] = ...
-
-val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
-    .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
-    .build()
-
-input.addSink(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-如果想要创建自定义的 Avro Writer,例如启用压缩等,用户可以实现 `AvroBuilder`
-接口并自行创建一个 `AvroWriterFactory` 实例:
-
-{{< tabs "bc3ef729-afac-4ca7-ae2e-85368368a61f" >}}
-{{< tab "Java" >}}
-```java
-AvroWriterFactory<?> factory = new AvroWriterFactory<>((AvroBuilder<Address>) out -> {
-	Schema schema = ReflectData.get().getSchema(Address.class);
-	DatumWriter<Address> datumWriter = new ReflectDatumWriter<>(schema);
-
-	DataFileWriter<Address> dataFileWriter = new DataFileWriter<>(datumWriter);
-	dataFileWriter.setCodec(CodecFactory.snappyCodec());
-	dataFileWriter.create(schema, out);
-	return dataFileWriter;
-});
-
-DataStream<Address> stream = ...
-stream.addSink(StreamingFileSink.forBulkFormat(
-	outputBasePath,
-	factory).build());
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val factory = new AvroWriterFactory[Address](new AvroBuilder[Address]() {
-    override def createWriter(out: OutputStream): DataFileWriter[Address] = {
-        val schema = ReflectData.get.getSchema(classOf[Address])
-        val datumWriter = new ReflectDatumWriter[Address](schema)
-
-        val dataFileWriter = new DataFileWriter[Address](datumWriter)
-        dataFileWriter.setCodec(CodecFactory.snappyCodec)
-        dataFileWriter.create(schema, out)
-        dataFileWriter
-    }
-})
-
-val stream: DataStream[Address] = ...
-stream.addSink(StreamingFileSink.forBulkFormat(
-    outputBasePath,
-    factory).build());
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-#### ORC Format
-
-To enable the data to be bulk encoded in ORC format, Flink offers `OrcBulkWriterFactory`
-which takes a concrete implementation of `Vectorizer`.
-
-Like any other columnar format that encodes data in bulk fashion, Flink's `OrcBulkWriter` writes the input elements in batches. It uses
-ORC's `VectorizedRowBatch` to achieve this.
-
-Since the input element has to be transformed to a `VectorizedRowBatch`, users have to extend the abstract `Vectorizer`
-class and override the `vectorize(T element, VectorizedRowBatch batch)` method. As you can see, the method provides an
-instance of `VectorizedRowBatch` to be used directly by the users so users just have to write the logic to transform the
-input `element` to `ColumnVectors` and set them in the provided `VectorizedRowBatch` instance.
-
-For example, if the input element is of type `Person` which looks like:
-
-{{< tabs "a49d0a8c-1cd6-458a-a5a1-0ed645a1139d" >}}
-{{< tab "Java" >}}
-```java
-
-class Person {
-    private final String name;
-    private final int age;
-    ...
-}
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-Then a child implementation to convert the element of type `Person` and set them in the `VectorizedRowBatch` can be like:
-
-{{< tabs "7198abfc-97cf-4a81-8100-1dfe233d5608" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-
-public class PersonVectorizer extends Vectorizer<Person> implements Serializable {
-	public PersonVectorizer(String schema) {
-		super(schema);
-	}
-	@Override
-	public void vectorize(Person element, VectorizedRowBatch batch) throws IOException {
-		BytesColumnVector nameColVector = (BytesColumnVector) batch.cols[0];
-		LongColumnVector ageColVector = (LongColumnVector) batch.cols[1];
-		int row = batch.size++;
-		nameColVector.setVal(row, element.getName().getBytes(StandardCharsets.UTF_8));
-		ageColVector.vector[row] = element.getAge();
-	}
-}
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import java.nio.charset.StandardCharsets
-import org.apache.hadoop.hive.ql.exec.vector.{BytesColumnVector, LongColumnVector}
-
-class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {
-
-  override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
-    val nameColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
-    val ageColVector = batch.cols(1).asInstanceOf[LongColumnVector]
-    nameColVector.setVal(batch.size + 1, element.getName.getBytes(StandardCharsets.UTF_8))
-    ageColVector.vector(batch.size + 1) = element.getAge
-  }
-
-}
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-To use the ORC bulk encoder in an application, users need to add the following dependency:
-
-{{< artifact flink-orc withScalaVersion >}}
-
-And then a `StreamingFileSink` that writes data in ORC format can be created like this:
-
-{{< tabs "fa7db9c6-8ad4-4cbd-82f8-9ec94f3371e6" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
-import org.apache.flink.orc.writer.OrcBulkWriterFactory;
-
-String schema = "struct<_col0:string,_col1:int>";
-DataStream<Person> input = ...;
-
-final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(new PersonVectorizer(schema));
-
-final StreamingFileSink<Person> sink = StreamingFileSink
-	.forBulkFormat(outputBasePath, writerFactory)
-	.build();
-
-input.addSink(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
-import org.apache.flink.orc.writer.OrcBulkWriterFactory
-
-val schema: String = "struct<_col0:string,_col1:int>"
-val input: DataStream[Person] = ...
-val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema));
-
-val sink: StreamingFileSink[Person] = StreamingFileSink
-    .forBulkFormat(outputBasePath, writerFactory)
-    .build()
-
-input.addSink(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-OrcBulkWriterFactory can also take Hadoop `Configuration` and `Properties` so that a custom Hadoop configuration and ORC
-writer properties can be provided.
-
-{{< tabs "97463c6c-ceeb-42c6-a281-f784d9cbafc6" >}}
-{{< tab "Java" >}}
-```java
-String schema = ...;
-Configuration conf = ...;
-Properties writerProperties = new Properties();
-
-writerProps.setProperty("orc.compress", "LZ4");
-// Other ORC supported properties can also be set similarly.
-
-final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(
-    new PersonVectorizer(schema), writerProperties, conf);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val schema: String = ...
-val conf: Configuration = ...
-val writerProperties: Properties = new Properties()
-
-writerProps.setProperty("orc.compress", "LZ4")
-// Other ORC supported properties can also be set similarly.
-
-val writerFactory = new OrcBulkWriterFactory(
-    new PersonVectorizer(schema), writerProperties, conf)
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-The complete list of ORC writer properties can be found [here](https://orc.apache.org/docs/hive-config.html).
-
-Users who want to add user metadata to the ORC files can do so by calling `addUserMetadata(...)` inside the overriding
-`vectorize(...)` method.
-
-{{< tabs "959c9327-80a3-4ef3-910a-e069b046f6d5" >}}
-{{< tab "Java" >}}
-```java
-
-public class PersonVectorizer extends Vectorizer<Person> implements Serializable {
-	@Override
-	public void vectorize(Person element, VectorizedRowBatch batch) throws IOException {
-		...
-		String metadataKey = ...;
-		ByteBuffer metadataValue = ...;
-		this.addUserMetadata(metadataKey, metadataValue);
-	}
-}
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-
-class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {
-
-  override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
-    ...
-    val metadataKey: String = ...
-    val metadataValue: ByteBuffer = ...
-    addUserMetadata(metadataKey, metadataValue)
-  }
-
-}
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-#### Hadoop SequenceFile 格式
-
-在应用中使用 SequenceFile 批量编码器,你需要添加以下依赖:
-
-{{< artifact flink-sequence-file >}}
-
-简单的 SequenceFile 写入示例:
-
-{{< tabs "466b0dac-1a2d-4472-8494-11764ef0a577" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-
-
-DataStream<Tuple2<LongWritable, Text>> input = ...;
-Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
-final StreamingFileSink<Tuple2<LongWritable, Text>> sink = StreamingFileSink
-  .forBulkFormat(
-    outputBasePath,
-    new SequenceFileWriterFactory<>(hadoopConf, LongWritable.class, Text.class))
-	.build();
-
-input.addSink(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
-import org.apache.flink.configuration.GlobalConfiguration
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.SequenceFile
-import org.apache.hadoop.io.Text;
-
-val input: DataStream[(LongWritable, Text)] = ...
-val hadoopConf: Configuration = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration())
-val sink: StreamingFileSink[(LongWritable, Text)] = StreamingFileSink
-  .forBulkFormat(
-    outputBasePath,
-    new SequenceFileWriterFactory(hadoopConf, LongWritable.class, Text.class))
-	.build()
-
-input.addSink(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-SequenceFileWriterFactory 支持附加构造函数参数指定压缩设置。
-
-## 桶分配
-
-桶分配逻辑定义了如何将数据结构化为基本输出目录中的子目录
-
-行格式和批量格式都使用 `DateTimeBucketAssigner` 作为默认的分配器。
-默认情况下,DateTimeBucketAssigner 基于系统默认时区每小时创建一个桶,格式如下: `yyyy-MM-dd--HH` 。日期格式(即桶的大小)和时区都可以手动配置。
-
-我们可以在格式构建器上调用 `.withBucketAssigner(assigner)` 来自定义 `BucketAssigner` 。
-
-Flink 有两个内置的 BucketAssigners :
-
- - `DateTimeBucketAssigner` :默认基于时间的分配器
- - `BasePathBucketAssigner` :将所有部分文件(part file)存储在基本路径中的分配器(单个全局桶)
-
-## 滚动策略
-
-滚动策略 `RollingPolicy` 定义了指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。
-
-Flink 有两个内置的滚动策略:
-
- - `DefaultRollingPolicy`
- - `OnCheckpointRollingPolicy`
-
-## 部分文件(part file) 生命周期
-
-为了在下游系统中使用 StreamingFileSink 的输出,我们需要了解输出文件的命名规则和生命周期。
-
-部分文件(part file)可以处于以下三种状态之一:
- 1. **In-progress** :当前文件正在写入中
- 2. **Pending** :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
- 3. **Finished** :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
-
-处于 Finished 状态的文件不会再被修改,可以被下游系统安全地读取。
-
-{{< hint info >}}
-**重要:** 部分文件的索引在每个 subtask 内部是严格递增的(按文件创建顺序)。但是索引并不总是连续的。当 Job 重启后,所有部分文件的索引从 `max part index + 1` 开始,
-这里的 `max part index` 是所有 subtask 中索引的最大值。
-{{< /hint >}}
-
-对于每个活动的桶,Writer 在任何时候都只有一个处于 In-progress 状态的部分文件(part file),但是可能有几个 Penging 和 Finished 状态的部分文件(part file)。
-
-**部分文件(part file)例子**
-
-为了更好地理解这些文件的生命周期,让我们来看一个包含 2 个 Sink Subtask 的简单例子:
-
-```
-└── 2019-08-25--12
-    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
-    └── part-1-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
-```
-
-当部分文件 `part-1-0` 被滚动(假设它变得太大了)时,它将成为 Pending 状态,但是它还没有被重命名。然后 Sink 会创建一个新的部分文件: `part-1-1`:
-
-```
-└── 2019-08-25--12
-    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
-    ├── part-1-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
-    └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
-```
-
- `part-1-0` 现在处于 Pending 状态等待完成,在下一次成功的 Checkpoint 后,它会变成 Finished 状态:
-
-```
-└── 2019-08-25--12
-    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
-    ├── part-1-0
-    └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
-```
-
-根据分桶策略创建新的桶,但是这并不会影响当前处于 In-progress 状态的文件:
-
-```
-└── 2019-08-25--12
-    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
-    ├── part-1-0
-    └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
-└── 2019-08-25--13
-    └── part-0-2.inprogress.2b475fec-1482-4dea-9946-eb4353b475f1
-```
-
-因为分桶策略基于每条记录进行评估,所以旧桶仍然可以接受新的记录。
-
-### 部分文件的配置项
-
-已经完成的文件和进行中的文件仅能通过文件名格式进行区分。
-
-默认情况下,文件命名格式如下所示:
- - **In-progress / Pending:** `part-<subtaskIndex>-<partFileIndex>.inprogress.uid`
- - **FINISHED:** `part-<subtaskIndex>-<partFileIndex>`
-
-Flink 允许用户通过 `OutputFileConfig` 指定部分文件名的前缀和后缀。
-举例来说,前缀设置为 "prefix" 以及后缀设置为 ".ext" 之后,Sink 创建的文件名如下所示:
-
-```
-└── 2019-08-25--12
-    ├── prefix-0-0.ext
-    ├── prefix-0-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
-    ├── prefix-1-0.ext
-    └── prefix-1-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
-```
-
-用户可以通过如下方式设置 `OutputFileConfig`:
-
-{{< tabs "4ee26cce-7551-4a28-bc76-f8ba0e71ba80" >}}
-{{< tab "Java" >}}
-```java
-
-OutputFileConfig config = OutputFileConfig
- .builder()
- .withPartPrefix("prefix")
- .withPartSuffix(".ext")
- .build();
-
-StreamingFileSink<Tuple2<Integer, Integer>> sink = StreamingFileSink
- .forRowFormat((new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))
- .withBucketAssigner(new KeyBucketAssigner())
- .withRollingPolicy(OnCheckpointRollingPolicy.build())
- .withOutputFileConfig(config)
- .build();
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-
-val config = OutputFileConfig
- .builder()
- .withPartPrefix("prefix")
- .withPartSuffix(".ext")
- .build()
-
-val sink = StreamingFileSink
- .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
- .withBucketAssigner(new KeyBucketAssigner())
- .withRollingPolicy(OnCheckpointRollingPolicy.build())
- .withOutputFileConfig(config)
- .build()
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-## 重要注意事项
-
-### 通用注意事项
-
-<span class="label label-danger">重要提示 1</span>: 使用 Hadoop < 2.7 时,请使用 `OnCheckpointRollingPolicy` 滚动策略,该策略会在每次检查点时进行文件切割。
-这样做的原因是如果部分文件的生命周期跨多个检查点,当 `StreamingFileSink` 从之前的检查点进行恢复时会调用文件系统的 `truncate()` 方法清理 in-progress 文件中未提交的数据。
-Hadoop 2.7 之前的版本不支持这个方法,因此 Flink 会报异常。
-
-<span class="label label-danger">重要提示 2</span>: 鉴于 Flink 的 sink 以及 UDF 通常不会区分作业的正常结束(比如有限流)和异常终止,因此正常结束作业的最后一批 in-progress 文件不会被转换到 "完成" 状态。
-
-<span class="label label-danger">重要提示 3</span>: Flink 以及 `StreamingFileSink` 不会覆盖已经提交的数据。因此如果尝试从一个包含 in-progress 文件的旧 checkpoint/savepoint 恢复,
-且这些 in-progress 文件会被接下来的成功 checkpoint 提交,Flink 会因为无法找到 in-progress 文件而抛异常,从而恢复失败。
-
-<span class="label label-danger">重要提示 4</span>: 目前 `StreamingFileSink` 只支持三种文件系统: HDFS、S3和Local。如果配置了不支持的文件系统,在执行的时候 Flink 会抛出异常。
-
-###  S3 特有的注意事项
-
-<span class="label label-danger">重要提示 1</span>: 对于 S3,`StreamingFileSink`  只支持基于 [Hadoop](https://hadoop.apache.org/) 
-的文件系统实现,不支持基于 [Presto](https://prestodb.io/) 的实现。如果想使用 `StreamingFileSink` 向 S3 写入数据并且将 
-checkpoint 放在基于 Presto 的文件系统,建议明确指定 *"s3a://"* (for Hadoop)作为sink的目标路径方案,并且为 checkpoint 路径明确指定 *"s3p://"* (for Presto)。
-如果 Sink 和 checkpoint 都使用 *"s3://"* 路径的话,可能会导致不可预知的行为,因为双方的实现都在“监听”这个路径。
-
-<span class="label label-danger">重要提示 2</span>: `StreamingFileSink` 使用 S3 的 [Multi-part Upload](https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html)
-(后续使用MPU代替)特性可以保证精确一次的语义。这个特性支持以独立的块(因此被称为"multi-part")模式上传文件,当 MPU 的所有部分文件
-成功上传之后,可以合并成原始文件。对于失效的 MPUs,S3 提供了一个基于桶生命周期的规则,用户可以用这个规则来丢弃在指定时间内未完成的MPU。
-如果在一些部分文件还未上传时触发 savepoint,并且这个规则设置的比较严格,这意味着相关的 MPU在作业重启之前可能会超时。后续的部分文件没
-有写入到 savepoint, 那么在 Flink 作业从 savepoint 恢复时,会因为拿不到缺失的部分文件,导致任务失败并抛出异常。
-
-{{< top >}}
diff --git a/docs/content.zh/docs/connectors/table/filesystem.md b/docs/content.zh/docs/connectors/table/filesystem.md
index 4efcc89..9697b34 100644
--- a/docs/content.zh/docs/connectors/table/filesystem.md
+++ b/docs/content.zh/docs/connectors/table/filesystem.md
@@ -1,9 +1,9 @@
 ---
-title: 文件系统
+title: FileSystem
 weight: 8
 type: docs
 aliases:
-  - /zh/dev/table/connectors/filesystem.html
+- /dev/table/connectors/filesystem.html
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -24,13 +24,16 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# 文件系统 SQL 连接器 
+# FileSystem SQL Connector
 
-该连接器提供了对 [Flink 文件系统抽象]({{< ref "docs/deployment/filesystems/overview" >}}) 支持的文件系统中的分区文件的访问.
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{< ref "docs/deployment/filesystems/overview" >}}).
 
-文件系统连接器本身就被包括在 Flink 中,不需要任何额外的依赖。当从文件系统中读取或向文件系统写入记录时,需要指定相应的记录格式。
+The file system connector itself is included in Flink and does not require an additional dependency.
+The corresponding jar can be found in the Flink distribution inside the `/lib` directory.
+A corresponding format needs to be specified for reading and writing rows from and to a file system.
 
-文件系统连接器支持对本地文件系统或分布式文件系统的读取和写入。 可以通过如下方式定义文件系统表:
+The file system connector allows for reading and writing from a local or distributed filesystem. A filesystem table can be defined as:
 
 ```sql
 CREATE TABLE MyUserTable (
@@ -40,30 +43,37 @@ CREATE TABLE MyUserTable (
   part_name1 INT,
   part_name2 STRING
 ) PARTITIONED BY (part_name1, part_name2) WITH (
-  'connector' = 'filesystem',           -- 必选: 指定连接器类型
-  'path' = 'file:///path/to/whatever',  -- 必选: 指向目录的路径
-  'format' = '...',                     -- 必选: 文件系统连接器需要指定格式,请查阅 表格式 部分以获取更多细节
-  'partition.default-name' = '...',     -- 可选: 动态分区模式下分区字段值是 null 或空字符串时,默认的分区名。
-  'sink.shuffle-by-partition.enable' = '...',  -- 可选: 该选项开启了在 sink 阶段通过动态分区字段来 shuffle 数据,该功能可以大大减少文件系统 sink 的文件数,但可能会导致数据倾斜,默认值是 false.
+  'connector' = 'filesystem',           -- required: specify the connector
+  'path' = 'file:///path/to/whatever',  -- required: path to a directory
+  'format' = '...',                     -- required: file system connector requires to specify a format,
+                                        -- Please refer to Table Formats
+                                        -- section for more details
+  'partition.default-name' = '...',     -- optional: default partition name in case the dynamic partition
+                                        -- column value is null/empty string
+
+  -- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly
+  -- reduce the number of file for filesystem sink but may lead data skew, the default value is false.
+  'sink.shuffle-by-partition.enable' = '...',
   ...
 )
 ```
 
 {{< hint info >}}
-需要确保包含以下依赖 [Flink File System specific dependencies]({{< ref "docs/deployment/filesystems/overview" >}}).
+Make sure to include [Flink File System specific dependencies]({{< ref "docs/deployment/filesystems/overview" >}}).
 {{< /hint >}}
 
 {{< hint info >}}
-针对流的文件系统 sources 目前还在开发中。 将来,社区会不断添加对常见的流处理场景的支持, 比如对分区和目录的检测等。
+File system sources for streaming is still under development. In the future, the community will add support for common streaming use cases, i.e., partition and directory monitoring.
 {{< /hint >}}
 
 {{< hint warning >}}
-新版的文件系统连接器和旧版的文件系统连接器有很大不同:path 参数指定的是一个目录而不是一个文件,该目录下文件的格式也不是肉眼可读的。
+The behaviour of file system connector is much different from `previous legacy filesystem connector`:
+the path parameter is specified for a directory not for a file and you can't get a human-readable file in the path that you declare.
 {{< /hint >}}
 
-## 分区文件
+## Partition Files
 
-Flink 的文件系统连接器在对分区的支持上,使用了标准的 hive 格式。 不过,它不需要预先注册分区,而是基于目录结构自动做了分区发现。比如,以下目录结构的表, 会被自动推导为包含 `datetime` 和 `hour` 分区的分区表。
+Flink's file system partition support uses the standard hive format. However, it does not require partitions to be pre-registered with a table catalog. Partitions are discovered and inferred based on directory structure. For example, a table partitioned based on the directory below would be inferred to contain `datetime` and `hour` partitions.
 
 ```
 path
@@ -78,34 +88,122 @@ path
         ├── part-0.parquet
 ```
 
-文件系统连接器支持分区新增插入和分区覆盖插入。 参见 [INSERT Statement]({{< ref "docs/dev/table/sql/insert" >}}). 当对分区表进行分区覆盖插入时,只有相应的分区会被覆盖,而不是整个表。
+The file system table supports both partition inserting and overwrite inserting. See [INSERT Statement]({{< ref "docs/dev/table/sql/insert" >}}). When you insert overwrite to a partitioned table, only the corresponding partition will be overwritten, not the entire table.
 
-## 文件格式
+## File Formats
 
-文件系统连接器支持多种格式:
+The file system connector supports multiple formats:
 
- - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). 非压缩格式。
- - JSON: 注意文件系统连接器中的 JSON 不是传统的标准的 JSON 格式,而是非压缩的 [newline delimited JSON](http://jsonlines.org/).
- - Avro: [Apache Avro](http://avro.apache.org). 可以通过配置 `avro.codec` 支持压缩.
- - Parquet: [Apache Parquet](http://parquet.apache.org). 与 Hive 兼容.
- - Orc: [Apache Orc](http://orc.apache.org). 与 Hive 兼容.
- - Debezium-JSON: [debezium-json]({{< ref "docs/connectors/table/formats/debezium" >}}).
- - Canal-JSON: [canal-json]({{< ref "docs/connectors/table/formats/canal" >}}).
- - Raw: [raw]({{< ref "docs/connectors/table/formats/raw" >}}).
+- CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). Uncompressed.
+- JSON: Note JSON format for file system connector is not a typical JSON file but uncompressed [newline delimited JSON](http://jsonlines.org/).
+- Avro: [Apache Avro](http://avro.apache.org). Support compression by configuring `avro.codec`.
+- Parquet: [Apache Parquet](http://parquet.apache.org). Compatible with Hive.
+- Orc: [Apache Orc](http://orc.apache.org). Compatible with Hive.
+- Debezium-JSON: [debezium-json]({{< ref "docs/connectors/table/formats/debezium" >}}).
+- Canal-JSON: [canal-json]({{< ref "docs/connectors/table/formats/canal" >}}).
+- Raw: [raw]({{< ref "docs/connectors/table/formats/raw" >}}).
 
-## 流式 Sink
+## Source
 
-文件系统连接器支持流式的写, 它基于 Flink 的 [Streaming File Sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}})
-将记录写入文件。按行编码的格式支持 csv 和 json。 按块编码的格式支持 parquet, orc 和 avro。
+The file system connector can be used to read single files or entire directories into a single table.
 
-你可以直接编写 SQL,把流数据插入到非分区表。
-如果是分区表,可以配置分区操作相关的参数,参见 [分区提交](#分区提交) 以查阅更多细节.
+When using a directory as the source path, there is **no defined order of ingestion** for the files inside the directory.
 
-### 滚动策略
+### Directory watching
 
-分区目录下的数据被分割到分区文件中。每个分区对应的sink的每个接受到了数据的子任务都至少会为该分区生成一个分区文件。 
-根据可配置的滚动策略,当前正在写入的分区文件会被关闭,新的分区文件也会被生成。 
-该策略基于大小,和指定的文件可被打开的最大 timeout 时长,来滚动分区文件。
+The file system connector automatically watches the input directory when the runtime mode is configured as STREAMING.
+
+You can modify the watch interval using the following option.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>source.monitor-interval</h5></td>
+        <td style="word-wrap: break-word;">(none)</td>
+        <td>Duration</td>
+        <td>The interval in which the source checks for new files. The interval must be greater than 0. 
+        Each file is uniquely identified by its path, and will be processed once, as soon as it's discovered. 
+        The set of files already processed is kept in state during the whole lifecycle of the source, 
+        so it's persisted in checkpoints and savepoints together with the source state. 
+        Shorter intervals mean that files are discovered more quickly, 
+        but also imply more frequent listing or directory traversal of the file system / object store. 
+        If this config option is not set, the provided path will be scanned once, hence the source will be bounded.</td>
+    </tr>
+  </tbody>
+</table>
+
+### Available Metadata
+
+The following connector metadata can be accessed as metadata columns in a table definition. All the metadata are read only.
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Key</th>
+      <th class="text-center" style="width: 30%">Data Type</th>
+      <th class="text-center" style="width: 40%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>file.path</code></td>
+      <td><code>STRING NOT NULL</code></td>
+      <td>Full path of the input file.</td>
+    </tr>
+    <tr>
+      <td><code>file.name</code></td>
+      <td><code>STRING NOT NULL</code></td>
+      <td>Name of the file, that is the farthest element from the root of the filepath.</td>
+    </tr>
+    <tr>
+      <td><code>file.size</code></td>
+      <td><code>BIGINT NOT NULL</code></td>
+      <td>Byte count of the file.</td>
+    </tr>
+    <tr>
+      <td><code>file.modification-time</code></td>
+      <td><code>TIMESTAMP_LTZ(3) NOT NULL</code></td>
+      <td>Modification time of the file.</td>
+    </tr>
+    </tbody>
+</table>
+
+The extended `CREATE TABLE` example demonstrates the syntax for exposing these metadata fields:
+
+```sql
+CREATE TABLE MyUserTableWithFilepath (
+  column_name1 INT,
+  column_name2 STRING,
+  `file.path` STRING NOT NULL METADATA
+) WITH (
+  'connector' = 'filesystem',
+  'path' = 'file:///path/to/whatever',
+  'format' = 'json'
+)
+```
+
+## Streaming Sink
+
+The file system connector supports streaming writes, based on Flink's [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}),
+to write records to file. Row-encoded Formats are CSV and JSON. Bulk-encoded Formats are Parquet, ORC and Avro.
+
+You can write SQL directly, insert the stream data into the non-partitioned table.
+If it is a partitioned table, you can configure partition related operations. See [Partition Commit](filesystem.html#partition-commit) for details.
+
+### Rolling Policy
+
+Data within the partition directories are split into part files. Each partition will contain at least one part file for
+each subtask of the sink that has received data for that partition. The in-progress part file will be closed and additional
+part file will be created according to the configurable rolling policy. The policy rolls part files based on size,
+a timeout that specifies the maximum duration for which a file can be open.
 
 <table class="table table-bordered">
   <thead>
@@ -121,31 +219,33 @@ path
         <td><h5>sink.rolling-policy.file-size</h5></td>
         <td style="word-wrap: break-word;">128MB</td>
         <td>MemorySize</td>
-        <td> 滚动前,分区文件最大大小.</td>
+        <td>The maximum part file size before rolling.</td>
     </tr>
     <tr>
         <td><h5>sink.rolling-policy.rollover-interval</h5></td>
         <td style="word-wrap: break-word;">30 min</td>
         <td>Duration</td>
-        <td> 滚动前,分区文件处于打开状态的最大时长 (默认值是30分钟,以避免产生大量小文件)。 检查该选项的频率由参数 'sink.rolling-policy.check-interval' 控制。</td>
+        <td>The maximum time duration a part file can stay open before rolling (by default 30 min to avoid to many small files).
+        The frequency at which this is checked is controlled by the 'sink.rolling-policy.check-interval' option.</td>
     </tr>
     <tr>
         <td><h5>sink.rolling-policy.check-interval</h5></td>
         <td style="word-wrap: break-word;">1 min</td>
         <td>Duration</td>
-        <td> 基于时间的滚动策略的检查间隔。该参数控制了基于参数 'sink.rolling-policy.rollover-interval' 检查分区文件是否该被滚动的检查频率 .</td>
+        <td>The interval for checking time based rolling policies. This controls the frequency to check whether a part file should rollover based on 'sink.rolling-policy.rollover-interval'.</td>
     </tr>
   </tbody>
 </table>
 
-**注意:** 对于 bulk 格式 (parquet, orc, avro), 滚动策略和检查点间隔控制了分区文件的大小和个数 (未完成的文件会在下个检查点完成).
+**NOTE:** For bulk formats (parquet, orc, avro), the rolling policy in combination with the checkpoint interval(pending files
+become finished on the next checkpoint) control the size and number of these parts.
 
-**注意:** 对于行格式 (csv, json), 如果想使得分区文件更快地在文件系统中可见,可以设置连接器参数 `sink.rolling-policy.file-size` 或 `sink.rolling-policy.rollover-interval` ,以及 flink-conf.yaml 中的 `execution.checkpointing.interval` 。 
-对于其他格式 (avro, orc), 可以只设置 flink-conf.yaml 中的 `execution.checkpointing.interval` 。
+**NOTE:** For row formats (csv, json), you can set the parameter `sink.rolling-policy.file-size` or `sink.rolling-policy.rollover-interval` in the connector properties and parameter `execution.checkpointing.interval` in flink-conf.yaml together
+if you don't want to wait a long period before observe the data exists in file system. For other formats (avro, orc), you can just set parameter `execution.checkpointing.interval` in flink-conf.yaml.
 
-### 文件合并
+### File Compaction
 
-file sink 支持文件合并,以允许应用程序可以使用较小的检查点间隔而不产生大量文件。
+The file sink supports file compactions, which allows applications to have smaller checkpoint intervals without generating a large number of files.
 
 <table class="table table-bordered">
   <thead>
@@ -161,36 +261,35 @@ file sink 支持文件合并,以允许应用程序可以使用较小的检查
         <td><h5>auto-compaction</h5></td>
         <td style="word-wrap: break-word;">false</td>
         <td>Boolean</td>
-        <td> 在流式 sink 中是否开启自动合并功能。数据首先会被写入到临时文件,在检查点完成后,该检查点产生的临时文件会被合并。这些临时文件在合并前不可见.</td>
+        <td>Whether to enable automatic compaction in streaming sink or not. The data will be written to temporary files. After the checkpoint is completed, the temporary files generated by a checkpoint will be compacted. The temporary files are invisible before compaction.</td>
     </tr>
     <tr>
         <td><h5>compaction.file-size</h5></td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>MemorySize</td>
-        <td> 合并目标文件大小,默认值是滚动文件大小.</td>
+        <td>The compaction target file size, the default value is the rolling file size.</td>
     </tr>
   </tbody>
 </table>
 
-启用该参数后,文件合并功能会根据设定的目标文件大小,合并多个小文件到大文件。
-当在生产环境使用文件合并功能时,需要注意:
-- 只有检查点内部的文件才会被合并,也就是说,至少会生成跟检查点个数一样多的文件。
-- 合并前文件是可见的,所以文件的可见性是:检查点间隔 + 合并时长。
-- 如果合并花费的时间很长,会对作业产生反压,延长检查点所需时间。
+If enabled, file compaction will merge multiple small files into larger files based on the target file size.
+When running file compaction in production, please be aware that:
+- Only files in a single checkpoint are compacted, that is, at least the same number of files as the number of checkpoints is generated.
+- The file before merging is invisible, so the visibility of the file may be: checkpoint interval + compaction time.
+- If the compaction takes too long, it will backpressure the job and extend the time period of checkpoint.
 
-### 分区提交 
-<a id="分区提交"></a>
+### Partition Commit
 
-分区数据写完毕后,经常需要通知下游应用。比如,在 Hive metastore 中新增分区或者在目录下新增 `_SUCCESS` 文件。 分区提交策略是可定制的,具体的分区提交行为是基于 `triggers` 和 `policies` 的组合.
+After writing a partition, it is often necessary to notify downstream applications. For example, add the partition to a Hive metastore or writing a `_SUCCESS` file in the directory. The file system sink contains a partition commit feature that allows configuring custom policies. Commit actions are based on a combination of `triggers` and `policies`.
 
-- Trigger: 分区提交的时机,可以基于从分区中提取的时间对应的水印,或者基于处理时间。
-- Policy: 分区提交策略,内置的策略包括提交 `_SUCCESS` 文件和 hive metastore, 也可以自己定制提交策略, 比如触发 hive 生成统计信息,合并小文件等。
+- Trigger: The timing of the commit of the partition can be determined by the watermark with the time extracted from the partition, or by processing time.
+- Policy: How to commit a partition, built-in policies support for the commit of success files and metastore, you can also implement your own policies, such as triggering hive's analysis to generate statistics, or merging small files, etc.
 
-**注意:** 分区提交只有在动态分区插入模式下才有效。
+**NOTE:** Partition Commit only works in dynamic partition inserting.
 
-#### 分区提交触发器
+#### Partition commit trigger
 
-通过配置分区提交的触发策略,来配置何时提交分区:
+To define when to commit a partition, providing partition commit trigger:
 
 <table class="table table-bordered">
   <thead>
@@ -206,53 +305,53 @@ file sink 支持文件合并,以允许应用程序可以使用较小的检查
         <td><h5>sink.partition-commit.trigger</h5></td>
         <td style="word-wrap: break-word;">process-time</td>
         <td>String</td>
-        <td>分区提交触发器类型。 
-         'process-time': 基于机器时间,既不需要分区时间提取器也不需要水印生成器,一旦 ”当前系统时间“ 超过了 “分区创建系统时间” 和 'sink.partition-commit.delay' 之和,就提交分区;
-         'partition-time': 基于从分区字段提取的时间,需要水印生成器,一旦 “水印” 超过了 ”从分区字段提取的时间“ 和 'sink.partition-commit.delay' 之和,就提交分区.</td>
+        <td>Trigger type for partition commit: 'process-time': based on the time of the machine, it neither requires partition time extraction nor watermark generation. Commit partition once the 'current system time' passes 'partition creation system time' plus 'delay'. 'partition-time': based on the time that extracted from partition values, it requires watermark generation. Commit partition once the 'watermark' passes 'time extracted from partition values' plus 'delay'.</td>
     </tr>
     <tr>
         <td><h5>sink.partition-commit.delay</h5></td>
         <td style="word-wrap: break-word;">0 s</td>
         <td>Duration</td>
-        <td>该延迟时间之前分区不会被提交。如果是按天的分区,应配置为 '1 d', 如果是按小时的分区,应配置为 '1 h'.</td>
+        <td>The partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'.</td>
     </tr>
     <tr>
         <td><h5>sink.partition-commit.watermark-time-zone</h5></td>
         <td style="word-wrap: break-word;">UTC</td>
         <td>String</td>
-        <td>解析 LONG 类型的水印到 TIMESTAMP 类型时所采用的时区,解析得到的水印的 TIMESTAMP 会被用来跟分区时间进行比较以判断分区是否该被提交。
-            该参数只有在参数 `sink.partition-commit.trigger` 被设置为 'partition-time' 时才生效。
-            如果该参数设置的不正确,比如在 TIMESTAMP_LTZ 列上定义了 source rowtime, 但没有设置该参数,则用户可能在若干个小时后才看到分区的提交。
-            该参数的默认值是 'UTC', 代表水印是定义在 TIMESTAMP 列上或没有定义水印。 如果水印定义在 TIMESTAMP_LTZ 列上,则水印的时区是会话的时区。
-            该参数的可选值要么是完整的时区名比如 'America/Los_Angeles',要么是自定义的时区 id 比如 'GMT-08:00'.</td>
-    </tr>        
+        <td>The time zone to parse the long watermark value to TIMESTAMP value, the parsed watermark timestamp is used to compare with partition time to decide the partition should commit or not. This option is only take effect when `sink.partition-commit.trigger` is set to 'partition-time'. If this option is not configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ column, but this config is not configured, then users may see the partition committed after a few hours. Th [...]
+    </tr>    
   </tbody>
 </table>
 
-有两种类型的触发器:
-- 第一种是根据分区的处理时间。 该触发器不需要分区时间提取,也不需要生成水印。通过分区创建时间和当前系统时间来触发分区提交。该触发器更通用但不是很精确。比如,数据的延迟或故障转移都会导致分区的提前提交。
-- 第二种是根据从分区字段提取的时间以及水印。这需要你的作业支持生成水印,分区是根据时间来切割的,比如按小时或按天分区。
+There are two types of trigger:
+- The first is partition processing time. It neither requires partition time extraction nor watermark
+  generation. The trigger of partition commit according to partition creation time and current system time. This trigger
+  is more universal, but not so precise. For example, data delay or failover will lead to premature partition commit.
+- The second is the trigger of partition commit according to the time that extracted from partition values and watermark.
+  This requires that your job has watermark generation, and the partition is divided according to time, such as
+  hourly partition or daily partition.
 
-如果想让下游系统尽快感知到分区,而不管分区数据是否完整:
-- 'sink.partition-commit.trigger'='process-time' (默认值)
-- 'sink.partition-commit.delay'='0s' (默认值)
-一旦分区中有数据,分区立马就会被提交。注意:分区可能会被提交多次。
+If you want to let downstream see the partition as soon as possible, no matter whether its data is complete or not:
+- 'sink.partition-commit.trigger'='process-time' (Default value)
+- 'sink.partition-commit.delay'='0s' (Default value)
+  Once there is data in the partition, it will immediately commit. Note: the partition may be committed multiple times.
 
-如果想让下游系统只有在分区数据完整时才感知到分区,且你的作业有水印生成的逻辑,也能从分区字段的值中提取到时间:
+If you want to let downstream see the partition only when its data is complete, and your job has watermark generation, and you can extract the time from partition values:
 - 'sink.partition-commit.trigger'='partition-time'
-- 'sink.partition-commit.delay'='1h' (根据分区类型指定,如果是按小时的分区可配置为 '1h')
-该方式是最精确的提交分区的方式,该方式尽力确保提交的分区包含尽量完整的数据。
+- 'sink.partition-commit.delay'='1h' ('1h' if your partition is hourly partition, depends on your partition type)
+  This is the most accurate way to commit partition, and it will try to ensure that the committed partitions are as data complete as possible.
 
-如果想让下游系统只有在数据完整时才感知到分区,但是没有水印,或者无法从分区字段的值中提取时间:
-- 'sink.partition-commit.trigger'='process-time' (默认值)
-- 'sink.partition-commit.delay'='1h' (根据分区类型指定,如果是按小时的分区可配置为 '1h')
-该方式尽量精确地提交分区,但是数据延迟或故障转移会导致分区的提前提交。
+If you want to let downstream see the partition only when its data is complete, but there is no watermark, or the time cannot be extracted from partition values:
+- 'sink.partition-commit.trigger'='process-time' (Default value)
+- 'sink.partition-commit.delay'='1h' ('1h' if your partition is hourly partition, depends on your partition type)
+  Try to commit partition accurately, but data delay or failover will lead to premature partition commit.
 
-延迟数据的处理:延迟的记录会被写入到已经提交的对应分区中,且会再次触发该分区的提交。
+Late data processing: The record will be written into its partition when a record is supposed to be
+written into a partition that has already been committed, and then the committing of this partition
+will be triggered again.
 
-#### 分区时间提取器 
+#### Partition Time Extractor
 
-时间提取器定义了如何从分区字段值中提取时间.
+Time extractors define extracting time from partition values.
 
 <table class="table table-bordered">
   <thead>
@@ -268,27 +367,32 @@ file sink 支持文件合并,以允许应用程序可以使用较小的检查
         <td><h5>partition.time-extractor.kind</h5></td>
         <td style="word-wrap: break-word;">default</td>
         <td>String</td>
-        <td>从分区字段提取时间的时间提取器。支持默认值和定制。对于默认值,可以配置时间戳模式。对于定制,应指定提取器类.</td>
+        <td>Time extractor to extract time from partition values. Support default and custom. For default, can configure timestamp pattern\formatter. For custom, should configure extractor class.</td>
     </tr>
     <tr>
         <td><h5>partition.time-extractor.class</h5></td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
-        <td>实现了接口 PartitionTimeExtractor 的提取器类.</td>
+        <td>The extractor class for implement PartitionTimeExtractor interface.</td>
     </tr>
     <tr>
         <td><h5>partition.time-extractor.timestamp-pattern</h5></td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
-        <td> 'default' 时间提取器允许用户从分区字段中提取合法的时间戳模式。默认支持从第一个字段按 'yyyy-MM-dd hh:mm:ss' 时间戳模式提取。
-        如果需要从一个分区字段比如 ‘dt’ 提取时间戳,可以配置为: '$dt';
-        如果需要从多个分区字段,比如 'year', 'month', 'day' 和 'hour'提取时间戳,可以配置为:'$year-$month-$day $hour:00:00';
-        如果需要从两字分区字段,比如 'dt' 和 'hour' 提取时间戳,可以配置为:'$dt $hour:00:00'.</td>
+        <td>The 'default' construction way allows users to use partition fields to get a legal timestamp pattern. Default support 'yyyy-MM-dd hh:mm:ss' from first field. If timestamp should be extracted from a single partition field 'dt', can configure: '$dt'. If timestamp should be extracted from multiple partition fields, say 'year', 'month', 'day' and 'hour', can configure: '$year-$month-$day $hour:00:00'. If timestamp should be extracted from two partition fields 'dt' and 'hour', can [...]
+    </tr>
+    <tr>
+        <td><h5>partition.time-extractor.timestamp-formatter</h5></td>
+        <td style="word-wrap: break-word;">yyyy-MM-dd&nbsp;HH:mm:ss</td>
+        <td>String</td>
+        <td>The formatter that formats the partition timestamp string value to timestamp, the partition timestamp string value is expressed by 'partition.time-extractor.timestamp-pattern'. For example, the partition timestamp is extracted from multiple partition fields, say 'year', 'month' and 'day', you can configure 'partition.time-extractor.timestamp-pattern' to '$year$month$day', and configure `partition.time-extractor.timestamp-formatter` to 'yyyyMMdd'. By default the formatter is ' [...]
+            <br>The timestamp-formatter is compatible with Java's <a href="https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html">DateTimeFormatter</a>
+ 				</td>
     </tr>
   </tbody>
 </table>
 
-默认的提取器是基于由分区字段组合而成的时间戳模式。你也可以指定一个实现了 `PartitionTimeExtractor` 接口的自定义的提取器。
+The default extractor is based on a timestamp pattern composed of your partition fields. You can also specify an implementation for fully custom partition extraction based on the `PartitionTimeExtractor` interface.
 
 ```java
 
@@ -303,12 +407,12 @@ public class HourPartTimeExtractor implements PartitionTimeExtractor {
 
 ```
 
-#### 分区提交策略 
+#### Partition Commit Policy
 
-分区提交策略指定了提交分区时的具体操作.
+The partition commit policy defines what action is taken when partitions are committed.
 
-- 第一种是 metastore, 只有 hive 表支持该策略, 该策略下文件系统通过目录层次结构来管理分区.
-- 第二种是 success 文件, 该策略下会在分区对应的目录下写入一个名为 `_SUCCESS` 的空文件. 
+- The first is metastore, only hive table supports metastore policy, file system manages partitions through directory structure.
+- The second is the success file, which will write an empty file in the directory corresponding to the partition.
 
 <table class="table table-bordered">
   <thead>
@@ -324,28 +428,24 @@ public class HourPartTimeExtractor implements PartitionTimeExtractor {
         <td><h5>sink.partition-commit.policy.kind</h5></td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
-        <td>分区提交策略用来通知下游应用系统某个分区已经写完毕可以被读取了。
-            metastore: 向 metastore 中增加分区,只有 hive 支持 metastore 策略,文件系统通过目录结构管理分区;
-            success-file: 向目录下增加 '_success' 文件; 
-            custom: 使用指定的类来创建提交策略;
-            支持同时指定多个提交策略,如:'metastore,success-file'.</td>
+        <td>Policy to commit a partition is to notify the downstream application that the partition has finished writing, the partition is ready to be read. metastore: add partition to metastore. Only hive table supports metastore policy, file system manages partitions through directory structure. success-file: add '_success' file to directory. Both can be configured at the same time: 'metastore,success-file'. custom: use policy class to create a commit policy. Support to configure multi [...]
     </tr>
     <tr>
         <td><h5>sink.partition-commit.policy.class</h5></td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
-        <td> 实现了 PartitionCommitPolicy 接口的分区提交策略。只有在 custom 提交策略下适用。</td>
+        <td>The partition commit policy class for implement PartitionCommitPolicy interface. Only work in custom commit policy.</td>
     </tr>
     <tr>
         <td><h5>sink.partition-commit.success-file.name</h5></td>
         <td style="word-wrap: break-word;">_SUCCESS</td>
         <td>String</td>
-        <td> 使用 success-file 分区提交策略时的文件名,默认值是 '_SUCCESS'.</td>
+        <td>The file name for success-file partition commit policy, default is '_SUCCESS'.</td>
     </tr>
   </tbody>
 </table>
 
-你也可以实现自己的提交策略,如:
+You can extend the implementation of commit policy, The custom commit policy implementation like:
 
 ```java
 
@@ -374,9 +474,9 @@ public class AnalysisCommitPolicy implements PartitionCommitPolicy {
 
 ```
 
-## Sink 并行度
+## Sink Parallelism
 
-向外部文件系统(包括 hive) 写文件时的并行度,在流处理模式和批处理模式下,都可以通过对应的 table 选项指定。默认情况下,该并行度跟上一个上游的 chained operator 的并行度一样。当配置了跟上一个上游的 chained operator 不一样的并行度时,写文件的算子和合并文件的算子(如果使用了的话)会使用指定的并行度。
+The parallelism of writing files into external file system (including Hive) can be configured by the corresponding table option, which is supported both in streaming mode and in batch mode. By default, the parallelism is configured to being the same as the parallelism of its last upstream chained operator. When the parallelism which is different from the parallelism of the upstream parallelism is configured, the operator of writing files and the operator compacting files (if used) will a [...]
 
 
 <table class="table table-bordered">
@@ -393,17 +493,17 @@ public class AnalysisCommitPolicy implements PartitionCommitPolicy {
         <td><h5>sink.parallelism</h5></td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>Integer</td>
-        <td> 向外部文件系统写文件时的并行度。必须大于 0,否则会抛出异常.</td>
+        <td>Parallelism of writing files into external file system. The value should greater than zero otherwise exception will be thrown.</td>
     </tr>
-    
+
   </tbody>
 </table>
 
-**注意:** 当前,只有在上游的 changelog 模式是 **INSERT-ONLY** 时,才支持设置 sink 的并行度。否则的话,会抛出异常。
+**NOTE:** Currently, Configuring sink parallelism is supported if and only if the changelog mode of upstream is **INSERT-ONLY**. Otherwise, exception will be thrown.
 
-## 完整示例
+## Full Example
 
-如下示例演示了如何使用文件系统连接器编写流查询语句查询 kafka 中的数据并写入到文件系统中,以及通过批查询把结果数据读取出来.
+The below examples show how the file system connector can be used to write a streaming query to write data from Kafka into a file system and runs a batch query to read that data back out.
 
 ```sql
 
@@ -411,7 +511,7 @@ CREATE TABLE kafka_table (
   user_id STRING,
   order_amount DOUBLE,
   log_ts TIMESTAMP(3),
-  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- 在 TIMESTAMP 列上定义水印
+  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
 ) WITH (...);
 
 CREATE TABLE fs_table (
@@ -427,7 +527,7 @@ CREATE TABLE fs_table (
   'sink.partition-commit.policy.kind'='success-file'
 );
 
--- streaming sql, 插入数据到文件系统表中
+-- streaming sql, insert into file system table
 INSERT INTO fs_table 
 SELECT 
     user_id, 
@@ -436,19 +536,19 @@ SELECT
     DATE_FORMAT(log_ts, 'HH') 
 FROM kafka_table;
 
--- batch sql, 分区裁剪查询
+-- batch sql, select with partition pruning
 SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
 ```
 
-如果水印是定义在 TIMESTAMP_LTZ 列上,且使用了 `partition-time` 来提交分区, 则参数 `sink.partition-commit.watermark-time-zone` 需要被设置为会话的时区,否则分区会在若干小时后才会被提交。  
+If the watermark is defined on TIMESTAMP_LTZ column and used `partition-time` to commit, the `sink.partition-commit.watermark-time-zone` is required to set to the session time zone, otherwise the partition committed may happen after a few hours.
 ```sql
 
 CREATE TABLE kafka_table (
   user_id STRING,
   order_amount DOUBLE,
-  ts BIGINT, -- epoch 毫秒时间
+  ts BIGINT, -- time in epoch milliseconds
   ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
-  WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 在 TIMESTAMP_LTZ 列上定义水印
+  WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP_LTZ column
 ) WITH (...);
 
 CREATE TABLE fs_table (
@@ -463,11 +563,11 @@ CREATE TABLE fs_table (
   'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
   'sink.partition-commit.delay'='1 h',
   'sink.partition-commit.trigger'='partition-time',
-  'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- 假定用户配置的时区是 'Asia/Shanghai'
+  'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user configured time zone is 'Asia/Shanghai'
   'sink.partition-commit.policy.kind'='success-file'
 );
 
--- streaming sql, 插入数据到文件系统表中
+-- streaming sql, insert into file system table
 INSERT INTO fs_table 
 SELECT 
     user_id, 
@@ -476,7 +576,7 @@ SELECT
     DATE_FORMAT(ts_ltz, 'HH') 
 FROM kafka_table;
 
--- batch sql, 分区裁剪查询
+-- batch sql, select with partition pruning
 SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
 ```
 
diff --git a/docs/content.zh/docs/deployment/filesystems/s3.md b/docs/content.zh/docs/deployment/filesystems/s3.md
index ee04ab6..bf41d97 100644
--- a/docs/content.zh/docs/deployment/filesystems/s3.md
+++ b/docs/content.zh/docs/deployment/filesystems/s3.md
@@ -70,10 +70,10 @@ Flink 提供两种文件系统用来与 S3 交互:`flink-s3-fs-presto` 和 `fl
   
      例如,Hadoop 有 `fs.s3a.connection.maximum` 的配置选项。 如果你想在 Flink 程序中改变该配置的值,你需要将配置 `s3.connection.maximum: xyz` 添加到 `flink-conf.yaml` 文件中。Flink 会内部将其转换成配置 `fs.s3a.connection.maximum`。 而无需通过 Hadoop 的 XML 配置文件来传递参数。
   
-    另外,它是唯一支持 [StreamingFileSink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) 和 [FileSink]({{< ref "docs/connectors/datastream/file_sink" >}}) 的 S3 文件系统。
+    另外,它是唯一支持 [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) 的 S3 文件系统。
   
 `flink-s3-fs-hadoop` 和 `flink-s3-fs-presto` 都为 *s3://* scheme 注册了默认的文件系统包装器,`flink-s3-fs-hadoop` 另外注册了 *s3a://*,`flink-s3-fs-presto` 注册了 *s3p://*,因此二者可以同时使用。
-例如某作业使用了 [StreamingFileSink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}),它仅支持 Hadoop,但建立 checkpoint 使用 Presto。在这种情况下,建议明确地使用 *s3a://* 作为 sink (Hadoop) 的 scheme,checkpoint (Presto) 使用 *s3p://*。这一点对于 [FileSink]({{< ref "docs/connectors/datastream/file_sink" >}}) 同样成立。
+例如某作业使用了 [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}),它仅支持 Hadoop,但建立 checkpoint 使用 Presto。在这种情况下,建议明确地使用 *s3a://* 作为 sink (Hadoop) 的 scheme,checkpoint (Presto) 使用 *s3p://*。这一点对于 [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) 同样成立。
 
 在启动 Flink 之前,将对应的 JAR 文件从 `opt` 复制到 Flink 发行版的 `plugins` 目录下,以使用 `flink-s3-fs-hadoop` 或 `flink-s3-fs-presto`。
 
diff --git a/docs/content.zh/docs/dev/datastream/execution_mode.md b/docs/content.zh/docs/dev/datastream/execution_mode.md
index 5934f1d..e0655b7 100644
--- a/docs/content.zh/docs/dev/datastream/execution_mode.md
+++ b/docs/content.zh/docs/dev/datastream/execution_mode.md
@@ -214,7 +214,7 @@ Checkpointing 用于故障恢复的特点之一是,在发生故障时,Flink
 
 如[上文所述](#故障恢复),批处理程序的故障恢复不使用检查点。
 
-重要的是要记住,因为没有 checkpoints,某些功能如 ({{< javadoc file="org/apache/flink/api/common/state/CheckpointListener.html" name="CheckpointListener">}}),以及因此,Kafka 的 [精确一次(EXACTLY_ONCE)]({{< ref "docs/connectors/datastream/kafka" >}}#kafka-producers-and-fault-tolerance) 模式或 `StreamingFileSink` 的 [OnCheckpointRollingPolicy]({{< ref "docs/connectors/datastream/streamfile_sink" >}}#rolling-policy) 将无法工作。
+重要的是要记住,因为没有 checkpoints,某些功能如 ({{< javadoc file="org/apache/flink/api/common/state/CheckpointListener.html" name="CheckpointListener">}}),以及因此,Kafka 的 [精确一次(EXACTLY_ONCE)]({{< ref "docs/connectors/datastream/kafka" >}}#kafka-producers-and-fault-tolerance) 模式或 `File Sink` 的 [OnCheckpointRollingPolicy]({{< ref "docs/connectors/datastream/filesystem" >}}#rolling-policy) 将无法工作。
 如果你需要一个在`批`模式下工作的事务型 sink,请确保它使用 [FLIP-143](https://cwiki.apache.org/confluence/x/KEJ4CQ) 中提出的统一 Sink API。
 
 你仍然可以使用所有的 [状态原语(state primitives)]({{< ref "docs/dev/datastream/fault-tolerance/state" >}}#working-with-state),只是用于故障恢复的机制会有所不同。
diff --git a/docs/content/docs/connectors/datastream/file_sink.md b/docs/content/docs/connectors/datastream/filesystem.md
similarity index 74%
rename from docs/content/docs/connectors/datastream/file_sink.md
rename to docs/content/docs/connectors/datastream/filesystem.md
index 4930ba8..6fcbc9a 100644
--- a/docs/content/docs/connectors/datastream/file_sink.md
+++ b/docs/content/docs/connectors/datastream/filesystem.md
@@ -1,10 +1,12 @@
 ---
-title: File Sink
+title: FileSystem
 weight: 6
 type: docs
 aliases:
   - /dev/connectors/file_sink.html
   - /apis/streaming/connectors/filesystem_sink.html
+  - /docs/connectors/datastream/streamfile_sink/
+  - /docs/connectors/datastream/file_sink/
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` that reads or writes (partitioned) files to file systems
 supported by the [Flink `FileSystem` abstraction]({{< ref "docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it is an evolution of the 
-existing [Streaming File Sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) which was designed for providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is designed to provide exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any (distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) (e.g., Avro, CSV, Parquet),
+and produces a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref "docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: `SplitEnumerator` and `SourceReader`. 
+
+* `SplitEnumerator` is responsible for discovering and identifying the files to read and assigns them to the `SourceReader`.
+* `SourceReader` requests the files it needs to process and reads the file from the filesystem. 
+
+You will need to combine the File Source with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}), which allows you to
+parse CSV, decode AVRO, or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator - a recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for periodic file discovery.
+In this case, the `SplitEnumerator` will enumerate like the bounded case but, after a certain interval, repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously detected files and only sends new ones to the `SourceReader`.
+
+### Usage
+
+You can start building a File Source via one of the following API calls:
+
+{{< tabs "FileSourceUsage" >}}
+{{< tab "Java" >}}
+```java
+// reads the contents of a file from a file stream. 
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads batches of records from a file at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+This creates a `FileSource.FileSourceBuilder` on which you can configure all the properties of the File Source.
+
+For the bounded/batch case, the File Source processes all files under the given path(s).
+For the continuous/streaming case, the source periodically checks the paths for new files and will start reading those.
+
+When you start creating a File Source (via the `FileSource.FileSourceBuilder` created through one of the above-mentioned methods),
+the source is in bounded/batch mode by default. You can call `AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)`
+to put the source into continuous streaming mode.
+
+{{< tabs "FileSourceBuilder" >}}
+{{< tab "Java" >}}
+```java
+final FileSource<String> source =
+        FileSource.forRecordStreamFormat(...)
+        .monitorContinuously(Duration.ofMillis(5))  
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Format Types
+
+The reading of each file happens through file readers defined by file formats. 
+These define the parsing logic for the contents of the file. There are multiple classes that the source supports.
+The interfaces are a tradeoff between simplicity of implementation and flexibility/efficiency.
+
+* A `StreamFormat` reads the contents of a file from a file stream. It is the simplest format to implement, 
+and provides many features out-of-the-box (like checkpointing logic) but is limited in the optimizations it can apply 
+(such as object reuse, batching, etc.).
+
+* A `BulkFormat` reads batches of records from a file at a time. 
+It is the most "low level" format to implement, but offers the greatest flexibility to optimize the implementation.
+
+#### TextLine Format
+
+A `StreamFormat` reader formats text lines from a file.
+The reader uses Java's built-in `InputStreamReader` to decode the byte stream using
+various supported charset encodings.
+This format does not support optimized recovery from checkpoints. On recovery, it will re-read
+and discard the number of lines that were processed before the last checkpoint. This is due to
+the fact that the offsets of lines in the file cannot be tracked through the charset decoders
+with their internal buffering of stream input and charset decoder state.
+
+#### SimpleStreamFormat Abstract Class
+
+This is a simple version of `StreamFormat` for formats that are not splittable.
+Custom reads of Array or File can be done by implementing `SimpleStreamFormat`:
+
+{{< tabs "SimpleStreamFormat" >}}
+{{< tab "Java" >}}
+```java
+private static final class ArrayReaderFormat extends SimpleStreamFormat<byte[]> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public Reader<byte[]> createReader(Configuration config, FSDataInputStream stream)
+            throws IOException {
+        return new ArrayReader(stream);
+    }
+
+    @Override
+    public TypeInformation<byte[]> getProducedType() {
+        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+    }
+}
+
+final FileSource<byte[]> source =
+                FileSource.forRecordStreamFormat(new ArrayReaderFormat(), path).build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+An example of a `SimpleStreamFormat` is `CsvReaderFormat`. It can be initialized like this:
+```java
+CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
+FileSource<SomePojo> source = 
+        FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();
+```
+
+The schema for CSV parsing, in this case, is automatically derived based on the fields of the `SomePojo` class using the `Jackson` library. (Note: you might need to add `@JsonPropertyOrder({field1, field2, ...})` annotation to your class definition with the fields order exactly matching those of the CSV file columns).
+
+If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level `forSchema` static factory method of `CsvReaderFormat`:
+
+```java
+CsvReaderFormat<T> forSchema(CsvMapper mapper, 
+                             CsvSchema schema, 
+                             TypeInformation<T> typeInformation) 
+```
+
+#### Bulk Format
+
+The BulkFormat reads and decodes batches of records at a time. Examples of bulk formats
+are formats like ORC or Parquet.
+The outer `BulkFormat` class acts mainly as a configuration holder and factory for the
+reader. The actual reading is done by the `BulkFormat.Reader`, which is created in the
+`BulkFormat#createReader(Configuration, FileSourceSplit)` method. If a bulk reader is
+created based on a checkpoint during checkpointed streaming execution, then the reader is
+re-created in the `BulkFormat#restoreReader(Configuration, FileSourceSplit)` method.
+
+A `SimpleStreamFormat` can be turned into a `BulkFormat` by wrapping it in a `StreamFormatAdapter`:
+```java
+BulkFormat<SomePojo, FileSourceSplit> bulkFormat = 
+        new StreamFormatAdapter<>(CsvReaderFormat.forPojo(SomePojo.class));
+```
+
+### Customizing File Enumeration
+
+{{< tabs "CustomizingFileEnumeration" >}}
+{{< tab "Java" >}}
+```java
+/**
+ * A FileEnumerator implementation for hive source, which generates splits based on 
+ * HiveTablePartition.
+ */
+public class HiveSourceFileEnumerator implements FileEnumerator {
+    
+    // reference constructor
+    public HiveSourceFileEnumerator(...) {
+        ...
+    }
+
+    /***
+     * Generates all file splits for the relevant files under the given paths. The {@code
+     * minDesiredSplits} is an optional hint indicating how many splits would be necessary to
+     * exploit parallelism properly.
+     */
+    @Override
+    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
+            throws IOException {
+        // createInputSplits:splitting files into fragmented collections
+        return new ArrayList<>(createInputSplits(...));
+    }
+
+    ...
+
+    /***
+     * A factory to create HiveSourceFileEnumerator.
+     */
+    public static class Provider implements FileEnumerator.Provider {
+
+        ...
+        @Override
+        public FileEnumerator create() {
+            return new HiveSourceFileEnumerator(...);
+        }
+    }
+}
+// use the customizing file enumeration
+new HiveSource<>(
+        ...,
+        new HiveSourceFileEnumerator.Provider(
+        partitions != null ? partitions : Collections.emptyList(),
+        new JobConfWrapper(jobConf)),
+       ...);
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Current Limitations
+
+Watermarking does not work very well for large backlogs of files. This is because watermarks eagerly advance within a file, and the next file might contain data later than the watermark.
+
+For Unbounded File Sources, the enumerator currently remembers paths of all already processed files, which is a state that can, in some cases, grow rather large.
+There are plans to add a compressed form of tracking already processed files in the future (for example, by keeping modification timestamps below boundaries).
+
+### Behind the Scenes
+{{< hint info >}}
+If you are interested in how File Source works through the new data source API design, you may
+want to read this part as a reference. For details about the new data source API, check out the
+[documentation on data sources]({{< ref "docs/dev/datastream/sources.md" >}}) and
+<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface">FLIP-27</a>
+for more descriptive discussions.
+{{< /hint >}}
+
+## File Sink
 
 The file sink writes incoming data into buckets. Given that the incoming streams can be unbounded,
 data in each bucket is organized into part files of finite size. The bucketing behaviour is fully configurable
@@ -54,7 +271,7 @@ in the `in-progress` or the `pending` state, and cannot be safely read by downst
 
  {{< img src="/fig/streamfilesink_bucketing.png"  width="100%" >}}
 
-## File Formats
+### Format Types
 
 The `FileSink` supports both row-wise and bulk encoding formats, such as [Apache Parquet](http://parquet.apache.org).
 These two variants come with their respective builders that can be created with the following static methods:
@@ -68,7 +285,7 @@ stored and the encoding logic for our data.
 Please check out the JavaDoc for {{< javadoc file="org/apache/flink/connector/file/sink/FileSink.html" name="FileSink">}}
 for all the configuration options and more documentation about the implementation of the different data formats.
 
-### Row-encoded Formats
+#### Row-encoded Formats
 
 Row-encoded formats need to specify an `Encoder`
 that is used for serializing individual rows to the `OutputStream` of the in-progress part files.
@@ -137,7 +354,7 @@ a rolling policy that rolls the in-progress part file on any of the following 3
  - It hasn't received new records for the last 5 minutes
  - The file size has reached 1 GB (after writing the last record)
 
-### Bulk-encoded Formats
+#### Bulk-encoded Formats
 
 Bulk-encoded sinks are created similarly to the row-encoded ones, but instead of
 specifying an `Encoder`, we have to specify a {{< javadoc file="org/apache/flink/api/common/serialization/BulkWriter.Factory.html" name="BulkWriter.Factory">}}.
@@ -157,7 +374,7 @@ Flink comes with four built-in BulkWriter factories:
 The latter rolls on every checkpoint. A policy can roll additionally based on size or processing time.
 {{< /hint >}}
 
-#### Parquet format
+##### Parquet format
 
 Flink contains built in convenience methods for creating Parquet writer factories for Avro data. These methods
 and their associated documentation can be found in the ParquetAvroWriters class.
@@ -245,7 +462,7 @@ input.sinkTo(sink)
 {{< /tab >}}
 {{< /tabs >}}
 
-#### Avro format
+##### Avro format
 
 Flink also provides built-in support for writing data into Avro files. A list of convenience methods to create
 Avro writer factories and their associated documentation can be found in the 
@@ -339,7 +556,7 @@ stream.sinkTo(FileSink.forBulkFormat(
 {{< /tab >}}
 {{< /tabs >}}
 
-#### ORC Format
+##### ORC Format
  
 To enable the data to be bulk encoded in ORC format, Flink offers `OrcBulkWriterFactory`
 which takes a concrete implementation of Vectorizer.
@@ -533,7 +750,7 @@ class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {
 {{< /tab >}}
 {{< /tabs >}}
 
-#### Hadoop SequenceFile format
+##### Hadoop SequenceFile format
 
 To use the `SequenceFile` bulk encoder in your application you need to add the following dependency:
 
@@ -589,7 +806,7 @@ input.sinkTo(sink)
 
 The `SequenceFileWriterFactory` supports additional constructor parameters to specify compression settings.
 
-## Bucket Assignment
+### Bucket Assignment
 
 The bucketing logic defines how the data will be structured into subdirectories inside the base output directory.
 
@@ -605,7 +822,7 @@ Flink comes with two built-in BucketAssigners:
  - `DateTimeBucketAssigner` : Default time based assigner
  - `BasePathBucketAssigner` : Assigner that stores all part files in the base path (single global bucket)
 
-## Rolling Policy
+### Rolling Policy
 
 The `RollingPolicy` defines when a given in-progress part file will be closed and moved to the pending and later to finished state.
 Part files in the "finished" state are the ones that are ready for viewing and are guaranteed to contain valid data that will not be reverted in case of failure.
@@ -618,7 +835,7 @@ Flink comes with two built-in RollingPolicies:
  - `DefaultRollingPolicy`
  - `OnCheckpointRollingPolicy`
 
-## Part file lifecycle
+### Part file lifecycle
 
 In order to use the output of the `FileSink` in downstream systems, we need to understand the naming and lifecycle of the output files produced.
 
@@ -672,7 +889,7 @@ New buckets are created as dictated by the bucketing policy, and this doesn't af
 
 Old buckets can still receive new records as the bucketing policy is evaluated on a per-record basis.
 
-### Part file configuration
+#### Part file configuration
 
 Finished files can be distinguished from the in-progress ones by their naming scheme only.
 
@@ -735,9 +952,9 @@ val sink = FileSink
 {{< /tab >}}
 {{< /tabs >}}
 
-## Important Considerations
+### Important Considerations
 
-### General
+#### General
 
 <span class="label label-danger">Important Note 1</span>: When using Hadoop < 2.7, please use
 the `OnCheckpointRollingPolicy` which rolls part files on every checkpoint. The reason is that if part files "traverse"
@@ -757,7 +974,7 @@ in-progress file.
 <span class="label label-danger">Important Note 4</span>: Currently, the `FileSink` only supports three filesystems: 
 HDFS, S3, and Local. Flink will throw an exception when using an unsupported filesystem at runtime.
 
-### BATCH-specific
+#### BATCH-specific
 
 <span class="label label-danger">Important Note 1</span>: Although the `Writer` is executed with the user-specified
 parallelism, the `Committer` is executed with parallelism equal to 1.
@@ -770,7 +987,7 @@ failure happens while the `Committers` are committing, then we may have duplicat
 future Flink versions 
 (see progress in [FLIP-147](https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished)).
 
-### S3-specific
+#### S3-specific
 
 <span class="label label-danger">Important Note 1</span>: For S3, the `FileSink`
 supports only the [Hadoop-based](https://hadoop.apache.org/) FileSystem implementation, not
@@ -791,3 +1008,4 @@ before the job is restarted. This will result in your job not being able to rest
 pending part-files are no longer there and Flink will fail with an exception as it tries to fetch them and fails.
 
 {{< top >}}
+
diff --git a/docs/content/docs/connectors/datastream/formats/azure_table_storage.md b/docs/content/docs/connectors/datastream/formats/azure_table_storage.md
index d8d1359..df3e291 100644
--- a/docs/content/docs/connectors/datastream/formats/azure_table_storage.md
+++ b/docs/content/docs/connectors/datastream/formats/azure_table_storage.md
@@ -1,5 +1,5 @@
 ---
-title:  "Microsoft Azure table"
+title:  "Azure Table storage"
 weight: 4
 type: docs
 aliases:
@@ -25,7 +25,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Microsoft Azure Table Storage format
+# Azure Table Storage
 
 This example is using the `HadoopInputFormat` wrapper to use an existing Hadoop input format implementation for accessing [Azure's Table Storage](https://azure.microsoft.com/en-us/documentation/articles/storage-introduction/).
 
diff --git a/docs/content/docs/connectors/datastream/formats/overview.md b/docs/content/docs/connectors/datastream/formats/overview.md
new file mode 100644
index 0000000..74d9226
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/formats/overview.md
@@ -0,0 +1,38 @@
+---
+title: Overview
+weight: 1
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# DataStream Formats
+
+## Available Formats
+
+Formats define how information is encoded for storage. Currently these formats are supported:
+
+ * [Avro]({{< ref "docs/connectors/datastream/formats/avro" >}})
+ * [Azure Table]({{< ref "docs/connectors/datastream/formats/azure_table_storage" >}})
+ * [Hadoop]({{< ref "docs/connectors/datastream/formats/hadoop" >}})
+ * [Parquet]({{< ref "docs/connectors/datastream/formats/parquet" >}})
+ * [Text files]({{< ref "docs/connectors/datastream/formats/text_files" >}})
+ 
+{{< top >}}
+
diff --git a/docs/content/docs/connectors/datastream/formats/parquet.md b/docs/content/docs/connectors/datastream/formats/parquet.md
index f58eeeb..98d76a6 100644
--- a/docs/content/docs/connectors/datastream/formats/parquet.md
+++ b/docs/content/docs/connectors/datastream/formats/parquet.md
@@ -1,5 +1,5 @@
 ---
-title:  "Parquet files"
+title:  "Parquet"
 weight: 4
 type: docs
 aliases:
@@ -28,12 +28,16 @@ under the License.
 
 # Parquet format
 
-Flink supports reading [parquet](https://parquet.apache.org/) files and producing [Flink rows](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/types/Row.html).
+Flink supports reading [Parquet](https://parquet.apache.org/) files and producing [Flink rows](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/types/Row.html).
 To use the format you need to add the Flink Parquet dependency to your project:
 
 ```xml
-{{< artifact flink-parquet >}}
-```  
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-parquet</artifactId>
+	<version>{{< version >}}</version>
+</dependency>
+```
  
 This format is compatible with the new Source that can be used in both batch and streaming modes.
 Thus, you can use this format in two ways:
@@ -69,7 +73,7 @@ final DataStream<RowData> stream =
 
 **Continuous read example**:
 
-In this example we create a DataStream containing parquet records as Flink Rows that will 
+In this example we create a DataStream containing Parquet records as Flink Rows that will 
 infinitely grow as new files are added to the directory. We monitor for new files each second.
 We project the schema to read only certain fields ("f7", "f4" and "f99").  
 We read records in batches of 500 records. The first boolean parameter specifies if timestamp columns need to be interpreted as UTC.
diff --git a/docs/content/docs/connectors/datastream/formats/text_files.md b/docs/content/docs/connectors/datastream/formats/text_files.md
index 7e9fcf6..864c880 100644
--- a/docs/content/docs/connectors/datastream/formats/text_files.md
+++ b/docs/content/docs/connectors/datastream/formats/text_files.md
@@ -32,7 +32,11 @@ Flink supports reading from text lines from a file using `TextLineFormat`. This
 To use the format you need to add the Flink Parquet dependency to your project:
 
 ```xml
-{{< artifact flink-connector-files >}}
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-connector-files</artifactId>
+	<version>{{< version >}}</version>
+</dependency>
 ```
 
 This format is compatible with the new Source that can be used in both batch and streaming modes.
diff --git a/docs/content/docs/connectors/datastream/overview.md b/docs/content/docs/connectors/datastream/overview.md
index 116888c..57be57b 100644
--- a/docs/content/docs/connectors/datastream/overview.md
+++ b/docs/content/docs/connectors/datastream/overview.md
@@ -42,9 +42,7 @@ Connectors provide code for interfacing with various third-party systems. Curren
  * [Apache Cassandra]({{< ref "docs/connectors/datastream/cassandra" >}}) (sink)
  * [Amazon Kinesis Streams]({{< ref "docs/connectors/datastream/kinesis" >}}) (source/sink)
  * [Elasticsearch]({{< ref "docs/connectors/datastream/elasticsearch" >}}) (sink)
- * [FileSystem (Hadoop included) - Streaming only sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) (sink)
- * [FileSystem (Hadoop included) - Streaming and Batch sink]({{< ref "docs/connectors/datastream/file_sink" >}}) (sink)
- * [FileSystem (Hadoop included) - Batch source] ({{< ref "docs/connectors/datastream/formats" >}}) (source)
+ * [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) (sink)
  * [RabbitMQ]({{< ref "docs/connectors/datastream/rabbitmq" >}}) (source/sink)
  * [Google PubSub]({{< ref "docs/connectors/datastream/pubsub" >}}) (source/sink)
  * [Hybrid Source]({{< ref "docs/connectors/datastream/hybridsource" >}}) (source)
diff --git a/docs/content/docs/connectors/datastream/streamfile_sink.md b/docs/content/docs/connectors/datastream/streamfile_sink.md
deleted file mode 100644
index c008898..0000000
--- a/docs/content/docs/connectors/datastream/streamfile_sink.md
+++ /dev/null
@@ -1,776 +0,0 @@
----
-title: Streaming File Sink
-weight: 6
-type: docs
-aliases:
-  - /dev/connectors/streamfile_sink.html
-bookHidden: true
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-# Streaming File Sink
-
-This connector provides a Sink that writes partitioned files to filesystems
-supported by the [Flink `FileSystem` abstraction]({{< ref "docs/deployment/filesystems/overview" >}}).
-
-{{< hint warning >}}
-This Streaming File Sink is in the process of being phased out. Please use the unified [File Sink]({{< ref "docs/connectors/datastream/file_sink" >}}) as a drop-in replacement.
-{{< /hint >}}
-
-The streaming file sink writes incoming data into buckets. Given that the incoming streams can be unbounded,
-data in each bucket are organized into part files of finite size. The bucketing behaviour is fully configurable
-with a default time-based bucketing where we start writing a new bucket every hour. This means that each resulting
-bucket will contain files with records received during 1 hour intervals from the stream.
-
-Data within the bucket directories are split into part files. Each bucket will contain at least one part file for
-each subtask of the sink that has received data for that bucket. Additional part files will be created according to the configurable
-rolling policy. The default policy rolls part files based on size, a timeout that specifies the maximum duration for which a file can be open, and a maximum inactivity timeout after which the file is closed.
-
-{{< hint info >}}
-Checkpointing needs to be enabled when using the StreamingFileSink. Part files can only be finalized on successful checkpoints. If checkpointing is disabled, part files will forever stay in the `in-progress` or the `pending` state,
-and cannot be safely read by downstream systems.
-{{< /hint >}}
-
- {{< img src="/fig/streamfilesink_bucketing.png" >}}
-
-
-## File Formats
-
-The `StreamingFileSink` supports both row-wise and bulk encoding formats, such as [Apache Parquet](http://parquet.apache.org).
-These two variants come with their respective builders that can be created with the following static methods:
-
- - Row-encoded sink: `StreamingFileSink.forRowFormat(basePath, rowEncoder)`
- - Bulk-encoded sink: `StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)`
-
-When creating either a row or a bulk encoded sink we have to specify the base path where the buckets will be
-stored and the encoding logic for our data.
-
-Please check out the JavaDoc for `StreamingFileSink`
-and more documentation about the implementation of the different data formats.
-
-### Row-encoded Formats
-
-Row-encoded formats need to specify an `Encoder` that is used for serializing individual rows to the `OutputStream` of the in-progress part files.
-
-In addition to the bucket assigner, the `RowFormatBuilder` allows the user to specify:
-
- - Custom `RollingPolicy` : Rolling policy to override the DefaultRollingPolicy
- - bucketCheckInterval (default = 1 min) : Millisecond interval for checking time based rolling policies
-
-Basic usage for writing String elements thus looks like this:
-
-
-{{< tabs "39aba608-5b82-457c-b3fe-cd390eb3ed3d" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.api.common.serialization.SimpleStringEncoder;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
-import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
-
-DataStream<String> input = ...;
-
-final StreamingFileSink<String> sink = StreamingFileSink
-    .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
-    .withRollingPolicy(
-        DefaultRollingPolicy.builder()
-            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
-            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
-            .withMaxPartSize(1024 * 1024 * 1024)
-            .build())
-	.build();
-
-input.addSink(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.api.common.serialization.SimpleStringEncoder
-import org.apache.flink.core.fs.Path
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
-import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
-
-val input: DataStream[String] = ...
-
-val sink: StreamingFileSink[String] = StreamingFileSink
-    .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
-    .withRollingPolicy(
-        DefaultRollingPolicy.builder()
-            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
-            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
-            .withMaxPartSize(1024 * 1024 * 1024)
-            .build())
-    .build()
-
-input.addSink(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-This example creates a simple sink that assigns records to the default one hour time buckets. It also specifies
-a rolling policy that rolls the in-progress part file on any of the following 3 conditions:
-
- - It contains at least 15 minutes worth of data
- - It hasn't received new records for the last 5 minutes
- - The file size has reached 1 GB (after writing the last record)
-
-### Bulk-encoded Formats
-
-Bulk-encoded sinks are created similarly to the row-encoded ones, but instead of
-specifying an `Encoder`, we have to specify a `BulkWriter.Factory`.
-The `BulkWriter` logic defines how new elements are added and flushed, and how a batch of records
-is finalized for further encoding purposes.
-
-Flink comes with four built-in BulkWriter factories:
-
- - `ParquetWriterFactory`
- - `AvroWriterFactory`
- - `SequenceFileWriterFactory`
- - `CompressWriterFactory`
- - `OrcBulkWriterFactory`
-
-{{< hint info >}} 
-Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls (ONLY) on every checkpoint.
-{{< /hint >}}
-
-#### Parquet format
-
-Flink contains built in convenience methods for creating Parquet writer factories for Avro data. These methods
-and their associated documentation can be found in the `ParquetAvroWriters` class.
-
-For writing to other Parquet compatible data formats, users need to create the ParquetWriterFactory with a custom implementation of the `ParquetBuilder` interface.
-
-To use the Parquet bulk encoder in your application you need to add the following dependency:
-
-{{< artifact flink-parquet withScalaVersion >}}
-
-A StreamingFileSink that writes Avro data to Parquet format can be created like this:
-
-{{< tabs "ed00c260-2293-4a52-8761-6d2c25fa4e8c" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
-import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
-import org.apache.avro.Schema;
-
-
-Schema schema = ...;
-DataStream<GenericRecord> input = ...;
-
-final StreamingFileSink<GenericRecord> sink = StreamingFileSink
-	.forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema))
-	.build();
-
-input.addSink(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
-import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
-import org.apache.avro.Schema
-
-val schema: Schema = ...
-val input: DataStream[GenericRecord] = ...
-
-val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
-    .forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema))
-    .build()
-
-input.addSink(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-Similarly, a StreamingFileSink that writes Protobuf data to Parquet format can be created like this:
-
-{{< tabs "22fad830-b0ba-4fd2-85d2-2df6f67d1034" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
-import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;
-
-// ProtoRecord is a generated protobuf Message class.
-DataStream<ProtoRecord> input = ...;
-
-final StreamingFileSink<ProtoRecord> sink = StreamingFileSink
-	.forBulkFormat(outputBasePath, ParquetProtoWriters.forType(ProtoRecord.class))
-	.build();
-
-input.addSink(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
-import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters
-
-// ProtoRecord is a generated protobuf Message class.
-val input: DataStream[ProtoRecord] = ...
-
-val sink: StreamingFileSink[ProtoRecord] = StreamingFileSink
-    .forBulkFormat(outputBasePath, ParquetProtoWriters.forType(classOf[ProtoRecord]))
-    .build()
-
-input.addSink(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-#### Avro format
-
-Flink also provides built-in support for writing data into Avro files. A list of convenience methods to create
-Avro writer factories and their associated documentation can be found in the 
-`AvroWriters` class.
-
-To use the Avro writers in your application you need to add the following dependency:
-
-{{< artifact flink-avro >}}
-
-A StreamingFileSink that writes data to Avro files can be created like this:
-
-{{< tabs "a01417d6-8a1c-477f-8c46-3c3e66c96c18" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
-import org.apache.flink.formats.avro.AvroWriters;
-import org.apache.avro.Schema;
-
-
-Schema schema = ...;
-DataStream<GenericRecord> input = ...;
-
-final StreamingFileSink<GenericRecord> sink = StreamingFileSink
-	.forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
-	.build();
-
-input.addSink(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
-import org.apache.flink.formats.avro.AvroWriters
-import org.apache.avro.Schema
-
-val schema: Schema = ...
-val input: DataStream[GenericRecord] = ...
-
-val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
-    .forBulkFormat(outputBasePath, AvroWriters.forGenericRecord(schema))
-    .build()
-
-input.addSink(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-For creating customized Avro writers, e.g. enabling compression, users need to create the `AvroWriterFactory`
-with a custom implementation of the `AvroBuilder` interface:
-
-{{< tabs "455694f1-f5c9-4a8b-af73-e01e42965df4" >}}
-{{< tab "Java" >}}
-```java
-AvroWriterFactory<?> factory = new AvroWriterFactory<>((AvroBuilder<Address>) out -> {
-	Schema schema = ReflectData.get().getSchema(Address.class);
-	DatumWriter<Address> datumWriter = new ReflectDatumWriter<>(schema);
-
-	DataFileWriter<Address> dataFileWriter = new DataFileWriter<>(datumWriter);
-	dataFileWriter.setCodec(CodecFactory.snappyCodec());
-	dataFileWriter.create(schema, out);
-	return dataFileWriter;
-});
-
-DataStream<Address> stream = ...
-stream.addSink(StreamingFileSink.forBulkFormat(
-	outputBasePath,
-	factory).build());
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val factory = new AvroWriterFactory[Address](new AvroBuilder[Address]() {
-    override def createWriter(out: OutputStream): DataFileWriter[Address] = {
-        val schema = ReflectData.get.getSchema(classOf[Address])
-        val datumWriter = new ReflectDatumWriter[Address](schema)
-
-        val dataFileWriter = new DataFileWriter[Address](datumWriter)
-        dataFileWriter.setCodec(CodecFactory.snappyCodec)
-        dataFileWriter.create(schema, out)
-        dataFileWriter
-    }
-})
-
-val stream: DataStream[Address] = ...
-stream.addSink(StreamingFileSink.forBulkFormat(
-    outputBasePath,
-    factory).build());
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-#### ORC Format
- 
-To enable the data to be bulk encoded in ORC format, Flink offers `OrcBulkWriterFactory`
-which takes a concrete implementation of `Vectorizer`.
-
-Like any other columnar format that encodes data in bulk fashion, Flink's `OrcBulkWriter` writes the input elements in batches. It uses 
-ORC's `VectorizedRowBatch` to achieve this. 
-
-Since the input element has to be transformed to a `VectorizedRowBatch`, users have to extend the abstract `Vectorizer` 
-class and override the `vectorize(T element, VectorizedRowBatch batch)` method. As you can see, the method provides an 
-instance of `VectorizedRowBatch` to be used directly by the users so users just have to write the logic to transform the 
-input `element` to `ColumnVectors` and set them in the provided `VectorizedRowBatch` instance.
-
-For example, if the input element is of type `Person` which looks like: 
-
-{{< tabs "4be71488-d7ca-4c89-87f5-ac5045dfcf82" >}}
-{{< tab "Java" >}}
-```java
-
-class Person {
-    private final String name;
-    private final int age;
-    ...
-}
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-Then a child implementation to convert the element of type `Person` and set them in the `VectorizedRowBatch` can be like: 
-
-{{< tabs "4b79d495-062c-4dbd-a7b2-9804bd7a74aa" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-
-public class PersonVectorizer extends Vectorizer<Person> implements Serializable {	
-	public PersonVectorizer(String schema) {
-		super(schema);
-	}
-	@Override
-	public void vectorize(Person element, VectorizedRowBatch batch) throws IOException {
-		BytesColumnVector nameColVector = (BytesColumnVector) batch.cols[0];
-		LongColumnVector ageColVector = (LongColumnVector) batch.cols[1];
-		int row = batch.size++;
-		nameColVector.setVal(row, element.getName().getBytes(StandardCharsets.UTF_8));
-		ageColVector.vector[row] = element.getAge();
-	}
-}
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import java.nio.charset.StandardCharsets
-import org.apache.hadoop.hive.ql.exec.vector.{BytesColumnVector, LongColumnVector}
-
-class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {
-
-  override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
-    val nameColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
-    val ageColVector = batch.cols(1).asInstanceOf[LongColumnVector]
-    nameColVector.setVal(batch.size + 1, element.getName.getBytes(StandardCharsets.UTF_8))
-    ageColVector.vector(batch.size + 1) = element.getAge
-  }
-
-}
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-To use the ORC bulk encoder in an application, users need to add the following dependency:
-
-{{< artifact flink-orc withScalaVersion >}}
-
-And then a `StreamingFileSink` that writes data in ORC format can be created like this:
-
-{{< tabs "fa716810-f0f5-49cd-a177-8bf43d2d437c" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
-import org.apache.flink.orc.writer.OrcBulkWriterFactory;
-
-String schema = "struct<_col0:string,_col1:int>";
-DataStream<Person> input = ...;
-
-final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(new PersonVectorizer(schema));
-
-final StreamingFileSink<Person> sink = StreamingFileSink
-	.forBulkFormat(outputBasePath, writerFactory)
-	.build();
-
-input.addSink(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
-import org.apache.flink.orc.writer.OrcBulkWriterFactory
-
-val schema: String = "struct<_col0:string,_col1:int>"
-val input: DataStream[Person] = ...
-val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema));
-
-val sink: StreamingFileSink[Person] = StreamingFileSink
-    .forBulkFormat(outputBasePath, writerFactory)
-    .build()
-
-input.addSink(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-OrcBulkWriterFactory can also take Hadoop `Configuration` and `Properties` so that a custom Hadoop configuration and ORC 
-writer properties can be provided.
-
-{{< tabs "a6d92f60-677b-4d09-b15b-fb3c567e3f2c" >}}
-{{< tab "Java" >}}
-```java
-String schema = ...;
-Configuration conf = ...;
-Properties writerProperties = new Properties();
-
-writerProps.setProperty("orc.compress", "LZ4");
-// Other ORC supported properties can also be set similarly.
-
-final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(
-    new PersonVectorizer(schema), writerProperties, conf);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val schema: String = ...
-val conf: Configuration = ...
-val writerProperties: Properties = new Properties()
-
-writerProps.setProperty("orc.compress", "LZ4")
-// Other ORC supported properties can also be set similarly.
-
-val writerFactory = new OrcBulkWriterFactory(
-    new PersonVectorizer(schema), writerProperties, conf)
-```
-{{< /tab >}}
-{{< /tabs >}} 
-
-The complete list of ORC writer properties can be found [here](https://orc.apache.org/docs/hive-config.html).
-
-Users who want to add user metadata to the ORC files can do so by calling `addUserMetadata(...)` inside the overriding 
-`vectorize(...)` method.
-
-{{< tabs "b92d3d09-ce3e-4aad-93fb-ad8c28f026cb" >}}
-{{< tab "Java" >}}
-```java
-
-public class PersonVectorizer extends Vectorizer<Person> implements Serializable {	
-	@Override
-	public void vectorize(Person element, VectorizedRowBatch batch) throws IOException {
-		...
-		String metadataKey = ...;
-		ByteBuffer metadataValue = ...;
-		this.addUserMetadata(metadataKey, metadataValue);
-	}
-}
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-
-class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {
-
-  override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
-    ...
-    val metadataKey: String = ...
-    val metadataValue: ByteBuffer = ...
-    addUserMetadata(metadataKey, metadataValue)
-  }
-
-}
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-#### Hadoop SequenceFile format
-
-To use the SequenceFile bulk encoder in your application you need to add the following dependency:
-
-{{< artifact flink-sequence-file >}}
-
-A simple SequenceFile writer can be created like this:
-
-{{< tabs "581d9302-6ef3-4ff9-8182-f86d00d497be" >}}
-{{< tab "Java" >}}
-```java
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-
-
-DataStream<Tuple2<LongWritable, Text>> input = ...;
-Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
-final StreamingFileSink<Tuple2<LongWritable, Text>> sink = StreamingFileSink
-  .forBulkFormat(
-    outputBasePath,
-    new SequenceFileWriterFactory<>(hadoopConf, LongWritable.class, Text.class))
-	.build();
-
-input.addSink(sink);
-
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
-import org.apache.flink.configuration.GlobalConfiguration
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.SequenceFile
-import org.apache.hadoop.io.Text;
-
-val input: DataStream[(LongWritable, Text)] = ...
-val hadoopConf: Configuration = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration())
-val sink: StreamingFileSink[(LongWritable, Text)] = StreamingFileSink
-  .forBulkFormat(
-    outputBasePath,
-    new SequenceFileWriterFactory(hadoopConf, LongWritable.class, Text.class))
-	.build()
-
-input.addSink(sink)
-
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-The SequenceFileWriterFactory supports additional constructor parameters to specify compression settings.
-
-## Bucket Assignment
-
-The bucketing logic defines how the data will be structured into subdirectories inside the base output directory.
-
-Both row and bulk formats (see [File Formats](#file-formats)) use the `DateTimeBucketAssigner` as the default assigner.
-By default the `DateTimeBucketAssigner` creates hourly buckets based on the system default timezone
-with the following format: `yyyy-MM-dd--HH`. Both the date format (*i.e.* bucket size) and timezone can be
-configured manually.
-
-We can specify a custom `BucketAssigner` by calling `.withBucketAssigner(assigner)` on the format builders.
-
-Flink comes with two built in BucketAssigners:
-
- - `DateTimeBucketAssigner` : Default time based assigner
- - `BasePathBucketAssigner`: Assigner that stores all part files in the base path (single global bucket)
-
-## Rolling Policy
-
-The `RollingPolicy` defines when a given in-progress part file will be closed and moved to the pending and later to finished state.
-Part files in the "finished" state are the ones that are ready for viewing and are guaranteed to contain valid data that will not be reverted in case of failure.
-The Rolling Policy in combination with the checkpointing interval (pending files become finished on the next checkpoint) control how quickly
-part files become available for downstream readers and also the size and number of these parts.
-
-Flink comes with two built-in RollingPolicies:
-
- - `DefaultRollingPolicy`
- - `OnCheckpointRollingPolicy`
-
-## Part file lifecycle
-
-In order to use the output of the `StreamingFileSink` in downstream systems, we need to understand the naming and lifecycle of the output files produced.
-
-Part files can be in one of three states:
- 1. **In-progress** : The part file that is currently being written to is in-progress
- 2. **Pending** : Closed (due to the specified rolling policy) in-progress files that are waiting to be committed
- 3. **Finished** : On successful checkpoints pending files transition to "Finished"
-
-Only finished files are safe to read by downstream systems as those are guaranteed to not be modified later.
-
-{{< hint info >}}
-Part file indexes are strictly increasing for any given subtask (in the order they were created). However these indexes are not always sequential. When the job restarts, the next part index for all subtask will be the `max part index + 1`
-where `max` is computed across all subtasks.
-{{< /hint >}}
-
-Each writer subtask will have a single in-progress part file at any given time for every active bucket, but there can be several pending and finished files.
-
-**Part file example**
-
-To better understand the lifecycle of these files let's look at a simple example with 2 sink subtasks:
-
-```
-└── 2019-08-25--12
-    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
-    └── part-1-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
-```
-
-When the part file `part-1-0` is rolled (let's say it becomes too large), it becomes pending but it is not renamed. The sink then opens a new part file: `part-1-1`:
-
-```
-└── 2019-08-25--12
-    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
-    ├── part-1-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
-    └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
-```
-
-As `part-1-0` is now pending completion, after the next successful checkpoint, it is finalized:
-
-```
-└── 2019-08-25--12
-    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
-    ├── part-1-0
-    └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
-```
-
-New buckets are created as dictated by the bucketing policy, and this doesn't affect currently in-progress files:
-
-```
-└── 2019-08-25--12
-    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
-    ├── part-1-0
-    └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
-└── 2019-08-25--13
-    └── part-0-2.inprogress.2b475fec-1482-4dea-9946-eb4353b475f1
-```
-
-Old buckets can still receive new records as the bucketing policy is evaluated on a per-record basis.
-
-### Part file configuration
-
-Finished files can be distinguished from the in-progress ones by their naming scheme only.
-
-By default, the file naming strategy is as follows:
- - **In-progress / Pending**: `part-<subtaskIndex>-<partFileIndex>.inprogress.uid`
- - **Finished:** `part-<subtaskIndex>-<partFileIndex>`
-
-Flink allows the user to specify a prefix and/or a suffix for his/her part files. 
-This can be done using an `OutputFileConfig`. 
-For example for a prefix "prefix" and a suffix ".ext" the sink will create the following files:
-
-```
-└── 2019-08-25--12
-    ├── prefix-0-0.ext
-    ├── prefix-0-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
-    ├── prefix-1-0.ext
-    └── prefix-1-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
-```
-
-The user can specify an `OutputFileConfig` in the following way:
-
-{{< tabs "ba840863-618b-4caf-996d-c7ea345a20fc" >}}
-{{< tab "Java" >}}
-```java
-
-OutputFileConfig config = OutputFileConfig
- .builder()
- .withPartPrefix("prefix")
- .withPartSuffix(".ext")
- .build();
-            
-StreamingFileSink<Tuple2<Integer, Integer>> sink = StreamingFileSink
- .forRowFormat((new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))
- .withBucketAssigner(new KeyBucketAssigner())
- .withRollingPolicy(OnCheckpointRollingPolicy.build())
- .withOutputFileConfig(config)
- .build();
-			
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-
-val config = OutputFileConfig
- .builder()
- .withPartPrefix("prefix")
- .withPartSuffix(".ext")
- .build()
-            
-val sink = StreamingFileSink
- .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
- .withBucketAssigner(new KeyBucketAssigner())
- .withRollingPolicy(OnCheckpointRollingPolicy.build())
- .withOutputFileConfig(config)
- .build()
-			
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-## Important Considerations
-
-### General
-
-<span class="label label-danger">Important Note 1</span>: When using Hadoop < 2.7, please use
-the `OnCheckpointRollingPolicy` which rolls part files on every checkpoint. The reason is that if part files "traverse"
-the checkpoint interval, then, upon recovery from a failure the `StreamingFileSink` may use the `truncate()` method of the 
-filesystem to discard uncommitted data from the in-progress file. This method is not supported by pre-2.7 Hadoop versions 
-and Flink will throw an exception.
-
-<span class="label label-danger">Important Note 2</span>: Given that Flink sinks and UDFs in general do not differentiate between
-normal job termination (*e.g.* finite input stream) and termination due to failure, upon normal termination of a job, the last 
-in-progress files will not be transitioned to the "finished" state.
-
-<span class="label label-danger">Important Note 3</span>: Flink and the `StreamingFileSink` never overwrites committed data.
-Given this, when trying to restore from an old checkpoint/savepoint which assumes an in-progress file which was committed
-by subsequent successful checkpoints, the `StreamingFileSink` will refuse to resume and it will throw an exception as it cannot locate the 
-in-progress file.
-
-<span class="label label-danger">Important Note 4</span>: Currently, the `StreamingFileSink` only supports three filesystems: 
-HDFS, S3, and Local. Flink will throw an exception when using an unsupported filesystem at runtime.
-
-### S3-specific
-
-<span class="label label-danger">Important Note 1</span>: For S3, the `StreamingFileSink`
-supports only the [Hadoop-based](https://hadoop.apache.org/) FileSystem implementation, not
-the implementation based on [Presto](https://prestodb.io/). In case your job uses the
-`StreamingFileSink` to write to S3 but you want to use the Presto-based one for checkpointing,
-it is advised to use explicitly *"s3a://"* (for Hadoop) as the scheme for the target path of
-the sink and *"s3p://"* for checkpointing (for Presto). Using *"s3://"* for both the sink
-and checkpointing may lead to unpredictable behavior, as both implementations "listen" to that scheme.
-
-<span class="label label-danger">Important Note 2</span>: To guarantee exactly-once semantics while
-being efficient, the `StreamingFileSink` uses the [Multi-part Upload](https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html)
-feature of S3 (MPU from now on). This feature allows to upload files in independent chunks (thus the "multi-part")
-which can be combined into the original file when all the parts of the MPU are successfully uploaded.
-For inactive MPUs, S3 supports a bucket lifecycle rule that the user can use to abort multipart uploads
-that don't complete within a specified number of days after being initiated. This implies that if you set this rule
-aggressively and take a savepoint with some part-files being not fully uploaded, their associated MPUs may time-out
-before the job is restarted. This will result in your job not being able to restore from that savepoint as the
-pending part-files are no longer there and Flink will fail with an exception as it tries to fetch them and fails.
-
-{{< top >}}
diff --git a/docs/content/docs/connectors/table/filesystem.md b/docs/content/docs/connectors/table/filesystem.md
index 3f2b455..c3b0eaf 100644
--- a/docs/content/docs/connectors/table/filesystem.md
+++ b/docs/content/docs/connectors/table/filesystem.md
@@ -110,8 +110,8 @@ When using a directory as the source path, there is **no defined order of ingest
 
 ## Streaming Sink
 
-The file system connector supports streaming writes, based on Flink's [Streaming File Sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}})
-to write records to file. Row-encoded Formats are csv and json. Bulk-encoded Formats are parquet, orc and avro.
+The file system connector supports streaming writes, based on Flink's [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}),
+to write records to file. Row-encoded Formats are CSV and JSON. Bulk-encoded Formats are Parquet, ORC and Avro.
 
 You can write SQL directly, insert the stream data into the non-partitioned table.
 If it is a partitioned table, you can configure partition related operations. See [Partition Commit](filesystem.html#partition-commit) for details.
diff --git a/docs/content/docs/deployment/filesystems/s3.md b/docs/content/docs/deployment/filesystems/s3.md
index ac5211f..a159331 100644
--- a/docs/content/docs/deployment/filesystems/s3.md
+++ b/docs/content/docs/deployment/filesystems/s3.md
@@ -70,16 +70,15 @@ Both implementations are self-contained with no dependency footprint, so there i
   
      For example, Hadoop has a `fs.s3a.connection.maximum` configuration key. If you want to change it, you need to put `s3.connection.maximum: xyz` to the `flink-conf.yaml`. Flink will internally translate this back to `fs.s3a.connection.maximum`. There is no need to pass configuration parameters using Hadoop's XML configuration files.
   
-    It is the only S3 file system with support for the [StreamingFileSink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) and the [FileSink]({{< ref "docs/connectors/datastream/file_sink" >}}).
+    It is the only S3 file system with support for the [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}).
   
 
 Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem
 wrappers for URIs with the *s3://* scheme, `flink-s3-fs-hadoop` also registers
 for *s3a://* and `flink-s3-fs-presto` also registers for *s3p://*, so you can
 use this to use both at the same time.
-For example, the job uses the [StreamingFileSink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) which only supports Hadoop, but uses Presto for checkpointing.
-In this case, it is advised to explicitly use *s3a://* as a scheme for the sink (Hadoop) and *s3p://* for checkpointing (Presto). The same holds for the 
-[FileSink]({{< ref "docs/connectors/datastream/file_sink" >}}).
+For example, the job uses the [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) which only supports Hadoop, but uses Presto for checkpointing.
+In this case, you should explicitly use *s3a://* as a scheme for the sink (Hadoop) and *s3p://* for checkpointing (Presto).
 
 To use `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the respective JAR file from the `opt` directory to the `plugins` directory of your Flink distribution before starting Flink, e.g.
 
diff --git a/docs/content/docs/dev/datastream/execution_mode.md b/docs/content/docs/dev/datastream/execution_mode.md
index 28b6c94..7837558 100644
--- a/docs/content/docs/dev/datastream/execution_mode.md
+++ b/docs/content/docs/dev/datastream/execution_mode.md
@@ -365,8 +365,8 @@ does not use checkpointing.
 
 It is important to remember that because there are no checkpoints, certain
 features such as {{< javadoc file="org/apache/flink/api/common/state/CheckpointListener.html" name="CheckpointListener">}}
-and, as a result,  Kafka's [EXACTLY_ONCE]({{< ref "docs/connectors/datastream/kafka" >}}#kafka-producers-and-fault-tolerance) mode or `StreamingFileSink`'s
-[OnCheckpointRollingPolicy]({{< ref "docs/connectors/datastream/streamfile_sink" >}}#rolling-policy)
+and, as a result,  Kafka's [EXACTLY_ONCE]({{< ref "docs/connectors/datastream/kafka" >}}#kafka-producers-and-fault-tolerance) mode or `File Sink`'s
+[OnCheckpointRollingPolicy]({{< ref "docs/connectors/datastream/filesystem" >}}#rolling-policy)
 won't work. If you need a transactional sink that works in
 `BATCH` mode make sure it uses the Unified Sink API as proposed in
 [FLIP-143](https://cwiki.apache.org/confluence/x/KEJ4CQ).