You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/12/12 13:06:22 UTC

[flink-connector-opensearch] branch main updated: [FLINK-25756][docs] Add documentation

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git


The following commit(s) were added to refs/heads/main by this push:
     new 6407aab  [FLINK-25756][docs] Add documentation
6407aab is described below

commit 6407aabfe82a48002f301aee2bd3fc38fe52e035
Author: Andriy Redko <an...@aiven.io>
AuthorDate: Mon Dec 12 08:06:17 2022 -0500

    [FLINK-25756][docs] Add documentation
---
 .../docs/connectors/datastream/opensearch.md       | 261 +++++++++++++++++
 .../content.zh/docs/connectors/table/opensearch.md | 308 +++++++++++++++++++++
 .../docs/connectors/datastream/opensearch.md       | 261 +++++++++++++++++
 docs/content/docs/connectors/table/opensearch.md   | 308 +++++++++++++++++++++
 .../table/OpensearchConnectorOptions.java          |   2 +-
 5 files changed, 1139 insertions(+), 1 deletion(-)

diff --git a/docs/content.zh/docs/connectors/datastream/opensearch.md b/docs/content.zh/docs/connectors/datastream/opensearch.md
new file mode 100644
index 0000000..cca9e4f
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/opensearch.md
@@ -0,0 +1,261 @@
+---
+title: Opensearch
+weight: 5
+type: docs
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch Connector
+
+This connector provides sinks that can request document actions to an
+[Opensearch](https://opensearch.org/) Index. To use this connector, add 
+the following dependency to your project:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">Opensearch version</th>
+      <th class="text-left">Maven Dependency</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>1.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+    <tr>
+        <td>2.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+  </tbody>
+</table>
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See [here]({{< ref "docs/dev/configuration/overview" >}}) for information
+about how to package the program with the libraries for cluster execution.
+
+## Installing Opensearch
+
+Instructions for setting up an Opensearch cluster can be found
+[here](https://opensearch.org/docs/latest/opensearch/install/index/).
+
+## Opensearch Sink
+
+The example below shows how to configure and create a sink:
+
+{{< tabs "a1732edd-4218-470e-adad-b1ebb4021a12" >}}
+{{< tab "Java" >}}
+
+```java
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.http.HttpHost;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.client.Requests;
+
+import java.util.HashMap;
+import java.util.Map;
+
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new OpensearchSinkBuilder<String>()
+        .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        .build());
+
+private static IndexRequest createIndexRequest(String element) {
+    Map<String, Object> json = new HashMap<>();
+    json.put("data", element);
+
+    return Requests.indexRequest()
+        .index("my-index")
+        .id(element)
+        .source(json);
+}
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.connector.sink.SinkWriter
+import org.apache.flink.connector.opensearch.sink.{OpensearchSinkBuilder, RequestIndexer}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.http.HttpHost
+import org.opensearch.action.index.IndexRequest
+import org.opensearch.client.Requests
+
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  new OpensearchSinkBuilder[String]
+    .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    .build())
+
+def createIndexRequest(element: (String)): IndexRequest = {
+
+  val json = Map(
+    "data" -> element.asInstanceOf[AnyRef]
+  )
+
+  Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
+}
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+Note that the example only demonstrates performing a single index
+request for each incoming element. Generally, the `OpensearchEmitter`
+can be used to perform requests of different types (ex.,
+`DeleteRequest`, `UpdateRequest`, etc.). 
+
+Internally, each parallel instance of the Flink Opensearch Sink uses
+a `BulkProcessor` to send action requests to the cluster.
+This will buffer elements before sending them in bulk to the cluster. The `BulkProcessor`
+executes bulk requests one at a time, i.e. there will be no two concurrent
+flushes of the buffered actions in progress.
+
+### Opensearch Sinks and Fault Tolerance
+
+With Flink’s checkpointing enabled, the Flink Opensearch Sink guarantees
+at-least-once delivery of action requests to Opensearch clusters. It does
+so by waiting for all pending action requests in the `BulkProcessor` at the
+time of checkpoints. This effectively assures that all requests before the
+checkpoint was triggered have been successfully acknowledged by Opensearch, before
+proceeding to process more records sent to the sink.
+
+More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{< ref "docs/learn-flink/fault_tolerance" >}}).
+
+To use fault tolerant Opensearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
+
+{{< tabs "aa0d1e93-4844-40d7-b0ec-9ec37e731a5f" >}}
+{{< tab "Java" >}}
+```java
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+<b>IMPORTANT</b>: Checkpointing is not enabled by default but the default delivery guarantee is `AT_LEAST_ONCE`.
+This causes the sink to buffer requests until it either finishes or the `BulkProcessor` flushes automatically. 
+By default, the `BulkProcessor` will flush after `1000` added actions. To configure the processor to flush more frequently, please refer to the <a href="#configuring-the-internal-bulk-processor">BulkProcessor configuration section</a>.
+</p>
+
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+Using `UpdateRequests` with deterministic IDs and the upsert method it is possible to achieve exactly-once semantics in Opensearch when `AT_LEAST_ONCE` delivery is configured for the connector.
+</p>
+
+### Handling Failing Opensearch Requests
+
+Opensearch action requests may fail due to a variety of reasons, including
+temporarily saturated node queue capacity or malformed documents to be indexed.
+The Flink Opensearch Sink allows the user to retry requests by specifying a backoff-policy.
+
+Below is an example:
+
+{{< tabs "adb958b3-5dd5-476e-b946-ace3335628ea" >}}
+{{< tab "Java" >}}
+```java
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new OpensearchSinkBuilder<String>()
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        // This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds
+        .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+        .build());
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  new OpensearchSinkBuilder[String]
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    // This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds
+    .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+    .build())
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+The above example will let the sink re-add requests that failed due to resource constrains (e.g.
+queue capacity saturation). For all other failures, such as malformed documents, the sink will fail. 
+If no `BulkFlushBackoffStrategy` (or `FlushBackoffType.NONE`) is configured, the sink will fail for any kind of error.
+
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
+on failures will lead to longer checkpoints, as the sink will also
+need to wait for the re-added requests to be flushed when checkpointing.
+For example, when using <b>FlushBackoffType.EXPONENTIAL</b>, checkpoints
+will need to wait until Opensearch node queues have enough capacity for
+all the pending requests, or until the maximum number of retries has been reached.
+</p>
+
+### Configuring the Internal Bulk Processor
+
+The internal `BulkProcessor` can be further configured for its behaviour
+on how buffered action requests are flushed, by using the following methods of the OpensearchSinkBuilder:
+
+* **setBulkFlushMaxActions(int numMaxActions)**: Maximum amount of actions to buffer before flushing.
+* **setBulkFlushMaxSizeMb(int maxSizeMb)**: Maximum size of data (in megabytes) to buffer before flushing.
+* **setBulkFlushInterval(long intervalMillis)**: Interval at which to flush regardless of the amount or size of buffered actions.
+ 
+Configuring how temporary request errors are retried is also supported:
+ * **setBulkFlushBackoffStrategy(FlushBackoffType flushBackoffType, int maxRetries, long delayMillis)**: The type of backoff delay, either `CONSTANT` or `EXPONENTIAL`, the amount of backoff retries to attempt, the amount of delay for backoff. For constant backoff, this
+   is simply the delay between each retry. For exponential backoff, this is the initial base delay.
+
+More information about Opensearch can be found [here](https://opensearch.org/).
+
+## Packaging the Opensearch Connector into an Uber-Jar
+
+For the execution of your Flink program, it is recommended to build a
+so-called uber-jar (executable jar) containing all your dependencies
+(see [here]({{< ref "docs/dev/configuration" >}}) for further information).
+
+Alternatively, you can put the connector's jar file into Flink's `lib/` folder to make it available
+system-wide, i.e. for all job being run.
+
+{{< top >}}
diff --git a/docs/content.zh/docs/connectors/table/opensearch.md b/docs/content.zh/docs/connectors/table/opensearch.md
new file mode 100644
index 0000000..475c180
--- /dev/null
+++ b/docs/content.zh/docs/connectors/table/opensearch.md
@@ -0,0 +1,308 @@
+---
+title: Opensearch
+weight: 7
+type: docs
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch SQL Connector
+
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append & Upsert Mode" >}}
+
+The Opensearch connector allows for writing into an index of the Opensearch engine. This document describes how to setup the Opensearch Connector to run SQL queries against Opensearch.
+
+The connector can operate in upsert mode for exchanging UPDATE/DELETE messages with the external system using the primary key defined on the DDL.
+
+If no primary key is defined on the DDL, the connector can only operate in append mode for exchanging INSERT only messages with external system.
+
+Dependencies
+------------
+
+{{< sql_download_table "opensearch" >}}
+
+The Opensearch connector is not part of the binary distribution.
+See how to link with it for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
+
+How to create an Opensearch table
+----------------
+
+The example below shows how to create an Opensearch sink table:
+
+```sql
+CREATE TABLE myUserTable (
+  user_id STRING,
+  user_name STRING,
+  uv BIGINT,
+  pv BIGINT,
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'opensearch',
+  'hosts' = 'http://localhost:9200',
+  'index' = 'users'
+);
+```
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 42%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use, the valid value is: `opensearch`
+</td>
+    </tr>
+    <tr>
+      <td><h5>hosts</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>One or more Opensearch hosts to connect to, e.g. <code>'http://host_name:9092;http://host_name:9093'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>index</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Opensearch index for every record. Can be a static index (e.g. <code>'myIndex'</code>) or
+       a dynamic index (e.g. <code>'index-{log_ts|yyyy-MM-dd}'</code>).
+       See the following <a href="#dynamic-index">Dynamic Index</a> section for more details.</td>
+    </tr>
+    <tr>
+      <td><h5>allow-insecure</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Boolean</td>
+      <td>Allow insecure connections to `HTTPS` endpoints (disable certificates validation).</td>
+    </tr>
+    <tr>
+      <td><h5>document-id.key-delimiter</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">_</td>
+      <td>String</td>
+      <td>Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3".</td>
+    </tr>
+    <tr>
+      <td><h5>username</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Username used to connect to Opensearch instance. Please notice that Opensearch comes with pre-bundled security feature, you can disable it by following the <a href="https://opensearch.org/docs/latest/security-plugin/configuration/index/">guidelines</a> on how to configure the security for your Opensearch cluster.</td>
+    </tr>
+    <tr>
+      <td><h5>password</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Password used to connect to Opensearch instance. If <code>username</code> is configured, this option must be configured with non-empty string as well.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.delivery-guarantee</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">AT_LEAST_ONCE</td>
+      <td>String</td>
+      <td>Optional delivery guarantee when committing. Valid values are:
+      <ul>
+        <li><code>EXACTLY_ONCE</code>: records are only delivered exactly-once also under failover scenarios.</li>
+        <li><code>AT_LEAST_ONCE</code>: records are ensured to be delivered but it may happen that the same record is delivered multiple times.</li>
+        <li><code>NONE</code>:  records are delivered on a best effort basis.</li>
+      </ul>
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-on-checkpoint</h5></td>
+      <td>optional</td>
+      <td></td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests
+       to be acknowledged by Opensearch on checkpoints. Thus, a sink does NOT provide any strong
+       guarantees for at-least-once delivery of action requests.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.max-actions</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">1000</td>
+      <td>Integer</td>
+      <td>Maximum number of buffered actions per bulk request.
+      Can be set to <code>'0'</code> to disable it.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.max-size</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">2mb</td>
+      <td>MemorySize</td>
+      <td>Maximum size in memory of buffered actions per bulk request. Must be in MB granularity.
+      Can be set to <code>'0'</code> to disable it.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.interval</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">1s</td>
+      <td>Duration</td>
+      <td>The interval to flush buffered actions.
+        Can be set to <code>'0'</code> to disable it. Note, both <code>'sink.bulk-flush.max-size'</code> and <code>'sink.bulk-flush.max-actions'</code>
+        can be set to <code>'0'</code> with the flush interval set allowing for complete async processing of buffered actions.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.strategy</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">DISABLED</td>
+      <td>String</td>
+      <td>Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
+      <ul>
+        <li><code>DISABLED</code>: no retry performed, i.e. fail after the first request error.</li>
+        <li><code>CONSTANT</code>: wait for backoff delay between retries.</li>
+        <li><code>EXPONENTIAL</code>: initially wait for backoff delay and increase exponentially between retries.</li>
+      </ul>
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.max-retries</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Integer</td>
+      <td>Maximum number of backoff retries.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.delay</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>Delay between each backoff attempt. For <code>CONSTANT</code> backoff, this is simply the delay between each retry. For <code>EXPONENTIAL</code> backoff, this is the initial base delay.</td>
+    </tr>
+    <tr>
+      <td><h5>connection.path-prefix</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Prefix string to be added to every REST communication, e.g., <code>'/v1'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>connection.request-timeout</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>The timeout for requesting a connection from the connection manager.</td>
+    </tr>
+    <tr>
+      <td><h5>connection.timeout</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>The timeout for establishing a connection.</td>
+    </tr>
+    <tr>
+      <td><h5>socket.timeout</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>The socket timeout (SO_TIMEOUT) for waiting for data or, put differently, a maximum period inactivity between two consecutive data packets.</td>
+    </tr>
+    <tr>
+      <td><h5>format</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">json</td>
+      <td>String</td>
+      <td>Opensearch connector supports to specify a format. The format must produce a valid json document.
+       By default uses built-in <code>'json'</code> format. Please refer to <a href="{{< ref "docs/connectors/table/formats/overview" >}}">JSON Format</a> page for more details.
+      </td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+----------------
+
+### Key Handling
+
+The Opensearch sink can work in either upsert mode or append mode, depending on whether a primary key is defined.
+If a primary key is defined, the Opensearch sink works in upsert mode which can consume queries containing UPDATE/DELETE messages.
+If a primary key is not defined, the Opensearch sink works in append mode which can only consume queries containing INSERT only messages.
+
+In the Opensearch connector, the primary key is used to calculate the Opensearch document id, which is a string of up to 512 bytes. It cannot have whitespaces.
+The Opensearch connector generates a document ID string for every row by concatenating all primary key fields in the order defined in the DDL using a key delimiter specified by `document-id.key-delimiter`.
+Certain types are not allowed as a primary key field as they do not have a good string representation, e.g. `BYTES`, `ROW`, `ARRAY`, `MAP`, etc.
+If no primary key is specified, Opensearch will generate a document id automatically.
+
+See [CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) for more details about the PRIMARY KEY syntax.
+
+### Dynamic Index
+
+The Opensearch sink supports both static index and dynamic index.
+
+If you want to have a static index, the `index` option value should be a plain string, e.g. `'myusers'`, all the records will be consistently written into "myusers" index.
+
+If you want to have a dynamic index, you can use `{field_name}` to reference a field value in the record to dynamically generate a target index.
+You can also use `'{field_name|date_format_string}'` to convert a field value of `TIMESTAMP/DATE/TIME` type into the format specified by the `date_format_string`.
+The `date_format_string` is compatible with Java's [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/index.html).
+For example, if the option value is `'myusers-{log_ts|yyyy-MM-dd}'`, then a record with `log_ts` field value `2020-03-27 12:25:55` will be written into "myusers-2020-03-27" index.
+
+You can also use `'{now()|date_format_string}'` to convert the current system time to the format specified by `date_format_string`. The corresponding time type of `now()` is `TIMESTAMP_WITH_LTZ`.
+When formatting the system time as a string, the time zone configured in the session through `table.local-time-zone` will be used. You can use `NOW()`, `now()`, `CURRENT_TIMESTAMP`, `current_timestamp`.
+
+**NOTE:**  When using the dynamic index generated by the current system time, for changelog stream, there is no guarantee that the records with the same primary key can generate the same index name.
+Therefore, the dynamic index based on the system time can only support append only stream.
+
+Data Type Mapping
+----------------
+
+Opensearch stores document in a JSON string. So the data type mapping is between Flink data type and JSON data type.
+Flink uses built-in `'json'` format for Opensearch connector. Please refer to [JSON Format]({{< ref "docs/connectors/table/formats/json" >}}) page for more type mapping details.
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/opensearch.md b/docs/content/docs/connectors/datastream/opensearch.md
new file mode 100644
index 0000000..cca9e4f
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/opensearch.md
@@ -0,0 +1,261 @@
+---
+title: Opensearch
+weight: 5
+type: docs
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch Connector
+
+This connector provides sinks that can request document actions to an
+[Opensearch](https://opensearch.org/) Index. To use this connector, add 
+the following dependency to your project:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">Opensearch version</th>
+      <th class="text-left">Maven Dependency</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>1.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+    <tr>
+        <td>2.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+  </tbody>
+</table>
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See [here]({{< ref "docs/dev/configuration/overview" >}}) for information
+about how to package the program with the libraries for cluster execution.
+
+## Installing Opensearch
+
+Instructions for setting up an Opensearch cluster can be found
+[here](https://opensearch.org/docs/latest/opensearch/install/index/).
+
+## Opensearch Sink
+
+The example below shows how to configure and create a sink:
+
+{{< tabs "a1732edd-4218-470e-adad-b1ebb4021a12" >}}
+{{< tab "Java" >}}
+
+```java
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.http.HttpHost;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.client.Requests;
+
+import java.util.HashMap;
+import java.util.Map;
+
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new OpensearchSinkBuilder<String>()
+        .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        .build());
+
+private static IndexRequest createIndexRequest(String element) {
+    Map<String, Object> json = new HashMap<>();
+    json.put("data", element);
+
+    return Requests.indexRequest()
+        .index("my-index")
+        .id(element)
+        .source(json);
+}
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.connector.sink.SinkWriter
+import org.apache.flink.connector.opensearch.sink.{OpensearchSinkBuilder, RequestIndexer}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.http.HttpHost
+import org.opensearch.action.index.IndexRequest
+import org.opensearch.client.Requests
+
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  new OpensearchSinkBuilder[String]
+    .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    .build())
+
+def createIndexRequest(element: (String)): IndexRequest = {
+
+  val json = Map(
+    "data" -> element.asInstanceOf[AnyRef]
+  )
+
+  Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
+}
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+Note that the example only demonstrates performing a single index
+request for each incoming element. Generally, the `OpensearchEmitter`
+can be used to perform requests of different types (ex.,
+`DeleteRequest`, `UpdateRequest`, etc.). 
+
+Internally, each parallel instance of the Flink Opensearch Sink uses
+a `BulkProcessor` to send action requests to the cluster.
+This will buffer elements before sending them in bulk to the cluster. The `BulkProcessor`
+executes bulk requests one at a time, i.e. there will be no two concurrent
+flushes of the buffered actions in progress.
+
+### Opensearch Sinks and Fault Tolerance
+
+With Flink’s checkpointing enabled, the Flink Opensearch Sink guarantees
+at-least-once delivery of action requests to Opensearch clusters. It does
+so by waiting for all pending action requests in the `BulkProcessor` at the
+time of checkpoints. This effectively assures that all requests before the
+checkpoint was triggered have been successfully acknowledged by Opensearch, before
+proceeding to process more records sent to the sink.
+
+More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{< ref "docs/learn-flink/fault_tolerance" >}}).
+
+To use fault tolerant Opensearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
+
+{{< tabs "aa0d1e93-4844-40d7-b0ec-9ec37e731a5f" >}}
+{{< tab "Java" >}}
+```java
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+<b>IMPORTANT</b>: Checkpointing is not enabled by default but the default delivery guarantee is `AT_LEAST_ONCE`.
+This causes the sink to buffer requests until it either finishes or the `BulkProcessor` flushes automatically. 
+By default, the `BulkProcessor` will flush after `1000` added actions. To configure the processor to flush more frequently, please refer to the <a href="#configuring-the-internal-bulk-processor">BulkProcessor configuration section</a>.
+</p>
+
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+Using `UpdateRequests` with deterministic IDs and the upsert method it is possible to achieve exactly-once semantics in Opensearch when `AT_LEAST_ONCE` delivery is configured for the connector.
+</p>
+
+### Handling Failing Opensearch Requests
+
+Opensearch action requests may fail due to a variety of reasons, including
+temporarily saturated node queue capacity or malformed documents to be indexed.
+The Flink Opensearch Sink allows the user to retry requests by specifying a backoff-policy.
+
+Below is an example:
+
+{{< tabs "adb958b3-5dd5-476e-b946-ace3335628ea" >}}
+{{< tab "Java" >}}
+```java
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new OpensearchSinkBuilder<String>()
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        // This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds
+        .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+        .build());
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  new OpensearchSinkBuilder[String]
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    // This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds
+    .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+    .build())
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+The above example will let the sink re-add requests that failed due to resource constrains (e.g.
+queue capacity saturation). For all other failures, such as malformed documents, the sink will fail. 
+If no `BulkFlushBackoffStrategy` (or `FlushBackoffType.NONE`) is configured, the sink will fail for any kind of error.
+
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
+on failures will lead to longer checkpoints, as the sink will also
+need to wait for the re-added requests to be flushed when checkpointing.
+For example, when using <b>FlushBackoffType.EXPONENTIAL</b>, checkpoints
+will need to wait until Opensearch node queues have enough capacity for
+all the pending requests, or until the maximum number of retries has been reached.
+</p>
+
+### Configuring the Internal Bulk Processor
+
+The internal `BulkProcessor` can be further configured for its behaviour
+on how buffered action requests are flushed, by using the following methods of the OpensearchSinkBuilder:
+
+* **setBulkFlushMaxActions(int numMaxActions)**: Maximum amount of actions to buffer before flushing.
+* **setBulkFlushMaxSizeMb(int maxSizeMb)**: Maximum size of data (in megabytes) to buffer before flushing.
+* **setBulkFlushInterval(long intervalMillis)**: Interval at which to flush regardless of the amount or size of buffered actions.
+ 
+Configuring how temporary request errors are retried is also supported:
+ * **setBulkFlushBackoffStrategy(FlushBackoffType flushBackoffType, int maxRetries, long delayMillis)**: The type of backoff delay, either `CONSTANT` or `EXPONENTIAL`, the amount of backoff retries to attempt, the amount of delay for backoff. For constant backoff, this
+   is simply the delay between each retry. For exponential backoff, this is the initial base delay.
+
+More information about Opensearch can be found [here](https://opensearch.org/).
+
+## Packaging the Opensearch Connector into an Uber-Jar
+
+For the execution of your Flink program, it is recommended to build a
+so-called uber-jar (executable jar) containing all your dependencies
+(see [here]({{< ref "docs/dev/configuration" >}}) for further information).
+
+Alternatively, you can put the connector's jar file into Flink's `lib/` folder to make it available
+system-wide, i.e. for all job being run.
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/table/opensearch.md b/docs/content/docs/connectors/table/opensearch.md
new file mode 100644
index 0000000..475c180
--- /dev/null
+++ b/docs/content/docs/connectors/table/opensearch.md
@@ -0,0 +1,308 @@
+---
+title: Opensearch
+weight: 7
+type: docs
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch SQL Connector
+
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append & Upsert Mode" >}}
+
+The Opensearch connector allows for writing into an index of the Opensearch engine. This document describes how to setup the Opensearch Connector to run SQL queries against Opensearch.
+
+The connector can operate in upsert mode for exchanging UPDATE/DELETE messages with the external system using the primary key defined on the DDL.
+
+If no primary key is defined on the DDL, the connector can only operate in append mode for exchanging INSERT only messages with external system.
+
+Dependencies
+------------
+
+{{< sql_download_table "opensearch" >}}
+
+The Opensearch connector is not part of the binary distribution.
+See how to link with it for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
+
+How to create an Opensearch table
+----------------
+
+The example below shows how to create an Opensearch sink table:
+
+```sql
+CREATE TABLE myUserTable (
+  user_id STRING,
+  user_name STRING,
+  uv BIGINT,
+  pv BIGINT,
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'opensearch',
+  'hosts' = 'http://localhost:9200',
+  'index' = 'users'
+);
+```
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 42%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use, the valid value is: `opensearch`
+</td>
+    </tr>
+    <tr>
+      <td><h5>hosts</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>One or more Opensearch hosts to connect to, e.g. <code>'http://host_name:9092;http://host_name:9093'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>index</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Opensearch index for every record. Can be a static index (e.g. <code>'myIndex'</code>) or
+       a dynamic index (e.g. <code>'index-{log_ts|yyyy-MM-dd}'</code>).
+       See the following <a href="#dynamic-index">Dynamic Index</a> section for more details.</td>
+    </tr>
+    <tr>
+      <td><h5>allow-insecure</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Boolean</td>
+      <td>Allow insecure connections to `HTTPS` endpoints (disable certificates validation).</td>
+    </tr>
+    <tr>
+      <td><h5>document-id.key-delimiter</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">_</td>
+      <td>String</td>
+      <td>Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3".</td>
+    </tr>
+    <tr>
+      <td><h5>username</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Username used to connect to Opensearch instance. Please notice that Opensearch comes with pre-bundled security feature, you can disable it by following the <a href="https://opensearch.org/docs/latest/security-plugin/configuration/index/">guidelines</a> on how to configure the security for your Opensearch cluster.</td>
+    </tr>
+    <tr>
+      <td><h5>password</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Password used to connect to Opensearch instance. If <code>username</code> is configured, this option must be configured with non-empty string as well.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.delivery-guarantee</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">AT_LEAST_ONCE</td>
+      <td>String</td>
+      <td>Optional delivery guarantee when committing. Valid values are:
+      <ul>
+        <li><code>EXACTLY_ONCE</code>: records are only delivered exactly-once also under failover scenarios.</li>
+        <li><code>AT_LEAST_ONCE</code>: records are ensured to be delivered but it may happen that the same record is delivered multiple times.</li>
+        <li><code>NONE</code>:  records are delivered on a best effort basis.</li>
+      </ul>
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-on-checkpoint</h5></td>
+      <td>optional</td>
+      <td></td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests
+       to be acknowledged by Opensearch on checkpoints. Thus, a sink does NOT provide any strong
+       guarantees for at-least-once delivery of action requests.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.max-actions</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">1000</td>
+      <td>Integer</td>
+      <td>Maximum number of buffered actions per bulk request.
+      Can be set to <code>'0'</code> to disable it.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.max-size</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">2mb</td>
+      <td>MemorySize</td>
+      <td>Maximum size in memory of buffered actions per bulk request. Must be in MB granularity.
+      Can be set to <code>'0'</code> to disable it.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.interval</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">1s</td>
+      <td>Duration</td>
+      <td>The interval to flush buffered actions.
+        Can be set to <code>'0'</code> to disable it. Note, both <code>'sink.bulk-flush.max-size'</code> and <code>'sink.bulk-flush.max-actions'</code>
+        can be set to <code>'0'</code> with the flush interval set allowing for complete async processing of buffered actions.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.strategy</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">DISABLED</td>
+      <td>String</td>
+      <td>Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
+      <ul>
+        <li><code>DISABLED</code>: no retry performed, i.e. fail after the first request error.</li>
+        <li><code>CONSTANT</code>: wait for backoff delay between retries.</li>
+        <li><code>EXPONENTIAL</code>: initially wait for backoff delay and increase exponentially between retries.</li>
+      </ul>
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.max-retries</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Integer</td>
+      <td>Maximum number of backoff retries.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.delay</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>Delay between each backoff attempt. For <code>CONSTANT</code> backoff, this is simply the delay between each retry. For <code>EXPONENTIAL</code> backoff, this is the initial base delay.</td>
+    </tr>
+    <tr>
+      <td><h5>connection.path-prefix</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Prefix string to be added to every REST communication, e.g., <code>'/v1'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>connection.request-timeout</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>The timeout for requesting a connection from the connection manager.</td>
+    </tr>
+    <tr>
+      <td><h5>connection.timeout</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>The timeout for establishing a connection.</td>
+    </tr>
+    <tr>
+      <td><h5>socket.timeout</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>The socket timeout (SO_TIMEOUT) for waiting for data or, put differently, a maximum period inactivity between two consecutive data packets.</td>
+    </tr>
+    <tr>
+      <td><h5>format</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">json</td>
+      <td>String</td>
+      <td>Opensearch connector supports to specify a format. The format must produce a valid json document.
+       By default uses built-in <code>'json'</code> format. Please refer to <a href="{{< ref "docs/connectors/table/formats/overview" >}}">JSON Format</a> page for more details.
+      </td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+----------------
+
+### Key Handling
+
+The Opensearch sink can work in either upsert mode or append mode, depending on whether a primary key is defined.
+If a primary key is defined, the Opensearch sink works in upsert mode which can consume queries containing UPDATE/DELETE messages.
+If a primary key is not defined, the Opensearch sink works in append mode which can only consume queries containing INSERT only messages.
+
+In the Opensearch connector, the primary key is used to calculate the Opensearch document id, which is a string of up to 512 bytes. It cannot have whitespaces.
+The Opensearch connector generates a document ID string for every row by concatenating all primary key fields in the order defined in the DDL using a key delimiter specified by `document-id.key-delimiter`.
+Certain types are not allowed as a primary key field as they do not have a good string representation, e.g. `BYTES`, `ROW`, `ARRAY`, `MAP`, etc.
+If no primary key is specified, Opensearch will generate a document id automatically.
+
+See [CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) for more details about the PRIMARY KEY syntax.
+
+### Dynamic Index
+
+The Opensearch sink supports both static index and dynamic index.
+
+If you want to have a static index, the `index` option value should be a plain string, e.g. `'myusers'`, all the records will be consistently written into "myusers" index.
+
+If you want to have a dynamic index, you can use `{field_name}` to reference a field value in the record to dynamically generate a target index.
+You can also use `'{field_name|date_format_string}'` to convert a field value of `TIMESTAMP/DATE/TIME` type into the format specified by the `date_format_string`.
+The `date_format_string` is compatible with Java's [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/index.html).
+For example, if the option value is `'myusers-{log_ts|yyyy-MM-dd}'`, then a record with `log_ts` field value `2020-03-27 12:25:55` will be written into "myusers-2020-03-27" index.
+
+You can also use `'{now()|date_format_string}'` to convert the current system time to the format specified by `date_format_string`. The corresponding time type of `now()` is `TIMESTAMP_WITH_LTZ`.
+When formatting the system time as a string, the time zone configured in the session through `table.local-time-zone` will be used. You can use `NOW()`, `now()`, `CURRENT_TIMESTAMP`, `current_timestamp`.
+
+**NOTE:**  When using the dynamic index generated by the current system time, for changelog stream, there is no guarantee that the records with the same primary key can generate the same index name.
+Therefore, the dynamic index based on the system time can only support append only stream.
+
+Data Type Mapping
+----------------
+
+Opensearch stores document in a JSON string. So the data type mapping is between Flink data type and JSON data type.
+Flink uses built-in `'json'` format for Opensearch connector. Please refer to [JSON Format]({{< ref "docs/connectors/table/formats/json" >}}) page for more type mapping details.
+
+{{< top >}}
diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchConnectorOptions.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchConnectorOptions.java
index dcd87cb..1ff6f12 100644
--- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchConnectorOptions.java
+++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchConnectorOptions.java
@@ -143,7 +143,7 @@ public class OpensearchConnectorOptions {
     public static final ConfigOption<DeliveryGuarantee> DELIVERY_GUARANTEE_OPTION =
             ConfigOptions.key("sink.delivery-guarantee")
                     .enumType(DeliveryGuarantee.class)
-                    .defaultValue(DeliveryGuarantee.NONE)
+                    .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
                     .withDescription("Optional delivery guarantee when committing.");
 
     public static final ConfigOption<Boolean> ALLOW_INSECURE =