You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/28 03:25:54 UTC

[GitHub] [flink] lirui-apache commented on a change in pull request #12283: [FLINK-16975][documentation] Add docs for FileSystem connector

lirui-apache commented on a change in pull request #12283:
URL: https://github.com/apache/flink/pull/12283#discussion_r431551933



##########
File path: docs/dev/table/connectors/filesystem.md
##########
@@ -0,0 +1,355 @@
+---
+title: "FileSystem Connector"
+nav-title: FileSystem Connector
+nav-parent_id: connectors-table
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{ site.baseurl}}/ops/filesystems/index.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+The file system connector itself is included in Flink and does not require an additional dependency.
+A corresponding format needs to be specified for reading and writing rows from and to a file system.
+
+The file system connector allows for reading and writing from a local or distributed filesystem. A filesystem can be defined as:
+
+<div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  column_name1 INT,
+  column_name2 STRING,
+  ...
+  part_name1 INT,
+  part_name2 STRING
+) PARTITIONED BY (part_name1, part_name2) WITH (
+  'connector' = 'filesystem',           -- required: specify to connector type

Review comment:
       ```suggestion
     'connector' = 'filesystem',           -- required: specify the connector type
   ```

##########
File path: docs/dev/table/connectors/filesystem.md
##########
@@ -0,0 +1,355 @@
+---
+title: "FileSystem Connector"
+nav-title: FileSystem Connector
+nav-parent_id: connectors-table
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{ site.baseurl}}/ops/filesystems/index.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+The file system connector itself is included in Flink and does not require an additional dependency.
+A corresponding format needs to be specified for reading and writing rows from and to a file system.
+
+The file system connector allows for reading and writing from a local or distributed filesystem. A filesystem can be defined as:
+
+<div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  column_name1 INT,
+  column_name2 STRING,
+  ...
+  part_name1 INT,
+  part_name2 STRING
+) PARTITIONED BY (part_name1, part_name2) WITH (
+  'connector' = 'filesystem',           -- required: specify to connector type
+  'path' = 'file:///path/to/whatever',  -- required: path to a directory
+  'format' = '...',                     -- required: file system connector requires to specify a format,
+                                        -- Please refer to Table Formats
+                                        -- section for more details.s
+  'partition.default-name' = '...',     -- optional: default partition name in case the dynamic partition
+                                        -- column value is null/empty string.
+  
+  -- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly
+  -- reduce the number of file for filesystem sink but may lead data skew, the default value is disabled.

Review comment:
       "the default value is **false**"?

##########
File path: docs/dev/table/connectors/filesystem.md
##########
@@ -0,0 +1,355 @@
+---
+title: "FileSystem Connector"
+nav-title: FileSystem Connector
+nav-parent_id: connectors-table
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{ site.baseurl}}/ops/filesystems/index.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+The file system connector itself is included in Flink and does not require an additional dependency.
+A corresponding format needs to be specified for reading and writing rows from and to a file system.
+
+The file system connector allows for reading and writing from a local or distributed filesystem. A filesystem can be defined as:
+
+<div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  column_name1 INT,
+  column_name2 STRING,
+  ...
+  part_name1 INT,
+  part_name2 STRING
+) PARTITIONED BY (part_name1, part_name2) WITH (
+  'connector' = 'filesystem',           -- required: specify to connector type
+  'path' = 'file:///path/to/whatever',  -- required: path to a directory
+  'format' = '...',                     -- required: file system connector requires to specify a format,
+                                        -- Please refer to Table Formats
+                                        -- section for more details.s
+  'partition.default-name' = '...',     -- optional: default partition name in case the dynamic partition
+                                        -- column value is null/empty string.
+  
+  -- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly
+  -- reduce the number of file for filesystem sink but may lead data skew, the default value is disabled.
+  'sink.shuffle-by-partition.enable' = '...',
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> Make sure to include [Flink File System specific dependencies]({{ site.baseurl }}/ops/filesystems/index.html).
+
+<span class="label label-danger">Attention</span> File system sources for streaming is still under development. In the future, the community will add support for common streaming use cases, i.e., partition and directory monitoring.
+
+## Partition Files
+
+Flink's file system partition support uses the standard hive format. However, it does not require partitions to be pre-registered with a table catalog. Partitions are discovered and inferred based on directory structure. For example, a table partitioned based on the directory below would be inferred to contain `datetime` and `hour` partitions.
+
+```
+path
+└── datetime=2019-08-25
+    └── hour=11
+        ├── part-0.parquet
+        ├── part-1.parquet
+    └── hour=12
+        ├── part-0.parquet
+└── datetime=2019-08-26
+    └── hour=6
+        ├── part-0.parquet
+```
+
+The file system table supports both partition inserting and overwrite inserting. See [INSERT Statement]({{ site.baseurl }}/dev/table/sql/insert.html). When you insert overwrite to a partitioned table, only the corresponding partition will be overwritten, not the entire table.
+
+## File Formats
+
+The file system connector supports multiple formats:
+
+ - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). Uncompressed.
+ - JSON: Note JSON format for file system connector is not a typical JSON file but uncompressed [newline delimited JSON](http://jsonlines.org/).
+ - Avro: [Apache Avro](http://avro.apache.org). Support compression by configuring `avro.codec`.
+ - Parquet: [Apache Parquet](http://parquet.apache.org). Compatible with Hive.
+ - Orc: [Apache Orc](http://orc.apache.org). Compatible with Hive.
+
+## Streaming Sink
+
+The file system connector supports streaming writes, based on Flink's [Streaming File Sink]({{ site.baseurl }}/dev/connectors/streamfile_sink.html)
+to write records to file. Row-encoded Formats are csv and json. Bulk-encoded Formats are parquet, orc and avro.
+
+### Rolling Policy
+
+Data within the partition directories are split into part files. Each partition will contain at least one part file for
+each subtask of the sink that has received data for that partition. The in-progress part file will be closed and additional
+part file will be created according to the configurable rolling policy. The policy rolls part files based on size,
+a timeout that specifies the maximum duration for which a file can be open.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>sink.rolling-policy.file-size</h5></td>
+        <td style="word-wrap: break-word;">128MB</td>
+        <td>MemorySize</td>
+        <td>The maximum part file size before rolling.</td>
+    </tr>
+    <tr>
+        <td><h5>sink.rolling-policy.time-interval</h5></td>
+        <td style="word-wrap: break-word;">30 m</td>
+        <td>Duration</td>
+        <td>The maximum time duration a part file can stay open before rolling (by default 30 min to avoid to many small files).</td>
+    </tr>
+  </tbody>
+</table>
+
+**NOTE:** For bulk formats (parquet, orc, avro), the rolling policy in combination with the checkpoint interval(pending files
+become finished on the next checkpoint) control the size and number of these parts.
+
+**NOTE:** For row formats (csv, json), you can reduce the time interval appropriately to avoid too long delay.
+
+### Partition Commit
+
+After writing a partition, it is often necessary to notify downstream applications. For example, add the partition to a Hive metastore or writing a `_SUCCESS` file in the directory. The file system sink contains a partition commit feature that allows configuring custom policies. Commit actions are based on a combination of `triggers` and `policies`. 
+
+- Trigger: The timing of the commit of the partition can be determined by the watermark with the time extracted from the partition, or by processing time.
+- Policy: How to commit a partition, built-in policies support for the commit of success files and metastore, you can also implement your own policies, such as triggering hive's analysis to generate statistics, or merging small files, etc.
+
+#### Partition commit trigger
+
+To define when to commit a partition, providing partition commit trigger:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>sink.partition-commit.trigger</h5></td>
+        <td style="word-wrap: break-word;">process-time</td>
+        <td>String</td>
+        <td>Trigger type for partition commit: 'process-time': based on the time of the machine, it neither requires partition time extraction nor watermark generation. Commit partition once the 'current system time' passes 'partition creation system time' plus 'delay'. 'partition-time': based on the time that extracted from partition values, it requires watermark generation. Commit partition once the 'watermark' passes 'time extracted from partition values' plus 'delay'.</td>
+    </tr>
+    <tr>
+        <td><h5>sink.partition-commit.delay</h5></td>
+        <td style="word-wrap: break-word;">0 s</td>
+        <td>Duration</td>
+        <td>The partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'.</td>
+    </tr>
+  </tbody>
+</table>
+
+There are two types of trigger:
+- The first is partition processing time. It neither requires partition time extraction nor watermark
+generation. The trigger of partition commit according to partition creation time and current system time. This trigger
+is more universal, but not so precise. For example, data delay or failover will lead to premature partition commit.
+- The second is the trigger of partition commit according to the time that extracted from partition values and watermark.
+This requires that your job has watermark generation, and the partition is divided according to time, such as
+hourly partition or daily partition.
+
+If you want to let downstream see the partition as soon as possible, no matter whether its data is complete or not:
+- 'sink.partition-commit.trigger'='process-time' (Default value)
+- 'sink.partition-commit.delay'='0s' (Default value)
+Once there is data in the partition, it will immediately commit. Note: the partition may be committed multiple times.
+
+If you want to let downstream see the partition only when its data is complete, and your job has watermark generation, and you can extract the time from partition values:
+- 'sink.partition-commit.trigger'='partition-time'
+- 'sink.partition-commit.delay'='1h' ('1h' if your partition is hourly partition, depends on your partition type)
+This is the most accurate way to commit partition, and it will try to ensure that the committed partitions are as data complete as possible.
+
+If you want to let downstream see the partition only when its data is complete, but there is no watermark, or the time cannot be extracted from partition values:
+- 'sink.partition-commit.trigger'='process-time' (Default value)
+- 'sink.partition-commit.delay'='1h' ('1h' if your partition is hourly partition, depends on your partition type)
+Try to commit partition accurately, but data delay or failover will lead to premature partition commit.
+
+#### Partition Time Extractor
+
+Time extractors define extracting time from partition values.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>partition.time-extractor.kind</h5></td>
+        <td style="word-wrap: break-word;">default</td>
+        <td>String</td>
+        <td>Time extractor to extract time from partition values. Support default and custom. For default, can configure timestamp pattern. For custom, should configure extractor class.</td>
+    </tr>
+    <tr>
+        <td><h5>partition.time-extractor.class</h5></td>
+        <td style="word-wrap: break-word;">(none)</td>
+        <td>String</td>
+        <td>The extractor class for implement PartitionTimeExtractor interface.</td>
+    </tr>
+    <tr>
+        <td><h5>partition.time-extractor.timestamp-pattern</h5></td>
+        <td style="word-wrap: break-word;">(none)</td>
+        <td>String</td>
+        <td>The 'default' construction way allows users to use partition fields to get a legal timestamp pattern. Default support 'yyyy-mm-dd hh:mm:ss' from first field. If timestamp in partition is single field 'dt', can configure: '$dt'. If timestamp in partition is year, month, day, hour, can configure: '$year-$month-$day $hour:00:00'. If timestamp in partition is dt and hour, can configure: '$dt $hour:00:00'.</td>

Review comment:
       ```suggestion
           <td>The 'default' construction way allows users to use partition fields to get a legal timestamp pattern. Default support 'yyyy-mm-dd hh:mm:ss' from first field. If timestamp should be extracted from a single partition field 'dt', can configure: '$dt'. If timestamp should be extracted from multiple partition fields, say 'year', 'month', 'day' and 'hour', can configure: '$year-$month-$day $hour:00:00'. If timestamp should be extracted from two partition fields 'dt' and 'hour', can configure: '$dt $hour:00:00'.</td>
   ```

##########
File path: docs/dev/table/connectors/filesystem.md
##########
@@ -0,0 +1,355 @@
+---
+title: "FileSystem Connector"
+nav-title: FileSystem Connector
+nav-parent_id: connectors-table
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{ site.baseurl}}/ops/filesystems/index.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+The file system connector itself is included in Flink and does not require an additional dependency.
+A corresponding format needs to be specified for reading and writing rows from and to a file system.
+
+The file system connector allows for reading and writing from a local or distributed filesystem. A filesystem can be defined as:

Review comment:
       "A filesystem **table** can be defined as"?

##########
File path: docs/dev/table/connectors/filesystem.md
##########
@@ -0,0 +1,355 @@
+---
+title: "FileSystem Connector"
+nav-title: FileSystem Connector
+nav-parent_id: connectors-table
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{ site.baseurl}}/ops/filesystems/index.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+The file system connector itself is included in Flink and does not require an additional dependency.
+A corresponding format needs to be specified for reading and writing rows from and to a file system.
+
+The file system connector allows for reading and writing from a local or distributed filesystem. A filesystem can be defined as:
+
+<div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  column_name1 INT,
+  column_name2 STRING,
+  ...
+  part_name1 INT,
+  part_name2 STRING
+) PARTITIONED BY (part_name1, part_name2) WITH (
+  'connector' = 'filesystem',           -- required: specify to connector type
+  'path' = 'file:///path/to/whatever',  -- required: path to a directory
+  'format' = '...',                     -- required: file system connector requires to specify a format,
+                                        -- Please refer to Table Formats
+                                        -- section for more details.s
+  'partition.default-name' = '...',     -- optional: default partition name in case the dynamic partition
+                                        -- column value is null/empty string.
+  
+  -- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly
+  -- reduce the number of file for filesystem sink but may lead data skew, the default value is disabled.
+  'sink.shuffle-by-partition.enable' = '...',
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> Make sure to include [Flink File System specific dependencies]({{ site.baseurl }}/ops/filesystems/index.html).
+
+<span class="label label-danger">Attention</span> File system sources for streaming is still under development. In the future, the community will add support for common streaming use cases, i.e., partition and directory monitoring.
+
+## Partition Files
+
+Flink's file system partition support uses the standard hive format. However, it does not require partitions to be pre-registered with a table catalog. Partitions are discovered and inferred based on directory structure. For example, a table partitioned based on the directory below would be inferred to contain `datetime` and `hour` partitions.
+
+```
+path
+└── datetime=2019-08-25
+    └── hour=11
+        ├── part-0.parquet
+        ├── part-1.parquet
+    └── hour=12
+        ├── part-0.parquet
+└── datetime=2019-08-26
+    └── hour=6
+        ├── part-0.parquet
+```
+
+The file system table supports both partition inserting and overwrite inserting. See [INSERT Statement]({{ site.baseurl }}/dev/table/sql/insert.html). When you insert overwrite to a partitioned table, only the corresponding partition will be overwritten, not the entire table.
+
+## File Formats
+
+The file system connector supports multiple formats:
+
+ - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). Uncompressed.
+ - JSON: Note JSON format for file system connector is not a typical JSON file but uncompressed [newline delimited JSON](http://jsonlines.org/).
+ - Avro: [Apache Avro](http://avro.apache.org). Support compression by configuring `avro.codec`.
+ - Parquet: [Apache Parquet](http://parquet.apache.org). Compatible with Hive.
+ - Orc: [Apache Orc](http://orc.apache.org). Compatible with Hive.
+
+## Streaming Sink
+
+The file system connector supports streaming writes, based on Flink's [Streaming File Sink]({{ site.baseurl }}/dev/connectors/streamfile_sink.html)
+to write records to file. Row-encoded Formats are csv and json. Bulk-encoded Formats are parquet, orc and avro.
+
+### Rolling Policy
+
+Data within the partition directories are split into part files. Each partition will contain at least one part file for
+each subtask of the sink that has received data for that partition. The in-progress part file will be closed and additional
+part file will be created according to the configurable rolling policy. The policy rolls part files based on size,
+a timeout that specifies the maximum duration for which a file can be open.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>sink.rolling-policy.file-size</h5></td>
+        <td style="word-wrap: break-word;">128MB</td>
+        <td>MemorySize</td>
+        <td>The maximum part file size before rolling.</td>
+    </tr>
+    <tr>
+        <td><h5>sink.rolling-policy.time-interval</h5></td>
+        <td style="word-wrap: break-word;">30 m</td>
+        <td>Duration</td>
+        <td>The maximum time duration a part file can stay open before rolling (by default 30 min to avoid to many small files).</td>
+    </tr>
+  </tbody>
+</table>
+
+**NOTE:** For bulk formats (parquet, orc, avro), the rolling policy in combination with the checkpoint interval(pending files
+become finished on the next checkpoint) control the size and number of these parts.
+
+**NOTE:** For row formats (csv, json), you can reduce the time interval appropriately to avoid too long delay.
+
+### Partition Commit
+
+After writing a partition, it is often necessary to notify downstream applications. For example, add the partition to a Hive metastore or writing a `_SUCCESS` file in the directory. The file system sink contains a partition commit feature that allows configuring custom policies. Commit actions are based on a combination of `triggers` and `policies`. 
+
+- Trigger: The timing of the commit of the partition can be determined by the watermark with the time extracted from the partition, or by processing time.
+- Policy: How to commit a partition, built-in policies support for the commit of success files and metastore, you can also implement your own policies, such as triggering hive's analysis to generate statistics, or merging small files, etc.
+
+#### Partition commit trigger
+
+To define when to commit a partition, providing partition commit trigger:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>sink.partition-commit.trigger</h5></td>
+        <td style="word-wrap: break-word;">process-time</td>
+        <td>String</td>
+        <td>Trigger type for partition commit: 'process-time': based on the time of the machine, it neither requires partition time extraction nor watermark generation. Commit partition once the 'current system time' passes 'partition creation system time' plus 'delay'. 'partition-time': based on the time that extracted from partition values, it requires watermark generation. Commit partition once the 'watermark' passes 'time extracted from partition values' plus 'delay'.</td>
+    </tr>
+    <tr>
+        <td><h5>sink.partition-commit.delay</h5></td>
+        <td style="word-wrap: break-word;">0 s</td>
+        <td>Duration</td>
+        <td>The partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'.</td>
+    </tr>
+  </tbody>
+</table>
+
+There are two types of trigger:
+- The first is partition processing time. It neither requires partition time extraction nor watermark
+generation. The trigger of partition commit according to partition creation time and current system time. This trigger
+is more universal, but not so precise. For example, data delay or failover will lead to premature partition commit.
+- The second is the trigger of partition commit according to the time that extracted from partition values and watermark.
+This requires that your job has watermark generation, and the partition is divided according to time, such as
+hourly partition or daily partition.
+
+If you want to let downstream see the partition as soon as possible, no matter whether its data is complete or not:
+- 'sink.partition-commit.trigger'='process-time' (Default value)
+- 'sink.partition-commit.delay'='0s' (Default value)
+Once there is data in the partition, it will immediately commit. Note: the partition may be committed multiple times.
+
+If you want to let downstream see the partition only when its data is complete, and your job has watermark generation, and you can extract the time from partition values:
+- 'sink.partition-commit.trigger'='partition-time'
+- 'sink.partition-commit.delay'='1h' ('1h' if your partition is hourly partition, depends on your partition type)
+This is the most accurate way to commit partition, and it will try to ensure that the committed partitions are as data complete as possible.

Review comment:
       What happens if late data arrives after a partition is committed? Is it appended to the partition or discarded?

##########
File path: docs/dev/table/connectors/filesystem.md
##########
@@ -0,0 +1,355 @@
+---
+title: "FileSystem Connector"
+nav-title: FileSystem Connector
+nav-parent_id: connectors-table
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{ site.baseurl}}/ops/filesystems/index.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+The file system connector itself is included in Flink and does not require an additional dependency.
+A corresponding format needs to be specified for reading and writing rows from and to a file system.
+
+The file system connector allows for reading and writing from a local or distributed filesystem. A filesystem can be defined as:
+
+<div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  column_name1 INT,
+  column_name2 STRING,
+  ...
+  part_name1 INT,
+  part_name2 STRING
+) PARTITIONED BY (part_name1, part_name2) WITH (
+  'connector' = 'filesystem',           -- required: specify to connector type
+  'path' = 'file:///path/to/whatever',  -- required: path to a directory
+  'format' = '...',                     -- required: file system connector requires to specify a format,
+                                        -- Please refer to Table Formats
+                                        -- section for more details.s
+  'partition.default-name' = '...',     -- optional: default partition name in case the dynamic partition
+                                        -- column value is null/empty string.
+  
+  -- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly
+  -- reduce the number of file for filesystem sink but may lead data skew, the default value is disabled.
+  'sink.shuffle-by-partition.enable' = '...',
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> Make sure to include [Flink File System specific dependencies]({{ site.baseurl }}/ops/filesystems/index.html).
+
+<span class="label label-danger">Attention</span> File system sources for streaming is still under development. In the future, the community will add support for common streaming use cases, i.e., partition and directory monitoring.
+
+## Partition Files
+
+Flink's file system partition support uses the standard hive format. However, it does not require partitions to be pre-registered with a table catalog. Partitions are discovered and inferred based on directory structure. For example, a table partitioned based on the directory below would be inferred to contain `datetime` and `hour` partitions.
+
+```
+path
+└── datetime=2019-08-25
+    └── hour=11
+        ├── part-0.parquet
+        ├── part-1.parquet
+    └── hour=12
+        ├── part-0.parquet
+└── datetime=2019-08-26
+    └── hour=6
+        ├── part-0.parquet
+```
+
+The file system table supports both partition inserting and overwrite inserting. See [INSERT Statement]({{ site.baseurl }}/dev/table/sql/insert.html). When you insert overwrite to a partitioned table, only the corresponding partition will be overwritten, not the entire table.
+
+## File Formats
+
+The file system connector supports multiple formats:
+
+ - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). Uncompressed.
+ - JSON: Note JSON format for file system connector is not a typical JSON file but uncompressed [newline delimited JSON](http://jsonlines.org/).
+ - Avro: [Apache Avro](http://avro.apache.org). Support compression by configuring `avro.codec`.
+ - Parquet: [Apache Parquet](http://parquet.apache.org). Compatible with Hive.
+ - Orc: [Apache Orc](http://orc.apache.org). Compatible with Hive.
+
+## Streaming Sink
+
+The file system connector supports streaming writes, based on Flink's [Streaming File Sink]({{ site.baseurl }}/dev/connectors/streamfile_sink.html)
+to write records to file. Row-encoded Formats are csv and json. Bulk-encoded Formats are parquet, orc and avro.
+
+### Rolling Policy
+
+Data within the partition directories are split into part files. Each partition will contain at least one part file for
+each subtask of the sink that has received data for that partition. The in-progress part file will be closed and additional
+part file will be created according to the configurable rolling policy. The policy rolls part files based on size,
+a timeout that specifies the maximum duration for which a file can be open.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>sink.rolling-policy.file-size</h5></td>
+        <td style="word-wrap: break-word;">128MB</td>
+        <td>MemorySize</td>

Review comment:
       Why is this called memory size? And what's the difference between memory size and the file size that a user can see in the file system?

##########
File path: docs/dev/table/connectors/filesystem.md
##########
@@ -0,0 +1,355 @@
+---
+title: "FileSystem Connector"
+nav-title: FileSystem Connector
+nav-parent_id: connectors-table
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{ site.baseurl}}/ops/filesystems/index.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+The file system connector itself is included in Flink and does not require an additional dependency.
+A corresponding format needs to be specified for reading and writing rows from and to a file system.
+
+The file system connector allows for reading and writing from a local or distributed filesystem. A filesystem can be defined as:
+
+<div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  column_name1 INT,
+  column_name2 STRING,
+  ...
+  part_name1 INT,
+  part_name2 STRING
+) PARTITIONED BY (part_name1, part_name2) WITH (
+  'connector' = 'filesystem',           -- required: specify to connector type
+  'path' = 'file:///path/to/whatever',  -- required: path to a directory
+  'format' = '...',                     -- required: file system connector requires to specify a format,
+                                        -- Please refer to Table Formats
+                                        -- section for more details.s
+  'partition.default-name' = '...',     -- optional: default partition name in case the dynamic partition
+                                        -- column value is null/empty string.
+  
+  -- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly
+  -- reduce the number of file for filesystem sink but may lead data skew, the default value is disabled.
+  'sink.shuffle-by-partition.enable' = '...',
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> Make sure to include [Flink File System specific dependencies]({{ site.baseurl }}/ops/filesystems/index.html).
+
+<span class="label label-danger">Attention</span> File system sources for streaming is still under development. In the future, the community will add support for common streaming use cases, i.e., partition and directory monitoring.
+
+## Partition Files
+
+Flink's file system partition support uses the standard hive format. However, it does not require partitions to be pre-registered with a table catalog. Partitions are discovered and inferred based on directory structure. For example, a table partitioned based on the directory below would be inferred to contain `datetime` and `hour` partitions.
+
+```
+path
+└── datetime=2019-08-25
+    └── hour=11
+        ├── part-0.parquet
+        ├── part-1.parquet
+    └── hour=12
+        ├── part-0.parquet
+└── datetime=2019-08-26
+    └── hour=6
+        ├── part-0.parquet
+```
+
+The file system table supports both partition inserting and overwrite inserting. See [INSERT Statement]({{ site.baseurl }}/dev/table/sql/insert.html). When you insert overwrite to a partitioned table, only the corresponding partition will be overwritten, not the entire table.
+
+## File Formats
+
+The file system connector supports multiple formats:
+
+ - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). Uncompressed.
+ - JSON: Note JSON format for file system connector is not a typical JSON file but uncompressed [newline delimited JSON](http://jsonlines.org/).
+ - Avro: [Apache Avro](http://avro.apache.org). Support compression by configuring `avro.codec`.
+ - Parquet: [Apache Parquet](http://parquet.apache.org). Compatible with Hive.
+ - Orc: [Apache Orc](http://orc.apache.org). Compatible with Hive.
+
+## Streaming Sink
+
+The file system connector supports streaming writes, based on Flink's [Streaming File Sink]({{ site.baseurl }}/dev/connectors/streamfile_sink.html)
+to write records to file. Row-encoded Formats are csv and json. Bulk-encoded Formats are parquet, orc and avro.
+
+### Rolling Policy
+
+Data within the partition directories are split into part files. Each partition will contain at least one part file for
+each subtask of the sink that has received data for that partition. The in-progress part file will be closed and additional
+part file will be created according to the configurable rolling policy. The policy rolls part files based on size,
+a timeout that specifies the maximum duration for which a file can be open.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>sink.rolling-policy.file-size</h5></td>
+        <td style="word-wrap: break-word;">128MB</td>
+        <td>MemorySize</td>
+        <td>The maximum part file size before rolling.</td>
+    </tr>
+    <tr>
+        <td><h5>sink.rolling-policy.time-interval</h5></td>
+        <td style="word-wrap: break-word;">30 m</td>
+        <td>Duration</td>
+        <td>The maximum time duration a part file can stay open before rolling (by default 30 min to avoid to many small files).</td>
+    </tr>
+  </tbody>
+</table>
+
+**NOTE:** For bulk formats (parquet, orc, avro), the rolling policy in combination with the checkpoint interval(pending files

Review comment:
       Does this mean only bulk formats can work with checkpoints?

##########
File path: docs/dev/table/connectors/filesystem.md
##########
@@ -0,0 +1,355 @@
+---
+title: "FileSystem Connector"
+nav-title: FileSystem Connector
+nav-parent_id: connectors-table
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides access to partitioned files in filesystems
+supported by the [Flink FileSystem abstraction]({{ site.baseurl}}/ops/filesystems/index.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+The file system connector itself is included in Flink and does not require an additional dependency.
+A corresponding format needs to be specified for reading and writing rows from and to a file system.
+
+The file system connector allows for reading and writing from a local or distributed filesystem. A filesystem can be defined as:
+
+<div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  column_name1 INT,
+  column_name2 STRING,
+  ...
+  part_name1 INT,
+  part_name2 STRING
+) PARTITIONED BY (part_name1, part_name2) WITH (
+  'connector' = 'filesystem',           -- required: specify to connector type
+  'path' = 'file:///path/to/whatever',  -- required: path to a directory
+  'format' = '...',                     -- required: file system connector requires to specify a format,
+                                        -- Please refer to Table Formats
+                                        -- section for more details.s
+  'partition.default-name' = '...',     -- optional: default partition name in case the dynamic partition
+                                        -- column value is null/empty string.
+  
+  -- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly
+  -- reduce the number of file for filesystem sink but may lead data skew, the default value is disabled.
+  'sink.shuffle-by-partition.enable' = '...',
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> Make sure to include [Flink File System specific dependencies]({{ site.baseurl }}/ops/filesystems/index.html).
+
+<span class="label label-danger">Attention</span> File system sources for streaming is still under development. In the future, the community will add support for common streaming use cases, i.e., partition and directory monitoring.
+
+## Partition Files
+
+Flink's file system partition support uses the standard hive format. However, it does not require partitions to be pre-registered with a table catalog. Partitions are discovered and inferred based on directory structure. For example, a table partitioned based on the directory below would be inferred to contain `datetime` and `hour` partitions.
+
+```
+path
+└── datetime=2019-08-25
+    └── hour=11
+        ├── part-0.parquet
+        ├── part-1.parquet
+    └── hour=12
+        ├── part-0.parquet
+└── datetime=2019-08-26
+    └── hour=6
+        ├── part-0.parquet
+```
+
+The file system table supports both partition inserting and overwrite inserting. See [INSERT Statement]({{ site.baseurl }}/dev/table/sql/insert.html). When you insert overwrite to a partitioned table, only the corresponding partition will be overwritten, not the entire table.
+
+## File Formats
+
+The file system connector supports multiple formats:
+
+ - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). Uncompressed.
+ - JSON: Note JSON format for file system connector is not a typical JSON file but uncompressed [newline delimited JSON](http://jsonlines.org/).
+ - Avro: [Apache Avro](http://avro.apache.org). Support compression by configuring `avro.codec`.
+ - Parquet: [Apache Parquet](http://parquet.apache.org). Compatible with Hive.
+ - Orc: [Apache Orc](http://orc.apache.org). Compatible with Hive.
+
+## Streaming Sink
+
+The file system connector supports streaming writes, based on Flink's [Streaming File Sink]({{ site.baseurl }}/dev/connectors/streamfile_sink.html)
+to write records to file. Row-encoded Formats are csv and json. Bulk-encoded Formats are parquet, orc and avro.
+
+### Rolling Policy
+
+Data within the partition directories are split into part files. Each partition will contain at least one part file for
+each subtask of the sink that has received data for that partition. The in-progress part file will be closed and additional
+part file will be created according to the configurable rolling policy. The policy rolls part files based on size,
+a timeout that specifies the maximum duration for which a file can be open.

Review comment:
       So user can roll a part file either by size or by time, or both?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org