You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/11 09:13:19 UTC

[flink] 02/02: [FLINK-17832][docs][es] Add documentation for the new Elasticsearch connector

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit af2648ce0d7214d29d2da2ee7c9baaaf940ccd77
Author: Jark Wu <ja...@apache.org>
AuthorDate: Wed Jun 10 18:03:59 2020 +0800

    [FLINK-17832][docs][es] Add documentation for the new Elasticsearch connector
    
    This closes #12579
---
 docs/dev/table/connectors/elasticsearch.md    | 266 ++++++++++++++++++++++++++
 docs/dev/table/connectors/elasticsearch.zh.md | 266 ++++++++++++++++++++++++++
 docs/dev/table/connectors/formats/index.md    |  14 +-
 docs/dev/table/connectors/formats/index.zh.md |  14 +-
 docs/dev/table/connectors/index.md            |   4 +-
 docs/dev/table/connectors/index.zh.md         |   4 +-
 6 files changed, 550 insertions(+), 18 deletions(-)

diff --git a/docs/dev/table/connectors/elasticsearch.md b/docs/dev/table/connectors/elasticsearch.md
new file mode 100644
index 0000000..5ea587ab
--- /dev/null
+++ b/docs/dev/table/connectors/elasticsearch.md
@@ -0,0 +1,266 @@
+---
+title: "Elasticsearch SQL Connector"
+nav-title: Elasticsearch
+nav-parent_id: sql-connectors
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Sink: Batch</span>
+<span class="label label-primary">Sink: Streaming Append & Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Elasticsearch connector allows for writing into an index of the Elasticsearch engine. This document describes how to setup the Elasticsearch Connector to run SQL queries against Elasticsearch.
+
+The connector can operate in upsert mode for exchanging UPDATE/DELETE messages with the external system using the primary key defined on the DDL.
+
+If no primary key is defined on the DDL, the connector can only operate in append mode for exchanging INSERT only messages with external system.
+
+Dependencies
+------------
+
+In order to setup the Elasticsearch connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
+
+| Elasticsearch Version   | Maven dependency                                                   | SQL Client JAR         |
+| :---------------------- | :----------------------------------------------------------------- | :----------------------|
+| 6.x                     | `flink-connector-elasticsearch6{{site.scala_version_suffix}}`      | {% if site.is_stable %} [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}.jar) {% else %} Only available for stable releases {% endif %}|
+| 7.x and later versions  | `flink-connector-elasticsearch7{{site.scala_version_suffix}}`      | {% if site.is_stable %} [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-elasticsearch7{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch7{{site.scala_version_suffix}}-{{site.version}}.jar) {% else %} Only available for stable releases {% endif %}|
+
+<br>
+<span class="label label-danger">Attention</span> Elasticsearch connector works with JSON format which defines how to encode documents for the external system, therefore, it must be added as a [dependency]({% link dev/table/connectors/formats/index.md %}).
+
+How to create an Elasticsearch table
+----------------
+
+The example below shows how to create an Elasticsearch sink table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE myUserTable (
+  user_id STRING,
+  user_name STRING
+  uv BIGINT,
+  pv BIGINT,
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'elasticsearch-7',
+  'hosts' = 'http://localhost:9200',
+  'index' = 'users'
+);
+{% endhighlight %}
+</div>
+</div>
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 50%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use, valid values are:
+      <ul>
+      <li><code>elasticsearch-6</code>: connect to Elasticsearch 6.x cluster</li>
+      <li><code>elasticsearch-7</code>: connect to Elasticsearch 7.x and later versions cluster</li>
+      </ul></td>
+    </tr>
+    <tr>
+      <td><h5>hosts</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>One or more Elasticsearch hosts to connect to, e.g. <code>'http://host_name:9092;http://host_name:9093'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>index</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Elasticsearch index for every record. Can be a static index (e.g. <code>'myIndex'</code>) or
+       a dynamic index (e.g. <code>'index-{log_ts|yyyy-MM-dd}'</code>).
+       See the following <a href="#dynamic-index">Dynamic Index</a> section for more details.</td>
+    </tr>
+    <tr>
+      <td><h5>document-type</h5></td>
+      <td>required in 6.x</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Elasticsearch document type. Not necessary anymore in <code>elasticsearch-7</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>document-id.key-delimiter</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">_</td>
+      <td>String</td>
+      <td>Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"."</td>
+    </tr>
+    <tr>
+      <td><h5>failure-handler</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">fail</td>
+      <td>String</td>
+      <td>Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
+      <ul>
+        <li><code>fail</code>: throws an exception if a request fails and thus causes a job failure.</li>
+        <li><code>ignore</code>: ignores failures and drops the request.</li>
+        <li><code>retry_rejected</code>: re-adds requests that have failed due to queue capacity saturation.</li>
+        <li>custom class name: for failure handling with a ActionRequestFailureHandler subclass.</li>
+      </ul>
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-on-checkpoint</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests
+       to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong
+       guarantees for at-least-once delivery of action requests.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.max-actions</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">1000</td>
+      <td>Integer</td>
+      <td>Maximum number of buffered actions per bulk request.
+      Can be set to <code>'0'</code> to disable it.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.max-size</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">2mb</td>
+      <td>MemorySize</td>
+      <td>Maximum size in memory of buffered actions per bulk request. Must be in MB granularity.
+      Can be set to <code>'0'</code> to disable it.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.interval</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">1s</td>
+      <td>Duration</td>
+      <td>The interval to flush buffered actions.
+        Can be set to <code>'0'</code> to disable it. Note, both <code>'sink.bulk-flush.max-size'</code> and <code>'sink.bulk-flush.max-actions'</code>
+        can be set to <code>'0'</code> with the flush interval set allowing for complete async processing of buffered actions.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.strategy</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">DISABLED</td>
+      <td>String</td>
+      <td>Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
+      <ul>
+        <li><code>DISABLED</code>: no retry performed, i.e. fail after the first request error.</li>
+        <li><code>CONSTANT</code>: wait for backoff delay between retries.</li>
+        <li><code>EXPONENTIAL</code>: initially wait for backoff delay and increase exponentially between retries.</li>
+      </ul>
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.max-retries</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">8</td>
+      <td>Integer</td>
+      <td>Maximum number of backoff retries.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.delay</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">50ms</td>
+      <td>Duration</td>
+      <td>Delay between each backoff attempt. For <code>CONSTANT</code> backoff, this is simply the delay between each retry. For <code>EXPONENTIAL</code> backoff, this is the initial base delay.</td>
+    </tr>
+    <tr>
+      <td><h5>connection.max-retry-timeout</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>Maximum timeout between retries.</td>
+    </tr>
+    <tr>
+      <td><h5>connection.path-prefix</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Prefix string to be added to every REST communication, e.g., <code>'/v1'</code></td>
+    </tr>
+    <tr>
+      <td><h5>format</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">json</td>
+      <td>String</td>
+      <td>Elasticsearch connector supports to specify a format. The format must produce a valid json document.
+       By default uses built-in <code>'json'</code> format. Please refer to <a href="{% link dev/table/connectors/formats/index.md %}">JSON Format</a> page for more details.
+      </td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+----------------
+
+### Key Handling
+
+Elasticsearch sink can work in either upsert mode or append mode, it depends on whether primary key is defined.
+If primary key is defined, Elasticsearch sink works in upsert mode which can consume queries containing UPDATE/DELETE messages.
+If primary key is not defined, Elasticsearch sink works in append mode which can only consume queries containing INSERT only messages.
+
+In Elasticsearch connector, the primary key is used to calculate the Elasticsearch document id, which is a string of up to 512 bytes. It cannot have whitespaces.
+The Elasticsearch connector generates a document ID string for every row by concatenating all primary key fields in the order defined in the DDL using a key delimiter specified by `document-id.key-delimiter`.
+Certain types are not allowed as primary key field as they do not have a good string representation, e.g. `BYTES`, `ROW`, `ARRAY`, `MAP`, etc.
+If no primary key is specified, Elasticsearch will generate a document id automatically.
+
+See [CREATE TABLE DDL]({% link dev/table/sql/create.md %}#create-table) for more details about PRIMARY KEY syntax.
+
+### Dynamic Index
+
+Elasticsearch sink supports both static index and dynamic index.
+
+If you want to have a static index, the `index` option value should be a plain string, e.g. `'myusers'`, all the records will be consistently written into "myusers" index.
+
+If you want to have a dynamic index, you can use `{field_name}` to reference a field value in the record to dynamically generate a target index.
+You can also use `'{field_name|date_format_string}'` to convert a field value of `TIMESTAMP/DATE/TIME` type into the format specified by the `date_format_string`.
+The `date_format_string` is compatible with Java's [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/index.html).
+For example, if the option value is `'myusers-{log_ts|yyyy-MM-dd}'`, then a record with `log_ts` field value `2020-03-27 12:25:55` will be written into "myusers-2020-03-27" index.
+
+
+Data Type Mapping
+----------------
+
+Elasticsearch stores document in a JSON string. So the data type mapping is between Flink data type and JSON data type.
+Flink uses built-in `'json'` format for Elasticsearch connector. Please refer to <a href="{% link dev/table/connectors/formats/index.md %}">JSON Format</a> page for more type mapping details.
\ No newline at end of file
diff --git a/docs/dev/table/connectors/elasticsearch.zh.md b/docs/dev/table/connectors/elasticsearch.zh.md
new file mode 100644
index 0000000..5ea587ab
--- /dev/null
+++ b/docs/dev/table/connectors/elasticsearch.zh.md
@@ -0,0 +1,266 @@
+---
+title: "Elasticsearch SQL Connector"
+nav-title: Elasticsearch
+nav-parent_id: sql-connectors
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Sink: Batch</span>
+<span class="label label-primary">Sink: Streaming Append & Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Elasticsearch connector allows for writing into an index of the Elasticsearch engine. This document describes how to setup the Elasticsearch Connector to run SQL queries against Elasticsearch.
+
+The connector can operate in upsert mode for exchanging UPDATE/DELETE messages with the external system using the primary key defined on the DDL.
+
+If no primary key is defined on the DDL, the connector can only operate in append mode for exchanging INSERT only messages with external system.
+
+Dependencies
+------------
+
+In order to setup the Elasticsearch connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
+
+| Elasticsearch Version   | Maven dependency                                                   | SQL Client JAR         |
+| :---------------------- | :----------------------------------------------------------------- | :----------------------|
+| 6.x                     | `flink-connector-elasticsearch6{{site.scala_version_suffix}}`      | {% if site.is_stable %} [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}.jar) {% else %} Only available for stable releases {% endif %}|
+| 7.x and later versions  | `flink-connector-elasticsearch7{{site.scala_version_suffix}}`      | {% if site.is_stable %} [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-elasticsearch7{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch7{{site.scala_version_suffix}}-{{site.version}}.jar) {% else %} Only available for stable releases {% endif %}|
+
+<br>
+<span class="label label-danger">Attention</span> Elasticsearch connector works with JSON format which defines how to encode documents for the external system, therefore, it must be added as a [dependency]({% link dev/table/connectors/formats/index.md %}).
+
+How to create an Elasticsearch table
+----------------
+
+The example below shows how to create an Elasticsearch sink table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE myUserTable (
+  user_id STRING,
+  user_name STRING
+  uv BIGINT,
+  pv BIGINT,
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'elasticsearch-7',
+  'hosts' = 'http://localhost:9200',
+  'index' = 'users'
+);
+{% endhighlight %}
+</div>
+</div>
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 50%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use, valid values are:
+      <ul>
+      <li><code>elasticsearch-6</code>: connect to Elasticsearch 6.x cluster</li>
+      <li><code>elasticsearch-7</code>: connect to Elasticsearch 7.x and later versions cluster</li>
+      </ul></td>
+    </tr>
+    <tr>
+      <td><h5>hosts</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>One or more Elasticsearch hosts to connect to, e.g. <code>'http://host_name:9092;http://host_name:9093'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>index</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Elasticsearch index for every record. Can be a static index (e.g. <code>'myIndex'</code>) or
+       a dynamic index (e.g. <code>'index-{log_ts|yyyy-MM-dd}'</code>).
+       See the following <a href="#dynamic-index">Dynamic Index</a> section for more details.</td>
+    </tr>
+    <tr>
+      <td><h5>document-type</h5></td>
+      <td>required in 6.x</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Elasticsearch document type. Not necessary anymore in <code>elasticsearch-7</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>document-id.key-delimiter</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">_</td>
+      <td>String</td>
+      <td>Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"."</td>
+    </tr>
+    <tr>
+      <td><h5>failure-handler</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">fail</td>
+      <td>String</td>
+      <td>Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
+      <ul>
+        <li><code>fail</code>: throws an exception if a request fails and thus causes a job failure.</li>
+        <li><code>ignore</code>: ignores failures and drops the request.</li>
+        <li><code>retry_rejected</code>: re-adds requests that have failed due to queue capacity saturation.</li>
+        <li>custom class name: for failure handling with a ActionRequestFailureHandler subclass.</li>
+      </ul>
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-on-checkpoint</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests
+       to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong
+       guarantees for at-least-once delivery of action requests.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.max-actions</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">1000</td>
+      <td>Integer</td>
+      <td>Maximum number of buffered actions per bulk request.
+      Can be set to <code>'0'</code> to disable it.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.max-size</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">2mb</td>
+      <td>MemorySize</td>
+      <td>Maximum size in memory of buffered actions per bulk request. Must be in MB granularity.
+      Can be set to <code>'0'</code> to disable it.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.interval</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">1s</td>
+      <td>Duration</td>
+      <td>The interval to flush buffered actions.
+        Can be set to <code>'0'</code> to disable it. Note, both <code>'sink.bulk-flush.max-size'</code> and <code>'sink.bulk-flush.max-actions'</code>
+        can be set to <code>'0'</code> with the flush interval set allowing for complete async processing of buffered actions.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.strategy</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">DISABLED</td>
+      <td>String</td>
+      <td>Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
+      <ul>
+        <li><code>DISABLED</code>: no retry performed, i.e. fail after the first request error.</li>
+        <li><code>CONSTANT</code>: wait for backoff delay between retries.</li>
+        <li><code>EXPONENTIAL</code>: initially wait for backoff delay and increase exponentially between retries.</li>
+      </ul>
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.max-retries</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">8</td>
+      <td>Integer</td>
+      <td>Maximum number of backoff retries.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.delay</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">50ms</td>
+      <td>Duration</td>
+      <td>Delay between each backoff attempt. For <code>CONSTANT</code> backoff, this is simply the delay between each retry. For <code>EXPONENTIAL</code> backoff, this is the initial base delay.</td>
+    </tr>
+    <tr>
+      <td><h5>connection.max-retry-timeout</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>Maximum timeout between retries.</td>
+    </tr>
+    <tr>
+      <td><h5>connection.path-prefix</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Prefix string to be added to every REST communication, e.g., <code>'/v1'</code></td>
+    </tr>
+    <tr>
+      <td><h5>format</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">json</td>
+      <td>String</td>
+      <td>Elasticsearch connector supports to specify a format. The format must produce a valid json document.
+       By default uses built-in <code>'json'</code> format. Please refer to <a href="{% link dev/table/connectors/formats/index.md %}">JSON Format</a> page for more details.
+      </td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+----------------
+
+### Key Handling
+
+Elasticsearch sink can work in either upsert mode or append mode, it depends on whether primary key is defined.
+If primary key is defined, Elasticsearch sink works in upsert mode which can consume queries containing UPDATE/DELETE messages.
+If primary key is not defined, Elasticsearch sink works in append mode which can only consume queries containing INSERT only messages.
+
+In Elasticsearch connector, the primary key is used to calculate the Elasticsearch document id, which is a string of up to 512 bytes. It cannot have whitespaces.
+The Elasticsearch connector generates a document ID string for every row by concatenating all primary key fields in the order defined in the DDL using a key delimiter specified by `document-id.key-delimiter`.
+Certain types are not allowed as primary key field as they do not have a good string representation, e.g. `BYTES`, `ROW`, `ARRAY`, `MAP`, etc.
+If no primary key is specified, Elasticsearch will generate a document id automatically.
+
+See [CREATE TABLE DDL]({% link dev/table/sql/create.md %}#create-table) for more details about PRIMARY KEY syntax.
+
+### Dynamic Index
+
+Elasticsearch sink supports both static index and dynamic index.
+
+If you want to have a static index, the `index` option value should be a plain string, e.g. `'myusers'`, all the records will be consistently written into "myusers" index.
+
+If you want to have a dynamic index, you can use `{field_name}` to reference a field value in the record to dynamically generate a target index.
+You can also use `'{field_name|date_format_string}'` to convert a field value of `TIMESTAMP/DATE/TIME` type into the format specified by the `date_format_string`.
+The `date_format_string` is compatible with Java's [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/index.html).
+For example, if the option value is `'myusers-{log_ts|yyyy-MM-dd}'`, then a record with `log_ts` field value `2020-03-27 12:25:55` will be written into "myusers-2020-03-27" index.
+
+
+Data Type Mapping
+----------------
+
+Elasticsearch stores document in a JSON string. So the data type mapping is between Flink data type and JSON data type.
+Flink uses built-in `'json'` format for Elasticsearch connector. Please refer to <a href="{% link dev/table/connectors/formats/index.md %}">JSON Format</a> page for more type mapping details.
\ No newline at end of file
diff --git a/docs/dev/table/connectors/formats/index.md b/docs/dev/table/connectors/formats/index.md
index ec39af0..9209166 100644
--- a/docs/dev/table/connectors/formats/index.md
+++ b/docs/dev/table/connectors/formats/index.md
@@ -39,18 +39,18 @@ Flink supports the following formats:
         <tr>
           <td><a href="{{ site.baseurl }}/dev/table/connectors/formats/csv.html">CSV</a></td>
           <td>Apache Kafka,
-          <a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a></td>
+          <a href="{% link dev/table/connectors/filesystem.md %}">Filesystem</a></td>
         </tr>
         <tr>
          <td><a href="{{ site.baseurl }}/dev/table/connectors/formats/json.html">JSON</a></td>
          <td>Apache Kafka,
-          <a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a>,
-          Elasticsearch</td>
+          <a href="{% link dev/table/connectors/filesystem.md %}">Filesystem</a>,
+          <a href="{% link dev/table/connectors/elasticsearch.md %}">Elasticsearch</a></td>
        </tr>
         <tr>
-          <td><a href="{{ site.baseurl }}/dev/table/connectors/formats/avro.html">Apache Avro</a></td>
+          <td><a href="{% link dev/table/connectors/formats/avro.md %}">Apache Avro</a></td>
           <td>Apache Kafka,
-           <a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a></td>
+           <a href="{% link dev/table/connectors/filesystem.md %}">Filesystem</a></td>
         </tr>
         <tr>
          <td>Debezium JSON</td>
@@ -62,11 +62,11 @@ Flink supports the following formats:
         </tr>
         <tr>
          <td>Apache Parquet</td>
-         <td><a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a></td>
+         <td><a href="{% link dev/table/connectors/filesystem.md %}">Filesystem</a></td>
         </tr>
         <tr>
          <td>Apache ORC</td>
-         <td><a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a></td>
+         <td><a href="{% link dev/table/connectors/filesystem.md %}">Filesystem</a></td>
         </tr>
     </tbody>
 </table>
\ No newline at end of file
diff --git a/docs/dev/table/connectors/formats/index.zh.md b/docs/dev/table/connectors/formats/index.zh.md
index ec39af0..35f073b 100644
--- a/docs/dev/table/connectors/formats/index.zh.md
+++ b/docs/dev/table/connectors/formats/index.zh.md
@@ -39,18 +39,18 @@ Flink supports the following formats:
         <tr>
           <td><a href="{{ site.baseurl }}/dev/table/connectors/formats/csv.html">CSV</a></td>
           <td>Apache Kafka,
-          <a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a></td>
+          <a href="{% link dev/table/connectors/filesystem.zh.md %}">Filesystem</a></td>
         </tr>
         <tr>
          <td><a href="{{ site.baseurl }}/dev/table/connectors/formats/json.html">JSON</a></td>
          <td>Apache Kafka,
-          <a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a>,
-          Elasticsearch</td>
+          <a href="{% link dev/table/connectors/filesystem.zh.md %}">Filesystem</a>,
+          <a href="{% link dev/table/connectors/elasticsearch.zh.md %}">Elasticsearch</a></td>
        </tr>
         <tr>
-          <td><a href="{{ site.baseurl }}/dev/table/connectors/formats/avro.html">Apache Avro</a></td>
+          <td><a href="{% link dev/table/connectors/formats/avro.zh.md %}">Apache Avro</a></td>
           <td>Apache Kafka,
-           <a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a></td>
+           <a href="{% link dev/table/connectors/filesystem.zh.md %}">Filesystem</a></td>
         </tr>
         <tr>
          <td>Debezium JSON</td>
@@ -62,11 +62,11 @@ Flink supports the following formats:
         </tr>
         <tr>
          <td>Apache Parquet</td>
-         <td><a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a></td>
+         <td><a href="{% link dev/table/connectors/filesystem.zh.md %}">Filesystem</a></td>
         </tr>
         <tr>
          <td>Apache ORC</td>
-         <td><a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a></td>
+         <td><a href="{% link dev/table/connectors/filesystem.zh.md %}">Filesystem</a></td>
         </tr>
     </tbody>
 </table>
\ No newline at end of file
diff --git a/docs/dev/table/connectors/index.md b/docs/dev/table/connectors/index.md
index c2a08b6..a9a66ca 100644
--- a/docs/dev/table/connectors/index.md
+++ b/docs/dev/table/connectors/index.md
@@ -58,7 +58,7 @@ Flink natively support various connectors. The following tables list all availab
       <td>Streaming Sink, Batch Sink</td>
     </tr>
     <tr>
-      <td>Elasticsearch</td>
+      <td><a href="{% link dev/table/connectors/elasticsearch.md %}">Elasticsearch</a></td>
       <td>6.x & 7.x</td>
       <td>Not supported</td>
       <td>Streaming Sink, Batch Sink</td>
@@ -76,7 +76,7 @@ Flink natively support various connectors. The following tables list all availab
       <td>Streaming Sink, Batch Sink</td>
     </tr>
     <tr>
-      <td><a href="{{ site.baseurl }}/dev/table/connectors/hbase.html">Apache HBase</a></td>
+      <td><a href="{% link dev/table/connectors/hbase.md %}">Apache HBase</a></td>
       <td>1.4.x</td>
       <td>Bounded Scan, Lookup</td>
       <td>Streaming Sink, Batch Sink</td>
diff --git a/docs/dev/table/connectors/index.zh.md b/docs/dev/table/connectors/index.zh.md
index c2a08b6..b585ad2 100644
--- a/docs/dev/table/connectors/index.zh.md
+++ b/docs/dev/table/connectors/index.zh.md
@@ -58,7 +58,7 @@ Flink natively support various connectors. The following tables list all availab
       <td>Streaming Sink, Batch Sink</td>
     </tr>
     <tr>
-      <td>Elasticsearch</td>
+      <td><a href="{% link dev/table/connectors/elasticsearch.zh.md %}">Elasticsearch</a></td>
       <td>6.x & 7.x</td>
       <td>Not supported</td>
       <td>Streaming Sink, Batch Sink</td>
@@ -76,7 +76,7 @@ Flink natively support various connectors. The following tables list all availab
       <td>Streaming Sink, Batch Sink</td>
     </tr>
     <tr>
-      <td><a href="{{ site.baseurl }}/dev/table/connectors/hbase.html">Apache HBase</a></td>
+      <td><a href="{% link dev/table/connectors/hbase.zh.md %}">Apache HBase</a></td>
       <td>1.4.x</td>
       <td>Bounded Scan, Lookup</td>
       <td>Streaming Sink, Batch Sink</td>