You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/19 07:46:22 UTC

[flink] 02/02: [docs-sync] Synchronize the latest documentation changes into Chinese documents

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d40abbf0309f414a6acf8a090c448ba397a08d9c
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue May 19 15:41:32 2020 +0800

    [docs-sync] Synchronize the latest documentation changes into Chinese documents
---
 docs/concepts/flink-architecture.zh.md             |   2 +-
 docs/concepts/glossary.zh.md                       |  33 ++-
 docs/dev/batch/hadoop_compatibility.zh.md          |   2 +-
 docs/dev/batch/index.zh.md                         | 259 +++++++++++++++++++--
 docs/dev/connectors/cassandra.zh.md                |   2 +-
 docs/dev/connectors/filesystem_sink.zh.md          |   2 +-
 docs/dev/connectors/kafka.zh.md                    |   2 +-
 docs/dev/connectors/kinesis.zh.md                  |  23 +-
 docs/dev/connectors/nifi.zh.md                     |   2 +-
 docs/dev/connectors/pubsub.zh.md                   |   2 +-
 docs/dev/connectors/rabbitmq.zh.md                 |   2 +-
 docs/dev/connectors/streamfile_sink.zh.md          | 201 +++++++++++++++-
 docs/dev/connectors/twitter.zh.md                  |   2 +-
 docs/dev/datastream_api.zh.md                      | 218 ++++++++++++++++-
 docs/dev/java_lambdas.zh.md                        |   4 +-
 docs/dev/libs/cep.zh.md                            |   4 +-
 docs/dev/parallel.zh.md                            |   2 +-
 docs/dev/stream/operators/index.zh.md              |   4 +-
 docs/dev/stream/operators/windows.zh.md            |   2 +-
 docs/dev/stream/state/checkpointing.zh.md          |   2 +-
 docs/dev/stream/state/index.zh.md                  |  21 +-
 docs/dev/stream/state/queryable_state.zh.md        |   2 +-
 docs/dev/stream/state/state.zh.md                  | 130 ++++++++---
 docs/dev/table/common.zh.md                        |   4 +-
 docs/dev/table/connect.zh.md                       |  42 +++-
 docs/dev/table/index.zh.md                         |   2 +-
 docs/dev/table/sqlClient.zh.md                     |   4 +-
 docs/dev/types_serialization.zh.md                 | 200 ++++++++++++++++
 docs/dev/user_defined_functions.zh.md              |   2 +-
 docs/getting-started/index.zh.md                   |  49 +++-
 .../walkthroughs/python_table_api.zh.md            |  27 +++
 docs/index.zh.md                                   |   1 -
 docs/monitoring/metrics.zh.md                      |   6 +-
 33 files changed, 1127 insertions(+), 133 deletions(-)

diff --git a/docs/concepts/flink-architecture.zh.md b/docs/concepts/flink-architecture.zh.md
index 8414943..00a0924 100644
--- a/docs/concepts/flink-architecture.zh.md
+++ b/docs/concepts/flink-architecture.zh.md
@@ -1,5 +1,5 @@
 ---
-title: Flink Architecture
+title: Flink 架构
 nav-id: flink-architecture
 nav-pos: 4
 nav-title: Flink Architecture
diff --git a/docs/concepts/glossary.zh.md b/docs/concepts/glossary.zh.md
index 8efd2f1..3f88e82 100644
--- a/docs/concepts/glossary.zh.md
+++ b/docs/concepts/glossary.zh.md
@@ -25,7 +25,16 @@ under the License.
 
 #### Flink Application Cluster
 
-Flink Application Cluster 是一个专用的 [Flink Cluster](#flink-cluster),它仅用于执行单个 [Flink Job](#flink-job)。[Flink Cluster](#flink-cluster)的生命周期与 [Flink Job](#flink-job)的生命周期绑定在一起。以前,Flink Application Cluster 也称为*job mode*的 Flink Cluster。和 [Flink Session Cluster](#flink-session-cluster) 作对比。
+A Flink Application Cluster is a dedicated [Flink Cluster](#flink-cluster) that
+only executes [Flink Jobs](#flink-job) from one [Flink
+Application](#flink-application). The lifetime of the [Flink
+Cluster](#flink-cluster) is bound to the lifetime of the Flink Application.
+
+#### Flink Job Cluster
+
+A Flink Job Cluster is a dedicated [Flink Cluster](#flink-cluster) that only
+executes a single [Flink Job](#flink-job). The lifetime of the
+[Flink Cluster](#flink-cluster) is bound to the lifetime of the Flink Job.
 
 #### Flink Cluster
 
@@ -47,9 +56,22 @@ Function 是由用户实现的,并封装了 Flink 程序的应用程序逻辑
 
 Instance 常用于描述运行时的特定类型(通常是 [Operator](#operator) 或者 [Function](#function))的一个具体实例。由于 Apache Flink 主要是用 Java 编写的,所以,这与 Java 中的 *Instance* 或 *Object* 的定义相对应。在 Apache Flink 的上下文中,*parallel instance* 也常用于强调同一 [Operator](#operator) 或者 [Function](#function) 的多个 instance 以并行的方式运行。
 
+#### Flink Application
+
+A Flink application is a Java Application that submits one or multiple [Flink
+Jobs](#flink-job) from the `main()` method (or by some other means). Submitting
+jobs is usually done by calling `execute()` on an execution environment.
+
+The jobs of an application can either be submitted to a long running [Flink
+Session Cluster](#flink-session-cluster), to a dedicated [Flink Application
+Cluster](#flink-application-cluster), or to a [Flink Job
+Cluster](#flink-job-cluster).
+
 #### Flink Job
 
-Flink Job 代表运行时的 Flink 程序。Flink Job 可以提交到长时间运行的 [Flink Session Cluster](#flink-session-cluster),也可以作为独立的 [Flink Application Cluster](#flink-application-cluster) 启动。
+A Flink Job is the runtime representation of a [logical graph](#logical-graph)
+(also often called dataflow graph) that is created and submitted by calling
+`execute()` in a [Flink Application](#flink-application).
 
 #### JobGraph
 
@@ -61,7 +83,12 @@ JobManager 是在 [Flink Master](#flink-master) 运行中的组件之一。JobMa
 
 #### Logical Graph
 
-Logical Graph 是一种描述流处理程序的高阶逻辑有向图。节点是[Operator](#operator),边代表输入/输出关系、数据流和数据集中的之一。
+A logical graph is a directed graph where the nodes are  [Operators](#operator)
+and the edges define input/output-relationships of the operators and correspond
+to data streams or data sets. A logical graph is created by submitting jobs
+from a [Flink Application](#flink-application).
+
+Logical graphs are also often referred to as *dataflow graphs*.
 
 #### Managed State
 
diff --git a/docs/dev/batch/hadoop_compatibility.zh.md b/docs/dev/batch/hadoop_compatibility.zh.md
index 381ae2f..1f03adb 100644
--- a/docs/dev/batch/hadoop_compatibility.zh.md
+++ b/docs/dev/batch/hadoop_compatibility.zh.md
@@ -28,7 +28,7 @@ reusing code that was implemented for Hadoop MapReduce.
 
 You can:
 
-- use Hadoop's `Writable` [data types]({{ site.baseurl }}/dev/api_concepts.html#supported-data-types) in Flink programs.
+- use Hadoop's `Writable` [data types]({% link dev/types_serialization.md %}#supported-data-types) in Flink programs.
 - use any Hadoop `InputFormat` as a [DataSource](index.html#data-sources).
 - use any Hadoop `OutputFormat` as a [DataSink](index.html#data-sinks).
 - use a Hadoop `Mapper` as [FlatMapFunction](dataset_transformations.html#flatmap).
diff --git a/docs/dev/batch/index.zh.md b/docs/dev/batch/index.zh.md
index 55a3be0..66fd0cf 100644
--- a/docs/dev/batch/index.zh.md
+++ b/docs/dev/batch/index.zh.md
@@ -32,11 +32,12 @@ example write the data to (distributed) files, or to standard output (for exampl
 terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs.
 The execution can happen in a local JVM, or on clusters of many machines.
 
-Please see [basic concepts]({{ site.baseurl }}/dev/api_concepts.html) for an introduction
-to the basic concepts of the Flink API.
+Please refer to the [DataStream API overview]({% link dev/datastream_api.md %})
+for an introduction to the basic concepts of the Flink API. That overview is
+for the DataStream API but the basic concepts of the two APIs are the same.
 
 In order to create your own Flink DataSet program, we encourage you to start with the
-[anatomy of a Flink Program]({{ site.baseurl }}/dev/api_concepts.html#anatomy-of-a-flink-program)
+[anatomy of a Flink Program]({% link dev/datastream_api.md %}#anatomy-of-a-flink-program)
 and gradually add your own
 [transformations](#dataset-transformations). The remaining sections act as references for additional
 operations and advanced features.
@@ -278,7 +279,7 @@ data.distinct();
         Joins two data sets by creating all pairs of elements that are equal on their keys.
         Optionally uses a JoinFunction to turn the pair of elements into a single element, or a
         FlatJoinFunction to turn the pair of elements into arbitrarily many (including none)
-        elements. See the <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys section</a> to learn how to define join keys.
+        elements. See the <a href="#specifying-keys">keys section</a> to learn how to define join keys.
 {% highlight java %}
 result = input1.join(input2)
                .where(0)       // key of the first input (tuple field 0)
@@ -304,7 +305,7 @@ result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
     <tr>
       <td><strong>OuterJoin</strong></td>
       <td>
-        Performs a left, right, or full outer join on two data sets. Outer joins are similar to regular (inner) joins and create all pairs of elements that are equal on their keys. In addition, records of the "outer" side (left, right, or both in case of full) are preserved if no matching key is found in the other side. Matching pairs of elements (or one element and a <code>null</code> value for the other input) are given to a JoinFunction to turn the pair of elements into a single eleme [...]
+        Performs a left, right, or full outer join on two data sets. Outer joins are similar to regular (inner) joins and create all pairs of elements that are equal on their keys. In addition, records of the "outer" side (left, right, or both in case of full) are preserved if no matching key is found in the other side. Matching pairs of elements (or one element and a <code>null</code> value for the other input) are given to a JoinFunction to turn the pair of elements into a single eleme [...]
 {% highlight java %}
 input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or full outer joins
       .where(0)              // key of the first input (tuple field 0)
@@ -326,7 +327,7 @@ input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or ful
       <td>
         <p>The two-dimensional variant of the reduce operation. Groups each input on one or more
         fields and then joins the groups. The transformation function is called per pair of groups.
-        See the <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys section</a> to learn how to define coGroup keys.</p>
+        See the <a href="#specifying-keys">keys section</a> to learn how to define coGroup keys.</p>
 {% highlight java %}
 data1.coGroup(data2)
      .where(0)
@@ -600,7 +601,7 @@ data.distinct()
         Joins two data sets by creating all pairs of elements that are equal on their keys.
         Optionally uses a JoinFunction to turn the pair of elements into a single element, or a
         FlatJoinFunction to turn the pair of elements into arbitrarily many (including none)
-        elements. See the <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys section</a> to learn how to define join keys.
+        elements. See the <a href="#specifying-keys">keys section</a> to learn how to define join keys.
 {% highlight scala %}
 // In this case tuple fields are used as keys. "0" is the join field on the first tuple
 // "1" is the join field on the second tuple.
@@ -626,7 +627,7 @@ val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
     <tr>
       <td><strong>OuterJoin</strong></td>
       <td>
-        Performs a left, right, or full outer join on two data sets. Outer joins are similar to regular (inner) joins and create all pairs of elements that are equal on their keys. In addition, records of the "outer" side (left, right, or both in case of full) are preserved if no matching key is found in the other side. Matching pairs of elements (or one element and a `null` value for the other input) are given to a JoinFunction to turn the pair of elements into a single element, or to a [...]
+        Performs a left, right, or full outer join on two data sets. Outer joins are similar to regular (inner) joins and create all pairs of elements that are equal on their keys. In addition, records of the "outer" side (left, right, or both in case of full) are preserved if no matching key is found in the other side. Matching pairs of elements (or one element and a `null` value for the other input) are given to a JoinFunction to turn the pair of elements into a single element, or to a [...]
 {% highlight scala %}
 val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
    (left, right) =>
@@ -642,7 +643,7 @@ val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
       <td>
         <p>The two-dimensional variant of the reduce operation. Groups each input on one or more
         fields and then joins the groups. The transformation function is called per pair of groups.
-        See the <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys section</a> to learn how to define coGroup keys.</p>
+        See the <a href="#specifying-keys">keys section</a> to learn how to define coGroup keys.</p>
 {% highlight scala %}
 data1.coGroup(data2).where(0).equalTo(1)
 {% endhighlight %}
@@ -796,6 +797,230 @@ possible for [Data Sources](#data-sources) and [Data Sinks](#data-sinks).
 
 {% top %}
 
+Specifying Keys
+---------------
+
+Some transformations (join, coGroup, groupBy) require that a key be defined on
+a collection of elements. Other transformations (Reduce, GroupReduce,
+Aggregate) allow data being grouped on a key before they are
+applied.
+
+A DataSet is grouped as
+{% highlight java %}
+DataSet<...> input = // [...]
+DataSet<...> reduced = input
+  .groupBy(/*define key here*/)
+  .reduceGroup(/*do something*/);
+{% endhighlight %}
+
+The data model of Flink is not based on key-value pairs. Therefore,
+you do not need to physically pack the data set types into keys and
+values. Keys are "virtual": they are defined as functions over the
+actual data to guide the grouping operator.
+
+### Define keys for Tuples
+{:.no_toc}
+
+The simplest case is grouping Tuples on one or more
+fields of the Tuple:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataSet<Tuple3<Integer,String,Long>> input = // [...]
+UnsortedGrouping<Tuple3<Integer,String,Long>,Tuple> keyed = input.groupBy(0)
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataSet[(Int, String, Long)] = // [...]
+val keyed = input.groupBy(0)
+{% endhighlight %}
+</div>
+</div>
+
+The tuples are grouped on the first field (the one of
+Integer type).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataSet<Tuple3<Integer,String,Long>> input = // [...]
+UnsortedGrouping<Tuple3<Integer,String,Long>,Tuple> keyed = input.groupBy(0,1)
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataSet[(Int, String, Long)] = // [...]
+val grouped = input.groupBy(0,1)
+{% endhighlight %}
+</div>
+</div>
+
+Here, we group the tuples on a composite key consisting of the first and the
+second field.
+
+A note on nested Tuples: If you have a DataSet with a nested tuple, such as:
+
+{% highlight java %}
+DataSet<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
+{% endhighlight %}
+
+Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a key (with the Integer and Float being the key). If you want to "navigate" into the nested `Tuple2`, you have to use field expression keys which are explained below.
+
+### Define keys using Field Expressions
+{:.no_toc}
+
+You can use String-based field expressions to reference nested fields and define keys for grouping, sorting, joining, or coGrouping.
+
+Field expressions make it very easy to select fields in (nested) composite types such as [Tuple](#tuples-and-case-classes) and [POJO](#pojos) types.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+In the example below, we have a `WC` POJO with two fields "word" and "count". To group by the field `word`, we just pass its name to the `groupBy()` function.
+{% highlight java %}
+// some ordinary POJO (Plain old Java Object)
+public class WC {
+  public String word;
+  public int count;
+}
+DataSet<WC> words = // [...]
+DataSet<WC> wordCounts = words.groupBy("word")
+{% endhighlight %}
+
+**Field Expression Syntax**:
+
+- Select POJO fields by their field name. For example `"user"` refers to the "user" field of a POJO type.
+
+- Select Tuple fields by their field name or 0-offset field index. For example `"f0"` and `"5"` refer to the first and sixth field of a Java Tuple type, respectively.
+
+- You can select nested fields in POJOs and Tuples. For example `"user.zip"` refers to the "zip" field of a POJO which is stored in the "user" field of a POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such as `"f1.user.zip"` or `"user.f3.1.zip"`.
+
+- You can select the full type using the `"*"` wildcard expressions. This does also work for types which are not Tuple or POJO types.
+
+**Field Expression Example**:
+
+{% highlight java %}
+public static class WC {
+  public ComplexNestedClass complex; //nested POJO
+  private int count;
+  // getter / setter for private field (count)
+  public int getCount() {
+    return count;
+  }
+  public void setCount(int c) {
+    this.count = c;
+  }
+}
+public static class ComplexNestedClass {
+  public Integer someNumber;
+  public float someFloat;
+  public Tuple3<Long, Long, String> word;
+  public IntWritable hadoopCitizen;
+}
+{% endhighlight %}
+
+These are valid field expressions for the example code above:
+
+- `"count"`: The count field in the `WC` class.
+
+- `"complex"`: Recursively selects all fields of the field complex of POJO type `ComplexNestedClass`.
+
+- `"complex.word.f2"`: Selects the last field of the nested `Tuple3`.
+
+- `"complex.hadoopCitizen"`: Selects the Hadoop `IntWritable` type.
+
+</div>
+<div data-lang="scala" markdown="1">
+
+In the example below, we have a `WC` POJO with two fields "word" and "count". To group by the field `word`, we just pass its name to the `groupBy()` function.
+{% highlight scala %}
+// some ordinary POJO (Plain old Java Object)
+class WC(var word: String, var count: Int) {
+  def this() { this("", 0L) }
+}
+val words: DataSet[WC] = // [...]
+val wordCounts = words.groupBy("word")
+
+// or, as a case class, which is less typing
+case class WC(word: String, count: Int)
+val words: DataSet[WC] = // [...]
+val wordCounts = words.groupBy("word")
+{% endhighlight %}
+
+**Field Expression Syntax**:
+
+- Select POJO fields by their field name. For example `"user"` refers to the "user" field of a POJO type.
+
+- Select Tuple fields by their 1-offset field name or 0-offset field index. For example `"_1"` and `"5"` refer to the first and sixth field of a Scala Tuple type, respectively.
+
+- You can select nested fields in POJOs and Tuples. For example `"user.zip"` refers to the "zip" field of a POJO which is stored in the "user" field of a POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such as `"_2.user.zip"` or `"user._4.1.zip"`.
+
+- You can select the full type using the `"_"` wildcard expressions. This does also work for types which are not Tuple or POJO types.
+
+**Field Expression Example**:
+
+{% highlight scala %}
+class WC(var complex: ComplexNestedClass, var count: Int) {
+  def this() { this(null, 0) }
+}
+
+class ComplexNestedClass(
+    var someNumber: Int,
+    someFloat: Float,
+    word: (Long, Long, String),
+    hadoopCitizen: IntWritable) {
+  def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
+}
+{% endhighlight %}
+
+These are valid field expressions for the example code above:
+
+- `"count"`: The count field in the `WC` class.
+
+- `"complex"`: Recursively selects all fields of the field complex of POJO type `ComplexNestedClass`.
+
+- `"complex.word._3"`: Selects the last field of the nested `Tuple3`.
+
+- `"complex.hadoopCitizen"`: Selects the Hadoop `IntWritable` type.
+
+</div>
+</div>
+
+### Define keys using Key Selector Functions
+{:.no_toc}
+
+An additional way to define keys are "key selector" functions. A key selector function
+takes a single element as input and returns the key for the element. The key can be of any type and be derived from deterministic computations.
+
+The following example shows a key selector function that simply returns the field of an object:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// some ordinary POJO
+public class WC {public String word; public int count;}
+DataSet<WC> words = // [...]
+UnsortedGrouping<WC> keyed = words
+  .groupBy(new KeySelector<WC, String>() {
+     public String getKey(WC wc) { return wc.word; }
+   });
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// some ordinary case class
+case class WC(word: String, count: Int)
+val words: DataSet[WC] = // [...]
+val keyed = words.groupBy( _.word )
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
 Data Sources
 ------------
 
@@ -1199,7 +1424,7 @@ myResult.output(
 
 #### Locally Sorted Output
 
-The output of a data sink can be locally sorted on specified fields in specified orders using [tuple field positions]({{ site.baseurl }}/dev/api_concepts.html#define-keys-for-tuples) or [field expressions]({{ site.baseurl }}/dev/api_concepts.html#define-keys-using-field-expressions). This works for every output format.
+The output of a data sink can be locally sorted on specified fields in specified orders using [tuple field positions](#define-keys-for-tuples) or [field expressions](#define-keys-using-field-expressions). This works for every output format.
 
 The following examples show how to use this feature:
 
@@ -1282,7 +1507,7 @@ values map { tuple => tuple._1 + " - " + tuple._2 }
 
 #### Locally Sorted Output
 
-The output of a data sink can be locally sorted on specified fields in specified orders using [tuple field positions]({{ site.baseurl }}/dev/api_concepts.html#define-keys-for-tuples) or [field expressions]({{ site.baseurl }}/dev/api_concepts.html#define-keys-using-field-expressions). This works for every output format.
+The output of a data sink can be locally sorted on specified fields in specified orders using [tuple field positions](#define-keys-for-tuples) or [field expressions](#define-keys-using-field-expressions). This works for every output format.
 
 The following examples show how to use this feature:
 
@@ -1771,7 +1996,7 @@ This information is used by the optimizer to infer whether a data property such
 partitioning is preserved by a function.
 For functions that operate on groups of input elements such as `GroupReduce`, `GroupCombine`, `CoGroup`, and `MapPartition`, all fields that are defined as forwarded fields must always be jointly forwarded from the same input element. The forwarded fields of each element that is emitted by a group-wise function may originate from a different element of the function's input group.
 
-Field forward information is specified using [field expressions]({{ site.baseurl }}/dev/api_concepts.html#define-keys-using-field-expressions).
+Field forward information is specified using [field expressions](#define-keys-using-field-expressions).
 Fields that are forwarded to the same position in the output can be specified by their position.
 The specified position must be valid for the input and output data type and have the same type.
 For example the String `"f2"` declares that the third field of a Java input tuple is always equal to the third field in the output tuple.
@@ -1840,7 +2065,7 @@ Non-forwarded field information for group-wise operators such as `GroupReduce`,
 **IMPORTANT**: The specification of non-forwarded fields information is optional. However if used,
 **ALL!** non-forwarded fields must be specified, because all other fields are considered to be forwarded in place. It is safe to declare a forwarded field as non-forwarded.
 
-Non-forwarded fields are specified as a list of [field expressions]({{ site.baseurl }}/dev/api_concepts.html#define-keys-using-field-expressions). The list can be either given as a single String with field expressions separated by semicolons or as multiple Strings.
+Non-forwarded fields are specified as a list of [field expressions](#define-keys-using-field-expressions). The list can be either given as a single String with field expressions separated by semicolons or as multiple Strings.
 For example both `"f1; f3"` and `"f1", "f3"` declare that the second and fourth field of a Java tuple
 are not preserved in place and all other fields are preserved in place.
 Non-forwarded field information can only be specified for functions which have identical input and output types.
@@ -1891,7 +2116,7 @@ Fields which are only unmodified forwarded to the output without evaluating thei
 **IMPORTANT**: The specification of read fields information is optional. However if used,
 **ALL!** read fields must be specified. It is safe to declare a non-read field as read.
 
-Read fields are specified as a list of [field expressions]({{ site.baseurl }}/dev/api_concepts.html#define-keys-using-field-expressions). The list can be either given as a single String with field expressions separated by semicolons or as multiple Strings.
+Read fields are specified as a list of [field expressions](#define-keys-using-field-expressions). The list can be either given as a single String with field expressions separated by semicolons or as multiple Strings.
 For example both `"f1; f3"` and `"f1", "f3"` declare that the second and fourth field of a Java tuple are read and evaluated by the function.
 
 Read field information is specified as function class annotations using the following annotations:
@@ -2045,7 +2270,7 @@ DataSet<Integer> result = input.map(new MyMapper());
 env.execute();
 {% endhighlight %}
 
-Access the cached file or directory in a user function (here a `MapFunction`). The function must extend a [RichFunction]({{ site.baseurl }}/dev/api_concepts.html#rich-functions) class because it needs access to the `RuntimeContext`.
+Access the cached file or directory in a user function (here a `MapFunction`). The function must extend a [RichFunction]({% link dev/user_defined_functions.md %}#rich-functions) class because it needs access to the `RuntimeContext`.
 
 {% highlight java %}
 
@@ -2091,7 +2316,7 @@ val result: DataSet[Integer] = input.map(new MyMapper())
 env.execute()
 {% endhighlight %}
 
-Access the cached file in a user function (here a `MapFunction`). The function must extend a [RichFunction]({{ site.baseurl }}/dev/api_concepts.html#rich-functions) class because it needs access to the `RuntimeContext`.
+Access the cached file in a user function (here a `MapFunction`). The function must extend a [RichFunction]({% link dev/user_defined_functions.md %}#rich-functions) class because it needs access to the `RuntimeContext`.
 
 {% highlight scala %}
 
@@ -2164,7 +2389,7 @@ class MyFilter(limit: Int) extends FilterFunction[Int] {
 
 #### Via `withParameters(Configuration)`
 
-This method takes a Configuration object as an argument, which will be passed to the [rich function]({{ site.baseurl }}/dev/api_concepts.html#rich-functions)'s `open()`
+This method takes a Configuration object as an argument, which will be passed to the [rich function]({% link dev/user_defined_functions.md %}#rich-functions)'s `open()`
 method. The Configuration object is a Map from String keys to different value types.
 
 <div class="codetabs" markdown="1">
diff --git a/docs/dev/connectors/cassandra.zh.md b/docs/dev/connectors/cassandra.zh.md
index 9a51387..002d388 100644
--- a/docs/dev/connectors/cassandra.zh.md
+++ b/docs/dev/connectors/cassandra.zh.md
@@ -111,7 +111,7 @@ More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpoin
 
 ## Examples
 
-The Cassandra sinks currently support both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java [...]
+The Cassandra sinks currently support both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({% link dev/types_serialization.md %}#supported-data-types). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWi [...]
 
 In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
 
diff --git a/docs/dev/connectors/filesystem_sink.zh.md b/docs/dev/connectors/filesystem_sink.zh.md
index 1e570e6..adb9f8b 100644
--- a/docs/dev/connectors/filesystem_sink.zh.md
+++ b/docs/dev/connectors/filesystem_sink.zh.md
@@ -39,7 +39,7 @@ under the License.
 </dependency>
 {% endhighlight %}
 
-注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考 [这里]({{site.baseurl}}/zh/dev/projectsetup/dependencies.html)。
+注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考 [这里]({{site.baseurl}}/zh/getting-started/project-setup/dependencies.html)。
 
 #### 分桶文件 Sink
 
diff --git a/docs/dev/connectors/kafka.zh.md b/docs/dev/connectors/kafka.zh.md
index b2cdc54..bf2ebbe 100644
--- a/docs/dev/connectors/kafka.zh.md
+++ b/docs/dev/connectors/kafka.zh.md
@@ -84,7 +84,7 @@ Flink 提供了专门的 Kafka 连接器,向 Kafka topic 中读取或者写入
 {% endhighlight %}
 
 请注意:目前流连接器还不是二进制分发的一部分。
-[在此处]({{ site.baseurl }}/zh/dev/projectsetup/dependencies.html)可以了解到如何链接它们以实现在集群中执行。
+[在此处]({{ site.baseurl }}/zh/getting-started/project-setup/dependencies.html)可以了解到如何链接它们以实现在集群中执行。
 
 ## 安装 Apache Kafka
 
diff --git a/docs/dev/connectors/kinesis.zh.md b/docs/dev/connectors/kinesis.zh.md
index 4e9009d..9601af2 100644
--- a/docs/dev/connectors/kinesis.zh.md
+++ b/docs/dev/connectors/kinesis.zh.md
@@ -45,7 +45,25 @@ Due to the licensing issue, the `flink-connector-kinesis{{ site.scala_version_su
 
 ## Using the Amazon Kinesis Streams Service
 Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
-to setup Kinesis streams. Make sure to create the appropriate IAM policy and user to read / write to the Kinesis streams.
+to setup Kinesis streams.
+
+## Configuring Access to Kinesis with IAM
+Make sure to create the appropriate IAM policy to allow reading / writing to / from the Kinesis streams. See examples [here](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html).
+
+Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis.
+By default, the `AUTO` Credentials Provider is used.
+If the access key ID and secret key are set in the configuration, the `BASIC` provider is used.
+
+A specific Credentials Provider can **optionally** be set by using the `AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` setting.
+
+Supported Credential Providers are:
+* `AUTO` - Using the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE` and EC2/ECS credentials provider.
+* `BASIC` - Using access key ID and secret key supplied as configuration.
+* `ENV_VAR` - Using `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables.
+* `SYS_PROP` - Using Java system properties aws.accessKeyId and aws.secretKey.
+* `PROFILE` - Use AWS credentials profile file to create the AWS credentials.
+* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied.
+* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token.
 
 ## Kinesis Consumer
 
@@ -91,8 +109,7 @@ The above is a simple example of using the consumer. Configuration for the consu
 instance, the configuration keys for which can be found in `AWSConfigConstants` (AWS-specific parameters) and
 `ConsumerConfigConstants` (Kinesis consumer parameters). The example
 demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". The AWS credentials are supplied using the basic method in which
-the AWS access key ID and secret access key are directly supplied in the configuration (other options are setting
-`AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, `PROFILE`, `ASSUME_ROLE`, and `AUTO`). Also, data is being consumed
+the AWS access key ID and secret access key are directly supplied in the configuration. Also, data is being consumed
 from the newest position in the Kinesis stream (the other option will be setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION`
 to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible).
 
diff --git a/docs/dev/connectors/nifi.zh.md b/docs/dev/connectors/nifi.zh.md
index 114092f..36ac3f3 100644
--- a/docs/dev/connectors/nifi.zh.md
+++ b/docs/dev/connectors/nifi.zh.md
@@ -34,7 +34,7 @@ under the License.
 </dependency>
 {% endhighlight %}
 
-注意这些连接器目前还没有包含在二进制发行版中。添加依赖、打包配置以及集群运行的相关信息请参考 [这里]({{site.baseurl}}/zh/dev/projectsetup/dependencies.html)。
+注意这些连接器目前还没有包含在二进制发行版中。添加依赖、打包配置以及集群运行的相关信息请参考 [这里]({{site.baseurl}}/zh/getting-started/project-setup/dependencies.html)。
 
 #### 安装 Apache NiFi
 
diff --git a/docs/dev/connectors/pubsub.zh.md b/docs/dev/connectors/pubsub.zh.md
index eaf5f58..93cfa96 100644
--- a/docs/dev/connectors/pubsub.zh.md
+++ b/docs/dev/connectors/pubsub.zh.md
@@ -37,7 +37,7 @@ under the License.
 <b>注意</b>:此连接器最近才加到 Flink 里,还未接受广泛测试。
 </p>
 
-注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考[这里]({{ site.baseurl }}/zh/dev/projectsetup/dependencies.html)
+注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考[这里]({{ site.baseurl }}/zh/getting-started/project-setup/dependencies.html)
 
 ## Consuming or Producing PubSubMessages
 
diff --git a/docs/dev/connectors/rabbitmq.zh.md b/docs/dev/connectors/rabbitmq.zh.md
index e213d3f..26b16ba 100644
--- a/docs/dev/connectors/rabbitmq.zh.md
+++ b/docs/dev/connectors/rabbitmq.zh.md
@@ -43,7 +43,7 @@ Flink 自身既没有复用 "RabbitMQ AMQP Java Client" 的代码,也没有将
 </dependency>
 {% endhighlight %}
 
-注意连接器现在没有包含在二进制发行版中。集群执行的相关信息请参考 [这里]({{site.baseurl}}/zh/dev/projectsetup/dependencies.html).
+注意连接器现在没有包含在二进制发行版中。集群执行的相关信息请参考 [这里]({{site.baseurl}}/zh/getting-started/project-setup/dependencies.html).
 
 ### 安装 RabbitMQ
 安装 RabbitMQ 请参考 [RabbitMQ 下载页面](http://www.rabbitmq.com/download.html)。安装完成之后,服务会自动拉起,应用程序就可以尝试连接到 RabbitMQ 了。
diff --git a/docs/dev/connectors/streamfile_sink.zh.md b/docs/dev/connectors/streamfile_sink.zh.md
index bd74bef..9f027a7 100644
--- a/docs/dev/connectors/streamfile_sink.zh.md
+++ b/docs/dev/connectors/streamfile_sink.zh.md
@@ -122,11 +122,12 @@ input.addSink(sink)
 批量编码 Sink 的创建与行编码 Sink 相似,不过在这里我们不是指定编码器  `Encoder` 而是指定 [BulkWriter.Factory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/api/common/serialization/BulkWriter.Factory.html) 。
 `BulkWriter` 定义了如何添加、刷新元素,以及如何批量编码。
 
-Flink 有三个内置的 BulkWriter Factory :
+Flink 有四个内置的 BulkWriter Factory :
 
  - [ParquetWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/parquet/ParquetWriterFactory.html)
  - [SequenceFileWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.html)
  - [CompressWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/compress/CompressWriterFactory.html)
+ - [OrcBulkWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.html)
 
 <div class="alert alert-info">
      <b>重要:</b> 批量编码模式仅支持 OnCheckpointRollingPolicy 策略, 在每次 checkpoint 的时候切割文件。
@@ -188,6 +189,204 @@ input.addSink(sink)
 </div>
 </div>
 
+#### ORC Format
+
+To enable the data to be bulk encoded in ORC format, Flink offers [OrcBulkWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/orc/writers/OrcBulkWriterFactory.html)
+which takes a concrete implementation of [Vectorizer]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/orc/vector/Vectorizer.html).
+
+Like any other columnar format that encodes data in bulk fashion, Flink's `OrcBulkWriter` writes the input elements in batches. It uses
+ORC's `VectorizedRowBatch` to achieve this.
+
+Since the input element has to be transformed to a `VectorizedRowBatch`, users have to extend the abstract `Vectorizer`
+class and override the `vectorize(T element, VectorizedRowBatch batch)` method. As you can see, the method provides an
+instance of `VectorizedRowBatch` to be used directly by the users so users just have to write the logic to transform the
+input `element` to `ColumnVectors` and set them in the provided `VectorizedRowBatch` instance.
+
+For example, if the input element is of type `Person` which looks like:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+class Person {
+    private final String name;
+    private final int age;
+    ...
+}
+
+{% endhighlight %}
+</div>
+
+Then a child implementation to convert the element of type `Person` and set them in the `VectorizedRowBatch` can be like:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+
+public class PersonVectorizer extends Vectorizer<Person> implements Serializable {
+	public PersonVectorizer(String schema) {
+		super(schema);
+	}
+	@Override
+	public void vectorize(Person element, VectorizedRowBatch batch) throws IOException {
+		BytesColumnVector nameColVector = (BytesColumnVector) batch.cols[0];
+		LongColumnVector ageColVector = (LongColumnVector) batch.cols[1];
+		int row = batch.size++;
+		nameColVector.setVal(row, element.getName().getBytes(StandardCharsets.UTF_8));
+		ageColVector.vector[row] = element.getAge();
+	}
+}
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import java.nio.charset.StandardCharsets
+import org.apache.hadoop.hive.ql.exec.vector.{BytesColumnVector, LongColumnVector}
+
+class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {
+
+  override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
+    val nameColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
+    val ageColVector = batch.cols(1).asInstanceOf[LongColumnVector]
+    nameColVector.setVal(batch.size + 1, element.getName.getBytes(StandardCharsets.UTF_8))
+    ageColVector.vector(batch.size + 1) = element.getAge
+  }
+
+}
+
+{% endhighlight %}
+</div>
+</div>
+
+To use the ORC bulk encoder in an application, users need to add the following dependency:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-orc{{ site.scala_version_suffix }}</artifactId>
+  <version>{{ site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+And then a `StreamingFileSink` that writes data in ORC format can be created like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.orc.writer.OrcBulkWriterFactory;
+
+String schema = "struct<_col0:string,_col1:int>";
+DataStream<Person> stream = ...;
+
+final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(new PersonVectorizer(schema));
+
+final StreamingFileSink<Person> sink = StreamingFileSink
+	.forBulkFormat(outputBasePath, writerFactory)
+	.build();
+
+input.addSink(sink);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.orc.writer.OrcBulkWriterFactory
+
+val schema: String = "struct<_col0:string,_col1:int>"
+val input: DataStream[Person] = ...
+val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema));
+
+val sink: StreamingFileSink[Person] = StreamingFileSink
+    .forBulkFormat(outputBasePath, writerFactory)
+    .build()
+
+input.addSink(sink)
+
+{% endhighlight %}
+</div>
+</div>
+
+OrcBulkWriterFactory can also take Hadoop `Configuration` and `Properties` so that a custom Hadoop configuration and ORC
+writer properties can be provided.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+String schema = ...;
+Configuration conf = ...;
+Properties writerProperties = new Properties();
+
+writerProps.setProperty("orc.compress", "LZ4");
+// Other ORC supported properties can also be set similarly.
+
+final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(
+    new PersonVectorizer(schema), writerProperties, conf);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val schema: String = ...
+val conf: Configuration = ...
+val writerProperties: Properties = new Properties()
+
+writerProps.setProperty("orc.compress", "LZ4")
+// Other ORC supported properties can also be set similarly.
+
+val writerFactory = new OrcBulkWriterFactory(
+    new PersonVectorizer(schema), writerProperties, conf)
+{% endhighlight %}
+</div>
+</div>
+
+The complete list of ORC writer properties can be found [here](https://orc.apache.org/docs/hive-config.html).
+
+Users who want to add user metadata to the ORC files can do so by calling `addUserMetadata(...)` inside the overriding
+`vectorize(...)` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+public class PersonVectorizer extends Vectorizer<Person> implements Serializable {
+	@Override
+	public void vectorize(Person element, VectorizedRowBatch batch) throws IOException {
+		...
+		String metadataKey = ...;
+		ByteBuffer metadataValue = ...;
+		this.addUserMetadata(metadataKey, metadataValue);
+	}
+}
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {
+
+  override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
+    ...
+    val metadataKey: String = ...
+    val metadataValue: ByteBuffer = ...
+    addUserMetadata(metadataKey, metadataValue)
+  }
+
+}
+
+{% endhighlight %}
+</div>
+</div>
+
 #### Hadoop SequenceFile 格式
 
 在应用中使用 SequenceFile 批量编码器,你需要添加以下依赖:
diff --git a/docs/dev/connectors/twitter.zh.md b/docs/dev/connectors/twitter.zh.md
index f4d110d..afd3ba8 100644
--- a/docs/dev/connectors/twitter.zh.md
+++ b/docs/dev/connectors/twitter.zh.md
@@ -35,7 +35,7 @@ Flink Streaming 通过一个内置的 `TwitterSource` 类来创建到 tweets 流
 </dependency>
 {% endhighlight %}
 
-注意:当前的二进制发行版还没有这些连接器。集群执行请参考[这里]({{site.baseurl}}/zh/dev/projectsetup/dependencies.html).
+注意:当前的二进制发行版还没有这些连接器。集群执行请参考[这里]({{site.baseurl}}/zh/getting-started/project-setup/dependencies.html).
 
 #### 认证
 使用 Twitter 流,用户需要先注册自己的程序,获取认证相关的必要信息。过程如下:
diff --git a/docs/dev/datastream_api.zh.md b/docs/dev/datastream_api.zh.md
index c95a910..8300827 100644
--- a/docs/dev/datastream_api.zh.md
+++ b/docs/dev/datastream_api.zh.md
@@ -32,19 +32,221 @@ example write the data to files, or to standard output (for example the command
 terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs.
 The execution can happen in a local JVM, or on clusters of many machines.
 
-Please see [basic concepts]({{ site.baseurl }}/dev/api_concepts.html) for an introduction
-to the basic concepts of the Flink API.
-
-In order to create your own Flink DataStream program, we encourage you to start with
-[anatomy of a Flink Program]({{ site.baseurl }}/dev/api_concepts.html#anatomy-of-a-flink-program)
-and gradually add your own
-[stream transformations]({{ site.baseurl }}/dev/stream/operators/index.html). The remaining sections act as references for additional
-operations and advanced features.
+In order to create your own Flink DataStream program, we encourage you to start
+with [anatomy of a Flink Program](#anatomy-of-a-flink-program) and gradually
+add your own [stream transformations]({{ site.baseurl
+}}/dev/stream/operators/index.html). The remaining sections act as references
+for additional operations and advanced features.
 
 
 * This will be replaced by the TOC
 {:toc}
 
+What is a DataStream?
+----------------------
+
+The DataStream API gets its name from the special `DataStream` class that is
+used to represent a collection of data in a Flink program. You can think of
+them as immutable collections of data that can contain duplicates. This data
+can either be finite or unbounded, the API that you use to work on them is the
+same.
+
+A `DataStream` is similar to a regular Java `Collection` in terms of usage but
+is quite different in some key ways. They are immutable, meaning that once they
+are created you cannot add or remove elements. You can also not simply inspect
+the elements inside but only work on them using the `DataStream` API
+operations, which are also called transformations.
+
+You can create an initial `DataStream` by adding a source in a Flink program.
+Then you can derive new streams from this and combine them by using API methods
+such as `map`, `filter`, and so on.
+
+Anatomy of a Flink Program
+--------------------------
+
+Flink programs look like regular programs that transform `DataStreams`.  Each
+program consists of the same basic parts:
+
+1. Obtain an `execution environment`,
+2. Load/create the initial data,
+3. Specify transformations on this data,
+4. Specify where to put the results of your computations,
+5. Trigger the program execution
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+
+We will now give an overview of each of those steps, please refer to the
+respective sections for more details. Note that all core classes of the Java
+DataStream API can be found in {% gh_link
+/flink-streaming-java/src/main/java/org/apache/flink/streaming/api
+"org.apache.flink.streaming.api" %}.
+
+The `StreamExecutionEnvironment` is the basis for all Flink programs. You can
+obtain one using these static methods on `StreamExecutionEnvironment`:
+
+{% highlight java %}
+getExecutionEnvironment()
+
+createLocalEnvironment()
+
+createRemoteEnvironment(String host, int port, String... jarFiles)
+{% endhighlight %}
+
+Typically, you only need to use `getExecutionEnvironment()`, since this will do
+the right thing depending on the context: if you are executing your program
+inside an IDE or as a regular Java program it will create a local environment
+that will execute your program on your local machine. If you created a JAR file
+from your program, and invoke it through the [command line]({{ site.baseurl
+}}/ops/cli.html), the Flink cluster manager will execute your main method and
+`getExecutionEnvironment()` will return an execution environment for executing
+your program on a cluster.
+
+For specifying data sources the execution environment has several methods to
+read from files using various methods: you can just read them line by line, as
+CSV files, or using any of the other provided sources. To just read a text file
+as a sequence of lines, you can use:
+
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+DataStream<String> text = env.readTextFile("file:///path/to/file");
+{% endhighlight %}
+
+This will give you a DataStream on which you can then apply transformations to create new
+derived DataStreams.
+
+You apply transformations by calling methods on DataStream with a
+transformation functions. For example, a map transformation looks like this:
+
+{% highlight java %}
+DataStream<String> input = ...;
+
+DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
+    @Override
+    public Integer map(String value) {
+        return Integer.parseInt(value);
+    }
+});
+{% endhighlight %}
+
+This will create a new DataStream by converting every String in the original
+collection to an Integer.
+
+Once you have a DataStream containing your final results, you can write it to
+an outside system by creating a sink. These are just some example methods for
+creating a sink:
+
+{% highlight java %}
+writeAsText(String path)
+
+print()
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+
+We will now give an overview of each of those steps, please refer to the
+respective sections for more details. Note that all core classes of the Scala
+DataStream API can be found in {% gh_link
+/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala
+"org.apache.flink.streaming.api.scala" %}.
+
+The `StreamExecutionEnvironment` is the basis for all Flink programs. You can
+obtain one using these static methods on `StreamExecutionEnvironment`:
+
+{% highlight scala %}
+getExecutionEnvironment()
+
+createLocalEnvironment()
+
+createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
+{% endhighlight %}
+
+Typically, you only need to use `getExecutionEnvironment()`, since this will do
+the right thing depending on the context: if you are executing your program
+inside an IDE or as a regular Java program it will create a local environment
+that will execute your program on your local machine. If you created a JAR file
+from your program, and invoke it through the [command line]({{ site.baseurl
+}}/ops/cli.html), the Flink cluster manager will execute your main method and
+`getExecutionEnvironment()` will return an execution environment for executing
+your program on a cluster.
+
+For specifying data sources the execution environment has several methods to
+read from files using various methods: you can just read them line by line, as
+CSV files, or using any of the other provided sources. To just read a text file
+as a sequence of lines, you can use:
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val text: DataStream[String] = env.readTextFile("file:///path/to/file")
+{% endhighlight %}
+
+This will give you a DataStream on which you can then apply transformations to
+create new derived DataStreams.
+
+You apply transformations by calling methods on DataStream with a
+transformation functions. For example, a map transformation looks like this:
+
+{% highlight scala %}
+val input: DataSet[String] = ...
+
+val mapped = input.map { x => x.toInt }
+{% endhighlight %}
+
+This will create a new DataStream by converting every String in the original
+collection to an Integer.
+
+Once you have a DataStream containing your final results, you can write it to
+an outside system by creating a sink. These are just some example methods for
+creating a sink:
+
+{% highlight scala %}
+writeAsText(path: String)
+
+print()
+{% endhighlight %}
+
+</div>
+</div>
+
+Once you specified the complete program you need to **trigger the program
+execution** by calling `execute()` on the `StreamExecutionEnvironment`.
+Depending on the type of the `ExecutionEnvironment` the execution will be
+triggered on your local machine or submit your program for execution on a
+cluster.
+
+The `execute()` method will wait for the job to finish and then return a
+`JobExecutionResult`, this contains execution times and accumulator results.
+
+If you don't want to wait for the job to finish, you can trigger asynchronous
+job execution by calling `executeAysnc()` on the `StreamExecutionEnvironment`.
+It will return a `JobClient` with which you can communicate with the job you
+just submitted. For instance, here is how to implement the semantics of
+`execute()` by using `executeAsync()`.
+
+{% highlight java %}
+final JobClient jobClient = env.executeAsync();
+
+final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
+{% endhighlight %}
+
+That last part about program execution is crucial to understanding when and how
+Flink operations are executed. All Flink programs are executed lazily: When the
+program's main method is executed, the data loading and transformations do not
+happen directly. Rather, each operation is created and added to a dataflow
+graph. The operations are actually executed when the execution is explicitly
+triggered by an `execute()` call on the execution environment.  Whether the
+program is executed locally or on a cluster depends on the type of execution
+environment
+
+The lazy evaluation lets you construct sophisticated programs that Flink
+executes as one holistically planned unit.
+
+{% top %}
 
 Example Program
 ---------------
diff --git a/docs/dev/java_lambdas.zh.md b/docs/dev/java_lambdas.zh.md
index 410a66a..8fbc6bc 100644
--- a/docs/dev/java_lambdas.zh.md
+++ b/docs/dev/java_lambdas.zh.md
@@ -26,7 +26,9 @@ Java 8 引入了几种新的语言特性,旨在实现更快、更清晰的编
 
 <span class="label label-danger">注意</span> Flink 支持对 Java API 的所有算子使用 Lambda 表达式,但是,当 Lambda 表达式使用 Java 泛型时,你需要 *显式* 声明类型信息。
 
-本文档介绍了如何使用 Lambda 表达式并描述了其在当前应用中的限制。有关 Flink API 的通用介绍, 请参阅[编程指南]({{ site.baseurl }}/zh/dev/api_concepts.html)。
+This document shows how to use lambda expressions and describes current
+limitations. For a general introduction to the Flink API, please refer to the
+[DataSteam API overview]({{ site.baseurl }}{% link dev/datastream_api.zh.md %})
 
 ### 示例和限制
 
diff --git a/docs/dev/libs/cep.zh.md b/docs/dev/libs/cep.zh.md
index 3805282..4a2a06c 100644
--- a/docs/dev/libs/cep.zh.md
+++ b/docs/dev/libs/cep.zh.md
@@ -35,7 +35,7 @@ FlinkCEP是在Flink上层实现的复杂事件处理库。
 
 ## 开始
 
-如果你想现在开始尝试,[创建一个Flink程序]({{ site.baseurl }}/zh/dev/projectsetup/dependencies.html),
+如果你想现在开始尝试,[创建一个Flink程序]({{ site.baseurl }}/zh/getting-started/project-setup/dependencies.html),
 添加FlinkCEP的依赖到项目的`pom.xml`文件中。
 
 <div class="codetabs" markdown="1">
@@ -60,7 +60,7 @@ FlinkCEP是在Flink上层实现的复杂事件处理库。
 </div>
 </div>
 
-{% info 提示 %} FlinkCEP不是二进制发布包的一部分。在集群上执行如何链接它可以看[这里]({{site.baseurl}}/zh/dev/projectsetup/dependencies.html)。
+{% info 提示 %} FlinkCEP不是二进制发布包的一部分。在集群上执行如何链接它可以看[这里]({{site.baseurl}}/zh/getting-started/project-setup/dependencies.html)。
 
 现在可以开始使用Pattern API写你的第一个CEP程序了。
 
diff --git a/docs/dev/parallel.zh.md b/docs/dev/parallel.zh.md
index c1b6038..041136d 100644
--- a/docs/dev/parallel.zh.md
+++ b/docs/dev/parallel.zh.md
@@ -73,7 +73,7 @@ env.execute("Word Count Example")
 
 ### 执行环境层次
 
-如[此节]({{ site.baseurl }}/zh/dev/api_concepts.html#anatomy-of-a-flink-program)所描述,Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。
+如[此节]({{ site.baseurl }}{% link dev/datastream_api.zh.md %}#anatomy-of-a-flink-program)所描述,Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。
 
 可以通过调用 `setParallelism()` 方法指定执行环境的默认并行度。如果想以并行度`3`来执行所有的算子、数据源和数据接收器。可以在执行环境上设置默认并行度,如下所示:
 
diff --git a/docs/dev/stream/operators/index.zh.md b/docs/dev/stream/operators/index.zh.md
index 2f54335..d226b01 100644
--- a/docs/dev/stream/operators/index.zh.md
+++ b/docs/dev/stream/operators/index.zh.md
@@ -100,7 +100,7 @@ dataStream.filter(new FilterFunction<Integer>() {
         <tr>
           <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
           <td>
-            <p>Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, <em>keyBy()</em> is implemented with hash partitioning. There are different ways to <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">specify keys</a>.</p>
+            <p>Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, <em>keyBy()</em> is implemented with hash partitioning. There are different ways to <a href="{% link dev/stream/state/state.md %}#keyed-datastream">specify keys</a>.</p>
             <p>
             This transformation returns a <em>KeyedStream</em>, which is, among other things, required to use <a href="{{ site.baseurl }}/dev/stream/state/state.html#keyed-state">keyed state</a>. </p>
 {% highlight java %}
@@ -500,7 +500,7 @@ dataStream.filter { _ != 0 }
           <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
           <td>
             <p>Logically partitions a stream into disjoint partitions, each partition containing elements of the same key.
-            Internally, this is implemented with hash partitioning. See <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys</a> on how to specify keys.
+            Internally, this is implemented with hash partitioning. See <a href="{{ site.baseurl }}/dev/stream/state/state.html#keyed-state">keys</a> on how to specify keys.
             This transformation returns a KeyedStream.</p>
 {% highlight scala %}
 dataStream.keyBy("someKey") // Key by field "someKey"
diff --git a/docs/dev/stream/operators/windows.zh.md b/docs/dev/stream/operators/windows.zh.md
index 72269a4..517f7b3 100644
--- a/docs/dev/stream/operators/windows.zh.md
+++ b/docs/dev/stream/operators/windows.zh.md
@@ -94,7 +94,7 @@ Using the `keyBy(...)` will split your infinite stream into logical keyed stream
 stream is not keyed.
 
 In the case of keyed streams, any attribute of your incoming events can be used as a key
-(more details [here]({{ site.baseurl }}/dev/api_concepts.html#specifying-keys)). Having a keyed stream will
+(more details [here]({% link dev/stream/state/state.zh.md %}#keyed-datastream)). Having a keyed stream will
 allow your windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processed
 independently from the rest. All elements referring to the same key will be sent to the same parallel task.
 
diff --git a/docs/dev/stream/state/checkpointing.zh.md b/docs/dev/stream/state/checkpointing.zh.md
index c940ada..6f22d62 100644
--- a/docs/dev/stream/state/checkpointing.zh.md
+++ b/docs/dev/stream/state/checkpointing.zh.md
@@ -184,7 +184,7 @@ Flink 现在为没有迭代(iterations)的作业提供一致性的处理保
 
 ## 重启策略
 
-Flink 支持不同的重启策略,来控制 job 万一故障时该如何重启。更多信息请阅读 [重启策略]({{ site.baseurl }}/zh/dev/restart_strategies.html)。
+Flink 支持不同的重启策略,来控制 job 万一故障时该如何重启。更多信息请阅读 [重启策略]({{ site.baseurl }}/zh/dev/task_failure_recovery.html)。
 
 {% top %}
 
diff --git a/docs/dev/stream/state/index.zh.md b/docs/dev/stream/state/index.zh.md
index 1b54448..ab8d9ea 100644
--- a/docs/dev/stream/state/index.zh.md
+++ b/docs/dev/stream/state/index.zh.md
@@ -25,23 +25,10 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Stateful functions and operators store data across the processing of individual elements/events, making state a critical building block for
-any type of more elaborate operation.
-
-For example:
-
-  - When an application searches for certain event patterns, the state will store the sequence of events encountered so far.
-  - When aggregating events per minute/hour/day, the state holds the pending aggregates.
-  - When training a machine learning model over a stream of data points, the state holds the current version of the model parameters.
-  - When historic data needs to be managed, the state allows efficient access to events that occurred in the past.
-
-Flink needs to be aware of the state in order to make state fault tolerant using [checkpoints](checkpointing.html) and to allow [savepoints]({{ site.baseurl }}/ops/state/savepoints.html) of streaming applications.
-
-Knowledge about the state also allows for rescaling Flink applications, meaning that Flink takes care of redistributing state across parallel instances.
-
-The [queryable state](queryable_state.html) feature of Flink allows you to access state from outside of Flink during runtime.
-
-When working with state, it might also be useful to read about [Flink's state backends]({{ site.baseurl }}/ops/state/state_backends.html). Flink provides different state backends that specify how and where state is stored. State can be located on Java's heap or off-heap. Depending on your state backend, Flink can also *manage* the state for the application, meaning Flink deals with the memory management (possibly spilling to disk if necessary) to allow applications to hold very large sta [...]
+In this section you will learn about the APIs that Flink provides for writing
+stateful programs. Please take a look at [Stateful Stream
+Processing]({% link concepts/stateful-stream-processing.zh.md %})
+to learn about the concepts behind stateful stream processing.
 
 {% top %}
 
diff --git a/docs/dev/stream/state/queryable_state.zh.md b/docs/dev/stream/state/queryable_state.zh.md
index 1d62efd..6066de0 100644
--- a/docs/dev/stream/state/queryable_state.zh.md
+++ b/docs/dev/stream/state/queryable_state.zh.md
@@ -149,7 +149,7 @@ descriptor.setQueryable("query-name"); // queryable state name
 {% endhighlight %}
 </div>
 
-关于依赖的更多信息, 可以参考如何 [配置 Flink 项目]({{ site.baseurl }}/zh/dev/projectsetup/dependencies.html).
+关于依赖的更多信息, 可以参考如何 [配置 Flink 项目]({{ site.baseurl }}/zh/getting-started/project-setup/dependencies.html).
 
 `QueryableStateClient` 将提交你的请求到内部代理,代理会处理请求并返回结果。客户端的初始化只需要提供一个有效的 `TaskManager` 主机名
 (每个 task manager 上都运行着一个 queryable state 代理),以及代理监听的端口号。关于如何配置代理以及端口号可以参考 [Configuration Section](#configuration).
diff --git a/docs/dev/stream/state/state.zh.md b/docs/dev/stream/state/state.zh.md
index dad89a3..bda020a 100644
--- a/docs/dev/stream/state/state.zh.md
+++ b/docs/dev/stream/state/state.zh.md
@@ -22,51 +22,75 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-本文档主要介绍如何在 Flink 作业中使用状态
+In this section you will learn about the APIs that Flink provides for writing
+stateful programs. Please take a look at [Stateful Stream
+Processing]({% link concepts/stateful-stream-processing.md %})
+to learn about the concepts behind stateful stream processing.
+
 * 目录
 {:toc}
 
-## Keyed State 与 Operator State
-
-Flink 中有两种基本的状态:`Keyed State` 和 `Operator State`。
-
-### Keyed State
-
-*Keyed State* 通常和 key 相关,仅可使用在 `KeyedStream` 的方法和算子中。
+## Keyed DataStream
 
-你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个 key 仅出现在一个分区内。
-逻辑上每个 keyed-state 和唯一元组 <算子并发实例, key> 绑定,由于每个 key 仅"属于"
-算子的一个并发,因此简化为 <算子, key>。
+If you want to use keyed state, you first need to specify a key on a
+`DataStream` that should be used to partition the state (and also the records
+in the stream themselves). You can specify a key using `keyBy(KeySelector)` on
+a `DataStream`. This will yield a `KeyedDataStream`, which then allows
+operations that use keyed state.
 
-Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元;
-Key Group 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group
+A key selector function takes a single record as input and returns the key for
+that record. The key can be of any type and **must** be derived from
+deterministic computations.
 
-### Operator State
+The data model of Flink is not based on key-value pairs. Therefore, you do not
+need to physically pack the data set types into keys and values. Keys are
+"virtual": they are defined as functions over the actual data to guide the
+grouping operator.
 
-对于 *Operator State* (或者 *non-keyed state*) 来说,每个 operator state 和一个并发实例进行绑定。
-[Kafka Connector]({{ site.baseurl }}/zh/dev/connectors/kafka.html) 是 Flink 中使用 operator state 的一个很好的示例。
-每个 Kafka 消费者的并发在 Operator State 中维护一个 topic partition 到 offset 的映射关系。
+The following example shows a key selector function that simply returns the
+field of an object:
 
-Operator State 在 Flink 作业的并发改变后,会重新分发状态,分发的策略和 Keyed State 不一样。
-
-## Raw State 与 Managed State
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// some ordinary POJO
+public class WC {
+  public String word;
+  public int count;
 
-*Keyed State* 和 *Operator State* 分别有两种存在形式:*managed* and *raw*.
+  public String getWord() { return word; }
+}
+DataStream<WC> words = // [...]
+KeyedStream<WC> keyed = words
+  .keyBy(WC::getWord);
+{% endhighlight %}
 
-*Managed State* 由 Flink 运行时控制的数据结构表示,比如内部的 hash table 或者 RocksDB。
-比如 "ValueState", "ListState" 等。Flink runtime 会对这些状态进行编码并写入 checkpoint。
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// some ordinary case class
+case class WC(word: String, count: Int)
+val words: DataStream[WC] = // [...]
+val keyed = words.keyBy( _.word )
+{% endhighlight %}
+</div>
+</div>
 
-*Raw State* 则保存在算子自己的数据结构中。checkpoint 的时候,Flink 并不知晓具体的内容,仅仅写入一串字节序列到 checkpoint。
+### Tuple Keys and Expression Keys
+{:.no_toc}
 
-所有 datastream 的 function 都可以使用 managed state, 但是 raw state 则只能在实现算子的时候使用。
-由于 Flink 可以在修改并发时更好的分发状态数据,并且能够更好的管理内存,因此建议使用 managed state(而不是 raw state)。
+Flink also has two alternative ways of defining keys: tuple keys and expression
+keys. With this you can specify keys using tuple field indices or expressions
+for selecting fields of objects. We don't recommend using these today but you
+can refer to the Javadoc of DataStream to learn about them. Using a KeySelector
+function is strictly superior: with Java lambdas they are easy to use and they
+have potentially less overhead at runtime.
 
-<span class="label label-danger">注意</span> 如果你的 managed state 需要定制化的序列化逻辑,
-为了后续的兼容性请参考 [相应指南](custom_serialization.html),Flink 的默认序列化器不需要用户做特殊的处理。
+{% top %}
 
-## 使用 Managed Keyed State
+## 使用 Keyed State
 
-managed keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 `KeyedStream`
+keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 `KeyedStream`
 上使用,可以通过 `stream.keyBy(...)` 得到 `KeyedStream`.
 
 接下来,我们会介绍不同类型的状态,然后介绍如何使用他们。所有支持的状态类型如下所示:
@@ -101,7 +125,7 @@ managed keyed state 接口提供不同类型状态的访问接口,这些状态
 状态所持有值的类型,并且可能包含用户指定的函数,例如`ReduceFunction`。 根据不同的状态类型,可以创建`ValueStateDescriptor`,`ListStateDescriptor`,
 `ReducingStateDescriptor`,`FoldingStateDescriptor` 或 `MapStateDescriptor`。
 
-状态通过 `RuntimeContext` 进行访问,因此只能在 *rich functions* 中使用。请参阅[这里]({{site.baseurl}}/zh/dev/api_concepts.html#rich-functions)获取相关信息,
+状态通过 `RuntimeContext` 进行访问,因此只能在 *rich functions* 中使用。请参阅[这里]({% link dev/user_defined_functions.zh.md %}#rich-functions)获取相关信息,
 但是我们很快也会看到一个例子。`RichFunction` 中 `RuntimeContext` 提供如下方法:
 
 * `ValueState<T> getState(ValueStateDescriptor<T>)`
@@ -219,7 +243,7 @@ object ExampleCountWindowAverage extends App {
     .print()
   // the printed output will be (1,4) and (1,5)
 
-  env.execute("ExampleManagedState")
+  env.execute("ExampleKeyedState")
 }
 {% endhighlight %}
 </div>
@@ -470,9 +494,44 @@ val counts: DataStream[(String, Int)] = stream
     })
 {% endhighlight %}
 
-## 使用 Managed Operator State
+## Operator State
+
+*Operator State* (or *non-keyed state*) is state that is is bound to one
+parallel operator instance. The [Kafka Connector]({% link
+dev/connectors/kafka.md %}) is a good motivating example for the use of
+Operator State in Flink. Each parallel instance of the Kafka consumer maintains
+a map of topic partitions and offsets as its Operator State.
+
+The Operator State interfaces support redistributing state among parallel
+operator instances when the parallelism is changed. There are different schemes
+for doing this redistribution.
+
+In a typical stateful Flink Application you don't need operators state. It is
+mostly a special type of state that is used in source/sink implementations and
+scenarios where you don't have a key by which state can be partitioned.
+
+## Broadcast State
+
+*Broadcast State* is a special type of *Operator State*.  It was introduced to
+support use cases where records of one stream need to be broadcasted to all
+downstream tasks, where they are used to maintain the same state among all
+subtasks. This state can then be accessed while processing records of a second
+stream. As an example where broadcast state can emerge as a natural fit, one
+can imagine a low-throughput stream containing a set of rules which we want to
+evaluate against all elements coming from another stream. Having the above type
+of use cases in mind, broadcast state differs from the rest of operator states
+in that:
+
+ 1. it has a map format,
+ 2. it is only available to specific operators that have as inputs a
+    *broadcasted* stream and a *non-broadcasted* one, and
+ 3. such an operator can have *multiple broadcast states* with different names.
+
+{% top %}
+
+## 使用 Operator State
 
-用户可以通过实现 `CheckpointedFunction` 或 `ListCheckpointed<T extends Serializable>` 接口来使用 managed operator state。
+用户可以通过实现 `CheckpointedFunction` 接口来使用 operator state。
 
 #### CheckpointedFunction
 
@@ -487,13 +546,14 @@ void initializeState(FunctionInitializationContext context) throws Exception;
 进行 checkpoint 时会调用 `snapshotState()`。 用户自定义函数初始化时会调用 `initializeState()`,初始化包括第一次自定义函数初始化和从之前的 checkpoint 恢复。
 因此 `initializeState()` 不仅是定义不同状态类型初始化的地方,也需要包括状态恢复的逻辑。
 
-当前,managed operator state 以 list 的形式存在。这些状态是一个 *可序列化* 对象的集合 `List`,彼此独立,方便在改变并发后进行状态的重新分派。
+当前 operator state 以 list 的形式存在。这些状态是一个 *可序列化* 对象的集合 `List`,彼此独立,方便在改变并发后进行状态的重新分派。
 换句话说,这些对象是重新分配 non-keyed state 的最细粒度。根据状态的不同访问方式,有如下几种重新分配的模式:
 
   - **Even-split redistribution:** 每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。
     比如说,算子 A 的并发读为 1,包含两个元素 `element1` 和 `element2`,当并发读增加为 2 时,`element1` 会被分到并发 0 上,`element2` 则会被分到并发 1 上。
 
   - **Union redistribution:** 每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,每个算子都将获得所有的状态数据。
+    Do not use this feature if your list may have high cardinality. Checkpoint metadata will store an offset to each list entry, which could lead to RPC framesize or out-of-memory errors.
 
 下面的例子中的 `SinkFunction` 在 `CheckpointedFunction` 中进行数据缓存,然后统一发送到下游,这个例子演示了列表状态数据的 event-split redistribution。 
 
diff --git a/docs/dev/table/common.zh.md b/docs/dev/table/common.zh.md
index c10f2d3..91aab89 100644
--- a/docs/dev/table/common.zh.md
+++ b/docs/dev/table/common.zh.md
@@ -419,7 +419,7 @@ tableEnvironment.sqlUpdate("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
 用户可以指定一个 catalog 和数据库作为 "当前catalog" 和"当前数据库"。有了这些,那么刚刚提到的三元标识符的前两个部分就可以被省略了。如果前两部分的标识符没有指定,
 那么会使用当前的 catalog 和当前数据库。用户也可以通过 Table API 或 SQL 切换当前的 catalog 和当前的数据库。
 
-标识符遵循 SQL 标准,因此使用时需要用反引号(`` ` ``)进行转义。此外,所有 SQL 保留关键字都必须转义。
+标识符遵循 SQL 标准,因此使用时需要用反引号(`` ` ``)进行转义。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1273,7 +1273,7 @@ val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as
 
 #### POJO 类型 (Java 和 Scala)
 
-Flink 支持 POJO 类型作为复合类型。确定 POJO 类型的规则记录在[这里]({{ site.baseurl }}/zh/dev/api_concepts.html#pojos).
+Flink 支持 POJO 类型作为复合类型。确定 POJO 类型的规则记录在[这里]({{ site.baseurl }}{% link dev/types_serialization.md %}#pojos).
 
 在不指定字段名称的情况下将 POJO 类型的 `DataStream` 或 `DataSet` 转换成 `Table` 时,将使用原始 POJO 类型字段的名称。名称映射需要原始名称,并且不能按位置进行。字段可以使用别名(带有 `as` 关键字)来重命名,重新排序和投影。
 
diff --git a/docs/dev/table/connect.zh.md b/docs/dev/table/connect.zh.md
index ba6a5f7..71b490c 100644
--- a/docs/dev/table/connect.zh.md
+++ b/docs/dev/table/connect.zh.md
@@ -59,6 +59,8 @@ The following tables list all available connectors and formats. Their mutual com
 | CSV (for Kafka)            | `flink-csv`                  | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/{{site.version}}/flink-csv-{{site.version}}-sql-jar.jar) |
 | JSON                       | `flink-json`                 | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar) |
 | Apache Avro                | `flink-avro`                 | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar) |
+| Apache ORC                 | `flink-orc`                  | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-orc{{site.scala_version_suffix}}/{{site.version}}/flink-orc{{site.scala_version_suffix}}-{{site.version}}-jar-with-dependencies.jar) |
+| Apache Parquet             | `flink-parquet`              | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-parquet{{site.scala_version_suffix}}/{{site.version}}/flink-parquet{{site.scala_version_suffix}}-{{site.version}}-jar-with-dependencies.jar) |
 
 {% else %}
 
@@ -929,15 +931,15 @@ CREATE TABLE MyUserTable (
   'connector.hosts' = 'http://host_name:9092;http://host_name:9093',  -- required: one or more Elasticsearch hosts to connect to
 
   'connector.index' = 'myusers',       -- required: Elasticsearch index. Flink supports both static index and dynamic index.
-                                       -- If you want to have a static index, this option value should be a plain string, 
+                                       -- If you want to have a static index, this option value should be a plain string,
                                        -- e.g. 'myusers', all the records will be consistently written into "myusers" index.
                                        -- If you want to have a dynamic index, you can use '{field_name}' to reference a field
-                                       -- value in the record to dynamically generate a target index. You can also use 
+                                       -- value in the record to dynamically generate a target index. You can also use
                                        -- '{field_name|date_format_string}' to convert a field value of TIMESTAMP/DATE/TIME type
-                                       -- into the format specified by date_format_string. The date_format_string is 
+                                       -- into the format specified by date_format_string. The date_format_string is
                                        -- compatible with Java's [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/index.html).
-                                       -- For example, if the option value is 'myusers-{log_ts|yyyy-MM-dd}', then a 
-                                       -- record with log_ts field value 2020-03-27 12:25:55 will be written into 
+                                       -- For example, if the option value is 'myusers-{log_ts|yyyy-MM-dd}', then a
+                                       -- record with log_ts field value 2020-03-27 12:25:55 will be written into
                                        -- "myusers-2020-03-27" index.
 
   'connector.document-type' = 'user',  -- required: Elasticsearch document type
@@ -969,11 +971,11 @@ CREATE TABLE MyUserTable (
                                               -- per bulk request
                                               -- (only MB granularity is supported)
   'connector.bulk-flush.interval' = '60000',  -- optional: bulk flush interval (in milliseconds)
-  'connector.bulk-flush.back-off.type' = '...',       -- optional: backoff strategy ("disabled" by default)
+  'connector.bulk-flush.backoff.type' = '...',       -- optional: backoff strategy ("disabled" by default)
                                                       -- valid strategies are "disabled", "constant",
                                                       -- or "exponential"
-  'connector.bulk-flush.back-off.max-retries' = '3',  -- optional: maximum number of retries
-  'connector.bulk-flush.back-off.delay' = '30000',    -- optional: delay between each backoff attempt
+  'connector.bulk-flush.backoff.max-retries' = '3',  -- optional: maximum number of retries
+  'connector.bulk-flush.backoff.delay' = '30000',    -- optional: delay between each backoff attempt
                                                       -- (in milliseconds)
 
   -- optional: connection properties to be used during REST communication to Elasticsearch
@@ -1199,6 +1201,27 @@ CREATE TABLE MyUserTable (
 )
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python%}
+.connect(
+    HBase()
+    .version('1.4.3')                      # required: currently only support '1.4.3'
+    .table_name('hbase_table_name')        # required: HBase table name
+    .zookeeper_quorum('localhost:2181')    # required: HBase Zookeeper quorum configuration
+    .zookeeper_node_parent('/test')        # optional: the root dir in Zookeeper for Hbae cluster.
+                                           # The default value is '/hbase'
+    .write_buffer_flush_max_size('10mb')   # optional: writing option, determines how many size in memory of buffered
+                                           # rows to insert per round trip. This can help performance on writing to JDBC
+                                           # database. The default value is '2mb'
+    .write_buffer_flush_max_rows(1000)     # optional: writing option, determines how many rows to insert per round trip.
+                                           # This can help performance on writing to JDBC database. No default value,
+                                           # i.e. the default flushing is not depends on the number of buffered rows.
+    .write_buffer_flush_interval('2s')     # optional: writing option, sets a flush interval flushing buffered requesting
+                                           # if the interval passes, in milliseconds. Default value is '0s', which means
+                                           # no asynchronous flush thread will he scheduled.
+)
+{% endhighlight%}
+</div>
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 connector:
@@ -1594,9 +1617,8 @@ CREATE TABLE MyUserTable (
   'format.type' = 'json',                   -- required: specify the format type
   'format.fail-on-missing-field' = 'true',  -- optional: flag whether to fail if a field is missing or not,
                                             -- 'false' by default
-  'format.ignore-parse-errors' = 'true'     -- optional: skip fields and rows with parse errors instead of failing;
+  'format.ignore-parse-errors' = 'true',    -- optional: skip fields and rows with parse errors instead of failing;
                                             -- fields are set to null in case of errors
-
   -- deprecated: define the schema explicitly using JSON schema which parses to DECIMAL and TIMESTAMP.
   'format.json-schema' =
     '{
diff --git a/docs/dev/table/index.zh.md b/docs/dev/table/index.zh.md
index 8613b11..561890d 100644
--- a/docs/dev/table/index.zh.md
+++ b/docs/dev/table/index.zh.md
@@ -27,7 +27,7 @@ under the License.
 
 Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。Flink SQL 是基于 [Apache Calcite](https://calcite.apache.org) 来实现的标准 SQL。这两种 API 中的查询对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。
 
-Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。你可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先用 [CEP]({{ site.baseurl }}/zh/dev/libs/cep.html) 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 [Gelly 图算法]({{ site.baseurl }}/zh/dev/libs/gelly) 来处理已经预处理好的数据。
+Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。你可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先用 [CEP]({{ site.baseurl }}/zh/dev/libs/cep.html) 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 [Gelly 图算法]({{ site.baseurl }}/zh/dev/libs/gelly/index.html) 来处理已经预处理好的数据。
 
 **注意:Table API 和 SQL 现在还处于活跃开发阶段,还没有完全实现所有的特性。不是所有的 \[Table API,SQL\] 和 \[流,批\] 的组合都是支持的。**
 
diff --git a/docs/dev/table/sqlClient.zh.md b/docs/dev/table/sqlClient.zh.md
index c234390..d2cacbe 100644
--- a/docs/dev/table/sqlClient.zh.md
+++ b/docs/dev/table/sqlClient.zh.md
@@ -347,7 +347,7 @@ CLI commands > session environment file > defaults environment file
 
 #### 重启策略(Restart Strategies)
 
-重启策略控制 Flink 作业失败时的重启方式。与 Flink 集群的[全局重启策略]({{ site.baseurl }}/zh/dev/restart_strategies.html)相似,更细精度的重启配置可以在环境配置文件中声明。
+重启策略控制 Flink 作业失败时的重启方式。与 Flink 集群的[全局重启策略]({{ site.baseurl }}/zh/dev/task_failure_recovery.html)相似,更细精度的重启配置可以在环境配置文件中声明。
 
 Flink 支持以下策略:
 
@@ -600,7 +600,7 @@ Job ID: 6f922fe5cba87406ff23ae4a7bb79044
 Web interface: http://localhost:8081
 {% endhighlight %}
 
-<span class="label label-danger">注意</span> 提交后,SQL 客户端不追踪正在运行的 Flink 作业状态。提交后可以关闭 CLI 进程,并且不会影响分离的查询。Flink 的[重启策略]({{ site.baseurl }}/zh/dev/restart_strategies.html)负责容错。取消查询可以用 Flink 的 web 接口、命令行或 REST API 。
+<span class="label label-danger">注意</span> 提交后,SQL 客户端不追踪正在运行的 Flink 作业状态。提交后可以关闭 CLI 进程,并且不会影响分离的查询。Flink 的[重启策略]({{ site.baseurl }}/zh/dev/task_failure_recovery.html)负责容错。取消查询可以用 Flink 的 web 接口、命令行或 REST API 。
 
 {% top %}
 
diff --git a/docs/dev/types_serialization.zh.md b/docs/dev/types_serialization.zh.md
index 72c70cc..5be877f 100644
--- a/docs/dev/types_serialization.zh.md
+++ b/docs/dev/types_serialization.zh.md
@@ -30,6 +30,206 @@ Apache Flink 以其独特的方式来处理数据类型以及序列化,这种
 * This will be replaced by the TOC
 {:toc}
 
+## Supported Data Types
+
+Flink places some restrictions on the type of elements that can be in a DataSet or DataStream.
+The reason for this is that the system analyzes the types to determine
+efficient execution strategies.
+
+There are seven different categories of data types:
+
+1. **Java Tuples** and **Scala Case Classes**
+2. **Java POJOs**
+3. **Primitive Types**
+4. **Regular Classes**
+5. **Values**
+6. **Hadoop Writables**
+7. **Special Types**
+
+#### Tuples and Case Classes
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+Tuples are composite types that contain a fixed number of fields with various types.
+The Java API provides classes from `Tuple1` up to `Tuple25`. Every field of a tuple
+can be an arbitrary Flink type including further tuples, resulting in nested tuples. Fields of a
+tuple can be accessed directly using the field's name as `tuple.f4`, or using the generic getter method
+`tuple.getField(int position)`. The field indices start at 0. Note that this stands in contrast
+to the Scala tuples, but it is more consistent with Java's general indexing.
+
+{% highlight java %}
+DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
+    new Tuple2<String, Integer>("hello", 1),
+    new Tuple2<String, Integer>("world", 2));
+
+wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
+    @Override
+    public Integer map(Tuple2<String, Integer> value) throws Exception {
+        return value.f1;
+    }
+});
+
+wordCounts.keyBy(0); // also valid .keyBy("f0")
+
+
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+
+Scala case classes (and Scala tuples which are a special case of case classes), are composite types that contain a fixed number of fields with various types. Tuple fields are addressed by their 1-offset names such as `_1` for the first field. Case class fields are accessed by their name.
+
+{% highlight scala %}
+case class WordCount(word: String, count: Int)
+val input = env.fromElements(
+    WordCount("hello", 1),
+    WordCount("world", 2)) // Case Class Data Set
+
+input.keyBy("word")// key by field expression "word"
+
+val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
+
+input2.keyBy(0, 1) // key by field positions 0 and 1
+{% endhighlight %}
+
+</div>
+</div>
+
+#### POJOs
+
+Java and Scala classes are treated by Flink as a special POJO data type if they fulfill the following requirements:
+
+- The class must be public.
+
+- It must have a public constructor without arguments (default constructor).
+
+- All fields are either public or must be accessible through getter and setter functions. For a field called `foo` the getter and setter methods must be named `getFoo()` and `setFoo()`.
+
+- The type of a field must be supported by a registered serializer.
+
+POJOs are generally represented with a `PojoTypeInfo` and serialized with the `PojoSerializer` (using [Kryo](https://github.com/EsotericSoftware/kryo) as configurable fallback).
+The exception is when the POJOs are actually Avro types (Avro Specific Records) or produced as "Avro Reflect Types".
+In that case the POJO's are represented by an `AvroTypeInfo` and serialized with the `AvroSerializer`.
+You can also register your own custom serializer if required; see [Serialization](https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#serialization-of-pojo-types) for further information.
+
+Flink analyzes the structure of POJO types, i.e., it learns about the fields of a POJO. As a result POJO types are easier to use than general types. Moreover, Flink can process POJOs more efficiently than general types.
+
+The following example shows a simple POJO with two public fields.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class WordWithCount {
+
+    public String word;
+    public int count;
+
+    public WordWithCount() {}
+
+    public WordWithCount(String word, int count) {
+        this.word = word;
+        this.count = count;
+    }
+}
+
+DataStream<WordWithCount> wordCounts = env.fromElements(
+    new WordWithCount("hello", 1),
+    new WordWithCount("world", 2));
+
+wordCounts.keyBy("word"); // key by field expression "word"
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class WordWithCount(var word: String, var count: Int) {
+    def this() {
+      this(null, -1)
+    }
+}
+
+val input = env.fromElements(
+    new WordWithCount("hello", 1),
+    new WordWithCount("world", 2)) // Case Class Data Set
+
+input.keyBy("word")// key by field expression "word"
+
+{% endhighlight %}
+</div>
+</div>
+
+#### Primitive Types
+
+Flink supports all Java and Scala primitive types such as `Integer`, `String`, and `Double`.
+
+#### General Class Types
+
+Flink supports most Java and Scala classes (API and custom).
+Restrictions apply to classes containing fields that cannot be serialized, like file pointers, I/O streams, or other native
+resources. Classes that follow the Java Beans conventions work well in general.
+
+All classes that are not identified as POJO types (see POJO requirements above) are handled by Flink as general class types.
+Flink treats these data types as black boxes and is not able to access their content (e.g., for efficient sorting). General types are de/serialized using the serialization framework [Kryo](https://github.com/EsotericSoftware/kryo).
+
+#### Values
+
+*Value* types describe their serialization and deserialization manually. Instead of going through a
+general purpose serialization framework, they provide custom code for those operations by means of
+implementing the `org.apache.flinktypes.Value` interface with the methods `read` and `write`. Using
+a Value type is reasonable when general purpose serialization would be highly inefficient. An
+example would be a data type that implements a sparse vector of elements as an array. Knowing that
+the array is mostly zero, one can use a special encoding for the non-zero elements, while the
+general purpose serialization would simply write all array elements.
+
+The `org.apache.flinktypes.CopyableValue` interface supports manual internal cloning logic in a
+similar way.
+
+Flink comes with pre-defined Value types that correspond to basic data types. (`ByteValue`,
+`ShortValue`, `IntValue`, `LongValue`, `FloatValue`, `DoubleValue`, `StringValue`, `CharValue`,
+`BooleanValue`). These Value types act as mutable variants of the basic data types: Their value can
+be altered, allowing programmers to reuse objects and take pressure off the garbage collector.
+
+
+#### Hadoop Writables
+
+You can use types that implement the `org.apache.hadoop.Writable` interface. The serialization logic
+defined in the `write()`and `readFields()` methods will be used for serialization.
+
+#### Special Types
+
+You can use special types, including Scala's `Either`, `Option`, and `Try`.
+The Java API has its own custom implementation of `Either`.
+Similarly to Scala's `Either`, it represents a value of two possible types, *Left* or *Right*.
+`Either` can be useful for error handling or operators that need to output two different types of records.
+
+#### Type Erasure & Type Inference
+
+*Note: This Section is only relevant for Java.*
+
+The Java compiler throws away much of the generic type information after compilation. This is
+known as *type erasure* in Java. It means that at runtime, an instance of an object does not know
+its generic type any more. For example, instances of `DataStream<String>` and `DataStream<Long>` look the
+same to the JVM.
+
+Flink requires type information at the time when it prepares the program for execution (when the
+main method of the program is called). The Flink Java API tries to reconstruct the type information
+that was thrown away in various ways and store it explicitly in the data sets and operators. You can
+retrieve the type via `DataStream.getType()`. The method returns an instance of `TypeInformation`,
+which is Flink's internal way of representing types.
+
+The type inference has its limits and needs the "cooperation" of the programmer in some cases.
+Examples for that are methods that create data sets from collections, such as
+`ExecutionEnvironment.fromCollection(),` where you can pass an argument that describes the type. But
+also generic functions like `MapFunction<I, O>` may need extra type information.
+
+The
+{% gh_link /flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java "ResultTypeQueryable" %}
+interface can be implemented by input formats and functions to tell the API
+explicitly about their return type. The *input types* that the functions are invoked with can
+usually be inferred by the result types of the previous operations.
+
+{% top %}
 
 ## Flink 中的类型处理
 
diff --git a/docs/dev/user_defined_functions.zh.md b/docs/dev/user_defined_functions.zh.md
index bdfbe54..a1b52a0 100644
--- a/docs/dev/user_defined_functions.zh.md
+++ b/docs/dev/user_defined_functions.zh.md
@@ -1,5 +1,5 @@
 ---
-title: 'User-Defined Functions'
+title: '用户自定义函数'
 nav-id: user_defined_function
 nav-parent_id: streaming
 nav-pos: 4
diff --git a/docs/getting-started/index.zh.md b/docs/getting-started/index.zh.md
index cb81bbc..e27119c 100644
--- a/docs/getting-started/index.zh.md
+++ b/docs/getting-started/index.zh.md
@@ -27,29 +27,54 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-上手使用 Apache Flink 有很多方式,哪一个最适合你取决于你的目标和以前的经验。
+There are many ways to get started with Apache Flink. Which one is the best for
+you depends on your goals and prior experience:
 
-### 初识 Flink
+* take a look at the **Docker Playgrounds** if you want to see what Flink can do, via a hands-on,
+  docker-based introduction to specific Flink concepts
+* explore one of the **Code Walkthroughs** if you want a quick, end-to-end
+  introduction to one of Flink's APIs
+* work your way through the **Hands-on Training** for a comprehensive,
+  step-by-step introduction to Flink
+* use **Project Setup** if you already know the basics of Flink and want a
+  project template for Java or Scala, or need help setting up the dependencies
 
-通过 **Docker Playgrounds** 提供沙箱的Flink环境,你只需花几分钟做些简单设置,就可以开始探索和使用 Flink。
+### Taking a first look at Flink
 
-* [**Operations Playground**](./docker-playgrounds/flink-operations-playground.html) 向你展示如何使用 Flink 编写数据流应用程序。你可以体验 Flink 如何从故障中恢复应用程序,升级、提高并行度、降低并行度和监控运行的状态指标等特性。
+The **Docker Playgrounds** provide sandboxed Flink environments that are set up in just a few minutes and which allow you to explore and play with Flink.
+
+* The [**Operations Playground**]({% link getting-started/docker-playgrounds/flink-operations-playground.md %}) shows you how to operate streaming applications with Flink. You can experience how Flink recovers application from failures, upgrade and scale streaming applications up and down, and query application metrics.
 
 <!--
 * The [**Streaming SQL Playground**]() provides a Flink cluster with a SQL CLI client, tables which are fed by streaming data sources, and instructions for how to run continuous streaming SQL queries on these tables. This is the perfect environment for your first steps with streaming SQL.
 -->
 
-### Flink API 入门
+### First steps with one of Flink's APIs
 
-**代码练习**是入门的最佳方式,通过代码练习可以逐步深入理解 Flink API。
-下边的例子演示了如何使用 Flink 的代码框架开始构建一个基础的 Flink 项目,和如何逐步将其扩展为一个简单的应用程序。
+The **Code Walkthroughs** are a great way to get started quickly with a step-by-step introduction to
+one of Flink's APIs. Each walkthrough provides instructions for bootstrapping a small skeleton
+project, and then shows how to extend it to a simple application.
 
-<!--
-* The [**DataStream API**]() code walkthrough shows how to implement a simple DataStream application and how to extend it to be stateful and use timers.
--->
-* [**DataStream API 示例**](./walkthroughs/datastream_api.html) 展示了如何编写一个基本的 DataStream 应用程序。 DataStream API 是 Flink 的主要抽象,用于通过 Java 或 Scala 实现具有复杂时间语义的有状态数据流处理的应用程序。
+* The [**DataStream API**  code walkthrough]({% link getting-started/walkthroughs/datastream_api.md %}) shows how
+  to implement a simple DataStream application and how to extend it to be stateful and use timers.
+  The DataStream API is Flink's main abstraction for implementing stateful streaming applications
+  with sophisticated time semantics in Java or Scala.
+
+* Flink's **Table API** is a relational API used for writing SQL-like queries in Java, Scala, or
+  Python, which are then automatically optimized, and can be executed on batch or streaming data
+  with identical syntax and semantics. The [Table API code walkthrough for Java and Scala]({% link
+  getting-started/walkthroughs/table_api.md %}) shows how to implement a simple Table API query on a
+  batch source and how to evolve it into a continuous query on a streaming source. There's also a
+  similar [code walkthrough for the Python Table API]({% link
+  getting-started/walkthroughs/python_table_api.md %}).
+
+### Taking a Deep Dive with the Hands-on Training
 
-* [**Table API 示例**](./walkthroughs/table_api.html) 演示了如何在批处中使用简单的 Table API 进行查询,以及如何将其扩展为流处理中的查询。Table API 是 Flink 的语言嵌入式关系 API,用于在 Java 或 Scala 中编写类 SQL 的查询,这些查询会自动进行优化。Table API 查询可以使用一致的语法和语义同时在批处理或流数据上运行。
+The [**Hands-on Training**]({% link training/index.md %}) is a self-paced training course with
+a set of lessons and hands-on exercises. This step-by-step introduction to Flink focuses
+on learning how to use the DataStream API to meet the needs of common, real-world use cases,
+and provides a complete introduction to the fundamental concepts: parallel dataflows,
+stateful stream processing, event time and watermarking, and fault tolerance via state snapshots.
 
 <!--
 ### Starting a new Flink application
diff --git a/docs/getting-started/walkthroughs/python_table_api.zh.md b/docs/getting-started/walkthroughs/python_table_api.zh.md
index 34a0170..48ab506 100644
--- a/docs/getting-started/walkthroughs/python_table_api.zh.md
+++ b/docs/getting-started/walkthroughs/python_table_api.zh.md
@@ -72,6 +72,33 @@ t_env.connect(FileSystem().path('/tmp/output')) \
     .create_temporary_table('mySink')
 {% endhighlight %}
 
+You can also use the TableEnvironment.sql_update() method to register a source/sink table defined in DDL:
+{% highlight python %}
+my_source_ddl = """
+    create table mySource (
+        word VARCHAR
+    ) with (
+        'connector.type' = 'filesystem',
+        'format.type' = 'csv',
+        'connector.path' = '/tmp/input'
+    )
+"""
+
+my_sink_ddl = """
+    create table mySink (
+        word VARCHAR,
+        `count` BIGINT
+    ) with (
+        'connector.type' = 'filesystem',
+        'format.type' = 'csv',
+        'connector.path' = '/tmp/output'
+    )
+"""
+
+t_env.sql_update(my_source_ddl)
+t_env.sql_update(my_sink_ddl)
+{% endhighlight %}
+
 上面的程序展示了如何创建及在`ExecutionEnvironment`中注册表名分别为`mySource`和`mySink`的表。
 其中,源表`mySource`有一列: word,该表代表了从输入文件`/tmp/input`中读取的单词;
 结果表`mySink`有两列: word和count,该表会将计算结果输出到文件`/tmp/output`中,字段之间使用`\t`作为分隔符。
diff --git a/docs/index.zh.md b/docs/index.zh.md
index 76e6d45..f2b1bc8 100644
--- a/docs/index.zh.md
+++ b/docs/index.zh.md
@@ -47,7 +47,6 @@ Apache Flink 是一个分布式流批一体化的开源平台。Flink 的核心
 
 API 参考列举并解释了 Flink API 的所有功能。
 
-* [基本 API 概念](dev/api_concepts.html)
 * [DataStream API](dev/datastream_api.html)
 * [DataSet API](dev/batch/index.html)
 * [Table API &amp; SQL](dev/table/index.html)
diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index 29c6e70..8a59edf 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -29,7 +29,7 @@ Flink exposes a metric system that allows gathering and exposing metrics to exte
 
 ## Registering metrics
 
-You can access the metric system from any user function that extends [RichFunction]({{ site.baseurl }}/zh/dev/user_defined_functions.html#rich-functions) by calling `getRuntimeContext().getMetricGroup()`.
+You can access the metric system from any user function that extends [RichFunction]({% link dev/user_defined_functions.zh.md %}#rich-functions) by calling `getRuntimeContext().getMetricGroup()`.
 This method returns a `MetricGroup` object on which you can create and register new metrics.
 
 ### Metric types
@@ -737,7 +737,7 @@ Please see the [Prometheus documentation](https://prometheus.io/docs/practices/p
 
 ### StatsD (org.apache.flink.metrics.statsd.StatsDReporter)
 
-In order to use this reporter you must copy `/opt/flink-metrics-statsd-{{site.version}}.jar` into the `/plugins/statsd` 
+In order to use this reporter you must copy `/opt/flink-metrics-statsd-{{site.version}}.jar` into the `/plugins/statsd`
 folder of your Flink distribution.
 
 Parameters:
@@ -769,6 +769,7 @@ Parameters:
 - `tags` - (optional) the global tags that will be applied to metrics when sending to Datadog. Tags should be separated by comma only
 - `proxyHost` - (optional) The proxy host to use when sending to Datadog.
 - `proxyPort` - (optional) The proxy port to use when sending to Datadog, defaults to 8080.
+- `dataCenter` - (optional) The data center (`EU`/`US`) to connect to, defaults to `US`.
 
 Example configuration:
 
@@ -779,6 +780,7 @@ metrics.reporter.dghttp.apikey: xxx
 metrics.reporter.dghttp.tags: myflinkapp,prod
 metrics.reporter.dghttp.proxyHost: my.web.proxy.com
 metrics.reporter.dghttp.proxyPort: 8080
+metrics.reporter.dhhttp.dataCenter: US
 
 {% endhighlight %}