You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/11 09:13:19 UTC
[flink] 02/02: [FLINK-17832][docs][es] Add documentation for the
new Elasticsearch connector
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit af2648ce0d7214d29d2da2ee7c9baaaf940ccd77
Author: Jark Wu <ja...@apache.org>
AuthorDate: Wed Jun 10 18:03:59 2020 +0800
[FLINK-17832][docs][es] Add documentation for the new Elasticsearch connector
This closes #12579
---
docs/dev/table/connectors/elasticsearch.md | 266 ++++++++++++++++++++++++++
docs/dev/table/connectors/elasticsearch.zh.md | 266 ++++++++++++++++++++++++++
docs/dev/table/connectors/formats/index.md | 14 +-
docs/dev/table/connectors/formats/index.zh.md | 14 +-
docs/dev/table/connectors/index.md | 4 +-
docs/dev/table/connectors/index.zh.md | 4 +-
6 files changed, 550 insertions(+), 18 deletions(-)
diff --git a/docs/dev/table/connectors/elasticsearch.md b/docs/dev/table/connectors/elasticsearch.md
new file mode 100644
index 0000000..5ea587ab
--- /dev/null
+++ b/docs/dev/table/connectors/elasticsearch.md
@@ -0,0 +1,266 @@
+---
+title: "Elasticsearch SQL Connector"
+nav-title: Elasticsearch
+nav-parent_id: sql-connectors
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Sink: Batch</span>
+<span class="label label-primary">Sink: Streaming Append & Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Elasticsearch connector allows for writing into an index of the Elasticsearch engine. This document describes how to setup the Elasticsearch Connector to run SQL queries against Elasticsearch.
+
+The connector can operate in upsert mode for exchanging UPDATE/DELETE messages with the external system using the primary key defined on the DDL.
+
+If no primary key is defined on the DDL, the connector can only operate in append mode for exchanging INSERT only messages with external system.
+
+Dependencies
+------------
+
+In order to setup the Elasticsearch connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
+
+| Elasticsearch Version | Maven dependency | SQL Client JAR |
+| :---------------------- | :----------------------------------------------------------------- | :----------------------|
+| 6.x | `flink-connector-elasticsearch6{{site.scala_version_suffix}}` | {% if site.is_stable %} [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}.jar) {% else %} Only available for stable releases {% endif %}|
+| 7.x and later versions | `flink-connector-elasticsearch7{{site.scala_version_suffix}}` | {% if site.is_stable %} [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-elasticsearch7{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch7{{site.scala_version_suffix}}-{{site.version}}.jar) {% else %} Only available for stable releases {% endif %}|
+
+<br>
+<span class="label label-danger">Attention</span> Elasticsearch connector works with JSON format which defines how to encode documents for the external system, therefore, it must be added as a [dependency]({% link dev/table/connectors/formats/index.md %}).
+
+How to create an Elasticsearch table
+----------------
+
+The example below shows how to create an Elasticsearch sink table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE myUserTable (
+ user_id STRING,
+ user_name STRING
+ uv BIGINT,
+ pv BIGINT,
+ PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+ 'connector' = 'elasticsearch-7',
+ 'hosts' = 'http://localhost:9200',
+ 'index' = 'users'
+);
+{% endhighlight %}
+</div>
+</div>
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 50%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what connector to use, valid values are:
+ <ul>
+ <li><code>elasticsearch-6</code>: connect to Elasticsearch 6.x cluster</li>
+ <li><code>elasticsearch-7</code>: connect to Elasticsearch 7.x and later versions cluster</li>
+ </ul></td>
+ </tr>
+ <tr>
+ <td><h5>hosts</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>One or more Elasticsearch hosts to connect to, e.g. <code>'http://host_name:9092;http://host_name:9093'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>index</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Elasticsearch index for every record. Can be a static index (e.g. <code>'myIndex'</code>) or
+ a dynamic index (e.g. <code>'index-{log_ts|yyyy-MM-dd}'</code>).
+ See the following <a href="#dynamic-index">Dynamic Index</a> section for more details.</td>
+ </tr>
+ <tr>
+ <td><h5>document-type</h5></td>
+ <td>required in 6.x</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Elasticsearch document type. Not necessary anymore in <code>elasticsearch-7</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>document-id.key-delimiter</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">_</td>
+ <td>String</td>
+ <td>Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"."</td>
+ </tr>
+ <tr>
+ <td><h5>failure-handler</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">fail</td>
+ <td>String</td>
+ <td>Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
+ <ul>
+ <li><code>fail</code>: throws an exception if a request fails and thus causes a job failure.</li>
+ <li><code>ignore</code>: ignores failures and drops the request.</li>
+ <li><code>retry_rejected</code>: re-adds requests that have failed due to queue capacity saturation.</li>
+ <li>custom class name: for failure handling with a ActionRequestFailureHandler subclass.</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.flush-on-checkpoint</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests
+ to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong
+ guarantees for at-least-once delivery of action requests.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.bulk-flush.max-actions</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">1000</td>
+ <td>Integer</td>
+ <td>Maximum number of buffered actions per bulk request.
+ Can be set to <code>'0'</code> to disable it.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.bulk-flush.max-size</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">2mb</td>
+ <td>MemorySize</td>
+ <td>Maximum size in memory of buffered actions per bulk request. Must be in MB granularity.
+ Can be set to <code>'0'</code> to disable it.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.bulk-flush.interval</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">1s</td>
+ <td>Duration</td>
+ <td>The interval to flush buffered actions.
+ Can be set to <code>'0'</code> to disable it. Note, both <code>'sink.bulk-flush.max-size'</code> and <code>'sink.bulk-flush.max-actions'</code>
+ can be set to <code>'0'</code> with the flush interval set allowing for complete async processing of buffered actions.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.bulk-flush.backoff.strategy</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">DISABLED</td>
+ <td>String</td>
+ <td>Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
+ <ul>
+ <li><code>DISABLED</code>: no retry performed, i.e. fail after the first request error.</li>
+ <li><code>CONSTANT</code>: wait for backoff delay between retries.</li>
+ <li><code>EXPONENTIAL</code>: initially wait for backoff delay and increase exponentially between retries.</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.bulk-flush.backoff.max-retries</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">8</td>
+ <td>Integer</td>
+ <td>Maximum number of backoff retries.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.bulk-flush.backoff.delay</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">50ms</td>
+ <td>Duration</td>
+ <td>Delay between each backoff attempt. For <code>CONSTANT</code> backoff, this is simply the delay between each retry. For <code>EXPONENTIAL</code> backoff, this is the initial base delay.</td>
+ </tr>
+ <tr>
+ <td><h5>connection.max-retry-timeout</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>Maximum timeout between retries.</td>
+ </tr>
+ <tr>
+ <td><h5>connection.path-prefix</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Prefix string to be added to every REST communication, e.g., <code>'/v1'</code></td>
+ </tr>
+ <tr>
+ <td><h5>format</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">json</td>
+ <td>String</td>
+ <td>Elasticsearch connector supports to specify a format. The format must produce a valid json document.
+ By default uses built-in <code>'json'</code> format. Please refer to <a href="{% link dev/table/connectors/formats/index.md %}">JSON Format</a> page for more details.
+ </td>
+ </tr>
+ </tbody>
+</table>
+
+Features
+----------------
+
+### Key Handling
+
+Elasticsearch sink can work in either upsert mode or append mode, it depends on whether primary key is defined.
+If primary key is defined, Elasticsearch sink works in upsert mode which can consume queries containing UPDATE/DELETE messages.
+If primary key is not defined, Elasticsearch sink works in append mode which can only consume queries containing INSERT only messages.
+
+In Elasticsearch connector, the primary key is used to calculate the Elasticsearch document id, which is a string of up to 512 bytes. It cannot have whitespaces.
+The Elasticsearch connector generates a document ID string for every row by concatenating all primary key fields in the order defined in the DDL using a key delimiter specified by `document-id.key-delimiter`.
+Certain types are not allowed as primary key field as they do not have a good string representation, e.g. `BYTES`, `ROW`, `ARRAY`, `MAP`, etc.
+If no primary key is specified, Elasticsearch will generate a document id automatically.
+
+See [CREATE TABLE DDL]({% link dev/table/sql/create.md %}#create-table) for more details about PRIMARY KEY syntax.
+
+### Dynamic Index
+
+Elasticsearch sink supports both static index and dynamic index.
+
+If you want to have a static index, the `index` option value should be a plain string, e.g. `'myusers'`, all the records will be consistently written into "myusers" index.
+
+If you want to have a dynamic index, you can use `{field_name}` to reference a field value in the record to dynamically generate a target index.
+You can also use `'{field_name|date_format_string}'` to convert a field value of `TIMESTAMP/DATE/TIME` type into the format specified by the `date_format_string`.
+The `date_format_string` is compatible with Java's [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/index.html).
+For example, if the option value is `'myusers-{log_ts|yyyy-MM-dd}'`, then a record with `log_ts` field value `2020-03-27 12:25:55` will be written into "myusers-2020-03-27" index.
+
+
+Data Type Mapping
+----------------
+
+Elasticsearch stores document in a JSON string. So the data type mapping is between Flink data type and JSON data type.
+Flink uses built-in `'json'` format for Elasticsearch connector. Please refer to <a href="{% link dev/table/connectors/formats/index.md %}">JSON Format</a> page for more type mapping details.
\ No newline at end of file
diff --git a/docs/dev/table/connectors/elasticsearch.zh.md b/docs/dev/table/connectors/elasticsearch.zh.md
new file mode 100644
index 0000000..5ea587ab
--- /dev/null
+++ b/docs/dev/table/connectors/elasticsearch.zh.md
@@ -0,0 +1,266 @@
+---
+title: "Elasticsearch SQL Connector"
+nav-title: Elasticsearch
+nav-parent_id: sql-connectors
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Sink: Batch</span>
+<span class="label label-primary">Sink: Streaming Append & Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Elasticsearch connector allows for writing into an index of the Elasticsearch engine. This document describes how to setup the Elasticsearch Connector to run SQL queries against Elasticsearch.
+
+The connector can operate in upsert mode for exchanging UPDATE/DELETE messages with the external system using the primary key defined on the DDL.
+
+If no primary key is defined on the DDL, the connector can only operate in append mode for exchanging INSERT only messages with external system.
+
+Dependencies
+------------
+
+In order to setup the Elasticsearch connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
+
+| Elasticsearch Version | Maven dependency | SQL Client JAR |
+| :---------------------- | :----------------------------------------------------------------- | :----------------------|
+| 6.x | `flink-connector-elasticsearch6{{site.scala_version_suffix}}` | {% if site.is_stable %} [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}.jar) {% else %} Only available for stable releases {% endif %}|
+| 7.x and later versions | `flink-connector-elasticsearch7{{site.scala_version_suffix}}` | {% if site.is_stable %} [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-elasticsearch7{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch7{{site.scala_version_suffix}}-{{site.version}}.jar) {% else %} Only available for stable releases {% endif %}|
+
+<br>
+<span class="label label-danger">Attention</span> Elasticsearch connector works with JSON format which defines how to encode documents for the external system, therefore, it must be added as a [dependency]({% link dev/table/connectors/formats/index.md %}).
+
+How to create an Elasticsearch table
+----------------
+
+The example below shows how to create an Elasticsearch sink table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE myUserTable (
+ user_id STRING,
+ user_name STRING
+ uv BIGINT,
+ pv BIGINT,
+ PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+ 'connector' = 'elasticsearch-7',
+ 'hosts' = 'http://localhost:9200',
+ 'index' = 'users'
+);
+{% endhighlight %}
+</div>
+</div>
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 50%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what connector to use, valid values are:
+ <ul>
+ <li><code>elasticsearch-6</code>: connect to Elasticsearch 6.x cluster</li>
+ <li><code>elasticsearch-7</code>: connect to Elasticsearch 7.x and later versions cluster</li>
+ </ul></td>
+ </tr>
+ <tr>
+ <td><h5>hosts</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>One or more Elasticsearch hosts to connect to, e.g. <code>'http://host_name:9092;http://host_name:9093'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>index</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Elasticsearch index for every record. Can be a static index (e.g. <code>'myIndex'</code>) or
+ a dynamic index (e.g. <code>'index-{log_ts|yyyy-MM-dd}'</code>).
+ See the following <a href="#dynamic-index">Dynamic Index</a> section for more details.</td>
+ </tr>
+ <tr>
+ <td><h5>document-type</h5></td>
+ <td>required in 6.x</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Elasticsearch document type. Not necessary anymore in <code>elasticsearch-7</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>document-id.key-delimiter</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">_</td>
+ <td>String</td>
+ <td>Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"."</td>
+ </tr>
+ <tr>
+ <td><h5>failure-handler</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">fail</td>
+ <td>String</td>
+ <td>Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
+ <ul>
+ <li><code>fail</code>: throws an exception if a request fails and thus causes a job failure.</li>
+ <li><code>ignore</code>: ignores failures and drops the request.</li>
+ <li><code>retry_rejected</code>: re-adds requests that have failed due to queue capacity saturation.</li>
+ <li>custom class name: for failure handling with a ActionRequestFailureHandler subclass.</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.flush-on-checkpoint</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests
+ to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong
+ guarantees for at-least-once delivery of action requests.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.bulk-flush.max-actions</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">1000</td>
+ <td>Integer</td>
+ <td>Maximum number of buffered actions per bulk request.
+ Can be set to <code>'0'</code> to disable it.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.bulk-flush.max-size</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">2mb</td>
+ <td>MemorySize</td>
+ <td>Maximum size in memory of buffered actions per bulk request. Must be in MB granularity.
+ Can be set to <code>'0'</code> to disable it.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.bulk-flush.interval</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">1s</td>
+ <td>Duration</td>
+ <td>The interval to flush buffered actions.
+ Can be set to <code>'0'</code> to disable it. Note, both <code>'sink.bulk-flush.max-size'</code> and <code>'sink.bulk-flush.max-actions'</code>
+ can be set to <code>'0'</code> with the flush interval set allowing for complete async processing of buffered actions.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.bulk-flush.backoff.strategy</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">DISABLED</td>
+ <td>String</td>
+ <td>Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
+ <ul>
+ <li><code>DISABLED</code>: no retry performed, i.e. fail after the first request error.</li>
+ <li><code>CONSTANT</code>: wait for backoff delay between retries.</li>
+ <li><code>EXPONENTIAL</code>: initially wait for backoff delay and increase exponentially between retries.</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.bulk-flush.backoff.max-retries</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">8</td>
+ <td>Integer</td>
+ <td>Maximum number of backoff retries.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.bulk-flush.backoff.delay</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">50ms</td>
+ <td>Duration</td>
+ <td>Delay between each backoff attempt. For <code>CONSTANT</code> backoff, this is simply the delay between each retry. For <code>EXPONENTIAL</code> backoff, this is the initial base delay.</td>
+ </tr>
+ <tr>
+ <td><h5>connection.max-retry-timeout</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>Maximum timeout between retries.</td>
+ </tr>
+ <tr>
+ <td><h5>connection.path-prefix</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Prefix string to be added to every REST communication, e.g., <code>'/v1'</code></td>
+ </tr>
+ <tr>
+ <td><h5>format</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">json</td>
+ <td>String</td>
+ <td>Elasticsearch connector supports to specify a format. The format must produce a valid json document.
+ By default uses built-in <code>'json'</code> format. Please refer to <a href="{% link dev/table/connectors/formats/index.md %}">JSON Format</a> page for more details.
+ </td>
+ </tr>
+ </tbody>
+</table>
+
+Features
+----------------
+
+### Key Handling
+
+Elasticsearch sink can work in either upsert mode or append mode, it depends on whether primary key is defined.
+If primary key is defined, Elasticsearch sink works in upsert mode which can consume queries containing UPDATE/DELETE messages.
+If primary key is not defined, Elasticsearch sink works in append mode which can only consume queries containing INSERT only messages.
+
+In Elasticsearch connector, the primary key is used to calculate the Elasticsearch document id, which is a string of up to 512 bytes. It cannot have whitespaces.
+The Elasticsearch connector generates a document ID string for every row by concatenating all primary key fields in the order defined in the DDL using a key delimiter specified by `document-id.key-delimiter`.
+Certain types are not allowed as primary key field as they do not have a good string representation, e.g. `BYTES`, `ROW`, `ARRAY`, `MAP`, etc.
+If no primary key is specified, Elasticsearch will generate a document id automatically.
+
+See [CREATE TABLE DDL]({% link dev/table/sql/create.md %}#create-table) for more details about PRIMARY KEY syntax.
+
+### Dynamic Index
+
+Elasticsearch sink supports both static index and dynamic index.
+
+If you want to have a static index, the `index` option value should be a plain string, e.g. `'myusers'`, all the records will be consistently written into "myusers" index.
+
+If you want to have a dynamic index, you can use `{field_name}` to reference a field value in the record to dynamically generate a target index.
+You can also use `'{field_name|date_format_string}'` to convert a field value of `TIMESTAMP/DATE/TIME` type into the format specified by the `date_format_string`.
+The `date_format_string` is compatible with Java's [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/index.html).
+For example, if the option value is `'myusers-{log_ts|yyyy-MM-dd}'`, then a record with `log_ts` field value `2020-03-27 12:25:55` will be written into "myusers-2020-03-27" index.
+
+
+Data Type Mapping
+----------------
+
+Elasticsearch stores document in a JSON string. So the data type mapping is between Flink data type and JSON data type.
+Flink uses built-in `'json'` format for Elasticsearch connector. Please refer to <a href="{% link dev/table/connectors/formats/index.md %}">JSON Format</a> page for more type mapping details.
\ No newline at end of file
diff --git a/docs/dev/table/connectors/formats/index.md b/docs/dev/table/connectors/formats/index.md
index ec39af0..9209166 100644
--- a/docs/dev/table/connectors/formats/index.md
+++ b/docs/dev/table/connectors/formats/index.md
@@ -39,18 +39,18 @@ Flink supports the following formats:
<tr>
<td><a href="{{ site.baseurl }}/dev/table/connectors/formats/csv.html">CSV</a></td>
<td>Apache Kafka,
- <a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a></td>
+ <a href="{% link dev/table/connectors/filesystem.md %}">Filesystem</a></td>
</tr>
<tr>
<td><a href="{{ site.baseurl }}/dev/table/connectors/formats/json.html">JSON</a></td>
<td>Apache Kafka,
- <a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a>,
- Elasticsearch</td>
+ <a href="{% link dev/table/connectors/filesystem.md %}">Filesystem</a>,
+ <a href="{% link dev/table/connectors/elasticsearch.md %}">Elasticsearch</a></td>
</tr>
<tr>
- <td><a href="{{ site.baseurl }}/dev/table/connectors/formats/avro.html">Apache Avro</a></td>
+ <td><a href="{% link dev/table/connectors/formats/avro.md %}">Apache Avro</a></td>
<td>Apache Kafka,
- <a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a></td>
+ <a href="{% link dev/table/connectors/filesystem.md %}">Filesystem</a></td>
</tr>
<tr>
<td>Debezium JSON</td>
@@ -62,11 +62,11 @@ Flink supports the following formats:
</tr>
<tr>
<td>Apache Parquet</td>
- <td><a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a></td>
+ <td><a href="{% link dev/table/connectors/filesystem.md %}">Filesystem</a></td>
</tr>
<tr>
<td>Apache ORC</td>
- <td><a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a></td>
+ <td><a href="{% link dev/table/connectors/filesystem.md %}">Filesystem</a></td>
</tr>
</tbody>
</table>
\ No newline at end of file
diff --git a/docs/dev/table/connectors/formats/index.zh.md b/docs/dev/table/connectors/formats/index.zh.md
index ec39af0..35f073b 100644
--- a/docs/dev/table/connectors/formats/index.zh.md
+++ b/docs/dev/table/connectors/formats/index.zh.md
@@ -39,18 +39,18 @@ Flink supports the following formats:
<tr>
<td><a href="{{ site.baseurl }}/dev/table/connectors/formats/csv.html">CSV</a></td>
<td>Apache Kafka,
- <a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a></td>
+ <a href="{% link dev/table/connectors/filesystem.zh.md %}">Filesystem</a></td>
</tr>
<tr>
<td><a href="{{ site.baseurl }}/dev/table/connectors/formats/json.html">JSON</a></td>
<td>Apache Kafka,
- <a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a>,
- Elasticsearch</td>
+ <a href="{% link dev/table/connectors/filesystem.zh.md %}">Filesystem</a>,
+ <a href="{% link dev/table/connectors/elasticsearch.zh.md %}">Elasticsearch</a></td>
</tr>
<tr>
- <td><a href="{{ site.baseurl }}/dev/table/connectors/formats/avro.html">Apache Avro</a></td>
+ <td><a href="{% link dev/table/connectors/formats/avro.zh.md %}">Apache Avro</a></td>
<td>Apache Kafka,
- <a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a></td>
+ <a href="{% link dev/table/connectors/filesystem.zh.md %}">Filesystem</a></td>
</tr>
<tr>
<td>Debezium JSON</td>
@@ -62,11 +62,11 @@ Flink supports the following formats:
</tr>
<tr>
<td>Apache Parquet</td>
- <td><a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a></td>
+ <td><a href="{% link dev/table/connectors/filesystem.zh.md %}">Filesystem</a></td>
</tr>
<tr>
<td>Apache ORC</td>
- <td><a href="{{ site.baseurl }}/dev/table/connectors/filesystem.html">Filesystem</a></td>
+ <td><a href="{% link dev/table/connectors/filesystem.zh.md %}">Filesystem</a></td>
</tr>
</tbody>
</table>
\ No newline at end of file
diff --git a/docs/dev/table/connectors/index.md b/docs/dev/table/connectors/index.md
index c2a08b6..a9a66ca 100644
--- a/docs/dev/table/connectors/index.md
+++ b/docs/dev/table/connectors/index.md
@@ -58,7 +58,7 @@ Flink natively support various connectors. The following tables list all availab
<td>Streaming Sink, Batch Sink</td>
</tr>
<tr>
- <td>Elasticsearch</td>
+ <td><a href="{% link dev/table/connectors/elasticsearch.md %}">Elasticsearch</a></td>
<td>6.x & 7.x</td>
<td>Not supported</td>
<td>Streaming Sink, Batch Sink</td>
@@ -76,7 +76,7 @@ Flink natively support various connectors. The following tables list all availab
<td>Streaming Sink, Batch Sink</td>
</tr>
<tr>
- <td><a href="{{ site.baseurl }}/dev/table/connectors/hbase.html">Apache HBase</a></td>
+ <td><a href="{% link dev/table/connectors/hbase.md %}">Apache HBase</a></td>
<td>1.4.x</td>
<td>Bounded Scan, Lookup</td>
<td>Streaming Sink, Batch Sink</td>
diff --git a/docs/dev/table/connectors/index.zh.md b/docs/dev/table/connectors/index.zh.md
index c2a08b6..b585ad2 100644
--- a/docs/dev/table/connectors/index.zh.md
+++ b/docs/dev/table/connectors/index.zh.md
@@ -58,7 +58,7 @@ Flink natively support various connectors. The following tables list all availab
<td>Streaming Sink, Batch Sink</td>
</tr>
<tr>
- <td>Elasticsearch</td>
+ <td><a href="{% link dev/table/connectors/elasticsearch.zh.md %}">Elasticsearch</a></td>
<td>6.x & 7.x</td>
<td>Not supported</td>
<td>Streaming Sink, Batch Sink</td>
@@ -76,7 +76,7 @@ Flink natively support various connectors. The following tables list all availab
<td>Streaming Sink, Batch Sink</td>
</tr>
<tr>
- <td><a href="{{ site.baseurl }}/dev/table/connectors/hbase.html">Apache HBase</a></td>
+ <td><a href="{% link dev/table/connectors/hbase.zh.md %}">Apache HBase</a></td>
<td>1.4.x</td>
<td>Bounded Scan, Lookup</td>
<td>Streaming Sink, Batch Sink</td>