You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/11 11:50:50 UTC

[flink] branch release-1.11 updated: [FLINK-17733][FLINK-16448][hive][doc] Adjust Hive doc & Add documentation for real-time hive

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 59d8844  [FLINK-17733][FLINK-16448][hive][doc] Adjust Hive doc & Add documentation for real-time hive
59d8844 is described below

commit 59d8844296984b414212916022ad0e23ec493d0e
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Jun 11 19:49:50 2020 +0800

    [FLINK-17733][FLINK-16448][hive][doc] Adjust Hive doc & Add documentation for real-time hive
    
    This closes #12537
---
 docs/dev/table/connectors/filesystem.md            |   1 -
 docs/dev/table/connectors/filesystem.zh.md         |   1 -
 docs/dev/table/hive/hive_catalog.md                |   4 +
 docs/dev/table/hive/hive_catalog.zh.md             |   4 +
 .../{read_write_hive.md => hive_read_write.md}     |  34 ++++-
 ...read_write_hive.zh.md => hive_read_write.zh.md} |  34 ++++-
 docs/dev/table/hive/hive_streaming.md              | 166 +++++++++++++++++++++
 docs/dev/table/hive/hive_streaming.zh.md           | 166 +++++++++++++++++++++
 docs/dev/table/hive/scala_shell_hive.md            |  45 ------
 docs/dev/table/hive/scala_shell_hive.zh.md         |  45 ------
 .../flink/table/filesystem/FileSystemOptions.java  |  13 +-
 11 files changed, 413 insertions(+), 100 deletions(-)

diff --git a/docs/dev/table/connectors/filesystem.md b/docs/dev/table/connectors/filesystem.md
index 5b1b286..5b98041 100644
--- a/docs/dev/table/connectors/filesystem.md
+++ b/docs/dev/table/connectors/filesystem.md
@@ -352,7 +352,6 @@ CREATE TABLE fs_table (
   'connector'='filesystem',
   'path'='...',
   'format'='parquet',
-  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
   'sink.partition-commit.delay'='1 h',
   'sink.partition-commit.policy.kind'='success-file'
 );
diff --git a/docs/dev/table/connectors/filesystem.zh.md b/docs/dev/table/connectors/filesystem.zh.md
index 5b1b286..5b98041 100644
--- a/docs/dev/table/connectors/filesystem.zh.md
+++ b/docs/dev/table/connectors/filesystem.zh.md
@@ -352,7 +352,6 @@ CREATE TABLE fs_table (
   'connector'='filesystem',
   'path'='...',
   'format'='parquet',
-  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
   'sink.partition-commit.delay'='1 h',
   'sink.partition-commit.policy.kind'='success-file'
 );
diff --git a/docs/dev/table/hive/hive_catalog.md b/docs/dev/table/hive/hive_catalog.md
index d907703..7b59d9a 100644
--- a/docs/dev/table/hive/hive_catalog.md
+++ b/docs/dev/table/hive/hive_catalog.md
@@ -387,3 +387,7 @@ Something to note about the type mapping:
 * Hive's `TIMESTAMP` always has precision 9 and doesn't support other precisions. Hive UDFs, on the other hand, can process `TIMESTAMP` values with a precision <= 9.
 * Hive doesn't support Flink's `TIMESTAMP_WITH_TIME_ZONE`, `TIMESTAMP_WITH_LOCAL_TIME_ZONE`, and `MULTISET`
 * Flink's `INTERVAL` type cannot be mapped to Hive `INTERVAL` type yet
+
+## Scala Shell
+
+NOTE: since blink planner is not well supported in Scala Shell at the moment, it's **NOT** recommended to use Hive connector in Scala Shell.
diff --git a/docs/dev/table/hive/hive_catalog.zh.md b/docs/dev/table/hive/hive_catalog.zh.md
index d907703..05479b9 100644
--- a/docs/dev/table/hive/hive_catalog.zh.md
+++ b/docs/dev/table/hive/hive_catalog.zh.md
@@ -387,3 +387,7 @@ Something to note about the type mapping:
 * Hive's `TIMESTAMP` always has precision 9 and doesn't support other precisions. Hive UDFs, on the other hand, can process `TIMESTAMP` values with a precision <= 9.
 * Hive doesn't support Flink's `TIMESTAMP_WITH_TIME_ZONE`, `TIMESTAMP_WITH_LOCAL_TIME_ZONE`, and `MULTISET`
 * Flink's `INTERVAL` type cannot be mapped to Hive `INTERVAL` type yet
+
+## Scala Shell
+
+注意:目前 blink planner 还不能很好的支持 Scala Shell,因此 **不** 建议在 Scala Shell 中使用 Hive 连接器。
diff --git a/docs/dev/table/hive/read_write_hive.md b/docs/dev/table/hive/hive_read_write.md
similarity index 82%
rename from docs/dev/table/hive/read_write_hive.md
rename to docs/dev/table/hive/hive_read_write.md
index 8aa83b9..5185cc7 100644
--- a/docs/dev/table/hive/read_write_hive.md
+++ b/docs/dev/table/hive/hive_read_write.md
@@ -1,5 +1,5 @@
 ---
-title: "Reading & Writing Hive Tables"
+title: "Hive Read & Write"
 nav-parent_id: hive_tableapi
 nav-pos: 2
 ---
@@ -184,6 +184,38 @@ This feature is turned on by default. If there is a problem, you can use this co
 table.exec.hive.fallback-mapred-reader=true
 {% endhighlight %}
 
+### Source Parallelism Inference
+
+By default, Flink infers the hive source parallelism based on the number of splits, and the number of
+splits is based on the number of files and the number of blocks in the files.
+
+Flink allows you to flexibly configure the policy of parallelism inference. You can configure the
+following parameters in `TableConfig` (note that these parameters affect all sources of the job):
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>table.exec.hive.infer-source-parallelism</h5></td>
+        <td style="word-wrap: break-word;">true</td>
+        <td>Boolean</td>
+        <td>If is true, source parallelism is inferred according to splits number. If is false, parallelism of source are set by config.</td>
+    </tr>
+    <tr>
+        <td><h5>table.exec.hive.infer-source-parallelism.max</h5></td>
+        <td style="word-wrap: break-word;">1000</td>
+        <td>Integer</td>
+        <td>Sets max infer parallelism for source operator.</td>
+    </tr>
+  </tbody>
+</table>
 
 ## Roadmap
 
diff --git a/docs/dev/table/hive/read_write_hive.zh.md b/docs/dev/table/hive/hive_read_write.zh.md
similarity index 82%
rename from docs/dev/table/hive/read_write_hive.zh.md
rename to docs/dev/table/hive/hive_read_write.zh.md
index 8aa83b9..5185cc7 100644
--- a/docs/dev/table/hive/read_write_hive.zh.md
+++ b/docs/dev/table/hive/hive_read_write.zh.md
@@ -1,5 +1,5 @@
 ---
-title: "Reading & Writing Hive Tables"
+title: "Hive Read & Write"
 nav-parent_id: hive_tableapi
 nav-pos: 2
 ---
@@ -184,6 +184,38 @@ This feature is turned on by default. If there is a problem, you can use this co
 table.exec.hive.fallback-mapred-reader=true
 {% endhighlight %}
 
+### Source Parallelism Inference
+
+By default, Flink infers the hive source parallelism based on the number of splits, and the number of
+splits is based on the number of files and the number of blocks in the files.
+
+Flink allows you to flexibly configure the policy of parallelism inference. You can configure the
+following parameters in `TableConfig` (note that these parameters affect all sources of the job):
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>table.exec.hive.infer-source-parallelism</h5></td>
+        <td style="word-wrap: break-word;">true</td>
+        <td>Boolean</td>
+        <td>If is true, source parallelism is inferred according to splits number. If is false, parallelism of source are set by config.</td>
+    </tr>
+    <tr>
+        <td><h5>table.exec.hive.infer-source-parallelism.max</h5></td>
+        <td style="word-wrap: break-word;">1000</td>
+        <td>Integer</td>
+        <td>Sets max infer parallelism for source operator.</td>
+    </tr>
+  </tbody>
+</table>
 
 ## Roadmap
 
diff --git a/docs/dev/table/hive/hive_streaming.md b/docs/dev/table/hive/hive_streaming.md
new file mode 100644
index 0000000..7b9f268
--- /dev/null
+++ b/docs/dev/table/hive/hive_streaming.md
@@ -0,0 +1,166 @@
+---
+title: "Hive Streaming"
+nav-parent_id: hive_tableapi
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+A typical hive job is scheduled periodically to execute, so there will be a large delay.
+
+Flink supports to write, read and join the hive table in the form of streaming.
+
+* This will be replaced by the TOC
+{:toc}
+
+There are three types of streaming:
+
+- Writing streaming data into Hive table.
+- Reading Hive table incrementally in the form of streaming.
+- Streaming table join Hive table using [Temporal Table]({{ site.baseurl }}/dev/table/streaming/temporal_tables.html#temporal-table).
+
+## Streaming Writing
+
+The Hive table supports streaming writes, based on [Filesystem Streaming Sink]({{ site.baseurl }}/dev/table/connectors/filesystem.html#streaming-sink).
+
+The Hive Streaming Sink re-use Filesystem Streaming Sink to integrate Hadoop OutputFormat/RecordWriter to streaming writing.
+Hadoop RecordWriters are Bulk-encoded Formats, Bulk Formats rolls files on every checkpoint.
+
+By default, now only have renaming committer, this means S3 filesystem can not supports exactly-once,
+if you want to use Hive streaming sink in S3 filesystem, You can configure the following parameter to
+false to use Flink native writers (only work for parquet and orc) in `TableConfig` (note that these
+parameters affect all sinks of the job):
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>table.exec.hive.fallback-mapred-writer</h5></td>
+        <td style="word-wrap: break-word;">true</td>
+        <td>Boolean</td>
+        <td>If it is false, using flink native writer to write parquet and orc files; if it is true, using hadoop mapred record writer to write parquet and orc files.</td>
+    </tr>
+  </tbody>
+</table>
+
+The below shows how the streaming sink can be used to write a streaming query to write data from Kafka into a Hive table with partition-commit,
+and runs a batch query to read that data back out. 
+
+{% highlight sql %}
+
+SET table.sql-dialect=hive;
+CREATE TABLE hive_table (
+  user_id STRING,
+  order_amount DOUBLE
+) PARTITIONED BY (dt STRING, hour STRING) STORED AS parquet TBLPROPERTIES (
+  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
+  'sink.partition-commit.trigger'='partition-time',
+  'sink.partition-commit.delay'='1 h',
+  'sink.partition-commit.policy.kind'='metastore,success-file'
+);
+
+SET table.sql-dialect=default;
+CREATE TABLE kafka_table (
+  user_id STRING,
+  order_amount DOUBLE,
+  log_ts TIMESTAMP(3),
+  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
+) WITH (...);
+
+-- streaming sql, insert into hive table
+INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;
+
+-- batch sql, select with partition pruning
+SELECT * FROM hive_table WHERE dt='2020-05-20' and hour='12';
+
+{% endhighlight %}
+
+## Streaming Reading
+
+To improve the real-time performance of hive reading, Flink support real-time Hive table stream read:
+
+- Partition table, monitor the generation of partition, and read the new partition incrementally.
+- Non-partition table, monitor the generation of new files in the folder, and read new files incrementally.
+
+You can even use the 10 minute level partition strategy, and use Flink's Hive streaming reading and
+Hive streaming writing to greatly improve the real-time performance of Hive data warehouse to quasi
+real-time minute level.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>streaming-source.enable</h5></td>
+        <td style="word-wrap: break-word;">false</td>
+        <td>Boolean</td>
+        <td>Enable streaming source or not. NOTES: Please make sure that each partition/file should be written atomically, otherwise the reader may get incomplete data.</td>
+    </tr>
+    <tr>
+        <td><h5>streaming-source.monitor-interval</h5></td>
+        <td style="word-wrap: break-word;">1 m</td>
+        <td>Duration</td>
+        <td>Time interval for consecutively monitoring partition/file.</td>
+    </tr>
+    <tr>
+        <td><h5>streaming-source.consume-order</h5></td>
+        <td style="word-wrap: break-word;">create-time</td>
+        <td>String</td>
+        <td>The consume order of streaming source, support create-time and partition-time. create-time compare partition/file creation time, this is not the partition create time in Hive metaStore, but the folder/file modification time in filesystem; partition-time compare time represented by partition name, if the partition folder somehow gets updated, e.g. add new file into folder, it can affect how the data is consumed. For non-partition table, this value should always be 'create-time'.</td>
+    </tr>
+    <tr>
+        <td><h5>streaming-source.consume-start-offset</h5></td>
+        <td style="word-wrap: break-word;">1970-00-00</td>
+        <td>String</td>
+        <td>Start offset for streaming consuming. How to parse and compare offsets depends on your order. For create-time and partition-time, should be a timestamp string (yyyy-[m]m-[d]d [hh:mm:ss]). For partition-time, will use partition time extractor to extract time from partition.</td>
+    </tr>
+  </tbody>
+</table>
+
+Note:
+
+- Monitor strategy is to scan all directories/files in location path now. If there are too many partitions, there will be performance problems.
+- Streaming reading for non-partitioned requires that each file should be put atomically into the target directory.
+- Streaming reading for partitioned requires that each partition should be add atomically in the view of hive metastore. This means that new data added to an existing partition won't be consumed.
+- Streaming reading not support watermark grammar in Flink DDL. So it can not be used for window operators.
+
+The below shows how to read Hive table incrementally. 
+
+{% highlight sql %}
+
+SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;
+
+{% endhighlight %}
+
+## Hive Table As Temporal Tables
+
+TODO
diff --git a/docs/dev/table/hive/hive_streaming.zh.md b/docs/dev/table/hive/hive_streaming.zh.md
new file mode 100644
index 0000000..7b9f268
--- /dev/null
+++ b/docs/dev/table/hive/hive_streaming.zh.md
@@ -0,0 +1,166 @@
+---
+title: "Hive Streaming"
+nav-parent_id: hive_tableapi
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+A typical hive job is scheduled periodically to execute, so there will be a large delay.
+
+Flink supports to write, read and join the hive table in the form of streaming.
+
+* This will be replaced by the TOC
+{:toc}
+
+There are three types of streaming:
+
+- Writing streaming data into Hive table.
+- Reading Hive table incrementally in the form of streaming.
+- Streaming table join Hive table using [Temporal Table]({{ site.baseurl }}/dev/table/streaming/temporal_tables.html#temporal-table).
+
+## Streaming Writing
+
+The Hive table supports streaming writes, based on [Filesystem Streaming Sink]({{ site.baseurl }}/dev/table/connectors/filesystem.html#streaming-sink).
+
+The Hive Streaming Sink re-use Filesystem Streaming Sink to integrate Hadoop OutputFormat/RecordWriter to streaming writing.
+Hadoop RecordWriters are Bulk-encoded Formats, Bulk Formats rolls files on every checkpoint.
+
+By default, now only have renaming committer, this means S3 filesystem can not supports exactly-once,
+if you want to use Hive streaming sink in S3 filesystem, You can configure the following parameter to
+false to use Flink native writers (only work for parquet and orc) in `TableConfig` (note that these
+parameters affect all sinks of the job):
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>table.exec.hive.fallback-mapred-writer</h5></td>
+        <td style="word-wrap: break-word;">true</td>
+        <td>Boolean</td>
+        <td>If it is false, using flink native writer to write parquet and orc files; if it is true, using hadoop mapred record writer to write parquet and orc files.</td>
+    </tr>
+  </tbody>
+</table>
+
+The below shows how the streaming sink can be used to write a streaming query to write data from Kafka into a Hive table with partition-commit,
+and runs a batch query to read that data back out. 
+
+{% highlight sql %}
+
+SET table.sql-dialect=hive;
+CREATE TABLE hive_table (
+  user_id STRING,
+  order_amount DOUBLE
+) PARTITIONED BY (dt STRING, hour STRING) STORED AS parquet TBLPROPERTIES (
+  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
+  'sink.partition-commit.trigger'='partition-time',
+  'sink.partition-commit.delay'='1 h',
+  'sink.partition-commit.policy.kind'='metastore,success-file'
+);
+
+SET table.sql-dialect=default;
+CREATE TABLE kafka_table (
+  user_id STRING,
+  order_amount DOUBLE,
+  log_ts TIMESTAMP(3),
+  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
+) WITH (...);
+
+-- streaming sql, insert into hive table
+INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;
+
+-- batch sql, select with partition pruning
+SELECT * FROM hive_table WHERE dt='2020-05-20' and hour='12';
+
+{% endhighlight %}
+
+## Streaming Reading
+
+To improve the real-time performance of hive reading, Flink support real-time Hive table stream read:
+
+- Partition table, monitor the generation of partition, and read the new partition incrementally.
+- Non-partition table, monitor the generation of new files in the folder, and read new files incrementally.
+
+You can even use the 10 minute level partition strategy, and use Flink's Hive streaming reading and
+Hive streaming writing to greatly improve the real-time performance of Hive data warehouse to quasi
+real-time minute level.
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><h5>streaming-source.enable</h5></td>
+        <td style="word-wrap: break-word;">false</td>
+        <td>Boolean</td>
+        <td>Enable streaming source or not. NOTES: Please make sure that each partition/file should be written atomically, otherwise the reader may get incomplete data.</td>
+    </tr>
+    <tr>
+        <td><h5>streaming-source.monitor-interval</h5></td>
+        <td style="word-wrap: break-word;">1 m</td>
+        <td>Duration</td>
+        <td>Time interval for consecutively monitoring partition/file.</td>
+    </tr>
+    <tr>
+        <td><h5>streaming-source.consume-order</h5></td>
+        <td style="word-wrap: break-word;">create-time</td>
+        <td>String</td>
+        <td>The consume order of streaming source, support create-time and partition-time. create-time compare partition/file creation time, this is not the partition create time in Hive metaStore, but the folder/file modification time in filesystem; partition-time compare time represented by partition name, if the partition folder somehow gets updated, e.g. add new file into folder, it can affect how the data is consumed. For non-partition table, this value should always be 'create-time'.</td>
+    </tr>
+    <tr>
+        <td><h5>streaming-source.consume-start-offset</h5></td>
+        <td style="word-wrap: break-word;">1970-00-00</td>
+        <td>String</td>
+        <td>Start offset for streaming consuming. How to parse and compare offsets depends on your order. For create-time and partition-time, should be a timestamp string (yyyy-[m]m-[d]d [hh:mm:ss]). For partition-time, will use partition time extractor to extract time from partition.</td>
+    </tr>
+  </tbody>
+</table>
+
+Note:
+
+- Monitor strategy is to scan all directories/files in location path now. If there are too many partitions, there will be performance problems.
+- Streaming reading for non-partitioned requires that each file should be put atomically into the target directory.
+- Streaming reading for partitioned requires that each partition should be add atomically in the view of hive metastore. This means that new data added to an existing partition won't be consumed.
+- Streaming reading not support watermark grammar in Flink DDL. So it can not be used for window operators.
+
+The below shows how to read Hive table incrementally. 
+
+{% highlight sql %}
+
+SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;
+
+{% endhighlight %}
+
+## Hive Table As Temporal Tables
+
+TODO
diff --git a/docs/dev/table/hive/scala_shell_hive.md b/docs/dev/table/hive/scala_shell_hive.md
deleted file mode 100644
index d3fbd91..0000000
--- a/docs/dev/table/hive/scala_shell_hive.md
+++ /dev/null
@@ -1,45 +0,0 @@
----
-title: "Use Hive connector in Scala Shell"
-nav-parent_id: hive_tableapi
-nav-pos: 3
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-NOTE: since blink planner is not well supported in Scala Shell at the moment, it's **NOT** recommended to use Hive connector in Scala Shell.
-
-[Flink Scala Shell]({{ site.baseurl }}/ops/scala_shell.html) is a convenient quick way to try flink. 
-You can use hive in Scala Shell as well instead of specifying hive dependencies in pom file, packaging your program and submitting it via flink run command.
-In order to use hive connector in Scala Shell, you need to put the following [hive connector dependencies]({{ site.baseurl }}/dev/table/hive/#depedencies) under lib folder of flink dist .
-
-* flink-connector-hive_{scala_version}-{flink.version}.jar
-* flink-hadoop-compatibility_{scala_version}-{flink.version}.jar
-* flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar
-* hive-exec-2.x.jar (for Hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303-0.9.2.jar and libthrift-0.9.2.jar)
-
-Then you can use hive connector in Scala Shell like following:
-
-{% highlight scala %}
-Scala-Flink> import org.apache.flink.table.catalog.hive.HiveCatalog
-Scala-Flink> val hiveCatalog = new HiveCatalog("hive", "default", "<Replace it with HIVE_CONF_DIR>", "2.3.4");
-Scala-Flink> btenv.registerCatalog("hive", hiveCatalog)
-Scala-Flink> btenv.useCatalog("hive")
-Scala-Flink> btenv.listTables
-Scala-Flink> btenv.sqlQuery("<sql query>").toDataSet[Row].print()
-{% endhighlight %}
diff --git a/docs/dev/table/hive/scala_shell_hive.zh.md b/docs/dev/table/hive/scala_shell_hive.zh.md
deleted file mode 100644
index b9cefc6..0000000
--- a/docs/dev/table/hive/scala_shell_hive.zh.md
+++ /dev/null
@@ -1,45 +0,0 @@
----
-title: "在 Scala Shell 中使用 Hive 连接器"
-nav-parent_id: hive_tableapi
-nav-pos: 3
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-注意:目前 blink planner 还不能很好的支持 Scala Shell,因此 **不** 建议在 Scala Shell 中使用 Hive 连接器。
-
-[Flink Scala Shell]({{ site.baseurl }}/zh/ops/scala_shell.html) 是快速上手 Flink 的好方法。
-你可以在 Scala Shell 中直接使用 Hive 连接器,而不需要在 pom 中引入 Hive 相关依赖,并打包提交作业。
-想要在 Scala Shell 中使用 Hive 连接器,你需要把 [Hive 连接器依赖项]({{ site.baseurl }}/zh/dev/table/hive/#depedencies) 放在 Flink dist 包中的 lib 文件夹下。
-
-* flink-connector-hive_{scala_version}-{flink.version}.jar
-* flink-hadoop-compatibility_{scala_version}-{flink.version}.jar
-* flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar
-* hive-exec-2.x.jar (对于 Hive 1.x 版本,你需要复制 hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303-0.9.2.jar and libthrift-0.9.2.jar)
-
-然后你就可以在 Scala Shell 中使用 Hive 连接器,如下所示:
-
-{% highlight scala %}
-Scala-Flink> import org.apache.flink.table.catalog.hive.HiveCatalog
-Scala-Flink> val hiveCatalog = new HiveCatalog("hive", "default", "<Replace it with HIVE_CONF_DIR>", "2.3.4");
-Scala-Flink> btenv.registerCatalog("hive", hiveCatalog)
-Scala-Flink> btenv.useCatalog("hive")
-Scala-Flink> btenv.listTables
-Scala-Flink> btenv.sqlQuery("<sql query>").toDataSet[Row].print()
-{% endhighlight %}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
index e4bd12d..665081b 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
@@ -64,9 +64,8 @@ public class FileSystemOptions {
 					.booleanType()
 					.defaultValue(false)
 					.withDescription("Enable streaming source or not.\n" +
-							"NOTES: For non-partition table, please make sure that " +
-							"each file should be put atomically into the target directory, " +
-							"otherwise the reader may get incomplete data.");
+							" NOTES: Please make sure that each partition/file should be written" +
+							" atomically, otherwise the reader may get incomplete data.");
 
 	public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL =
 			key("streaming-source.monitor-interval")
@@ -81,8 +80,9 @@ public class FileSystemOptions {
 					.withDescription("The consume order of streaming source," +
 							" support create-time and partition-time." +
 							" create-time compare partition/file creation time, this is not the" +
-							" partition create time in Hive metaStore, but the folder/file create" +
-							" time in filesystem;" +
+							" partition create time in Hive metaStore, but the folder/file modification" +
+							" time in filesystem, if the partition folder somehow gets updated," +
+							" e.g. add new file into folder, it can affect how the data is consumed." +
 							" partition-time compare time represented by partition name.\n" +
 							"For non-partition table, this value should always be 'create-time'.");
 
@@ -92,7 +92,8 @@ public class FileSystemOptions {
 					.defaultValue("1970-00-00")
 					.withDescription("Start offset for streaming consuming." +
 							" How to parse and compare offsets depends on your order." +
-							" For create-time and partition-time, should be a timestamp string." +
+							" For create-time and partition-time, should be a timestamp" +
+							" string (yyyy-[m]m-[d]d [hh:mm:ss])." +
 							" For partition-time, will use partition time extractor to" +
 							" extract time from partition.");