You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/19 07:46:20 UTC

[flink] branch master updated (d0fc028 -> d40abbf)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d0fc028  [FLINK-17790][kafka] Fix JDK 11 compile error
     new 4dc3bda  [FLINK-17353][docs] Fix Broken links in Flink docs master
     new d40abbf  [docs-sync] Synchronize the latest documentation changes into Chinese documents

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...nk-architecture.md => flink-architecture.zh.md} |   2 +-
 docs/concepts/glossary.zh.md                       |  33 ++-
 docs/dev/batch/hadoop_compatibility.zh.md          |   2 +-
 docs/dev/batch/index.zh.md                         | 259 +++++++++++++++++++--
 docs/dev/connectors/cassandra.zh.md                |   2 +-
 docs/dev/connectors/elasticsearch.md               |   2 +-
 docs/dev/connectors/elasticsearch.zh.md            |   2 +-
 docs/dev/connectors/filesystem_sink.zh.md          |   2 +-
 docs/dev/connectors/kafka.zh.md                    |   2 +-
 docs/dev/connectors/kinesis.zh.md                  |  23 +-
 docs/dev/connectors/nifi.zh.md                     |   2 +-
 docs/dev/connectors/pubsub.zh.md                   |   2 +-
 docs/dev/connectors/rabbitmq.zh.md                 |   2 +-
 docs/dev/connectors/streamfile_sink.zh.md          | 201 +++++++++++++++-
 docs/dev/connectors/twitter.zh.md                  |   2 +-
 docs/dev/datastream_api.zh.md                      | 218 ++++++++++++++++-
 docs/dev/java_lambdas.zh.md                        |   4 +-
 docs/dev/libs/cep.zh.md                            |   4 +-
 docs/dev/parallel.zh.md                            |   2 +-
 docs/dev/stream/operators/index.zh.md              |   4 +-
 docs/dev/stream/operators/windows.zh.md            |   2 +-
 docs/dev/stream/state/checkpointing.md             |   4 +-
 docs/dev/stream/state/checkpointing.zh.md          |   6 +-
 docs/dev/stream/state/index.zh.md                  |  21 +-
 docs/dev/stream/state/queryable_state.zh.md        |   2 +-
 docs/dev/stream/state/state.zh.md                  | 130 ++++++++---
 docs/dev/table/common.zh.md                        |   6 +-
 docs/dev/table/connect.zh.md                       |  42 +++-
 docs/dev/table/index.zh.md                         |   2 +-
 docs/dev/table/sqlClient.zh.md                     |   4 +-
 docs/dev/types_serialization.zh.md                 | 200 ++++++++++++++++
 ...d_functions.md => user_defined_functions.zh.md} |   2 +-
 .../flink-operations-playground.md                 |   4 +-
 .../flink-operations-playground.zh.md              |   4 +-
 docs/getting-started/index.zh.md                   |  49 +++-
 .../walkthroughs/python_table_api.zh.md            |  29 ++-
 docs/index.md                                      |   7 +-
 docs/index.zh.md                                   |   6 +-
 docs/internals/task_lifecycle.md                   |   2 +-
 docs/internals/task_lifecycle.zh.md                |   2 +-
 docs/monitoring/metrics.zh.md                      |   6 +-
 docs/ops/config.md                                 |   2 +-
 docs/ops/config.zh.md                              |   2 +-
 docs/ops/memory/mem_migration.zh.md                |   6 +-
 docs/ops/memory/mem_trouble.zh.md                  |   4 +-
 docs/ops/memory/mem_tuning.zh.md                   |   3 +-
 docs/ops/python_shell.zh.md                        |   2 +-
 docs/ops/state/savepoints.md                       |   2 +-
 docs/ops/state/savepoints.zh.md                    |   2 +-
 49 files changed, 1159 insertions(+), 164 deletions(-)
 copy docs/concepts/{flink-architecture.md => flink-architecture.zh.md} (99%)
 copy docs/dev/{user_defined_functions.md => user_defined_functions.zh.md} (99%)


[flink] 01/02: [FLINK-17353][docs] Fix Broken links in Flink docs master

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4dc3bdaa460f35ea6e0fa31ce9068a7dba54316d
Author: yangyichao-mango <10...@qq.com>
AuthorDate: Sun May 17 15:04:28 2020 +0800

    [FLINK-17353][docs] Fix Broken links in Flink docs master
    
    This closes #12196
---
 docs/concepts/flink-architecture.zh.md             | 132 +++++++++++
 docs/dev/connectors/elasticsearch.md               |   2 +-
 docs/dev/connectors/elasticsearch.zh.md            |   2 +-
 docs/dev/stream/state/checkpointing.md             |   4 +-
 docs/dev/stream/state/checkpointing.zh.md          |   4 +-
 docs/dev/table/common.zh.md                        |   2 +-
 docs/dev/user_defined_functions.zh.md              | 241 +++++++++++++++++++++
 .../flink-operations-playground.md                 |   4 +-
 .../flink-operations-playground.zh.md              |   4 +-
 .../walkthroughs/python_table_api.zh.md            |   2 +-
 docs/index.md                                      |   7 +-
 docs/index.zh.md                                   |   5 +-
 docs/internals/task_lifecycle.md                   |   2 +-
 docs/internals/task_lifecycle.zh.md                |   2 +-
 docs/monitoring/metrics.zh.md                      |   2 +-
 docs/ops/config.md                                 |   2 +-
 docs/ops/config.zh.md                              |   2 +-
 docs/ops/memory/mem_migration.zh.md                |   6 +-
 docs/ops/memory/mem_trouble.zh.md                  |   4 +-
 docs/ops/memory/mem_tuning.zh.md                   |   3 +-
 docs/ops/python_shell.zh.md                        |   2 +-
 docs/ops/state/savepoints.md                       |   2 +-
 docs/ops/state/savepoints.zh.md                    |   2 +-
 23 files changed, 406 insertions(+), 32 deletions(-)

diff --git a/docs/concepts/flink-architecture.zh.md b/docs/concepts/flink-architecture.zh.md
new file mode 100644
index 0000000..8414943
--- /dev/null
+++ b/docs/concepts/flink-architecture.zh.md
@@ -0,0 +1,132 @@
+---
+title: Flink Architecture
+nav-id: flink-architecture
+nav-pos: 4
+nav-title: Flink Architecture
+nav-parent_id: concepts
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Flink Applications and Flink Sessions
+
+`TODO: expand this section`
+
+{% top %}
+
+## Anatomy of a Flink Cluster
+
+`TODO: expand this section, especially about components of the Flink Master and
+container environments`
+
+The Flink runtime consists of two types of processes:
+
+  - The *Flink Master* coordinates the distributed execution. It schedules
+    tasks, coordinates checkpoints, coordinates recovery on failures, etc.
+
+    There is always at least one *Flink Master*. A high-availability setup
+    might have multiple *Flink Masters*, one of which is always the
+    *leader*, and the others are *standby*.
+
+  - The *TaskManagers* (also called *workers*) execute the *tasks* (or more
+    specifically, the subtasks) of a dataflow, and buffer and exchange the data
+    *streams*.
+
+    There must always be at least one TaskManager.
+
+The Flink Master and TaskManagers can be started in various ways: directly on
+the machines as a [standalone cluster]({% link
+ops/deployment/cluster_setup.md %}), in containers, or managed by resource
+frameworks like [YARN]({% link ops/deployment/yarn_setup.md
+%}) or [Mesos]({% link ops/deployment/mesos.md %}).
+TaskManagers connect to Flink Masters, announcing themselves as available, and
+are assigned work.
+
+The *client* is not part of the runtime and program execution, but is used to
+prepare and send a dataflow to the Flink Master.  After that, the client can
+disconnect, or stay connected to receive progress reports. The client runs
+either as part of the Java/Scala program that triggers the execution, or in the
+command line process `./bin/flink run ...`.
+
+<img src="{{ site.baseurl }}/fig/processes.svg" alt="The processes involved in executing a Flink dataflow" class="offset" width="80%" />
+
+{% top %}
+
+## Tasks and Operator Chains
+
+For distributed execution, Flink *chains* operator subtasks together into
+*tasks*. Each task is executed by one thread.  Chaining operators together into
+tasks is a useful optimization: it reduces the overhead of thread-to-thread
+handover and buffering, and increases overall throughput while decreasing
+latency.  The chaining behavior can be configured; see the [chaining docs]({%
+link dev/stream/operators/index.md %}#task-chaining-and-resource-groups) for
+details.
+
+The sample dataflow in the figure below is executed with five subtasks, and
+hence with five parallel threads.
+
+<img src="{{ site.baseurl }}/fig/tasks_chains.svg" alt="Operator chaining into Tasks" class="offset" width="80%" />
+
+{% top %}
+
+## Task Slots and Resources
+
+Each worker (TaskManager) is a *JVM process*, and may execute one or more
+subtasks in separate threads.  To control how many tasks a worker accepts, a
+worker has so called **task slots** (at least one).
+
+Each *task slot* represents a fixed subset of resources of the TaskManager. A
+TaskManager with three slots, for example, will dedicate 1/3 of its managed
+memory to each slot. Slotting the resources means that a subtask will not
+compete with subtasks from other jobs for managed memory, but instead has a
+certain amount of reserved managed memory. Note that no CPU isolation happens
+here; currently slots only separate the managed memory of tasks.
+
+By adjusting the number of task slots, users can define how subtasks are
+isolated from each other.  Having one slot per TaskManager means that each task
+group runs in a separate JVM (which can be started in a separate container, for
+example). Having multiple slots means more subtasks share the same JVM. Tasks
+in the same JVM share TCP connections (via multiplexing) and heartbeat
+messages. They may also share data sets and data structures, thus reducing the
+per-task overhead.
+
+<img src="{{ site.baseurl }}/fig/tasks_slots.svg" alt="A TaskManager with Task Slots and Tasks" class="offset" width="80%" />
+
+By default, Flink allows subtasks to share slots even if they are subtasks of
+different tasks, so long as they are from the same job. The result is that one
+slot may hold an entire pipeline of the job. Allowing this *slot sharing* has
+two main benefits:
+
+  - A Flink cluster needs exactly as many task slots as the highest parallelism
+    used in the job.  No need to calculate how many tasks (with varying
+    parallelism) a program contains in total.
+
+  - It is easier to get better resource utilization. Without slot sharing, the
+    non-intensive *source/map()* subtasks would block as many resources as the
+    resource intensive *window* subtasks.  With slot sharing, increasing the
+    base parallelism in our example from two to six yields full utilization of
+    the slotted resources, while making sure that the heavy subtasks are fairly
+    distributed among the TaskManagers.
+
+<img src="{{ site.baseurl }}/fig/slot_sharing.svg" alt="TaskManagers with shared Task Slots" class="offset" width="80%" />
+
+{% top %}
diff --git a/docs/dev/connectors/elasticsearch.md b/docs/dev/connectors/elasticsearch.md
index 5bc1404..4b8b2da 100644
--- a/docs/dev/connectors/elasticsearch.md
+++ b/docs/dev/connectors/elasticsearch.md
@@ -317,7 +317,7 @@ time of checkpoints. This effectively assures that all requests before the
 checkpoint was triggered have been successfully acknowledged by Elasticsearch, before
 proceeding to process more records sent to the sink.
 
-More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{site.baseurl}}/internals/stream_checkpointing.html).
+More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{site.baseurl}}/training/fault_tolerance.html).
 
 To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
 
diff --git a/docs/dev/connectors/elasticsearch.zh.md b/docs/dev/connectors/elasticsearch.zh.md
index 5921954..640f4d6 100644
--- a/docs/dev/connectors/elasticsearch.zh.md
+++ b/docs/dev/connectors/elasticsearch.zh.md
@@ -317,7 +317,7 @@ time of checkpoints. This effectively assures that all requests before the
 checkpoint was triggered have been successfully acknowledged by Elasticsearch, before
 proceeding to process more records sent to the sink.
 
-More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{site.baseurl}}/internals/stream_checkpointing.html).
+More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{site.baseurl}}/zh/training/fault_tolerance.html).
 
 To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
 
diff --git a/docs/dev/stream/state/checkpointing.md b/docs/dev/stream/state/checkpointing.md
index c193fc3..f5fef89 100644
--- a/docs/dev/stream/state/checkpointing.md
+++ b/docs/dev/stream/state/checkpointing.md
@@ -32,7 +32,7 @@ any type of more elaborate operation.
 In order to make state fault tolerant, Flink needs to **checkpoint** the state. Checkpoints allow Flink to recover state and positions
 in the streams to give the application the same semantics as a failure-free execution.
 
-The [documentation on streaming fault tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html) describes in detail the technique behind Flink's streaming fault tolerance mechanism.
+The [documentation on streaming fault tolerance]({{ site.baseurl }}/training/fault_tolerance.html) describes in detail the technique behind Flink's streaming fault tolerance mechanism.
 
 
 ## Prerequisites
@@ -173,7 +173,7 @@ Some more parameters and/or defaults may be set via `conf/flink-conf.yaml` (see
 
 ## Selecting a State Backend
 
-Flink's [checkpointing mechanism]({{ site.baseurl }}/internals/stream_checkpointing.html) stores consistent snapshots
+Flink's [checkpointing mechanism]({{ site.baseurl }}/training/fault_tolerance.html) stores consistent snapshots
 of all the state in timers and stateful operators, including connectors, windows, and any [user-defined state](state.html).
 Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured
 **State Backend**. 
diff --git a/docs/dev/stream/state/checkpointing.zh.md b/docs/dev/stream/state/checkpointing.zh.md
index d4aa989..c940ada 100644
--- a/docs/dev/stream/state/checkpointing.zh.md
+++ b/docs/dev/stream/state/checkpointing.zh.md
@@ -29,7 +29,7 @@ Flink 中的每个方法或算子都能够是**有状态的**(阅读 [working
 状态化的方法在处理单个 元素/事件 的时候存储数据,让状态成为使各个类型的算子更加精细的重要部分。
 为了让状态容错,Flink 需要为状态添加 **checkpoint(检查点)**。Checkpoint 使得 Flink 能够恢复状态和在流中的位置,从而向应用提供和无故障执行时一样的语义。
 
-[容错文档]({{ site.baseurl }}/zh/internals/stream_checkpointing.html) 中介绍了 Flink 流计算容错机制内部的技术原理。
+[容错文档]({{ site.baseurl }}/zh/training/fault_tolerance.html) 中介绍了 Flink 流计算容错机制内部的技术原理。
 
 
 ## 前提条件
@@ -165,7 +165,7 @@ env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(True)
 
 ## 选择一个 State Backend
 
-Flink 的 [checkpointing 机制]({{ site.baseurl }}/zh/internals/stream_checkpointing.html) 会将 timer 以及 stateful 的 operator 进行快照,然后存储下来,
+Flink 的 [checkpointing 机制]({{ site.baseurl }}/zh/training/fault_tolerance.html) 会将 timer 以及 stateful 的 operator 进行快照,然后存储下来,
 包括连接器(connectors),窗口(windows)以及任何用户[自定义的状态](state.html)。
 Checkpoint 存储在哪里取决于所配置的 **State Backend**(比如 JobManager memory、 file system、 database)。
 
diff --git a/docs/dev/table/common.zh.md b/docs/dev/table/common.zh.md
index bd36b1e..c10f2d3 100644
--- a/docs/dev/table/common.zh.md
+++ b/docs/dev/table/common.zh.md
@@ -561,7 +561,7 @@ revenue = orders \
 
 Flink SQL 是基于实现了SQL标准的 [Apache Calcite](https://calcite.apache.org) 的。SQL 查询由常规字符串指定。
 
-文档 [SQL]({{ site.baseurl }}/zh/dev/table/sql.html) 描述了Flink对流处理和批处理表的SQL支持。
+文档 [SQL]({{ site.baseurl }}/zh/dev/table/sql/index.html) 描述了Flink对流处理和批处理表的SQL支持。
 
 下面的示例演示了如何指定查询并将结果作为 `Table` 对象返回。
 
diff --git a/docs/dev/user_defined_functions.zh.md b/docs/dev/user_defined_functions.zh.md
new file mode 100644
index 0000000..bdfbe54
--- /dev/null
+++ b/docs/dev/user_defined_functions.zh.md
@@ -0,0 +1,241 @@
+---
+title: 'User-Defined Functions'
+nav-id: user_defined_function
+nav-parent_id: streaming
+nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Most operations require a user-defined function. This section lists different
+ways of how they can be specified. We also cover `Accumulators`, which can be
+used to gain insights into your Flink application.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+## Implementing an interface
+
+The most basic way is to implement one of the provided interfaces:
+
+{% highlight java %}
+class MyMapFunction implements MapFunction<String, Integer> {
+  public Integer map(String value) { return Integer.parseInt(value); }
+};
+data.map(new MyMapFunction());
+{% endhighlight %}
+
+## Anonymous classes
+
+You can pass a function as an anonymous class:
+{% highlight java %}
+data.map(new MapFunction<String, Integer> () {
+  public Integer map(String value) { return Integer.parseInt(value); }
+});
+{% endhighlight %}
+
+## Java 8 Lambdas
+
+Flink also supports Java 8 Lambdas in the Java API.
+
+{% highlight java %}
+data.filter(s -> s.startsWith("http://"));
+{% endhighlight %}
+
+{% highlight java %}
+data.reduce((i1,i2) -> i1 + i2);
+{% endhighlight %}
+
+## Rich functions
+
+All transformations that require a user-defined function can
+instead take as argument a *rich* function. For example, instead of
+
+{% highlight java %}
+class MyMapFunction implements MapFunction<String, Integer> {
+  public Integer map(String value) { return Integer.parseInt(value); }
+};
+{% endhighlight %}
+
+you can write
+
+{% highlight java %}
+class MyMapFunction extends RichMapFunction<String, Integer> {
+  public Integer map(String value) { return Integer.parseInt(value); }
+};
+{% endhighlight %}
+
+and pass the function as usual to a `map` transformation:
+
+{% highlight java %}
+data.map(new MyMapFunction());
+{% endhighlight %}
+
+Rich functions can also be defined as an anonymous class:
+{% highlight java %}
+data.map (new RichMapFunction<String, Integer>() {
+  public Integer map(String value) { return Integer.parseInt(value); }
+});
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+
+
+## Lambda Functions
+
+As already seen in previous examples all operations accept lambda functions for describing
+the operation:
+{% highlight scala %}
+val data: DataSet[String] = // [...]
+data.filter { _.startsWith("http://") }
+{% endhighlight %}
+
+{% highlight scala %}
+val data: DataSet[Int] = // [...]
+data.reduce { (i1,i2) => i1 + i2 }
+// or
+data.reduce { _ + _ }
+{% endhighlight %}
+
+## Rich functions
+
+All transformations that take as argument a lambda function can
+instead take as argument a *rich* function. For example, instead of
+
+{% highlight scala %}
+data.map { x => x.toInt }
+{% endhighlight %}
+
+you can write
+
+{% highlight scala %}
+class MyMapFunction extends RichMapFunction[String, Int] {
+  def map(in: String):Int = { in.toInt }
+};
+{% endhighlight %}
+
+and pass the function to a `map` transformation:
+
+{% highlight scala %}
+data.map(new MyMapFunction())
+{% endhighlight %}
+
+Rich functions can also be defined as an anonymous class:
+{% highlight scala %}
+data.map (new RichMapFunction[String, Int] {
+  def map(in: String):Int = { in.toInt }
+})
+{% endhighlight %}
+</div>
+
+</div>
+
+Rich functions provide, in addition to the user-defined function (map,
+reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and
+`setRuntimeContext`. These are useful for parameterizing the function
+(see [Passing Parameters to Functions]({{ site.baseurl }}/dev/batch/index.html#passing-parameters-to-functions)),
+creating and finalizing local state, accessing broadcast variables (see
+[Broadcast Variables]({{ site.baseurl }}/dev/batch/index.html#broadcast-variables)), and for accessing runtime
+information such as accumulators and counters (see
+[Accumulators and Counters](#accumulators--counters)), and information
+on iterations (see [Iterations]({{ site.baseurl }}/dev/batch/iterations.html)).
+
+{% top %}
+
+## Accumulators & Counters
+
+Accumulators are simple constructs with an **add operation** and a **final accumulated result**,
+which is available after the job ended.
+
+The most straightforward accumulator is a **counter**: You can increment it using the
+```Accumulator.add(V value)``` method. At the end of the job Flink will sum up (merge) all partial
+results and send the result to the client. Accumulators are useful during debugging or if you
+quickly want to find out more about your data.
+
+Flink currently has the following **built-in accumulators**. Each of them implements the
+{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "Accumulator" %}
+interface.
+
+- {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/IntCounter.java "__IntCounter__" %},
+  {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/LongCounter.java "__LongCounter__" %}
+  and {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/DoubleCounter.java "__DoubleCounter__" %}:
+  See below for an example using a counter.
+- {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java "__Histogram__" %}:
+  A histogram implementation for a discrete number of bins. Internally it is just a map from Integer
+  to Integer. You can use this to compute distributions of values, e.g. the distribution of
+  words-per-line for a word count program.
+
+__How to use accumulators:__
+
+First you have to create an accumulator object (here a counter) in the user-defined transformation
+function where you want to use it.
+
+{% highlight java %}
+private IntCounter numLines = new IntCounter();
+{% endhighlight %}
+
+Second you have to register the accumulator object, typically in the ```open()``` method of the
+*rich* function. Here you also define the name.
+
+{% highlight java %}
+getRuntimeContext().addAccumulator("num-lines", this.numLines);
+{% endhighlight %}
+
+You can now use the accumulator anywhere in the operator function, including in the ```open()``` and
+```close()``` methods.
+
+{% highlight java %}
+this.numLines.add(1);
+{% endhighlight %}
+
+The overall result will be stored in the ```JobExecutionResult``` object which is
+returned from the `execute()` method of the execution environment
+(currently this only works if the execution waits for the
+completion of the job).
+
+{% highlight java %}
+myJobExecutionResult.getAccumulatorResult("num-lines")
+{% endhighlight %}
+
+All accumulators share a single namespace per job. Thus you can use the same accumulator in
+different operator functions of your job. Flink will internally merge all accumulators with the same
+name.
+
+A note on accumulators and iterations: Currently the result of accumulators is only available after
+the overall job has ended. We plan to also make the result of the previous iteration available in the
+next iteration. You can use
+{% gh_link /flink-java/src/main/java/org/apache/flink/api/java/operators/IterativeDataSet.java#L98 "Aggregators" %}
+to compute per-iteration statistics and base the termination of iterations on such statistics.
+
+__Custom accumulators:__
+
+To implement your own accumulator you simply have to write your implementation of the Accumulator
+interface. Feel free to create a pull request if you think your custom accumulator should be shipped
+with Flink.
+
+You have the choice to implement either
+{% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java "Accumulator" %}
+or {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/accumulators/SimpleAccumulator.java "SimpleAccumulator" %}.
+
+```Accumulator<V,R>``` is most flexible: It defines a type ```V``` for the value to add, and a
+result type ```R``` for the final result. E.g. for a histogram, ```V``` is a number and ```R``` is
+ a histogram. ```SimpleAccumulator``` is for the cases where both types are the same, e.g. for counters.
+
+{% top %}
diff --git a/docs/getting-started/docker-playgrounds/flink-operations-playground.md b/docs/getting-started/docker-playgrounds/flink-operations-playground.md
index 1e2a569..6d9f409 100644
--- a/docs/getting-started/docker-playgrounds/flink-operations-playground.md
+++ b/docs/getting-started/docker-playgrounds/flink-operations-playground.md
@@ -316,7 +316,7 @@ docker-compose up -d taskmanager
 
 When the Master is notified about the new TaskManager, it schedules the tasks of the 
 recovering Job to the newly available TaskSlots. Upon restart, the tasks recover their state from
-the last successful [checkpoint]({{ site.baseurl }}/internals/stream_checkpointing.html) that was taken
+the last successful [checkpoint]({{ site.baseurl }}/training/fault_tolerance.html) that was taken
 before the failure and switch to the `RUNNING` state.
 
 The Job will quickly process the full backlog of input events (accumulated during the outage) 
@@ -806,7 +806,7 @@ You might have noticed that the *Click Event Count* application was always start
 and `--event-time` program arguments. By omitting these in the command of the *client* container in the 
 `docker-compose.yaml`, you can change the behavior of the Job.
 
-* `--checkpointing` enables [checkpoint]({{ site.baseurl }}/internals/stream_checkpointing.html), 
+* `--checkpointing` enables [checkpoint]({{ site.baseurl }}/training/fault_tolerance.html), 
 which is Flink's fault-tolerance mechanism. If you run without it and go through 
 [failure and recovery](#observing-failure--recovery), you should will see that data is actually 
 lost.
diff --git a/docs/getting-started/docker-playgrounds/flink-operations-playground.zh.md b/docs/getting-started/docker-playgrounds/flink-operations-playground.zh.md
index 1e2a569..6d9f409 100644
--- a/docs/getting-started/docker-playgrounds/flink-operations-playground.zh.md
+++ b/docs/getting-started/docker-playgrounds/flink-operations-playground.zh.md
@@ -316,7 +316,7 @@ docker-compose up -d taskmanager
 
 When the Master is notified about the new TaskManager, it schedules the tasks of the 
 recovering Job to the newly available TaskSlots. Upon restart, the tasks recover their state from
-the last successful [checkpoint]({{ site.baseurl }}/internals/stream_checkpointing.html) that was taken
+the last successful [checkpoint]({{ site.baseurl }}/training/fault_tolerance.html) that was taken
 before the failure and switch to the `RUNNING` state.
 
 The Job will quickly process the full backlog of input events (accumulated during the outage) 
@@ -806,7 +806,7 @@ You might have noticed that the *Click Event Count* application was always start
 and `--event-time` program arguments. By omitting these in the command of the *client* container in the 
 `docker-compose.yaml`, you can change the behavior of the Job.
 
-* `--checkpointing` enables [checkpoint]({{ site.baseurl }}/internals/stream_checkpointing.html), 
+* `--checkpointing` enables [checkpoint]({{ site.baseurl }}/training/fault_tolerance.html), 
 which is Flink's fault-tolerance mechanism. If you run without it and go through 
 [failure and recovery](#observing-failure--recovery), you should will see that data is actually 
 lost.
diff --git a/docs/getting-started/walkthroughs/python_table_api.zh.md b/docs/getting-started/walkthroughs/python_table_api.zh.md
index a82ceb3..34a0170 100644
--- a/docs/getting-started/walkthroughs/python_table_api.zh.md
+++ b/docs/getting-started/walkthroughs/python_table_api.zh.md
@@ -28,7 +28,7 @@ under the License.
 
 在该教程中,我们会从零开始,介绍如何创建一个Flink Python项目及运行Python Table API程序。
 
-关于Python执行环境的要求,请参考Python Table API[环境安装]({{ site.baseurl }}/dev/dev/table/python/installation.html)。
+关于Python执行环境的要求,请参考Python Table API[环境安装]({{ site.baseurl }}/dev/table/python/installation.html)。
 
 ## 创建一个Python Table API项目
 
diff --git a/docs/index.md b/docs/index.md
index 9f0acae..09e7f7e 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -36,9 +36,10 @@ Apache Flink is an open source platform for distributed stream and batch data pr
 * **Docker Playgrounds**: Set up a sandboxed Flink environment in just a few minutes to explore and play with Flink.
   * [Run and manage Flink streaming applications](./getting-started/docker-playgrounds/flink-operations-playground.html)
 
-* **Concepts**: Learn about Flink's basic concepts to better understand the documentation.
-  * [Dataflow Programming Model](concepts/programming-model.html)
-  * [Distributed Runtime](concepts/runtime.html)
+* **Concepts**: Learn about Flink's concepts to better understand the documentation.
+  * [Stateful Stream Processing](concepts/stateful-stream-processing.html)
+  * [Timely Stream Processing](concepts/timely-stream-processing.html)
+  * [Flink Architecture](concepts/flink-architecture.html)
   * [Glossary](concepts/glossary.html)
 
 ## API References
diff --git a/docs/index.zh.md b/docs/index.zh.md
index 2315024..76e6d45 100644
--- a/docs/index.zh.md
+++ b/docs/index.zh.md
@@ -38,8 +38,9 @@ Apache Flink 是一个分布式流批一体化的开源平台。Flink 的核心
   * [运行与管理 Flink 流处理应用](./getting-started/docker-playgrounds/flink-operations-playground.html)
 
 * **概念**: 学习 Flink 的基本概念能更好地理解文档。
-  * [数据流编程模型](concepts/programming-model.html)
-  * [分布式执行](concepts/runtime.html)
+  * [有状态流处理](concepts/stateful-stream-processing.html)
+  * [实时流处理](concepts/timely-stream-processing.html)
+  * [Flink 架构](concepts/flink-architecture.html)
   * [术语表](concepts/glossary.html)
 
 ## API 参考
diff --git a/docs/internals/task_lifecycle.md b/docs/internals/task_lifecycle.md
index 44f847f..4d5c485 100644
--- a/docs/internals/task_lifecycle.md
+++ b/docs/internals/task_lifecycle.md
@@ -92,7 +92,7 @@ operator is opened and before it is closed. The responsibility of this method is
 to the specified [state backend]({{ site.baseurl }}/ops/state/state_backends.html) from where it will be retrieved when 
 the job resumes execution after a failure. Below we include a brief description of Flink's checkpointing mechanism, 
 and for a more detailed discussion on the principles around checkpointing in Flink please read the corresponding documentation: 
-[Data Streaming Fault Tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html).
+[Data Streaming Fault Tolerance]({{ site.baseurl }}/training/fault_tolerance.html).
 
 ## Task Lifecycle
 
diff --git a/docs/internals/task_lifecycle.zh.md b/docs/internals/task_lifecycle.zh.md
index 7de935b..bc5cccb 100644
--- a/docs/internals/task_lifecycle.zh.md
+++ b/docs/internals/task_lifecycle.zh.md
@@ -92,7 +92,7 @@ operator is opened and before it is closed. The responsibility of this method is
 to the specified [state backend]({{ site.baseurl }}/ops/state/state_backends.html) from where it will be retrieved when 
 the job resumes execution after a failure. Below we include a brief description of Flink's checkpointing mechanism, 
 and for a more detailed discussion on the principles around checkpointing in Flink please read the corresponding documentation: 
-[Data Streaming Fault Tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html).
+[Data Streaming Fault Tolerance]({{ site.baseurl }}/training/fault_tolerance.html).
 
 ## Task Lifecycle
 
diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index 04f6fd9..29c6e70 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -29,7 +29,7 @@ Flink exposes a metric system that allows gathering and exposing metrics to exte
 
 ## Registering metrics
 
-You can access the metric system from any user function that extends [RichFunction]({{ site.baseurl }}/dev/api_concepts.html#rich-functions) by calling `getRuntimeContext().getMetricGroup()`.
+You can access the metric system from any user function that extends [RichFunction]({{ site.baseurl }}/zh/dev/user_defined_functions.html#rich-functions) by calling `getRuntimeContext().getMetricGroup()`.
 This method returns a `MetricGroup` object on which you can create and register new metrics.
 
 ### Metric types
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 950d65b..f33d03f 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -158,7 +158,7 @@ In most cases, users should only need to set the values `taskmanager.memory.proc
 
 For a detailed explanation of how these options interact,
 see the documentation on [TaskManager]({{site.baseurl}}/ops/memory/mem_setup_tm.html) and
-[JobManager]({{site.baseurl}}/ops/memory/mem_setup_jm.html) memory configurations.
+[JobManager]({{site.baseurl}}/ops/memory/mem_setup_master.html) memory configurations.
 
 {% include generated/common_memory_section.html %}
 
diff --git a/docs/ops/config.zh.md b/docs/ops/config.zh.md
index 1d3151d..80244fd 100644
--- a/docs/ops/config.zh.md
+++ b/docs/ops/config.zh.md
@@ -158,7 +158,7 @@ In most cases, users should only need to set the values `taskmanager.memory.proc
 
 For a detailed explanation of how these options interact,
 see the documentation on [TaskManager]({{site.baseurl}}/ops/memory/mem_setup_tm.html) and
-[JobManager]({{site.baseurl}}/ops/memory/mem_setup_jm.html) memory configurations.
+[JobManager]({{site.baseurl}}/ops/memory/mem_setup_master.html) memory configurations.
 
 {% include generated/common_memory_section.html %}
 
diff --git a/docs/ops/memory/mem_migration.zh.md b/docs/ops/memory/mem_migration.zh.md
index 7c8a525..72af546 100644
--- a/docs/ops/memory/mem_migration.zh.md
+++ b/docs/ops/memory/mem_migration.zh.md
@@ -119,7 +119,7 @@ Flink 自带的[默认 flink-conf.yaml](#flink-confyaml-中的默认配置) 文
 
 尽管网络内存的配置参数没有发生太多变化,我们仍建议您检查其配置结果。
 网络内存的大小可能会受到其他内存部分大小变化的影响,例如总内存变化时,根据占比计算出的网络内存也可能发生变化。
-请参考[内存模型详解](mem_detail.html)。
+请参考[内存模型详解](mem_setup.html)。
 
 容器切除(Cut-Off)内存相关的配置参数(`containerized.heap-cutoff-ratio` 和 `containerized.heap-cutoff-min`)将不再对进程生效。
 
@@ -153,7 +153,7 @@ Flink 在 Mesos 上还有另一个具有同样语义的配置参数 `mesos.resou
 或 [FsStateBackend](../state/state_backends.html#fsstatebackend)),那么它同样需要使用 JVM 堆内存。
 
 Flink 现在总是会预留一部分 JVM 堆内存供框架使用([`taskmanager.memory.framework.heap.size`](../config.html#taskmanager-memory-framework-heap-size))。
-请参考[框架内存](mem_detail.html#框架内存)。
+请参考[框架内存](mem_setup.html#框架内存)。
 
 ## 托管内存
 
@@ -201,7 +201,7 @@ Flink 现在总是会预留一部分 JVM 堆内存供框架使用([`taskmanage
 * 任务堆外内存([`taskmanager.memory.task.off-heap.size`](../config.html#taskmanager-memory-task-off-heap-size))
 * 框架堆外内存([`taskmanager.memory.framework.off-heap.size`](../config.html#taskmanager-memory-framework-off-heap-size))
 * JVM Metaspace([`taskmanager.memory.jvm-metaspace.size`](../config.html#taskmanager-memory-jvm-metaspace-size))
-* JVM 开销(请参考[内存模型详解](mem_detail.html))
+* JVM 开销(请参考[内存模型详解](mem_setup_tm.html#detailed-memory-model))
 
 <span class="label label-info">提示</span> JobManager 进程仍保留了容器切除内存,相关配置项和此前一样仍对 JobManager 生效。
 
diff --git a/docs/ops/memory/mem_trouble.zh.md b/docs/ops/memory/mem_trouble.zh.md
index 0ecb5ad..52e08e8 100644
--- a/docs/ops/memory/mem_trouble.zh.md
+++ b/docs/ops/memory/mem_trouble.zh.md
@@ -28,14 +28,14 @@ under the License.
 ## IllegalConfigurationException
 
 如果遇到从 *TaskExecutorProcessUtils* 抛出的 *IllegalConfigurationException* 异常,这通常说明您的配置参数中存在无效值(例如内存大小为负数、占比大于 1 等)或者配置冲突。
-请根据异常信息,确认[内存模型详解](mem_detail.html)中与出错的内存部分对应章节的内容。
+请根据异常信息,确认[内存模型详解](../config.html#memory-configuration)中与出错的内存部分对应章节的内容。
 
 ## OutOfMemoryError: Java heap space
 
 该异常说明 JVM 的堆空间过小。
 可以通过增大[总内存](mem_setup.html#配置总内存)或[任务堆内存](mem_setup.html#任务算子堆内存)的方法来增大 JVM 堆空间。
 
-<span class="label label-info">提示</span> 也可以增大[框架堆内存](mem_detail.html#框架内存)。这是一个进阶配置,只有在确认是 Flink 框架自身需要更多内存时才应该去调整。
+<span class="label label-info">提示</span> 也可以增大[框架堆内存](mem_setup_tm.html#框架内存)。这是一个进阶配置,只有在确认是 Flink 框架自身需要更多内存时才应该去调整。
 
 ## OutOfMemoryError: Direct buffer memory
 
diff --git a/docs/ops/memory/mem_tuning.zh.md b/docs/ops/memory/mem_tuning.zh.md
index 9fb950d..eac6331 100644
--- a/docs/ops/memory/mem_tuning.zh.md
+++ b/docs/ops/memory/mem_tuning.zh.md
@@ -30,7 +30,7 @@ under the License.
 ## 独立部署模式(Standalone Deployment)下的内存配置
 
 [独立部署模式](../deployment/cluster_setup.html),我们通常更关注 Flink 应用本身使用的内存大小。
-建议配置 [Flink 总内存](mem_setup.html#配置总内存)([`taskmanager.memory.flink.size`](../config.html#taskmanager-memory-flink-size))或者它的[组成部分](mem_detail.html)。
+建议配置 [Flink 总内存](mem_setup.html#配置总内存)([`taskmanager.memory.flink.size`](../config.html#taskmanager-memory-flink-size))或者它的([`jobmanager.memory.flink.size`])(../config.html#jobmanager-memory-flink-size.html)。
 此外,如果出现 [Metaspace 不足的问题](mem_trouble.html#outofmemoryerror-metaspace),可以调整 *JVM Metaspace* 的大小。
 
 这种情况下通常无需配置*进程总内存*,因为不管是 Flink 还是部署环境都不会对 *JVM 开销* 进行限制,它只与机器的物理资源相关。
@@ -41,7 +41,6 @@ under the License.
 该配置参数用于指定分配给 Flink *JVM 进程*的总内存,也就是需要申请的容器大小。
 
 <span class="label label-info">提示</span> 如果配置了 *Flink 总内存*,Flink 会自动加上 JVM 相关的内存部分,根据推算出的*进程总内存*大小申请容器。
-请参考[内存模型详解](mem_detail.html)。
 
 <div class="alert alert-warning">
   <strong>注意:</strong> 如果 Flink 或者用户代码分配超过容器大小的非托管的堆外(本地)内存,部署环境可能会杀掉超用内存的容器,造成作业执行失败。
diff --git a/docs/ops/python_shell.zh.md b/docs/ops/python_shell.zh.md
index e5f2a6c..2f561c7 100644
--- a/docs/ops/python_shell.zh.md
+++ b/docs/ops/python_shell.zh.md
@@ -27,7 +27,7 @@ Flink附带了一个集成的交互式Python Shell。
 本地安装Flink,请看[本地安装](deployment/local.html)页面。
 您也可以从源码安装Flink,请看[从源码构建 Flink](../flinkDev/building.html)页面。
 
-<span class="label label-info">注意</span> Python Shell会调用“python”命令。关于Python执行环境的要求,请参考Python Table API[环境安装]({{ site.baseurl }}/dev/dev/table/python/installation.html)。
+<span class="label label-info">注意</span> Python Shell会调用“python”命令。关于Python执行环境的要求,请参考Python Table API[环境安装]({{ site.baseurl }}/dev/table/python/installation.html)。
 
 你可以通过PyPi安装PyFlink,然后使用Python Shell:
 
diff --git a/docs/ops/state/savepoints.md b/docs/ops/state/savepoints.md
index c235344..d1e07f2 100644
--- a/docs/ops/state/savepoints.md
+++ b/docs/ops/state/savepoints.md
@@ -27,7 +27,7 @@ under the License.
 
 ## What is a Savepoint? How is a Savepoint different from a Checkpoint?
 
-A Savepoint is a consistent image of the execution state of a streaming job, created via Flink's [checkpointing mechanism]({{ site.baseurl }}/internals/stream_checkpointing.html). You can use Savepoints to stop-and-resume, fork,
+A Savepoint is a consistent image of the execution state of a streaming job, created via Flink's [checkpointing mechanism]({{ site.baseurl }}/training/fault_tolerance.html). You can use Savepoints to stop-and-resume, fork,
 or update your Flink jobs. Savepoints consist of two parts: a directory with (typically large) binary files on stable storage (e.g. HDFS, S3, ...) and a (relatively small) meta data file. The files on stable storage represent the net data of the job's execution state
 image. The meta data file of a Savepoint contains (primarily) pointers to all files on stable storage that are part of the Savepoint, in form of absolute paths.
 
diff --git a/docs/ops/state/savepoints.zh.md b/docs/ops/state/savepoints.zh.md
index 6bdb9df..b8c52f7 100644
--- a/docs/ops/state/savepoints.zh.md
+++ b/docs/ops/state/savepoints.zh.md
@@ -27,7 +27,7 @@ under the License.
 
 ## 什么是 Savepoint ? Savepoint 与 Checkpoint 有什么不同?
 
-Savepoint 是依据 Flink [checkpointing 机制]({{ site.baseurl }}/zh/internals/stream_checkpointing.html)所创建的流作业执行状态的一致镜像。 你可以使用 Savepoint 进行 Flink 作业的停止与重启、fork 或者更新。 Savepoint 由两部分组成:稳定存储(列入 HDFS,S3,...) 上包含二进制文件的目录(通常很大),和元数据文件(相对较小)。 稳定存储上的文件表示作业执行状态的数据镜像。 Savepoint 的元数据文件以(绝对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。
+Savepoint 是依据 Flink [checkpointing 机制]({{ site.baseurl }}/zh/training/fault_tolerance.html)所创建的流作业执行状态的一致镜像。 你可以使用 Savepoint 进行 Flink 作业的停止与重启、fork 或者更新。 Savepoint 由两部分组成:稳定存储(列入 HDFS,S3,...) 上包含二进制文件的目录(通常很大),和元数据文件(相对较小)。 稳定存储上的文件表示作业执行状态的数据镜像。 Savepoint 的元数据文件以(绝对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。
 
 <div class="alert alert-warning">
 <strong>注意:</strong> 为了允许程序和 Flink 版本之间的升级,请务必查看以下有关<a href="#分配算子-id">分配算子 ID </a>的部分 。


[flink] 02/02: [docs-sync] Synchronize the latest documentation changes into Chinese documents

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d40abbf0309f414a6acf8a090c448ba397a08d9c
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue May 19 15:41:32 2020 +0800

    [docs-sync] Synchronize the latest documentation changes into Chinese documents
---
 docs/concepts/flink-architecture.zh.md             |   2 +-
 docs/concepts/glossary.zh.md                       |  33 ++-
 docs/dev/batch/hadoop_compatibility.zh.md          |   2 +-
 docs/dev/batch/index.zh.md                         | 259 +++++++++++++++++++--
 docs/dev/connectors/cassandra.zh.md                |   2 +-
 docs/dev/connectors/filesystem_sink.zh.md          |   2 +-
 docs/dev/connectors/kafka.zh.md                    |   2 +-
 docs/dev/connectors/kinesis.zh.md                  |  23 +-
 docs/dev/connectors/nifi.zh.md                     |   2 +-
 docs/dev/connectors/pubsub.zh.md                   |   2 +-
 docs/dev/connectors/rabbitmq.zh.md                 |   2 +-
 docs/dev/connectors/streamfile_sink.zh.md          | 201 +++++++++++++++-
 docs/dev/connectors/twitter.zh.md                  |   2 +-
 docs/dev/datastream_api.zh.md                      | 218 ++++++++++++++++-
 docs/dev/java_lambdas.zh.md                        |   4 +-
 docs/dev/libs/cep.zh.md                            |   4 +-
 docs/dev/parallel.zh.md                            |   2 +-
 docs/dev/stream/operators/index.zh.md              |   4 +-
 docs/dev/stream/operators/windows.zh.md            |   2 +-
 docs/dev/stream/state/checkpointing.zh.md          |   2 +-
 docs/dev/stream/state/index.zh.md                  |  21 +-
 docs/dev/stream/state/queryable_state.zh.md        |   2 +-
 docs/dev/stream/state/state.zh.md                  | 130 ++++++++---
 docs/dev/table/common.zh.md                        |   4 +-
 docs/dev/table/connect.zh.md                       |  42 +++-
 docs/dev/table/index.zh.md                         |   2 +-
 docs/dev/table/sqlClient.zh.md                     |   4 +-
 docs/dev/types_serialization.zh.md                 | 200 ++++++++++++++++
 docs/dev/user_defined_functions.zh.md              |   2 +-
 docs/getting-started/index.zh.md                   |  49 +++-
 .../walkthroughs/python_table_api.zh.md            |  27 +++
 docs/index.zh.md                                   |   1 -
 docs/monitoring/metrics.zh.md                      |   6 +-
 33 files changed, 1127 insertions(+), 133 deletions(-)

diff --git a/docs/concepts/flink-architecture.zh.md b/docs/concepts/flink-architecture.zh.md
index 8414943..00a0924 100644
--- a/docs/concepts/flink-architecture.zh.md
+++ b/docs/concepts/flink-architecture.zh.md
@@ -1,5 +1,5 @@
 ---
-title: Flink Architecture
+title: Flink 架构
 nav-id: flink-architecture
 nav-pos: 4
 nav-title: Flink Architecture
diff --git a/docs/concepts/glossary.zh.md b/docs/concepts/glossary.zh.md
index 8efd2f1..3f88e82 100644
--- a/docs/concepts/glossary.zh.md
+++ b/docs/concepts/glossary.zh.md
@@ -25,7 +25,16 @@ under the License.
 
 #### Flink Application Cluster
 
-Flink Application Cluster 是一个专用的 [Flink Cluster](#flink-cluster),它仅用于执行单个 [Flink Job](#flink-job)。[Flink Cluster](#flink-cluster)的生命周期与 [Flink Job](#flink-job)的生命周期绑定在一起。以前,Flink Application Cluster 也称为*job mode*的 Flink Cluster。和 [Flink Session Cluster](#flink-session-cluster) 作对比。
+A Flink Application Cluster is a dedicated [Flink Cluster](#flink-cluster) that
+only executes [Flink Jobs](#flink-job) from one [Flink
+Application](#flink-application). The lifetime of the [Flink
+Cluster](#flink-cluster) is bound to the lifetime of the Flink Application.
+
+#### Flink Job Cluster
+
+A Flink Job Cluster is a dedicated [Flink Cluster](#flink-cluster) that only
+executes a single [Flink Job](#flink-job). The lifetime of the
+[Flink Cluster](#flink-cluster) is bound to the lifetime of the Flink Job.
 
 #### Flink Cluster
 
@@ -47,9 +56,22 @@ Function 是由用户实现的,并封装了 Flink 程序的应用程序逻辑
 
 Instance 常用于描述运行时的特定类型(通常是 [Operator](#operator) 或者 [Function](#function))的一个具体实例。由于 Apache Flink 主要是用 Java 编写的,所以,这与 Java 中的 *Instance* 或 *Object* 的定义相对应。在 Apache Flink 的上下文中,*parallel instance* 也常用于强调同一 [Operator](#operator) 或者 [Function](#function) 的多个 instance 以并行的方式运行。
 
+#### Flink Application
+
+A Flink application is a Java Application that submits one or multiple [Flink
+Jobs](#flink-job) from the `main()` method (or by some other means). Submitting
+jobs is usually done by calling `execute()` on an execution environment.
+
+The jobs of an application can either be submitted to a long running [Flink
+Session Cluster](#flink-session-cluster), to a dedicated [Flink Application
+Cluster](#flink-application-cluster), or to a [Flink Job
+Cluster](#flink-job-cluster).
+
 #### Flink Job
 
-Flink Job 代表运行时的 Flink 程序。Flink Job 可以提交到长时间运行的 [Flink Session Cluster](#flink-session-cluster),也可以作为独立的 [Flink Application Cluster](#flink-application-cluster) 启动。
+A Flink Job is the runtime representation of a [logical graph](#logical-graph)
+(also often called dataflow graph) that is created and submitted by calling
+`execute()` in a [Flink Application](#flink-application).
 
 #### JobGraph
 
@@ -61,7 +83,12 @@ JobManager 是在 [Flink Master](#flink-master) 运行中的组件之一。JobMa
 
 #### Logical Graph
 
-Logical Graph 是一种描述流处理程序的高阶逻辑有向图。节点是[Operator](#operator),边代表输入/输出关系、数据流和数据集中的之一。
+A logical graph is a directed graph where the nodes are  [Operators](#operator)
+and the edges define input/output-relationships of the operators and correspond
+to data streams or data sets. A logical graph is created by submitting jobs
+from a [Flink Application](#flink-application).
+
+Logical graphs are also often referred to as *dataflow graphs*.
 
 #### Managed State
 
diff --git a/docs/dev/batch/hadoop_compatibility.zh.md b/docs/dev/batch/hadoop_compatibility.zh.md
index 381ae2f..1f03adb 100644
--- a/docs/dev/batch/hadoop_compatibility.zh.md
+++ b/docs/dev/batch/hadoop_compatibility.zh.md
@@ -28,7 +28,7 @@ reusing code that was implemented for Hadoop MapReduce.
 
 You can:
 
-- use Hadoop's `Writable` [data types]({{ site.baseurl }}/dev/api_concepts.html#supported-data-types) in Flink programs.
+- use Hadoop's `Writable` [data types]({% link dev/types_serialization.md %}#supported-data-types) in Flink programs.
 - use any Hadoop `InputFormat` as a [DataSource](index.html#data-sources).
 - use any Hadoop `OutputFormat` as a [DataSink](index.html#data-sinks).
 - use a Hadoop `Mapper` as [FlatMapFunction](dataset_transformations.html#flatmap).
diff --git a/docs/dev/batch/index.zh.md b/docs/dev/batch/index.zh.md
index 55a3be0..66fd0cf 100644
--- a/docs/dev/batch/index.zh.md
+++ b/docs/dev/batch/index.zh.md
@@ -32,11 +32,12 @@ example write the data to (distributed) files, or to standard output (for exampl
 terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs.
 The execution can happen in a local JVM, or on clusters of many machines.
 
-Please see [basic concepts]({{ site.baseurl }}/dev/api_concepts.html) for an introduction
-to the basic concepts of the Flink API.
+Please refer to the [DataStream API overview]({% link dev/datastream_api.md %})
+for an introduction to the basic concepts of the Flink API. That overview is
+for the DataStream API but the basic concepts of the two APIs are the same.
 
 In order to create your own Flink DataSet program, we encourage you to start with the
-[anatomy of a Flink Program]({{ site.baseurl }}/dev/api_concepts.html#anatomy-of-a-flink-program)
+[anatomy of a Flink Program]({% link dev/datastream_api.md %}#anatomy-of-a-flink-program)
 and gradually add your own
 [transformations](#dataset-transformations). The remaining sections act as references for additional
 operations and advanced features.
@@ -278,7 +279,7 @@ data.distinct();
         Joins two data sets by creating all pairs of elements that are equal on their keys.
         Optionally uses a JoinFunction to turn the pair of elements into a single element, or a
         FlatJoinFunction to turn the pair of elements into arbitrarily many (including none)
-        elements. See the <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys section</a> to learn how to define join keys.
+        elements. See the <a href="#specifying-keys">keys section</a> to learn how to define join keys.
 {% highlight java %}
 result = input1.join(input2)
                .where(0)       // key of the first input (tuple field 0)
@@ -304,7 +305,7 @@ result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
     <tr>
       <td><strong>OuterJoin</strong></td>
       <td>
-        Performs a left, right, or full outer join on two data sets. Outer joins are similar to regular (inner) joins and create all pairs of elements that are equal on their keys. In addition, records of the "outer" side (left, right, or both in case of full) are preserved if no matching key is found in the other side. Matching pairs of elements (or one element and a <code>null</code> value for the other input) are given to a JoinFunction to turn the pair of elements into a single eleme [...]
+        Performs a left, right, or full outer join on two data sets. Outer joins are similar to regular (inner) joins and create all pairs of elements that are equal on their keys. In addition, records of the "outer" side (left, right, or both in case of full) are preserved if no matching key is found in the other side. Matching pairs of elements (or one element and a <code>null</code> value for the other input) are given to a JoinFunction to turn the pair of elements into a single eleme [...]
 {% highlight java %}
 input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or full outer joins
       .where(0)              // key of the first input (tuple field 0)
@@ -326,7 +327,7 @@ input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or ful
       <td>
         <p>The two-dimensional variant of the reduce operation. Groups each input on one or more
         fields and then joins the groups. The transformation function is called per pair of groups.
-        See the <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys section</a> to learn how to define coGroup keys.</p>
+        See the <a href="#specifying-keys">keys section</a> to learn how to define coGroup keys.</p>
 {% highlight java %}
 data1.coGroup(data2)
      .where(0)
@@ -600,7 +601,7 @@ data.distinct()
         Joins two data sets by creating all pairs of elements that are equal on their keys.
         Optionally uses a JoinFunction to turn the pair of elements into a single element, or a
         FlatJoinFunction to turn the pair of elements into arbitrarily many (including none)
-        elements. See the <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys section</a> to learn how to define join keys.
+        elements. See the <a href="#specifying-keys">keys section</a> to learn how to define join keys.
 {% highlight scala %}
 // In this case tuple fields are used as keys. "0" is the join field on the first tuple
 // "1" is the join field on the second tuple.
@@ -626,7 +627,7 @@ val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
     <tr>
       <td><strong>OuterJoin</strong></td>
       <td>
-        Performs a left, right, or full outer join on two data sets. Outer joins are similar to regular (inner) joins and create all pairs of elements that are equal on their keys. In addition, records of the "outer" side (left, right, or both in case of full) are preserved if no matching key is found in the other side. Matching pairs of elements (or one element and a `null` value for the other input) are given to a JoinFunction to turn the pair of elements into a single element, or to a [...]
+        Performs a left, right, or full outer join on two data sets. Outer joins are similar to regular (inner) joins and create all pairs of elements that are equal on their keys. In addition, records of the "outer" side (left, right, or both in case of full) are preserved if no matching key is found in the other side. Matching pairs of elements (or one element and a `null` value for the other input) are given to a JoinFunction to turn the pair of elements into a single element, or to a [...]
 {% highlight scala %}
 val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
    (left, right) =>
@@ -642,7 +643,7 @@ val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
       <td>
         <p>The two-dimensional variant of the reduce operation. Groups each input on one or more
         fields and then joins the groups. The transformation function is called per pair of groups.
-        See the <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys section</a> to learn how to define coGroup keys.</p>
+        See the <a href="#specifying-keys">keys section</a> to learn how to define coGroup keys.</p>
 {% highlight scala %}
 data1.coGroup(data2).where(0).equalTo(1)
 {% endhighlight %}
@@ -796,6 +797,230 @@ possible for [Data Sources](#data-sources) and [Data Sinks](#data-sinks).
 
 {% top %}
 
+Specifying Keys
+---------------
+
+Some transformations (join, coGroup, groupBy) require that a key be defined on
+a collection of elements. Other transformations (Reduce, GroupReduce,
+Aggregate) allow data being grouped on a key before they are
+applied.
+
+A DataSet is grouped as
+{% highlight java %}
+DataSet<...> input = // [...]
+DataSet<...> reduced = input
+  .groupBy(/*define key here*/)
+  .reduceGroup(/*do something*/);
+{% endhighlight %}
+
+The data model of Flink is not based on key-value pairs. Therefore,
+you do not need to physically pack the data set types into keys and
+values. Keys are "virtual": they are defined as functions over the
+actual data to guide the grouping operator.
+
+### Define keys for Tuples
+{:.no_toc}
+
+The simplest case is grouping Tuples on one or more
+fields of the Tuple:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataSet<Tuple3<Integer,String,Long>> input = // [...]
+UnsortedGrouping<Tuple3<Integer,String,Long>,Tuple> keyed = input.groupBy(0)
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataSet[(Int, String, Long)] = // [...]
+val keyed = input.groupBy(0)
+{% endhighlight %}
+</div>
+</div>
+
+The tuples are grouped on the first field (the one of
+Integer type).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataSet<Tuple3<Integer,String,Long>> input = // [...]
+UnsortedGrouping<Tuple3<Integer,String,Long>,Tuple> keyed = input.groupBy(0,1)
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataSet[(Int, String, Long)] = // [...]
+val grouped = input.groupBy(0,1)
+{% endhighlight %}
+</div>
+</div>
+
+Here, we group the tuples on a composite key consisting of the first and the
+second field.
+
+A note on nested Tuples: If you have a DataSet with a nested tuple, such as:
+
+{% highlight java %}
+DataSet<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
+{% endhighlight %}
+
+Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a key (with the Integer and Float being the key). If you want to "navigate" into the nested `Tuple2`, you have to use field expression keys which are explained below.
+
+### Define keys using Field Expressions
+{:.no_toc}
+
+You can use String-based field expressions to reference nested fields and define keys for grouping, sorting, joining, or coGrouping.
+
+Field expressions make it very easy to select fields in (nested) composite types such as [Tuple](#tuples-and-case-classes) and [POJO](#pojos) types.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+In the example below, we have a `WC` POJO with two fields "word" and "count". To group by the field `word`, we just pass its name to the `groupBy()` function.
+{% highlight java %}
+// some ordinary POJO (Plain old Java Object)
+public class WC {
+  public String word;
+  public int count;
+}
+DataSet<WC> words = // [...]
+DataSet<WC> wordCounts = words.groupBy("word")
+{% endhighlight %}
+
+**Field Expression Syntax**:
+
+- Select POJO fields by their field name. For example `"user"` refers to the "user" field of a POJO type.
+
+- Select Tuple fields by their field name or 0-offset field index. For example `"f0"` and `"5"` refer to the first and sixth field of a Java Tuple type, respectively.
+
+- You can select nested fields in POJOs and Tuples. For example `"user.zip"` refers to the "zip" field of a POJO which is stored in the "user" field of a POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such as `"f1.user.zip"` or `"user.f3.1.zip"`.
+
+- You can select the full type using the `"*"` wildcard expressions. This does also work for types which are not Tuple or POJO types.
+
+**Field Expression Example**:
+
+{% highlight java %}
+public static class WC {
+  public ComplexNestedClass complex; //nested POJO
+  private int count;
+  // getter / setter for private field (count)
+  public int getCount() {
+    return count;
+  }
+  public void setCount(int c) {
+    this.count = c;
+  }
+}
+public static class ComplexNestedClass {
+  public Integer someNumber;
+  public float someFloat;
+  public Tuple3<Long, Long, String> word;
+  public IntWritable hadoopCitizen;
+}
+{% endhighlight %}
+
+These are valid field expressions for the example code above:
+
+- `"count"`: The count field in the `WC` class.
+
+- `"complex"`: Recursively selects all fields of the field complex of POJO type `ComplexNestedClass`.
+
+- `"complex.word.f2"`: Selects the last field of the nested `Tuple3`.
+
+- `"complex.hadoopCitizen"`: Selects the Hadoop `IntWritable` type.
+
+</div>
+<div data-lang="scala" markdown="1">
+
+In the example below, we have a `WC` POJO with two fields "word" and "count". To group by the field `word`, we just pass its name to the `groupBy()` function.
+{% highlight scala %}
+// some ordinary POJO (Plain old Java Object)
+class WC(var word: String, var count: Int) {
+  def this() { this("", 0L) }
+}
+val words: DataSet[WC] = // [...]
+val wordCounts = words.groupBy("word")
+
+// or, as a case class, which is less typing
+case class WC(word: String, count: Int)
+val words: DataSet[WC] = // [...]
+val wordCounts = words.groupBy("word")
+{% endhighlight %}
+
+**Field Expression Syntax**:
+
+- Select POJO fields by their field name. For example `"user"` refers to the "user" field of a POJO type.
+
+- Select Tuple fields by their 1-offset field name or 0-offset field index. For example `"_1"` and `"5"` refer to the first and sixth field of a Scala Tuple type, respectively.
+
+- You can select nested fields in POJOs and Tuples. For example `"user.zip"` refers to the "zip" field of a POJO which is stored in the "user" field of a POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such as `"_2.user.zip"` or `"user._4.1.zip"`.
+
+- You can select the full type using the `"_"` wildcard expressions. This does also work for types which are not Tuple or POJO types.
+
+**Field Expression Example**:
+
+{% highlight scala %}
+class WC(var complex: ComplexNestedClass, var count: Int) {
+  def this() { this(null, 0) }
+}
+
+class ComplexNestedClass(
+    var someNumber: Int,
+    someFloat: Float,
+    word: (Long, Long, String),
+    hadoopCitizen: IntWritable) {
+  def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
+}
+{% endhighlight %}
+
+These are valid field expressions for the example code above:
+
+- `"count"`: The count field in the `WC` class.
+
+- `"complex"`: Recursively selects all fields of the field complex of POJO type `ComplexNestedClass`.
+
+- `"complex.word._3"`: Selects the last field of the nested `Tuple3`.
+
+- `"complex.hadoopCitizen"`: Selects the Hadoop `IntWritable` type.
+
+</div>
+</div>
+
+### Define keys using Key Selector Functions
+{:.no_toc}
+
+An additional way to define keys are "key selector" functions. A key selector function
+takes a single element as input and returns the key for the element. The key can be of any type and be derived from deterministic computations.
+
+The following example shows a key selector function that simply returns the field of an object:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// some ordinary POJO
+public class WC {public String word; public int count;}
+DataSet<WC> words = // [...]
+UnsortedGrouping<WC> keyed = words
+  .groupBy(new KeySelector<WC, String>() {
+     public String getKey(WC wc) { return wc.word; }
+   });
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// some ordinary case class
+case class WC(word: String, count: Int)
+val words: DataSet[WC] = // [...]
+val keyed = words.groupBy( _.word )
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
 Data Sources
 ------------
 
@@ -1199,7 +1424,7 @@ myResult.output(
 
 #### Locally Sorted Output
 
-The output of a data sink can be locally sorted on specified fields in specified orders using [tuple field positions]({{ site.baseurl }}/dev/api_concepts.html#define-keys-for-tuples) or [field expressions]({{ site.baseurl }}/dev/api_concepts.html#define-keys-using-field-expressions). This works for every output format.
+The output of a data sink can be locally sorted on specified fields in specified orders using [tuple field positions](#define-keys-for-tuples) or [field expressions](#define-keys-using-field-expressions). This works for every output format.
 
 The following examples show how to use this feature:
 
@@ -1282,7 +1507,7 @@ values map { tuple => tuple._1 + " - " + tuple._2 }
 
 #### Locally Sorted Output
 
-The output of a data sink can be locally sorted on specified fields in specified orders using [tuple field positions]({{ site.baseurl }}/dev/api_concepts.html#define-keys-for-tuples) or [field expressions]({{ site.baseurl }}/dev/api_concepts.html#define-keys-using-field-expressions). This works for every output format.
+The output of a data sink can be locally sorted on specified fields in specified orders using [tuple field positions](#define-keys-for-tuples) or [field expressions](#define-keys-using-field-expressions). This works for every output format.
 
 The following examples show how to use this feature:
 
@@ -1771,7 +1996,7 @@ This information is used by the optimizer to infer whether a data property such
 partitioning is preserved by a function.
 For functions that operate on groups of input elements such as `GroupReduce`, `GroupCombine`, `CoGroup`, and `MapPartition`, all fields that are defined as forwarded fields must always be jointly forwarded from the same input element. The forwarded fields of each element that is emitted by a group-wise function may originate from a different element of the function's input group.
 
-Field forward information is specified using [field expressions]({{ site.baseurl }}/dev/api_concepts.html#define-keys-using-field-expressions).
+Field forward information is specified using [field expressions](#define-keys-using-field-expressions).
 Fields that are forwarded to the same position in the output can be specified by their position.
 The specified position must be valid for the input and output data type and have the same type.
 For example the String `"f2"` declares that the third field of a Java input tuple is always equal to the third field in the output tuple.
@@ -1840,7 +2065,7 @@ Non-forwarded field information for group-wise operators such as `GroupReduce`,
 **IMPORTANT**: The specification of non-forwarded fields information is optional. However if used,
 **ALL!** non-forwarded fields must be specified, because all other fields are considered to be forwarded in place. It is safe to declare a forwarded field as non-forwarded.
 
-Non-forwarded fields are specified as a list of [field expressions]({{ site.baseurl }}/dev/api_concepts.html#define-keys-using-field-expressions). The list can be either given as a single String with field expressions separated by semicolons or as multiple Strings.
+Non-forwarded fields are specified as a list of [field expressions](#define-keys-using-field-expressions). The list can be either given as a single String with field expressions separated by semicolons or as multiple Strings.
 For example both `"f1; f3"` and `"f1", "f3"` declare that the second and fourth field of a Java tuple
 are not preserved in place and all other fields are preserved in place.
 Non-forwarded field information can only be specified for functions which have identical input and output types.
@@ -1891,7 +2116,7 @@ Fields which are only unmodified forwarded to the output without evaluating thei
 **IMPORTANT**: The specification of read fields information is optional. However if used,
 **ALL!** read fields must be specified. It is safe to declare a non-read field as read.
 
-Read fields are specified as a list of [field expressions]({{ site.baseurl }}/dev/api_concepts.html#define-keys-using-field-expressions). The list can be either given as a single String with field expressions separated by semicolons or as multiple Strings.
+Read fields are specified as a list of [field expressions](#define-keys-using-field-expressions). The list can be either given as a single String with field expressions separated by semicolons or as multiple Strings.
 For example both `"f1; f3"` and `"f1", "f3"` declare that the second and fourth field of a Java tuple are read and evaluated by the function.
 
 Read field information is specified as function class annotations using the following annotations:
@@ -2045,7 +2270,7 @@ DataSet<Integer> result = input.map(new MyMapper());
 env.execute();
 {% endhighlight %}
 
-Access the cached file or directory in a user function (here a `MapFunction`). The function must extend a [RichFunction]({{ site.baseurl }}/dev/api_concepts.html#rich-functions) class because it needs access to the `RuntimeContext`.
+Access the cached file or directory in a user function (here a `MapFunction`). The function must extend a [RichFunction]({% link dev/user_defined_functions.md %}#rich-functions) class because it needs access to the `RuntimeContext`.
 
 {% highlight java %}
 
@@ -2091,7 +2316,7 @@ val result: DataSet[Integer] = input.map(new MyMapper())
 env.execute()
 {% endhighlight %}
 
-Access the cached file in a user function (here a `MapFunction`). The function must extend a [RichFunction]({{ site.baseurl }}/dev/api_concepts.html#rich-functions) class because it needs access to the `RuntimeContext`.
+Access the cached file in a user function (here a `MapFunction`). The function must extend a [RichFunction]({% link dev/user_defined_functions.md %}#rich-functions) class because it needs access to the `RuntimeContext`.
 
 {% highlight scala %}
 
@@ -2164,7 +2389,7 @@ class MyFilter(limit: Int) extends FilterFunction[Int] {
 
 #### Via `withParameters(Configuration)`
 
-This method takes a Configuration object as an argument, which will be passed to the [rich function]({{ site.baseurl }}/dev/api_concepts.html#rich-functions)'s `open()`
+This method takes a Configuration object as an argument, which will be passed to the [rich function]({% link dev/user_defined_functions.md %}#rich-functions)'s `open()`
 method. The Configuration object is a Map from String keys to different value types.
 
 <div class="codetabs" markdown="1">
diff --git a/docs/dev/connectors/cassandra.zh.md b/docs/dev/connectors/cassandra.zh.md
index 9a51387..002d388 100644
--- a/docs/dev/connectors/cassandra.zh.md
+++ b/docs/dev/connectors/cassandra.zh.md
@@ -111,7 +111,7 @@ More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpoin
 
 ## Examples
 
-The Cassandra sinks currently support both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java [...]
+The Cassandra sinks currently support both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({% link dev/types_serialization.md %}#supported-data-types). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWi [...]
 
 In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created.
 
diff --git a/docs/dev/connectors/filesystem_sink.zh.md b/docs/dev/connectors/filesystem_sink.zh.md
index 1e570e6..adb9f8b 100644
--- a/docs/dev/connectors/filesystem_sink.zh.md
+++ b/docs/dev/connectors/filesystem_sink.zh.md
@@ -39,7 +39,7 @@ under the License.
 </dependency>
 {% endhighlight %}
 
-注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考 [这里]({{site.baseurl}}/zh/dev/projectsetup/dependencies.html)。
+注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考 [这里]({{site.baseurl}}/zh/getting-started/project-setup/dependencies.html)。
 
 #### 分桶文件 Sink
 
diff --git a/docs/dev/connectors/kafka.zh.md b/docs/dev/connectors/kafka.zh.md
index b2cdc54..bf2ebbe 100644
--- a/docs/dev/connectors/kafka.zh.md
+++ b/docs/dev/connectors/kafka.zh.md
@@ -84,7 +84,7 @@ Flink 提供了专门的 Kafka 连接器,向 Kafka topic 中读取或者写入
 {% endhighlight %}
 
 请注意:目前流连接器还不是二进制分发的一部分。
-[在此处]({{ site.baseurl }}/zh/dev/projectsetup/dependencies.html)可以了解到如何链接它们以实现在集群中执行。
+[在此处]({{ site.baseurl }}/zh/getting-started/project-setup/dependencies.html)可以了解到如何链接它们以实现在集群中执行。
 
 ## 安装 Apache Kafka
 
diff --git a/docs/dev/connectors/kinesis.zh.md b/docs/dev/connectors/kinesis.zh.md
index 4e9009d..9601af2 100644
--- a/docs/dev/connectors/kinesis.zh.md
+++ b/docs/dev/connectors/kinesis.zh.md
@@ -45,7 +45,25 @@ Due to the licensing issue, the `flink-connector-kinesis{{ site.scala_version_su
 
 ## Using the Amazon Kinesis Streams Service
 Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
-to setup Kinesis streams. Make sure to create the appropriate IAM policy and user to read / write to the Kinesis streams.
+to setup Kinesis streams.
+
+## Configuring Access to Kinesis with IAM
+Make sure to create the appropriate IAM policy to allow reading / writing to / from the Kinesis streams. See examples [here](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html).
+
+Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis.
+By default, the `AUTO` Credentials Provider is used.
+If the access key ID and secret key are set in the configuration, the `BASIC` provider is used.
+
+A specific Credentials Provider can **optionally** be set by using the `AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` setting.
+
+Supported Credential Providers are:
+* `AUTO` - Using the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE` and EC2/ECS credentials provider.
+* `BASIC` - Using access key ID and secret key supplied as configuration.
+* `ENV_VAR` - Using `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables.
+* `SYS_PROP` - Using Java system properties aws.accessKeyId and aws.secretKey.
+* `PROFILE` - Use AWS credentials profile file to create the AWS credentials.
+* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied.
+* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token.
 
 ## Kinesis Consumer
 
@@ -91,8 +109,7 @@ The above is a simple example of using the consumer. Configuration for the consu
 instance, the configuration keys for which can be found in `AWSConfigConstants` (AWS-specific parameters) and
 `ConsumerConfigConstants` (Kinesis consumer parameters). The example
 demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". The AWS credentials are supplied using the basic method in which
-the AWS access key ID and secret access key are directly supplied in the configuration (other options are setting
-`AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, `PROFILE`, `ASSUME_ROLE`, and `AUTO`). Also, data is being consumed
+the AWS access key ID and secret access key are directly supplied in the configuration. Also, data is being consumed
 from the newest position in the Kinesis stream (the other option will be setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION`
 to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible).
 
diff --git a/docs/dev/connectors/nifi.zh.md b/docs/dev/connectors/nifi.zh.md
index 114092f..36ac3f3 100644
--- a/docs/dev/connectors/nifi.zh.md
+++ b/docs/dev/connectors/nifi.zh.md
@@ -34,7 +34,7 @@ under the License.
 </dependency>
 {% endhighlight %}
 
-注意这些连接器目前还没有包含在二进制发行版中。添加依赖、打包配置以及集群运行的相关信息请参考 [这里]({{site.baseurl}}/zh/dev/projectsetup/dependencies.html)。
+注意这些连接器目前还没有包含在二进制发行版中。添加依赖、打包配置以及集群运行的相关信息请参考 [这里]({{site.baseurl}}/zh/getting-started/project-setup/dependencies.html)。
 
 #### 安装 Apache NiFi
 
diff --git a/docs/dev/connectors/pubsub.zh.md b/docs/dev/connectors/pubsub.zh.md
index eaf5f58..93cfa96 100644
--- a/docs/dev/connectors/pubsub.zh.md
+++ b/docs/dev/connectors/pubsub.zh.md
@@ -37,7 +37,7 @@ under the License.
 <b>注意</b>:此连接器最近才加到 Flink 里,还未接受广泛测试。
 </p>
 
-注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考[这里]({{ site.baseurl }}/zh/dev/projectsetup/dependencies.html)
+注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考[这里]({{ site.baseurl }}/zh/getting-started/project-setup/dependencies.html)
 
 ## Consuming or Producing PubSubMessages
 
diff --git a/docs/dev/connectors/rabbitmq.zh.md b/docs/dev/connectors/rabbitmq.zh.md
index e213d3f..26b16ba 100644
--- a/docs/dev/connectors/rabbitmq.zh.md
+++ b/docs/dev/connectors/rabbitmq.zh.md
@@ -43,7 +43,7 @@ Flink 自身既没有复用 "RabbitMQ AMQP Java Client" 的代码,也没有将
 </dependency>
 {% endhighlight %}
 
-注意连接器现在没有包含在二进制发行版中。集群执行的相关信息请参考 [这里]({{site.baseurl}}/zh/dev/projectsetup/dependencies.html).
+注意连接器现在没有包含在二进制发行版中。集群执行的相关信息请参考 [这里]({{site.baseurl}}/zh/getting-started/project-setup/dependencies.html).
 
 ### 安装 RabbitMQ
 安装 RabbitMQ 请参考 [RabbitMQ 下载页面](http://www.rabbitmq.com/download.html)。安装完成之后,服务会自动拉起,应用程序就可以尝试连接到 RabbitMQ 了。
diff --git a/docs/dev/connectors/streamfile_sink.zh.md b/docs/dev/connectors/streamfile_sink.zh.md
index bd74bef..9f027a7 100644
--- a/docs/dev/connectors/streamfile_sink.zh.md
+++ b/docs/dev/connectors/streamfile_sink.zh.md
@@ -122,11 +122,12 @@ input.addSink(sink)
 批量编码 Sink 的创建与行编码 Sink 相似,不过在这里我们不是指定编码器  `Encoder` 而是指定 [BulkWriter.Factory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/api/common/serialization/BulkWriter.Factory.html) 。
 `BulkWriter` 定义了如何添加、刷新元素,以及如何批量编码。
 
-Flink 有三个内置的 BulkWriter Factory :
+Flink 有四个内置的 BulkWriter Factory :
 
  - [ParquetWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/parquet/ParquetWriterFactory.html)
  - [SequenceFileWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.html)
  - [CompressWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/compress/CompressWriterFactory.html)
+ - [OrcBulkWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.html)
 
 <div class="alert alert-info">
      <b>重要:</b> 批量编码模式仅支持 OnCheckpointRollingPolicy 策略, 在每次 checkpoint 的时候切割文件。
@@ -188,6 +189,204 @@ input.addSink(sink)
 </div>
 </div>
 
+#### ORC Format
+
+To enable the data to be bulk encoded in ORC format, Flink offers [OrcBulkWriterFactory]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/formats/orc/writers/OrcBulkWriterFactory.html)
+which takes a concrete implementation of [Vectorizer]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/orc/vector/Vectorizer.html).
+
+Like any other columnar format that encodes data in bulk fashion, Flink's `OrcBulkWriter` writes the input elements in batches. It uses
+ORC's `VectorizedRowBatch` to achieve this.
+
+Since the input element has to be transformed to a `VectorizedRowBatch`, users have to extend the abstract `Vectorizer`
+class and override the `vectorize(T element, VectorizedRowBatch batch)` method. As you can see, the method provides an
+instance of `VectorizedRowBatch` to be used directly by the users so users just have to write the logic to transform the
+input `element` to `ColumnVectors` and set them in the provided `VectorizedRowBatch` instance.
+
+For example, if the input element is of type `Person` which looks like:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+class Person {
+    private final String name;
+    private final int age;
+    ...
+}
+
+{% endhighlight %}
+</div>
+
+Then a child implementation to convert the element of type `Person` and set them in the `VectorizedRowBatch` can be like:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+
+public class PersonVectorizer extends Vectorizer<Person> implements Serializable {
+	public PersonVectorizer(String schema) {
+		super(schema);
+	}
+	@Override
+	public void vectorize(Person element, VectorizedRowBatch batch) throws IOException {
+		BytesColumnVector nameColVector = (BytesColumnVector) batch.cols[0];
+		LongColumnVector ageColVector = (LongColumnVector) batch.cols[1];
+		int row = batch.size++;
+		nameColVector.setVal(row, element.getName().getBytes(StandardCharsets.UTF_8));
+		ageColVector.vector[row] = element.getAge();
+	}
+}
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import java.nio.charset.StandardCharsets
+import org.apache.hadoop.hive.ql.exec.vector.{BytesColumnVector, LongColumnVector}
+
+class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {
+
+  override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
+    val nameColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
+    val ageColVector = batch.cols(1).asInstanceOf[LongColumnVector]
+    nameColVector.setVal(batch.size + 1, element.getName.getBytes(StandardCharsets.UTF_8))
+    ageColVector.vector(batch.size + 1) = element.getAge
+  }
+
+}
+
+{% endhighlight %}
+</div>
+</div>
+
+To use the ORC bulk encoder in an application, users need to add the following dependency:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-orc{{ site.scala_version_suffix }}</artifactId>
+  <version>{{ site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+And then a `StreamingFileSink` that writes data in ORC format can be created like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.orc.writer.OrcBulkWriterFactory;
+
+String schema = "struct<_col0:string,_col1:int>";
+DataStream<Person> stream = ...;
+
+final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(new PersonVectorizer(schema));
+
+final StreamingFileSink<Person> sink = StreamingFileSink
+	.forBulkFormat(outputBasePath, writerFactory)
+	.build();
+
+input.addSink(sink);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.orc.writer.OrcBulkWriterFactory
+
+val schema: String = "struct<_col0:string,_col1:int>"
+val input: DataStream[Person] = ...
+val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema));
+
+val sink: StreamingFileSink[Person] = StreamingFileSink
+    .forBulkFormat(outputBasePath, writerFactory)
+    .build()
+
+input.addSink(sink)
+
+{% endhighlight %}
+</div>
+</div>
+
+OrcBulkWriterFactory can also take Hadoop `Configuration` and `Properties` so that a custom Hadoop configuration and ORC
+writer properties can be provided.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+String schema = ...;
+Configuration conf = ...;
+Properties writerProperties = new Properties();
+
+writerProps.setProperty("orc.compress", "LZ4");
+// Other ORC supported properties can also be set similarly.
+
+final OrcBulkWriterFactory<Person> writerFactory = new OrcBulkWriterFactory<>(
+    new PersonVectorizer(schema), writerProperties, conf);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val schema: String = ...
+val conf: Configuration = ...
+val writerProperties: Properties = new Properties()
+
+writerProps.setProperty("orc.compress", "LZ4")
+// Other ORC supported properties can also be set similarly.
+
+val writerFactory = new OrcBulkWriterFactory(
+    new PersonVectorizer(schema), writerProperties, conf)
+{% endhighlight %}
+</div>
+</div>
+
+The complete list of ORC writer properties can be found [here](https://orc.apache.org/docs/hive-config.html).
+
+Users who want to add user metadata to the ORC files can do so by calling `addUserMetadata(...)` inside the overriding
+`vectorize(...)` method.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+public class PersonVectorizer extends Vectorizer<Person> implements Serializable {
+	@Override
+	public void vectorize(Person element, VectorizedRowBatch batch) throws IOException {
+		...
+		String metadataKey = ...;
+		ByteBuffer metadataValue = ...;
+		this.addUserMetadata(metadataKey, metadataValue);
+	}
+}
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+class PersonVectorizer(schema: String) extends Vectorizer[Person](schema) {
+
+  override def vectorize(element: Person, batch: VectorizedRowBatch): Unit = {
+    ...
+    val metadataKey: String = ...
+    val metadataValue: ByteBuffer = ...
+    addUserMetadata(metadataKey, metadataValue)
+  }
+
+}
+
+{% endhighlight %}
+</div>
+</div>
+
 #### Hadoop SequenceFile 格式
 
 在应用中使用 SequenceFile 批量编码器,你需要添加以下依赖:
diff --git a/docs/dev/connectors/twitter.zh.md b/docs/dev/connectors/twitter.zh.md
index f4d110d..afd3ba8 100644
--- a/docs/dev/connectors/twitter.zh.md
+++ b/docs/dev/connectors/twitter.zh.md
@@ -35,7 +35,7 @@ Flink Streaming 通过一个内置的 `TwitterSource` 类来创建到 tweets 流
 </dependency>
 {% endhighlight %}
 
-注意:当前的二进制发行版还没有这些连接器。集群执行请参考[这里]({{site.baseurl}}/zh/dev/projectsetup/dependencies.html).
+注意:当前的二进制发行版还没有这些连接器。集群执行请参考[这里]({{site.baseurl}}/zh/getting-started/project-setup/dependencies.html).
 
 #### 认证
 使用 Twitter 流,用户需要先注册自己的程序,获取认证相关的必要信息。过程如下:
diff --git a/docs/dev/datastream_api.zh.md b/docs/dev/datastream_api.zh.md
index c95a910..8300827 100644
--- a/docs/dev/datastream_api.zh.md
+++ b/docs/dev/datastream_api.zh.md
@@ -32,19 +32,221 @@ example write the data to files, or to standard output (for example the command
 terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs.
 The execution can happen in a local JVM, or on clusters of many machines.
 
-Please see [basic concepts]({{ site.baseurl }}/dev/api_concepts.html) for an introduction
-to the basic concepts of the Flink API.
-
-In order to create your own Flink DataStream program, we encourage you to start with
-[anatomy of a Flink Program]({{ site.baseurl }}/dev/api_concepts.html#anatomy-of-a-flink-program)
-and gradually add your own
-[stream transformations]({{ site.baseurl }}/dev/stream/operators/index.html). The remaining sections act as references for additional
-operations and advanced features.
+In order to create your own Flink DataStream program, we encourage you to start
+with [anatomy of a Flink Program](#anatomy-of-a-flink-program) and gradually
+add your own [stream transformations]({{ site.baseurl
+}}/dev/stream/operators/index.html). The remaining sections act as references
+for additional operations and advanced features.
 
 
 * This will be replaced by the TOC
 {:toc}
 
+What is a DataStream?
+----------------------
+
+The DataStream API gets its name from the special `DataStream` class that is
+used to represent a collection of data in a Flink program. You can think of
+them as immutable collections of data that can contain duplicates. This data
+can either be finite or unbounded, the API that you use to work on them is the
+same.
+
+A `DataStream` is similar to a regular Java `Collection` in terms of usage but
+is quite different in some key ways. They are immutable, meaning that once they
+are created you cannot add or remove elements. You can also not simply inspect
+the elements inside but only work on them using the `DataStream` API
+operations, which are also called transformations.
+
+You can create an initial `DataStream` by adding a source in a Flink program.
+Then you can derive new streams from this and combine them by using API methods
+such as `map`, `filter`, and so on.
+
+Anatomy of a Flink Program
+--------------------------
+
+Flink programs look like regular programs that transform `DataStreams`.  Each
+program consists of the same basic parts:
+
+1. Obtain an `execution environment`,
+2. Load/create the initial data,
+3. Specify transformations on this data,
+4. Specify where to put the results of your computations,
+5. Trigger the program execution
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+
+We will now give an overview of each of those steps, please refer to the
+respective sections for more details. Note that all core classes of the Java
+DataStream API can be found in {% gh_link
+/flink-streaming-java/src/main/java/org/apache/flink/streaming/api
+"org.apache.flink.streaming.api" %}.
+
+The `StreamExecutionEnvironment` is the basis for all Flink programs. You can
+obtain one using these static methods on `StreamExecutionEnvironment`:
+
+{% highlight java %}
+getExecutionEnvironment()
+
+createLocalEnvironment()
+
+createRemoteEnvironment(String host, int port, String... jarFiles)
+{% endhighlight %}
+
+Typically, you only need to use `getExecutionEnvironment()`, since this will do
+the right thing depending on the context: if you are executing your program
+inside an IDE or as a regular Java program it will create a local environment
+that will execute your program on your local machine. If you created a JAR file
+from your program, and invoke it through the [command line]({{ site.baseurl
+}}/ops/cli.html), the Flink cluster manager will execute your main method and
+`getExecutionEnvironment()` will return an execution environment for executing
+your program on a cluster.
+
+For specifying data sources the execution environment has several methods to
+read from files using various methods: you can just read them line by line, as
+CSV files, or using any of the other provided sources. To just read a text file
+as a sequence of lines, you can use:
+
+{% highlight java %}
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+DataStream<String> text = env.readTextFile("file:///path/to/file");
+{% endhighlight %}
+
+This will give you a DataStream on which you can then apply transformations to create new
+derived DataStreams.
+
+You apply transformations by calling methods on DataStream with a
+transformation functions. For example, a map transformation looks like this:
+
+{% highlight java %}
+DataStream<String> input = ...;
+
+DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
+    @Override
+    public Integer map(String value) {
+        return Integer.parseInt(value);
+    }
+});
+{% endhighlight %}
+
+This will create a new DataStream by converting every String in the original
+collection to an Integer.
+
+Once you have a DataStream containing your final results, you can write it to
+an outside system by creating a sink. These are just some example methods for
+creating a sink:
+
+{% highlight java %}
+writeAsText(String path)
+
+print()
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+
+We will now give an overview of each of those steps, please refer to the
+respective sections for more details. Note that all core classes of the Scala
+DataStream API can be found in {% gh_link
+/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala
+"org.apache.flink.streaming.api.scala" %}.
+
+The `StreamExecutionEnvironment` is the basis for all Flink programs. You can
+obtain one using these static methods on `StreamExecutionEnvironment`:
+
+{% highlight scala %}
+getExecutionEnvironment()
+
+createLocalEnvironment()
+
+createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
+{% endhighlight %}
+
+Typically, you only need to use `getExecutionEnvironment()`, since this will do
+the right thing depending on the context: if you are executing your program
+inside an IDE or as a regular Java program it will create a local environment
+that will execute your program on your local machine. If you created a JAR file
+from your program, and invoke it through the [command line]({{ site.baseurl
+}}/ops/cli.html), the Flink cluster manager will execute your main method and
+`getExecutionEnvironment()` will return an execution environment for executing
+your program on a cluster.
+
+For specifying data sources the execution environment has several methods to
+read from files using various methods: you can just read them line by line, as
+CSV files, or using any of the other provided sources. To just read a text file
+as a sequence of lines, you can use:
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val text: DataStream[String] = env.readTextFile("file:///path/to/file")
+{% endhighlight %}
+
+This will give you a DataStream on which you can then apply transformations to
+create new derived DataStreams.
+
+You apply transformations by calling methods on DataStream with a
+transformation functions. For example, a map transformation looks like this:
+
+{% highlight scala %}
+val input: DataSet[String] = ...
+
+val mapped = input.map { x => x.toInt }
+{% endhighlight %}
+
+This will create a new DataStream by converting every String in the original
+collection to an Integer.
+
+Once you have a DataStream containing your final results, you can write it to
+an outside system by creating a sink. These are just some example methods for
+creating a sink:
+
+{% highlight scala %}
+writeAsText(path: String)
+
+print()
+{% endhighlight %}
+
+</div>
+</div>
+
+Once you specified the complete program you need to **trigger the program
+execution** by calling `execute()` on the `StreamExecutionEnvironment`.
+Depending on the type of the `ExecutionEnvironment` the execution will be
+triggered on your local machine or submit your program for execution on a
+cluster.
+
+The `execute()` method will wait for the job to finish and then return a
+`JobExecutionResult`, this contains execution times and accumulator results.
+
+If you don't want to wait for the job to finish, you can trigger asynchronous
+job execution by calling `executeAysnc()` on the `StreamExecutionEnvironment`.
+It will return a `JobClient` with which you can communicate with the job you
+just submitted. For instance, here is how to implement the semantics of
+`execute()` by using `executeAsync()`.
+
+{% highlight java %}
+final JobClient jobClient = env.executeAsync();
+
+final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
+{% endhighlight %}
+
+That last part about program execution is crucial to understanding when and how
+Flink operations are executed. All Flink programs are executed lazily: When the
+program's main method is executed, the data loading and transformations do not
+happen directly. Rather, each operation is created and added to a dataflow
+graph. The operations are actually executed when the execution is explicitly
+triggered by an `execute()` call on the execution environment.  Whether the
+program is executed locally or on a cluster depends on the type of execution
+environment
+
+The lazy evaluation lets you construct sophisticated programs that Flink
+executes as one holistically planned unit.
+
+{% top %}
 
 Example Program
 ---------------
diff --git a/docs/dev/java_lambdas.zh.md b/docs/dev/java_lambdas.zh.md
index 410a66a..8fbc6bc 100644
--- a/docs/dev/java_lambdas.zh.md
+++ b/docs/dev/java_lambdas.zh.md
@@ -26,7 +26,9 @@ Java 8 引入了几种新的语言特性,旨在实现更快、更清晰的编
 
 <span class="label label-danger">注意</span> Flink 支持对 Java API 的所有算子使用 Lambda 表达式,但是,当 Lambda 表达式使用 Java 泛型时,你需要 *显式* 声明类型信息。
 
-本文档介绍了如何使用 Lambda 表达式并描述了其在当前应用中的限制。有关 Flink API 的通用介绍, 请参阅[编程指南]({{ site.baseurl }}/zh/dev/api_concepts.html)。
+This document shows how to use lambda expressions and describes current
+limitations. For a general introduction to the Flink API, please refer to the
+[DataSteam API overview]({{ site.baseurl }}{% link dev/datastream_api.zh.md %})
 
 ### 示例和限制
 
diff --git a/docs/dev/libs/cep.zh.md b/docs/dev/libs/cep.zh.md
index 3805282..4a2a06c 100644
--- a/docs/dev/libs/cep.zh.md
+++ b/docs/dev/libs/cep.zh.md
@@ -35,7 +35,7 @@ FlinkCEP是在Flink上层实现的复杂事件处理库。
 
 ## 开始
 
-如果你想现在开始尝试,[创建一个Flink程序]({{ site.baseurl }}/zh/dev/projectsetup/dependencies.html),
+如果你想现在开始尝试,[创建一个Flink程序]({{ site.baseurl }}/zh/getting-started/project-setup/dependencies.html),
 添加FlinkCEP的依赖到项目的`pom.xml`文件中。
 
 <div class="codetabs" markdown="1">
@@ -60,7 +60,7 @@ FlinkCEP是在Flink上层实现的复杂事件处理库。
 </div>
 </div>
 
-{% info 提示 %} FlinkCEP不是二进制发布包的一部分。在集群上执行如何链接它可以看[这里]({{site.baseurl}}/zh/dev/projectsetup/dependencies.html)。
+{% info 提示 %} FlinkCEP不是二进制发布包的一部分。在集群上执行如何链接它可以看[这里]({{site.baseurl}}/zh/getting-started/project-setup/dependencies.html)。
 
 现在可以开始使用Pattern API写你的第一个CEP程序了。
 
diff --git a/docs/dev/parallel.zh.md b/docs/dev/parallel.zh.md
index c1b6038..041136d 100644
--- a/docs/dev/parallel.zh.md
+++ b/docs/dev/parallel.zh.md
@@ -73,7 +73,7 @@ env.execute("Word Count Example")
 
 ### 执行环境层次
 
-如[此节]({{ site.baseurl }}/zh/dev/api_concepts.html#anatomy-of-a-flink-program)所描述,Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。
+如[此节]({{ site.baseurl }}{% link dev/datastream_api.zh.md %}#anatomy-of-a-flink-program)所描述,Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。
 
 可以通过调用 `setParallelism()` 方法指定执行环境的默认并行度。如果想以并行度`3`来执行所有的算子、数据源和数据接收器。可以在执行环境上设置默认并行度,如下所示:
 
diff --git a/docs/dev/stream/operators/index.zh.md b/docs/dev/stream/operators/index.zh.md
index 2f54335..d226b01 100644
--- a/docs/dev/stream/operators/index.zh.md
+++ b/docs/dev/stream/operators/index.zh.md
@@ -100,7 +100,7 @@ dataStream.filter(new FilterFunction<Integer>() {
         <tr>
           <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
           <td>
-            <p>Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, <em>keyBy()</em> is implemented with hash partitioning. There are different ways to <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">specify keys</a>.</p>
+            <p>Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, <em>keyBy()</em> is implemented with hash partitioning. There are different ways to <a href="{% link dev/stream/state/state.md %}#keyed-datastream">specify keys</a>.</p>
             <p>
             This transformation returns a <em>KeyedStream</em>, which is, among other things, required to use <a href="{{ site.baseurl }}/dev/stream/state/state.html#keyed-state">keyed state</a>. </p>
 {% highlight java %}
@@ -500,7 +500,7 @@ dataStream.filter { _ != 0 }
           <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
           <td>
             <p>Logically partitions a stream into disjoint partitions, each partition containing elements of the same key.
-            Internally, this is implemented with hash partitioning. See <a href="{{ site.baseurl }}/dev/api_concepts.html#specifying-keys">keys</a> on how to specify keys.
+            Internally, this is implemented with hash partitioning. See <a href="{{ site.baseurl }}/dev/stream/state/state.html#keyed-state">keys</a> on how to specify keys.
             This transformation returns a KeyedStream.</p>
 {% highlight scala %}
 dataStream.keyBy("someKey") // Key by field "someKey"
diff --git a/docs/dev/stream/operators/windows.zh.md b/docs/dev/stream/operators/windows.zh.md
index 72269a4..517f7b3 100644
--- a/docs/dev/stream/operators/windows.zh.md
+++ b/docs/dev/stream/operators/windows.zh.md
@@ -94,7 +94,7 @@ Using the `keyBy(...)` will split your infinite stream into logical keyed stream
 stream is not keyed.
 
 In the case of keyed streams, any attribute of your incoming events can be used as a key
-(more details [here]({{ site.baseurl }}/dev/api_concepts.html#specifying-keys)). Having a keyed stream will
+(more details [here]({% link dev/stream/state/state.zh.md %}#keyed-datastream)). Having a keyed stream will
 allow your windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processed
 independently from the rest. All elements referring to the same key will be sent to the same parallel task.
 
diff --git a/docs/dev/stream/state/checkpointing.zh.md b/docs/dev/stream/state/checkpointing.zh.md
index c940ada..6f22d62 100644
--- a/docs/dev/stream/state/checkpointing.zh.md
+++ b/docs/dev/stream/state/checkpointing.zh.md
@@ -184,7 +184,7 @@ Flink 现在为没有迭代(iterations)的作业提供一致性的处理保
 
 ## 重启策略
 
-Flink 支持不同的重启策略,来控制 job 万一故障时该如何重启。更多信息请阅读 [重启策略]({{ site.baseurl }}/zh/dev/restart_strategies.html)。
+Flink 支持不同的重启策略,来控制 job 万一故障时该如何重启。更多信息请阅读 [重启策略]({{ site.baseurl }}/zh/dev/task_failure_recovery.html)。
 
 {% top %}
 
diff --git a/docs/dev/stream/state/index.zh.md b/docs/dev/stream/state/index.zh.md
index 1b54448..ab8d9ea 100644
--- a/docs/dev/stream/state/index.zh.md
+++ b/docs/dev/stream/state/index.zh.md
@@ -25,23 +25,10 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Stateful functions and operators store data across the processing of individual elements/events, making state a critical building block for
-any type of more elaborate operation.
-
-For example:
-
-  - When an application searches for certain event patterns, the state will store the sequence of events encountered so far.
-  - When aggregating events per minute/hour/day, the state holds the pending aggregates.
-  - When training a machine learning model over a stream of data points, the state holds the current version of the model parameters.
-  - When historic data needs to be managed, the state allows efficient access to events that occurred in the past.
-
-Flink needs to be aware of the state in order to make state fault tolerant using [checkpoints](checkpointing.html) and to allow [savepoints]({{ site.baseurl }}/ops/state/savepoints.html) of streaming applications.
-
-Knowledge about the state also allows for rescaling Flink applications, meaning that Flink takes care of redistributing state across parallel instances.
-
-The [queryable state](queryable_state.html) feature of Flink allows you to access state from outside of Flink during runtime.
-
-When working with state, it might also be useful to read about [Flink's state backends]({{ site.baseurl }}/ops/state/state_backends.html). Flink provides different state backends that specify how and where state is stored. State can be located on Java's heap or off-heap. Depending on your state backend, Flink can also *manage* the state for the application, meaning Flink deals with the memory management (possibly spilling to disk if necessary) to allow applications to hold very large sta [...]
+In this section you will learn about the APIs that Flink provides for writing
+stateful programs. Please take a look at [Stateful Stream
+Processing]({% link concepts/stateful-stream-processing.zh.md %})
+to learn about the concepts behind stateful stream processing.
 
 {% top %}
 
diff --git a/docs/dev/stream/state/queryable_state.zh.md b/docs/dev/stream/state/queryable_state.zh.md
index 1d62efd..6066de0 100644
--- a/docs/dev/stream/state/queryable_state.zh.md
+++ b/docs/dev/stream/state/queryable_state.zh.md
@@ -149,7 +149,7 @@ descriptor.setQueryable("query-name"); // queryable state name
 {% endhighlight %}
 </div>
 
-关于依赖的更多信息, 可以参考如何 [配置 Flink 项目]({{ site.baseurl }}/zh/dev/projectsetup/dependencies.html).
+关于依赖的更多信息, 可以参考如何 [配置 Flink 项目]({{ site.baseurl }}/zh/getting-started/project-setup/dependencies.html).
 
 `QueryableStateClient` 将提交你的请求到内部代理,代理会处理请求并返回结果。客户端的初始化只需要提供一个有效的 `TaskManager` 主机名
 (每个 task manager 上都运行着一个 queryable state 代理),以及代理监听的端口号。关于如何配置代理以及端口号可以参考 [Configuration Section](#configuration).
diff --git a/docs/dev/stream/state/state.zh.md b/docs/dev/stream/state/state.zh.md
index dad89a3..bda020a 100644
--- a/docs/dev/stream/state/state.zh.md
+++ b/docs/dev/stream/state/state.zh.md
@@ -22,51 +22,75 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-本文档主要介绍如何在 Flink 作业中使用状态
+In this section you will learn about the APIs that Flink provides for writing
+stateful programs. Please take a look at [Stateful Stream
+Processing]({% link concepts/stateful-stream-processing.md %})
+to learn about the concepts behind stateful stream processing.
+
 * 目录
 {:toc}
 
-## Keyed State 与 Operator State
-
-Flink 中有两种基本的状态:`Keyed State` 和 `Operator State`。
-
-### Keyed State
-
-*Keyed State* 通常和 key 相关,仅可使用在 `KeyedStream` 的方法和算子中。
+## Keyed DataStream
 
-你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个 key 仅出现在一个分区内。
-逻辑上每个 keyed-state 和唯一元组 <算子并发实例, key> 绑定,由于每个 key 仅"属于"
-算子的一个并发,因此简化为 <算子, key>。
+If you want to use keyed state, you first need to specify a key on a
+`DataStream` that should be used to partition the state (and also the records
+in the stream themselves). You can specify a key using `keyBy(KeySelector)` on
+a `DataStream`. This will yield a `KeyedDataStream`, which then allows
+operations that use keyed state.
 
-Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元;
-Key Group 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group
+A key selector function takes a single record as input and returns the key for
+that record. The key can be of any type and **must** be derived from
+deterministic computations.
 
-### Operator State
+The data model of Flink is not based on key-value pairs. Therefore, you do not
+need to physically pack the data set types into keys and values. Keys are
+"virtual": they are defined as functions over the actual data to guide the
+grouping operator.
 
-对于 *Operator State* (或者 *non-keyed state*) 来说,每个 operator state 和一个并发实例进行绑定。
-[Kafka Connector]({{ site.baseurl }}/zh/dev/connectors/kafka.html) 是 Flink 中使用 operator state 的一个很好的示例。
-每个 Kafka 消费者的并发在 Operator State 中维护一个 topic partition 到 offset 的映射关系。
+The following example shows a key selector function that simply returns the
+field of an object:
 
-Operator State 在 Flink 作业的并发改变后,会重新分发状态,分发的策略和 Keyed State 不一样。
-
-## Raw State 与 Managed State
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// some ordinary POJO
+public class WC {
+  public String word;
+  public int count;
 
-*Keyed State* 和 *Operator State* 分别有两种存在形式:*managed* and *raw*.
+  public String getWord() { return word; }
+}
+DataStream<WC> words = // [...]
+KeyedStream<WC> keyed = words
+  .keyBy(WC::getWord);
+{% endhighlight %}
 
-*Managed State* 由 Flink 运行时控制的数据结构表示,比如内部的 hash table 或者 RocksDB。
-比如 "ValueState", "ListState" 等。Flink runtime 会对这些状态进行编码并写入 checkpoint。
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// some ordinary case class
+case class WC(word: String, count: Int)
+val words: DataStream[WC] = // [...]
+val keyed = words.keyBy( _.word )
+{% endhighlight %}
+</div>
+</div>
 
-*Raw State* 则保存在算子自己的数据结构中。checkpoint 的时候,Flink 并不知晓具体的内容,仅仅写入一串字节序列到 checkpoint。
+### Tuple Keys and Expression Keys
+{:.no_toc}
 
-所有 datastream 的 function 都可以使用 managed state, 但是 raw state 则只能在实现算子的时候使用。
-由于 Flink 可以在修改并发时更好的分发状态数据,并且能够更好的管理内存,因此建议使用 managed state(而不是 raw state)。
+Flink also has two alternative ways of defining keys: tuple keys and expression
+keys. With this you can specify keys using tuple field indices or expressions
+for selecting fields of objects. We don't recommend using these today but you
+can refer to the Javadoc of DataStream to learn about them. Using a KeySelector
+function is strictly superior: with Java lambdas they are easy to use and they
+have potentially less overhead at runtime.
 
-<span class="label label-danger">注意</span> 如果你的 managed state 需要定制化的序列化逻辑,
-为了后续的兼容性请参考 [相应指南](custom_serialization.html),Flink 的默认序列化器不需要用户做特殊的处理。
+{% top %}
 
-## 使用 Managed Keyed State
+## 使用 Keyed State
 
-managed keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 `KeyedStream`
+keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 `KeyedStream`
 上使用,可以通过 `stream.keyBy(...)` 得到 `KeyedStream`.
 
 接下来,我们会介绍不同类型的状态,然后介绍如何使用他们。所有支持的状态类型如下所示:
@@ -101,7 +125,7 @@ managed keyed state 接口提供不同类型状态的访问接口,这些状态
 状态所持有值的类型,并且可能包含用户指定的函数,例如`ReduceFunction`。 根据不同的状态类型,可以创建`ValueStateDescriptor`,`ListStateDescriptor`,
 `ReducingStateDescriptor`,`FoldingStateDescriptor` 或 `MapStateDescriptor`。
 
-状态通过 `RuntimeContext` 进行访问,因此只能在 *rich functions* 中使用。请参阅[这里]({{site.baseurl}}/zh/dev/api_concepts.html#rich-functions)获取相关信息,
+状态通过 `RuntimeContext` 进行访问,因此只能在 *rich functions* 中使用。请参阅[这里]({% link dev/user_defined_functions.zh.md %}#rich-functions)获取相关信息,
 但是我们很快也会看到一个例子。`RichFunction` 中 `RuntimeContext` 提供如下方法:
 
 * `ValueState<T> getState(ValueStateDescriptor<T>)`
@@ -219,7 +243,7 @@ object ExampleCountWindowAverage extends App {
     .print()
   // the printed output will be (1,4) and (1,5)
 
-  env.execute("ExampleManagedState")
+  env.execute("ExampleKeyedState")
 }
 {% endhighlight %}
 </div>
@@ -470,9 +494,44 @@ val counts: DataStream[(String, Int)] = stream
     })
 {% endhighlight %}
 
-## 使用 Managed Operator State
+## Operator State
+
+*Operator State* (or *non-keyed state*) is state that is is bound to one
+parallel operator instance. The [Kafka Connector]({% link
+dev/connectors/kafka.md %}) is a good motivating example for the use of
+Operator State in Flink. Each parallel instance of the Kafka consumer maintains
+a map of topic partitions and offsets as its Operator State.
+
+The Operator State interfaces support redistributing state among parallel
+operator instances when the parallelism is changed. There are different schemes
+for doing this redistribution.
+
+In a typical stateful Flink Application you don't need operators state. It is
+mostly a special type of state that is used in source/sink implementations and
+scenarios where you don't have a key by which state can be partitioned.
+
+## Broadcast State
+
+*Broadcast State* is a special type of *Operator State*.  It was introduced to
+support use cases where records of one stream need to be broadcasted to all
+downstream tasks, where they are used to maintain the same state among all
+subtasks. This state can then be accessed while processing records of a second
+stream. As an example where broadcast state can emerge as a natural fit, one
+can imagine a low-throughput stream containing a set of rules which we want to
+evaluate against all elements coming from another stream. Having the above type
+of use cases in mind, broadcast state differs from the rest of operator states
+in that:
+
+ 1. it has a map format,
+ 2. it is only available to specific operators that have as inputs a
+    *broadcasted* stream and a *non-broadcasted* one, and
+ 3. such an operator can have *multiple broadcast states* with different names.
+
+{% top %}
+
+## 使用 Operator State
 
-用户可以通过实现 `CheckpointedFunction` 或 `ListCheckpointed<T extends Serializable>` 接口来使用 managed operator state。
+用户可以通过实现 `CheckpointedFunction` 接口来使用 operator state。
 
 #### CheckpointedFunction
 
@@ -487,13 +546,14 @@ void initializeState(FunctionInitializationContext context) throws Exception;
 进行 checkpoint 时会调用 `snapshotState()`。 用户自定义函数初始化时会调用 `initializeState()`,初始化包括第一次自定义函数初始化和从之前的 checkpoint 恢复。
 因此 `initializeState()` 不仅是定义不同状态类型初始化的地方,也需要包括状态恢复的逻辑。
 
-当前,managed operator state 以 list 的形式存在。这些状态是一个 *可序列化* 对象的集合 `List`,彼此独立,方便在改变并发后进行状态的重新分派。
+当前 operator state 以 list 的形式存在。这些状态是一个 *可序列化* 对象的集合 `List`,彼此独立,方便在改变并发后进行状态的重新分派。
 换句话说,这些对象是重新分配 non-keyed state 的最细粒度。根据状态的不同访问方式,有如下几种重新分配的模式:
 
   - **Even-split redistribution:** 每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。
     比如说,算子 A 的并发读为 1,包含两个元素 `element1` 和 `element2`,当并发读增加为 2 时,`element1` 会被分到并发 0 上,`element2` 则会被分到并发 1 上。
 
   - **Union redistribution:** 每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,每个算子都将获得所有的状态数据。
+    Do not use this feature if your list may have high cardinality. Checkpoint metadata will store an offset to each list entry, which could lead to RPC framesize or out-of-memory errors.
 
 下面的例子中的 `SinkFunction` 在 `CheckpointedFunction` 中进行数据缓存,然后统一发送到下游,这个例子演示了列表状态数据的 event-split redistribution。 
 
diff --git a/docs/dev/table/common.zh.md b/docs/dev/table/common.zh.md
index c10f2d3..91aab89 100644
--- a/docs/dev/table/common.zh.md
+++ b/docs/dev/table/common.zh.md
@@ -419,7 +419,7 @@ tableEnvironment.sqlUpdate("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
 用户可以指定一个 catalog 和数据库作为 "当前catalog" 和"当前数据库"。有了这些,那么刚刚提到的三元标识符的前两个部分就可以被省略了。如果前两部分的标识符没有指定,
 那么会使用当前的 catalog 和当前数据库。用户也可以通过 Table API 或 SQL 切换当前的 catalog 和当前的数据库。
 
-标识符遵循 SQL 标准,因此使用时需要用反引号(`` ` ``)进行转义。此外,所有 SQL 保留关键字都必须转义。
+标识符遵循 SQL 标准,因此使用时需要用反引号(`` ` ``)进行转义。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1273,7 +1273,7 @@ val table: Table = tableEnv.fromDataStream(stream, $"age" as "myAge", $"name" as
 
 #### POJO 类型 (Java 和 Scala)
 
-Flink 支持 POJO 类型作为复合类型。确定 POJO 类型的规则记录在[这里]({{ site.baseurl }}/zh/dev/api_concepts.html#pojos).
+Flink 支持 POJO 类型作为复合类型。确定 POJO 类型的规则记录在[这里]({{ site.baseurl }}{% link dev/types_serialization.md %}#pojos).
 
 在不指定字段名称的情况下将 POJO 类型的 `DataStream` 或 `DataSet` 转换成 `Table` 时,将使用原始 POJO 类型字段的名称。名称映射需要原始名称,并且不能按位置进行。字段可以使用别名(带有 `as` 关键字)来重命名,重新排序和投影。
 
diff --git a/docs/dev/table/connect.zh.md b/docs/dev/table/connect.zh.md
index ba6a5f7..71b490c 100644
--- a/docs/dev/table/connect.zh.md
+++ b/docs/dev/table/connect.zh.md
@@ -59,6 +59,8 @@ The following tables list all available connectors and formats. Their mutual com
 | CSV (for Kafka)            | `flink-csv`                  | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/{{site.version}}/flink-csv-{{site.version}}-sql-jar.jar) |
 | JSON                       | `flink-json`                 | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar) |
 | Apache Avro                | `flink-avro`                 | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar) |
+| Apache ORC                 | `flink-orc`                  | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-orc{{site.scala_version_suffix}}/{{site.version}}/flink-orc{{site.scala_version_suffix}}-{{site.version}}-jar-with-dependencies.jar) |
+| Apache Parquet             | `flink-parquet`              | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-parquet{{site.scala_version_suffix}}/{{site.version}}/flink-parquet{{site.scala_version_suffix}}-{{site.version}}-jar-with-dependencies.jar) |
 
 {% else %}
 
@@ -929,15 +931,15 @@ CREATE TABLE MyUserTable (
   'connector.hosts' = 'http://host_name:9092;http://host_name:9093',  -- required: one or more Elasticsearch hosts to connect to
 
   'connector.index' = 'myusers',       -- required: Elasticsearch index. Flink supports both static index and dynamic index.
-                                       -- If you want to have a static index, this option value should be a plain string, 
+                                       -- If you want to have a static index, this option value should be a plain string,
                                        -- e.g. 'myusers', all the records will be consistently written into "myusers" index.
                                        -- If you want to have a dynamic index, you can use '{field_name}' to reference a field
-                                       -- value in the record to dynamically generate a target index. You can also use 
+                                       -- value in the record to dynamically generate a target index. You can also use
                                        -- '{field_name|date_format_string}' to convert a field value of TIMESTAMP/DATE/TIME type
-                                       -- into the format specified by date_format_string. The date_format_string is 
+                                       -- into the format specified by date_format_string. The date_format_string is
                                        -- compatible with Java's [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/index.html).
-                                       -- For example, if the option value is 'myusers-{log_ts|yyyy-MM-dd}', then a 
-                                       -- record with log_ts field value 2020-03-27 12:25:55 will be written into 
+                                       -- For example, if the option value is 'myusers-{log_ts|yyyy-MM-dd}', then a
+                                       -- record with log_ts field value 2020-03-27 12:25:55 will be written into
                                        -- "myusers-2020-03-27" index.
 
   'connector.document-type' = 'user',  -- required: Elasticsearch document type
@@ -969,11 +971,11 @@ CREATE TABLE MyUserTable (
                                               -- per bulk request
                                               -- (only MB granularity is supported)
   'connector.bulk-flush.interval' = '60000',  -- optional: bulk flush interval (in milliseconds)
-  'connector.bulk-flush.back-off.type' = '...',       -- optional: backoff strategy ("disabled" by default)
+  'connector.bulk-flush.backoff.type' = '...',       -- optional: backoff strategy ("disabled" by default)
                                                       -- valid strategies are "disabled", "constant",
                                                       -- or "exponential"
-  'connector.bulk-flush.back-off.max-retries' = '3',  -- optional: maximum number of retries
-  'connector.bulk-flush.back-off.delay' = '30000',    -- optional: delay between each backoff attempt
+  'connector.bulk-flush.backoff.max-retries' = '3',  -- optional: maximum number of retries
+  'connector.bulk-flush.backoff.delay' = '30000',    -- optional: delay between each backoff attempt
                                                       -- (in milliseconds)
 
   -- optional: connection properties to be used during REST communication to Elasticsearch
@@ -1199,6 +1201,27 @@ CREATE TABLE MyUserTable (
 )
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python%}
+.connect(
+    HBase()
+    .version('1.4.3')                      # required: currently only support '1.4.3'
+    .table_name('hbase_table_name')        # required: HBase table name
+    .zookeeper_quorum('localhost:2181')    # required: HBase Zookeeper quorum configuration
+    .zookeeper_node_parent('/test')        # optional: the root dir in Zookeeper for Hbae cluster.
+                                           # The default value is '/hbase'
+    .write_buffer_flush_max_size('10mb')   # optional: writing option, determines how many size in memory of buffered
+                                           # rows to insert per round trip. This can help performance on writing to JDBC
+                                           # database. The default value is '2mb'
+    .write_buffer_flush_max_rows(1000)     # optional: writing option, determines how many rows to insert per round trip.
+                                           # This can help performance on writing to JDBC database. No default value,
+                                           # i.e. the default flushing is not depends on the number of buffered rows.
+    .write_buffer_flush_interval('2s')     # optional: writing option, sets a flush interval flushing buffered requesting
+                                           # if the interval passes, in milliseconds. Default value is '0s', which means
+                                           # no asynchronous flush thread will he scheduled.
+)
+{% endhighlight%}
+</div>
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 connector:
@@ -1594,9 +1617,8 @@ CREATE TABLE MyUserTable (
   'format.type' = 'json',                   -- required: specify the format type
   'format.fail-on-missing-field' = 'true',  -- optional: flag whether to fail if a field is missing or not,
                                             -- 'false' by default
-  'format.ignore-parse-errors' = 'true'     -- optional: skip fields and rows with parse errors instead of failing;
+  'format.ignore-parse-errors' = 'true',    -- optional: skip fields and rows with parse errors instead of failing;
                                             -- fields are set to null in case of errors
-
   -- deprecated: define the schema explicitly using JSON schema which parses to DECIMAL and TIMESTAMP.
   'format.json-schema' =
     '{
diff --git a/docs/dev/table/index.zh.md b/docs/dev/table/index.zh.md
index 8613b11..561890d 100644
--- a/docs/dev/table/index.zh.md
+++ b/docs/dev/table/index.zh.md
@@ -27,7 +27,7 @@ under the License.
 
 Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。Flink SQL 是基于 [Apache Calcite](https://calcite.apache.org) 来实现的标准 SQL。这两种 API 中的查询对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。
 
-Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。你可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先用 [CEP]({{ site.baseurl }}/zh/dev/libs/cep.html) 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 [Gelly 图算法]({{ site.baseurl }}/zh/dev/libs/gelly) 来处理已经预处理好的数据。
+Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。你可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先用 [CEP]({{ site.baseurl }}/zh/dev/libs/cep.html) 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 [Gelly 图算法]({{ site.baseurl }}/zh/dev/libs/gelly/index.html) 来处理已经预处理好的数据。
 
 **注意:Table API 和 SQL 现在还处于活跃开发阶段,还没有完全实现所有的特性。不是所有的 \[Table API,SQL\] 和 \[流,批\] 的组合都是支持的。**
 
diff --git a/docs/dev/table/sqlClient.zh.md b/docs/dev/table/sqlClient.zh.md
index c234390..d2cacbe 100644
--- a/docs/dev/table/sqlClient.zh.md
+++ b/docs/dev/table/sqlClient.zh.md
@@ -347,7 +347,7 @@ CLI commands > session environment file > defaults environment file
 
 #### 重启策略(Restart Strategies)
 
-重启策略控制 Flink 作业失败时的重启方式。与 Flink 集群的[全局重启策略]({{ site.baseurl }}/zh/dev/restart_strategies.html)相似,更细精度的重启配置可以在环境配置文件中声明。
+重启策略控制 Flink 作业失败时的重启方式。与 Flink 集群的[全局重启策略]({{ site.baseurl }}/zh/dev/task_failure_recovery.html)相似,更细精度的重启配置可以在环境配置文件中声明。
 
 Flink 支持以下策略:
 
@@ -600,7 +600,7 @@ Job ID: 6f922fe5cba87406ff23ae4a7bb79044
 Web interface: http://localhost:8081
 {% endhighlight %}
 
-<span class="label label-danger">注意</span> 提交后,SQL 客户端不追踪正在运行的 Flink 作业状态。提交后可以关闭 CLI 进程,并且不会影响分离的查询。Flink 的[重启策略]({{ site.baseurl }}/zh/dev/restart_strategies.html)负责容错。取消查询可以用 Flink 的 web 接口、命令行或 REST API 。
+<span class="label label-danger">注意</span> 提交后,SQL 客户端不追踪正在运行的 Flink 作业状态。提交后可以关闭 CLI 进程,并且不会影响分离的查询。Flink 的[重启策略]({{ site.baseurl }}/zh/dev/task_failure_recovery.html)负责容错。取消查询可以用 Flink 的 web 接口、命令行或 REST API 。
 
 {% top %}
 
diff --git a/docs/dev/types_serialization.zh.md b/docs/dev/types_serialization.zh.md
index 72c70cc..5be877f 100644
--- a/docs/dev/types_serialization.zh.md
+++ b/docs/dev/types_serialization.zh.md
@@ -30,6 +30,206 @@ Apache Flink 以其独特的方式来处理数据类型以及序列化,这种
 * This will be replaced by the TOC
 {:toc}
 
+## Supported Data Types
+
+Flink places some restrictions on the type of elements that can be in a DataSet or DataStream.
+The reason for this is that the system analyzes the types to determine
+efficient execution strategies.
+
+There are seven different categories of data types:
+
+1. **Java Tuples** and **Scala Case Classes**
+2. **Java POJOs**
+3. **Primitive Types**
+4. **Regular Classes**
+5. **Values**
+6. **Hadoop Writables**
+7. **Special Types**
+
+#### Tuples and Case Classes
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+Tuples are composite types that contain a fixed number of fields with various types.
+The Java API provides classes from `Tuple1` up to `Tuple25`. Every field of a tuple
+can be an arbitrary Flink type including further tuples, resulting in nested tuples. Fields of a
+tuple can be accessed directly using the field's name as `tuple.f4`, or using the generic getter method
+`tuple.getField(int position)`. The field indices start at 0. Note that this stands in contrast
+to the Scala tuples, but it is more consistent with Java's general indexing.
+
+{% highlight java %}
+DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
+    new Tuple2<String, Integer>("hello", 1),
+    new Tuple2<String, Integer>("world", 2));
+
+wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
+    @Override
+    public Integer map(Tuple2<String, Integer> value) throws Exception {
+        return value.f1;
+    }
+});
+
+wordCounts.keyBy(0); // also valid .keyBy("f0")
+
+
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+
+Scala case classes (and Scala tuples which are a special case of case classes), are composite types that contain a fixed number of fields with various types. Tuple fields are addressed by their 1-offset names such as `_1` for the first field. Case class fields are accessed by their name.
+
+{% highlight scala %}
+case class WordCount(word: String, count: Int)
+val input = env.fromElements(
+    WordCount("hello", 1),
+    WordCount("world", 2)) // Case Class Data Set
+
+input.keyBy("word")// key by field expression "word"
+
+val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
+
+input2.keyBy(0, 1) // key by field positions 0 and 1
+{% endhighlight %}
+
+</div>
+</div>
+
+#### POJOs
+
+Java and Scala classes are treated by Flink as a special POJO data type if they fulfill the following requirements:
+
+- The class must be public.
+
+- It must have a public constructor without arguments (default constructor).
+
+- All fields are either public or must be accessible through getter and setter functions. For a field called `foo` the getter and setter methods must be named `getFoo()` and `setFoo()`.
+
+- The type of a field must be supported by a registered serializer.
+
+POJOs are generally represented with a `PojoTypeInfo` and serialized with the `PojoSerializer` (using [Kryo](https://github.com/EsotericSoftware/kryo) as configurable fallback).
+The exception is when the POJOs are actually Avro types (Avro Specific Records) or produced as "Avro Reflect Types".
+In that case the POJO's are represented by an `AvroTypeInfo` and serialized with the `AvroSerializer`.
+You can also register your own custom serializer if required; see [Serialization](https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#serialization-of-pojo-types) for further information.
+
+Flink analyzes the structure of POJO types, i.e., it learns about the fields of a POJO. As a result POJO types are easier to use than general types. Moreover, Flink can process POJOs more efficiently than general types.
+
+The following example shows a simple POJO with two public fields.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class WordWithCount {
+
+    public String word;
+    public int count;
+
+    public WordWithCount() {}
+
+    public WordWithCount(String word, int count) {
+        this.word = word;
+        this.count = count;
+    }
+}
+
+DataStream<WordWithCount> wordCounts = env.fromElements(
+    new WordWithCount("hello", 1),
+    new WordWithCount("world", 2));
+
+wordCounts.keyBy("word"); // key by field expression "word"
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class WordWithCount(var word: String, var count: Int) {
+    def this() {
+      this(null, -1)
+    }
+}
+
+val input = env.fromElements(
+    new WordWithCount("hello", 1),
+    new WordWithCount("world", 2)) // Case Class Data Set
+
+input.keyBy("word")// key by field expression "word"
+
+{% endhighlight %}
+</div>
+</div>
+
+#### Primitive Types
+
+Flink supports all Java and Scala primitive types such as `Integer`, `String`, and `Double`.
+
+#### General Class Types
+
+Flink supports most Java and Scala classes (API and custom).
+Restrictions apply to classes containing fields that cannot be serialized, like file pointers, I/O streams, or other native
+resources. Classes that follow the Java Beans conventions work well in general.
+
+All classes that are not identified as POJO types (see POJO requirements above) are handled by Flink as general class types.
+Flink treats these data types as black boxes and is not able to access their content (e.g., for efficient sorting). General types are de/serialized using the serialization framework [Kryo](https://github.com/EsotericSoftware/kryo).
+
+#### Values
+
+*Value* types describe their serialization and deserialization manually. Instead of going through a
+general purpose serialization framework, they provide custom code for those operations by means of
+implementing the `org.apache.flinktypes.Value` interface with the methods `read` and `write`. Using
+a Value type is reasonable when general purpose serialization would be highly inefficient. An
+example would be a data type that implements a sparse vector of elements as an array. Knowing that
+the array is mostly zero, one can use a special encoding for the non-zero elements, while the
+general purpose serialization would simply write all array elements.
+
+The `org.apache.flinktypes.CopyableValue` interface supports manual internal cloning logic in a
+similar way.
+
+Flink comes with pre-defined Value types that correspond to basic data types. (`ByteValue`,
+`ShortValue`, `IntValue`, `LongValue`, `FloatValue`, `DoubleValue`, `StringValue`, `CharValue`,
+`BooleanValue`). These Value types act as mutable variants of the basic data types: Their value can
+be altered, allowing programmers to reuse objects and take pressure off the garbage collector.
+
+
+#### Hadoop Writables
+
+You can use types that implement the `org.apache.hadoop.Writable` interface. The serialization logic
+defined in the `write()`and `readFields()` methods will be used for serialization.
+
+#### Special Types
+
+You can use special types, including Scala's `Either`, `Option`, and `Try`.
+The Java API has its own custom implementation of `Either`.
+Similarly to Scala's `Either`, it represents a value of two possible types, *Left* or *Right*.
+`Either` can be useful for error handling or operators that need to output two different types of records.
+
+#### Type Erasure & Type Inference
+
+*Note: This Section is only relevant for Java.*
+
+The Java compiler throws away much of the generic type information after compilation. This is
+known as *type erasure* in Java. It means that at runtime, an instance of an object does not know
+its generic type any more. For example, instances of `DataStream<String>` and `DataStream<Long>` look the
+same to the JVM.
+
+Flink requires type information at the time when it prepares the program for execution (when the
+main method of the program is called). The Flink Java API tries to reconstruct the type information
+that was thrown away in various ways and store it explicitly in the data sets and operators. You can
+retrieve the type via `DataStream.getType()`. The method returns an instance of `TypeInformation`,
+which is Flink's internal way of representing types.
+
+The type inference has its limits and needs the "cooperation" of the programmer in some cases.
+Examples for that are methods that create data sets from collections, such as
+`ExecutionEnvironment.fromCollection(),` where you can pass an argument that describes the type. But
+also generic functions like `MapFunction<I, O>` may need extra type information.
+
+The
+{% gh_link /flink-core/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java "ResultTypeQueryable" %}
+interface can be implemented by input formats and functions to tell the API
+explicitly about their return type. The *input types* that the functions are invoked with can
+usually be inferred by the result types of the previous operations.
+
+{% top %}
 
 ## Flink 中的类型处理
 
diff --git a/docs/dev/user_defined_functions.zh.md b/docs/dev/user_defined_functions.zh.md
index bdfbe54..a1b52a0 100644
--- a/docs/dev/user_defined_functions.zh.md
+++ b/docs/dev/user_defined_functions.zh.md
@@ -1,5 +1,5 @@
 ---
-title: 'User-Defined Functions'
+title: '用户自定义函数'
 nav-id: user_defined_function
 nav-parent_id: streaming
 nav-pos: 4
diff --git a/docs/getting-started/index.zh.md b/docs/getting-started/index.zh.md
index cb81bbc..e27119c 100644
--- a/docs/getting-started/index.zh.md
+++ b/docs/getting-started/index.zh.md
@@ -27,29 +27,54 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-上手使用 Apache Flink 有很多方式,哪一个最适合你取决于你的目标和以前的经验。
+There are many ways to get started with Apache Flink. Which one is the best for
+you depends on your goals and prior experience:
 
-### 初识 Flink
+* take a look at the **Docker Playgrounds** if you want to see what Flink can do, via a hands-on,
+  docker-based introduction to specific Flink concepts
+* explore one of the **Code Walkthroughs** if you want a quick, end-to-end
+  introduction to one of Flink's APIs
+* work your way through the **Hands-on Training** for a comprehensive,
+  step-by-step introduction to Flink
+* use **Project Setup** if you already know the basics of Flink and want a
+  project template for Java or Scala, or need help setting up the dependencies
 
-通过 **Docker Playgrounds** 提供沙箱的Flink环境,你只需花几分钟做些简单设置,就可以开始探索和使用 Flink。
+### Taking a first look at Flink
 
-* [**Operations Playground**](./docker-playgrounds/flink-operations-playground.html) 向你展示如何使用 Flink 编写数据流应用程序。你可以体验 Flink 如何从故障中恢复应用程序,升级、提高并行度、降低并行度和监控运行的状态指标等特性。
+The **Docker Playgrounds** provide sandboxed Flink environments that are set up in just a few minutes and which allow you to explore and play with Flink.
+
+* The [**Operations Playground**]({% link getting-started/docker-playgrounds/flink-operations-playground.md %}) shows you how to operate streaming applications with Flink. You can experience how Flink recovers application from failures, upgrade and scale streaming applications up and down, and query application metrics.
 
 <!--
 * The [**Streaming SQL Playground**]() provides a Flink cluster with a SQL CLI client, tables which are fed by streaming data sources, and instructions for how to run continuous streaming SQL queries on these tables. This is the perfect environment for your first steps with streaming SQL.
 -->
 
-### Flink API 入门
+### First steps with one of Flink's APIs
 
-**代码练习**是入门的最佳方式,通过代码练习可以逐步深入理解 Flink API。
-下边的例子演示了如何使用 Flink 的代码框架开始构建一个基础的 Flink 项目,和如何逐步将其扩展为一个简单的应用程序。
+The **Code Walkthroughs** are a great way to get started quickly with a step-by-step introduction to
+one of Flink's APIs. Each walkthrough provides instructions for bootstrapping a small skeleton
+project, and then shows how to extend it to a simple application.
 
-<!--
-* The [**DataStream API**]() code walkthrough shows how to implement a simple DataStream application and how to extend it to be stateful and use timers.
--->
-* [**DataStream API 示例**](./walkthroughs/datastream_api.html) 展示了如何编写一个基本的 DataStream 应用程序。 DataStream API 是 Flink 的主要抽象,用于通过 Java 或 Scala 实现具有复杂时间语义的有状态数据流处理的应用程序。
+* The [**DataStream API**  code walkthrough]({% link getting-started/walkthroughs/datastream_api.md %}) shows how
+  to implement a simple DataStream application and how to extend it to be stateful and use timers.
+  The DataStream API is Flink's main abstraction for implementing stateful streaming applications
+  with sophisticated time semantics in Java or Scala.
+
+* Flink's **Table API** is a relational API used for writing SQL-like queries in Java, Scala, or
+  Python, which are then automatically optimized, and can be executed on batch or streaming data
+  with identical syntax and semantics. The [Table API code walkthrough for Java and Scala]({% link
+  getting-started/walkthroughs/table_api.md %}) shows how to implement a simple Table API query on a
+  batch source and how to evolve it into a continuous query on a streaming source. There's also a
+  similar [code walkthrough for the Python Table API]({% link
+  getting-started/walkthroughs/python_table_api.md %}).
+
+### Taking a Deep Dive with the Hands-on Training
 
-* [**Table API 示例**](./walkthroughs/table_api.html) 演示了如何在批处中使用简单的 Table API 进行查询,以及如何将其扩展为流处理中的查询。Table API 是 Flink 的语言嵌入式关系 API,用于在 Java 或 Scala 中编写类 SQL 的查询,这些查询会自动进行优化。Table API 查询可以使用一致的语法和语义同时在批处理或流数据上运行。
+The [**Hands-on Training**]({% link training/index.md %}) is a self-paced training course with
+a set of lessons and hands-on exercises. This step-by-step introduction to Flink focuses
+on learning how to use the DataStream API to meet the needs of common, real-world use cases,
+and provides a complete introduction to the fundamental concepts: parallel dataflows,
+stateful stream processing, event time and watermarking, and fault tolerance via state snapshots.
 
 <!--
 ### Starting a new Flink application
diff --git a/docs/getting-started/walkthroughs/python_table_api.zh.md b/docs/getting-started/walkthroughs/python_table_api.zh.md
index 34a0170..48ab506 100644
--- a/docs/getting-started/walkthroughs/python_table_api.zh.md
+++ b/docs/getting-started/walkthroughs/python_table_api.zh.md
@@ -72,6 +72,33 @@ t_env.connect(FileSystem().path('/tmp/output')) \
     .create_temporary_table('mySink')
 {% endhighlight %}
 
+You can also use the TableEnvironment.sql_update() method to register a source/sink table defined in DDL:
+{% highlight python %}
+my_source_ddl = """
+    create table mySource (
+        word VARCHAR
+    ) with (
+        'connector.type' = 'filesystem',
+        'format.type' = 'csv',
+        'connector.path' = '/tmp/input'
+    )
+"""
+
+my_sink_ddl = """
+    create table mySink (
+        word VARCHAR,
+        `count` BIGINT
+    ) with (
+        'connector.type' = 'filesystem',
+        'format.type' = 'csv',
+        'connector.path' = '/tmp/output'
+    )
+"""
+
+t_env.sql_update(my_source_ddl)
+t_env.sql_update(my_sink_ddl)
+{% endhighlight %}
+
 上面的程序展示了如何创建及在`ExecutionEnvironment`中注册表名分别为`mySource`和`mySink`的表。
 其中,源表`mySource`有一列: word,该表代表了从输入文件`/tmp/input`中读取的单词;
 结果表`mySink`有两列: word和count,该表会将计算结果输出到文件`/tmp/output`中,字段之间使用`\t`作为分隔符。
diff --git a/docs/index.zh.md b/docs/index.zh.md
index 76e6d45..f2b1bc8 100644
--- a/docs/index.zh.md
+++ b/docs/index.zh.md
@@ -47,7 +47,6 @@ Apache Flink 是一个分布式流批一体化的开源平台。Flink 的核心
 
 API 参考列举并解释了 Flink API 的所有功能。
 
-* [基本 API 概念](dev/api_concepts.html)
 * [DataStream API](dev/datastream_api.html)
 * [DataSet API](dev/batch/index.html)
 * [Table API &amp; SQL](dev/table/index.html)
diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index 29c6e70..8a59edf 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -29,7 +29,7 @@ Flink exposes a metric system that allows gathering and exposing metrics to exte
 
 ## Registering metrics
 
-You can access the metric system from any user function that extends [RichFunction]({{ site.baseurl }}/zh/dev/user_defined_functions.html#rich-functions) by calling `getRuntimeContext().getMetricGroup()`.
+You can access the metric system from any user function that extends [RichFunction]({% link dev/user_defined_functions.zh.md %}#rich-functions) by calling `getRuntimeContext().getMetricGroup()`.
 This method returns a `MetricGroup` object on which you can create and register new metrics.
 
 ### Metric types
@@ -737,7 +737,7 @@ Please see the [Prometheus documentation](https://prometheus.io/docs/practices/p
 
 ### StatsD (org.apache.flink.metrics.statsd.StatsDReporter)
 
-In order to use this reporter you must copy `/opt/flink-metrics-statsd-{{site.version}}.jar` into the `/plugins/statsd` 
+In order to use this reporter you must copy `/opt/flink-metrics-statsd-{{site.version}}.jar` into the `/plugins/statsd`
 folder of your Flink distribution.
 
 Parameters:
@@ -769,6 +769,7 @@ Parameters:
 - `tags` - (optional) the global tags that will be applied to metrics when sending to Datadog. Tags should be separated by comma only
 - `proxyHost` - (optional) The proxy host to use when sending to Datadog.
 - `proxyPort` - (optional) The proxy port to use when sending to Datadog, defaults to 8080.
+- `dataCenter` - (optional) The data center (`EU`/`US`) to connect to, defaults to `US`.
 
 Example configuration:
 
@@ -779,6 +780,7 @@ metrics.reporter.dghttp.apikey: xxx
 metrics.reporter.dghttp.tags: myflinkapp,prod
 metrics.reporter.dghttp.proxyHost: my.web.proxy.com
 metrics.reporter.dghttp.proxyPort: 8080
+metrics.reporter.dhhttp.dataCenter: US
 
 {% endhighlight %}