You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/08 13:10:49 UTC

[GitHub] [flink-connector-opensearch] reta opened a new pull request, #3: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors (documentation)

reta opened a new pull request, #3:
URL: https://github.com/apache/flink-connector-opensearch/pull/3

   Added documentation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #3: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors (documentation)

Posted by GitBox <gi...@apache.org>.
reta commented on code in PR #3:
URL: https://github.com/apache/flink-connector-opensearch/pull/3#discussion_r1044440646


##########
docs/content/docs/connectors/table/opensearch.md:
##########
@@ -0,0 +1,286 @@
+---
+title: Opensearch
+weight: 7
+type: docs
+aliases:
+  - /dev/table/connectors/opensearch.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch SQL Connector
+
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append & Upsert Mode" >}}
+
+The Opensearch connector allows for writing into an index of the Opensearch engine. This document describes how to setup the Opensearch Connector to run SQL queries against Opensearch.
+
+The connector can operate in upsert mode for exchanging UPDATE/DELETE messages with the external system using the primary key defined on the DDL.
+
+If no primary key is defined on the DDL, the connector can only operate in append mode for exchanging INSERT only messages with external system.
+
+Dependencies
+------------
+
+{{< sql_download_table "opensearch" >}}
+
+The Opensearch connector is not part of the binary distribution.
+See how to link with it for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
+
+How to create an Opensearch table
+----------------
+
+The example below shows how to create an Opensearch sink table:
+
+```sql
+CREATE TABLE myUserTable (
+  user_id STRING,
+  user_name STRING,
+  uv BIGINT,
+  pv BIGINT,
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'opensearch',
+  'hosts' = 'http://localhost:9200',
+  'index' = 'users'
+);
+```
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 42%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use, the valid value is: `opensearch`
+</td>
+    </tr>
+    <tr>
+      <td><h5>hosts</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>One or more Opensearch hosts to connect to, e.g. <code>'http://host_name:9092;http://host_name:9093'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>index</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Opensearch index for every record. Can be a static index (e.g. <code>'myIndex'</code>) or
+       a dynamic index (e.g. <code>'index-{log_ts|yyyy-MM-dd}'</code>).
+       See the following <a href="#dynamic-index">Dynamic Index</a> section for more details.</td>
+    </tr>
+    <tr>
+      <td><h5>allow-insecure</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Boolean</td>
+      <td>Allow insecure connections to `HTTPS` endpoints (disable certificates validation).</td>
+    </tr>
+    <tr>
+      <td><h5>document-id.key-delimiter</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">_</td>
+      <td>String</td>
+      <td>Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3".</td>
+    </tr>
+    <tr>
+      <td><h5>username</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Username used to connect to Opensearch instance. Please notice that Opensearch comes with pre-bundled security feature, you can disable it by following the <a href="https://opensearch.org/docs/latest/security-plugin/configuration/index/">guidelines</a> on how to configure the security for your Opensearch cluster.</td>
+    </tr>
+    <tr>
+      <td><h5>password</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Password used to connect to Opensearch instance. If <code>username</code> is configured, this option must be configured with non-empty string as well.</td>
+    </tr>
+    <tr>
+      <td><h5>failure-handler</h5></td>

Review Comment:
   Missed that, thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #3: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors (documentation)

Posted by GitBox <gi...@apache.org>.
reta commented on code in PR #3:
URL: https://github.com/apache/flink-connector-opensearch/pull/3#discussion_r1044510624


##########
docs/content/docs/connectors/table/opensearch.md:
##########
@@ -0,0 +1,286 @@
+---
+title: Opensearch
+weight: 7
+type: docs
+aliases:
+  - /dev/table/connectors/opensearch.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch SQL Connector
+
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append & Upsert Mode" >}}
+
+The Opensearch connector allows for writing into an index of the Opensearch engine. This document describes how to setup the Opensearch Connector to run SQL queries against Opensearch.
+
+The connector can operate in upsert mode for exchanging UPDATE/DELETE messages with the external system using the primary key defined on the DDL.
+
+If no primary key is defined on the DDL, the connector can only operate in append mode for exchanging INSERT only messages with external system.
+
+Dependencies
+------------
+
+{{< sql_download_table "opensearch" >}}
+
+The Opensearch connector is not part of the binary distribution.
+See how to link with it for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
+
+How to create an Opensearch table
+----------------
+
+The example below shows how to create an Opensearch sink table:
+
+```sql
+CREATE TABLE myUserTable (
+  user_id STRING,
+  user_name STRING,
+  uv BIGINT,
+  pv BIGINT,
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'opensearch',
+  'hosts' = 'http://localhost:9200',
+  'index' = 'users'
+);
+```
+
+Connector Options

Review Comment:
   https://github.com/apache/flink-connector-elasticsearch/pull/49



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #3: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors (documentation)

Posted by GitBox <gi...@apache.org>.
reta commented on code in PR #3:
URL: https://github.com/apache/flink-connector-opensearch/pull/3#discussion_r1044510808


##########
docs/content/docs/connectors/table/opensearch.md:
##########
@@ -0,0 +1,286 @@
+---
+title: Opensearch
+weight: 7
+type: docs
+aliases:
+  - /dev/table/connectors/opensearch.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch SQL Connector
+
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append & Upsert Mode" >}}
+
+The Opensearch connector allows for writing into an index of the Opensearch engine. This document describes how to setup the Opensearch Connector to run SQL queries against Opensearch.
+
+The connector can operate in upsert mode for exchanging UPDATE/DELETE messages with the external system using the primary key defined on the DDL.
+
+If no primary key is defined on the DDL, the connector can only operate in append mode for exchanging INSERT only messages with external system.
+
+Dependencies
+------------
+
+{{< sql_download_table "opensearch" >}}
+
+The Opensearch connector is not part of the binary distribution.
+See how to link with it for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
+
+How to create an Opensearch table
+----------------
+
+The example below shows how to create an Opensearch sink table:
+
+```sql
+CREATE TABLE myUserTable (
+  user_id STRING,
+  user_name STRING,
+  uv BIGINT,
+  pv BIGINT,
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'opensearch',
+  'hosts' = 'http://localhost:9200',
+  'index' = 'users'
+);
+```
+
+Connector Options

Review Comment:
   https://github.com/apache/flink-connector-elasticsearch/pull/49



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-opensearch] zentol commented on a diff in pull request #3: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors (documentation)

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #3:
URL: https://github.com/apache/flink-connector-opensearch/pull/3#discussion_r1044352277


##########
docs/content/docs/connectors/datastream/opensearch.md:
##########
@@ -0,0 +1,328 @@
+---
+title: Opensearch
+weight: 5
+type: docs
+aliases:
+  - /dev/connectors/opensearch.html
+  - /apis/streaming/connectors/opensearch.html

Review Comment:
   you won't need these since the page didn't exist in the old structures :)



##########
docs/content/docs/connectors/table/opensearch.md:
##########
@@ -0,0 +1,286 @@
+---
+title: Opensearch
+weight: 7
+type: docs
+aliases:
+  - /dev/table/connectors/opensearch.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch SQL Connector
+
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append & Upsert Mode" >}}
+
+The Opensearch connector allows for writing into an index of the Opensearch engine. This document describes how to setup the Opensearch Connector to run SQL queries against Opensearch.
+
+The connector can operate in upsert mode for exchanging UPDATE/DELETE messages with the external system using the primary key defined on the DDL.
+
+If no primary key is defined on the DDL, the connector can only operate in append mode for exchanging INSERT only messages with external system.
+
+Dependencies
+------------
+
+{{< sql_download_table "opensearch" >}}
+
+The Opensearch connector is not part of the binary distribution.
+See how to link with it for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
+
+How to create an Opensearch table
+----------------
+
+The example below shows how to create an Opensearch sink table:
+
+```sql
+CREATE TABLE myUserTable (
+  user_id STRING,
+  user_name STRING,
+  uv BIGINT,
+  pv BIGINT,
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'opensearch',
+  'hosts' = 'http://localhost:9200',
+  'index' = 'users'
+);
+```
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 42%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use, the valid value is: `opensearch`
+</td>
+    </tr>
+    <tr>
+      <td><h5>hosts</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>One or more Opensearch hosts to connect to, e.g. <code>'http://host_name:9092;http://host_name:9093'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>index</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Opensearch index for every record. Can be a static index (e.g. <code>'myIndex'</code>) or
+       a dynamic index (e.g. <code>'index-{log_ts|yyyy-MM-dd}'</code>).
+       See the following <a href="#dynamic-index">Dynamic Index</a> section for more details.</td>
+    </tr>
+    <tr>
+      <td><h5>allow-insecure</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Boolean</td>
+      <td>Allow insecure connections to `HTTPS` endpoints (disable certificates validation).</td>
+    </tr>
+    <tr>
+      <td><h5>document-id.key-delimiter</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">_</td>
+      <td>String</td>
+      <td>Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3".</td>
+    </tr>
+    <tr>
+      <td><h5>username</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Username used to connect to Opensearch instance. Please notice that Opensearch comes with pre-bundled security feature, you can disable it by following the <a href="https://opensearch.org/docs/latest/security-plugin/configuration/index/">guidelines</a> on how to configure the security for your Opensearch cluster.</td>
+    </tr>
+    <tr>
+      <td><h5>password</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Password used to connect to Opensearch instance. If <code>username</code> is configured, this option must be configured with non-empty string as well.</td>
+    </tr>
+    <tr>
+      <td><h5>failure-handler</h5></td>

Review Comment:
   This option doesn't exist in the table connector. you opted for using the non-deprecated connector from the table api which doesn't support the failure handler-



##########
docs/content/docs/connectors/datastream/opensearch.md:
##########
@@ -0,0 +1,328 @@
+---
+title: Opensearch
+weight: 5
+type: docs
+aliases:
+  - /dev/connectors/opensearch.html
+  - /apis/streaming/connectors/opensearch.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch Connector
+
+This connector provides sinks that can request document actions to an
+[Opensearch](https://opensearch.org/) Index. To use this connector, add 
+the following dependency to your project:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">Opensearch version</th>
+      <th class="text-left">Maven Dependency</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>1.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+    <tr>
+        <td>2.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+  </tbody>
+</table>
+
+{{< py_download_link "opensearch" >}}

Review Comment:
   ditto re python (see below)



##########
docs/content/docs/connectors/datastream/opensearch.md:
##########
@@ -0,0 +1,328 @@
+---
+title: Opensearch
+weight: 5
+type: docs
+aliases:
+  - /dev/connectors/opensearch.html
+  - /apis/streaming/connectors/opensearch.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch Connector
+
+This connector provides sinks that can request document actions to an
+[Opensearch](https://opensearch.org/) Index. To use this connector, add 
+the following dependency to your project:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">Opensearch version</th>
+      <th class="text-left">Maven Dependency</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>1.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+    <tr>
+        <td>2.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+  </tbody>
+</table>
+
+{{< py_download_link "opensearch" >}}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See [here]({{< ref "docs/dev/configuration/overview" >}}) for information
+about how to package the program with the libraries for cluster execution.
+
+## Installing Opensearch
+
+Instructions for setting up an Opensearch cluster can be found
+[here](https://opensearch.org/docs/latest/opensearch/install/index/).
+
+## Opensearch Sink
+
+The example below shows how to configure and create a sink:
+
+{{< tabs "51732edd-4218-470e-adad-b1ebb4021ae4" >}}
+{{< tab "Java" >}}
+
+```java
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.http.HttpHost;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.client.Requests;
+
+import java.util.HashMap;
+import java.util.Map;
+
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new OpensearchSinkBuilder<String>()
+        .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        .build());
+
+private static IndexRequest createIndexRequest(String element) {
+    Map<String, Object> json = new HashMap<>();
+    json.put("data", element);
+
+    return Requests.indexRequest()
+        .index("my-index")
+        .id(element)
+        .source(json);
+}
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.connector.sink.SinkWriter
+import org.apache.flink.connector.opensearch.sink.{OpensearchSinkBuilder, RequestIndexer}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.http.HttpHost
+import org.opensearch.action.index.IndexRequest
+import org.opensearch.client.Requests
+
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  new OpensearchSinkBuilder[String]
+    .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    .build())
+
+def createIndexRequest(element: (String)): IndexRequest = {
+
+  val json = Map(
+    "data" -> element.asInstanceOf[AnyRef]
+  )
+
+  Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
+}
+```
+
+{{< /tab >}}
+{{< tab "Python" >}}
+Opensearch static index:
+```python
+from pyflink.datastream.connectors.opensearch import OpensearchSinkBuilder, OpensearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(OPENSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+# The set_bulk_flush_max_actions instructs the sink to emit after every element, otherwise they would be buffered
+os_sink = OpensearchSinkBuilder() \
+    .set_bulk_flush_max_actions(1) \
+    .set_emitter(OpensearchEmitter.static('foo', 'id')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(os_sink).name('os sink')
+```
+
+Opensearch dynamic index:
+```python
+from pyflink.datastream.connectors.opensearch import OpensearchSinkBuilder, OpensearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(OPENSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+os_sink = OpensearchSinkBuilder() \
+    .set_emitter(OpensearchEmitter.dynamic_index('name', 'id')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(os_sink).name('os dynamic index sink')
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+Note that the example only demonstrates performing a single index
+request for each incoming element. Generally, the `OpensearchEmitter`
+can be used to perform requests of different types (ex.,
+`DeleteRequest`, `UpdateRequest`, etc.). 
+
+Internally, each parallel instance of the Flink Opensearch Sink uses
+a `BulkProcessor` to send action requests to the cluster.
+This will buffer elements before sending them in bulk to the cluster. The `BulkProcessor`
+executes bulk requests one at a time, i.e. there will be no two concurrent
+flushes of the buffered actions in progress.
+
+### Opensearch Sinks and Fault Tolerance
+
+With Flink’s checkpointing enabled, the Flink Opensearch Sink guarantees
+at-least-once delivery of action requests to Opensearch clusters. It does
+so by waiting for all pending action requests in the `BulkProcessor` at the
+time of checkpoints. This effectively assures that all requests before the
+checkpoint was triggered have been successfully acknowledged by Opensearch, before
+proceeding to process more records sent to the sink.
+
+More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{< ref "docs/learn-flink/fault_tolerance" >}}).
+
+To use fault tolerant Opensearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
+
+{{< tabs "d00d1e93-4844-40d7-b0ec-9ec37e73145e" >}}
+{{< tab "Java" >}}
+```java
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+```
+
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+# checkpoint every 5000 msecs
+env.enable_checkpointing(5000)
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+<b>IMPORTANT</b>: Checkpointing is not enabled by default but the default delivery guarantee is `AT_LEAST_ONCE`.
+This causes the sink to buffer requests until it either finishes or the `BulkProcessor` flushes automatically. 
+By default, the `BulkProcessor` will flush after `1000` added actions. To configure the processor to flush more frequently, please refer to the <a href="#configuring-the-internal-bulk-processor">BulkProcessor configuration section</a>.
+</p>
+
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+Using `UpdateRequests` with deterministic IDs and the upsert method it is possible to achieve exactly-once semantics in Opensearch when `AT_LEAST_ONCE` delivery is configured for the connector.
+</p>
+
+### Handling Failing Opensearch Requests
+
+Opensearch action requests may fail due to a variety of reasons, including
+temporarily saturated node queue capacity or malformed documents to be indexed.
+The Flink Opensearch Sink allows the user to retry requests by specifying a backoff-policy.
+
+Below is an example:
+
+{{< tabs "ddb958b3-5dd5-476e-b946-ace3335628b2" >}}
+{{< tab "Java" >}}
+```java
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new OpensearchSinkBuilder<String>()
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        // This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds
+        .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+        .build());
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  new OpensearchSinkBuilder[String]
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    // This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds
+    .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+    .build())
+```
+
+{{< /tab >}}
+{{< tab "Python" >}}

Review Comment:
   The python API doesn't support this connector. (Needs a compatibility layer that currently has be added to Flink)



##########
docs/content/docs/connectors/table/opensearch.md:
##########
@@ -0,0 +1,286 @@
+---
+title: Opensearch
+weight: 7
+type: docs
+aliases:
+  - /dev/table/connectors/opensearch.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch SQL Connector
+
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append & Upsert Mode" >}}
+
+The Opensearch connector allows for writing into an index of the Opensearch engine. This document describes how to setup the Opensearch Connector to run SQL queries against Opensearch.
+
+The connector can operate in upsert mode for exchanging UPDATE/DELETE messages with the external system using the primary key defined on the DDL.
+
+If no primary key is defined on the DDL, the connector can only operate in append mode for exchanging INSERT only messages with external system.
+
+Dependencies
+------------
+
+{{< sql_download_table "opensearch" >}}
+
+The Opensearch connector is not part of the binary distribution.
+See how to link with it for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
+
+How to create an Opensearch table
+----------------
+
+The example below shows how to create an Opensearch sink table:
+
+```sql
+CREATE TABLE myUserTable (
+  user_id STRING,
+  user_name STRING,
+  uv BIGINT,
+  pv BIGINT,
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'opensearch',
+  'hosts' = 'http://localhost:9200',
+  'index' = 'users'
+);
+```
+
+Connector Options

Review Comment:
   missing connection.timeout, socket.timeout, and possibly others.
   This must be in sync with `OpensearchConnectorOptions`.



##########
docs/content/docs/connectors/datastream/opensearch.md:
##########
@@ -0,0 +1,328 @@
+---
+title: Opensearch
+weight: 5
+type: docs
+aliases:
+  - /dev/connectors/opensearch.html
+  - /apis/streaming/connectors/opensearch.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch Connector
+
+This connector provides sinks that can request document actions to an
+[Opensearch](https://opensearch.org/) Index. To use this connector, add 
+the following dependency to your project:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">Opensearch version</th>
+      <th class="text-left">Maven Dependency</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>1.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+    <tr>
+        <td>2.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+  </tbody>
+</table>
+
+{{< py_download_link "opensearch" >}}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See [here]({{< ref "docs/dev/configuration/overview" >}}) for information
+about how to package the program with the libraries for cluster execution.
+
+## Installing Opensearch
+
+Instructions for setting up an Opensearch cluster can be found
+[here](https://opensearch.org/docs/latest/opensearch/install/index/).
+
+## Opensearch Sink
+
+The example below shows how to configure and create a sink:
+
+{{< tabs "51732edd-4218-470e-adad-b1ebb4021ae4" >}}
+{{< tab "Java" >}}
+
+```java
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.http.HttpHost;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.client.Requests;
+
+import java.util.HashMap;
+import java.util.Map;
+
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new OpensearchSinkBuilder<String>()
+        .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        .build());
+
+private static IndexRequest createIndexRequest(String element) {
+    Map<String, Object> json = new HashMap<>();
+    json.put("data", element);
+
+    return Requests.indexRequest()
+        .index("my-index")
+        .id(element)
+        .source(json);
+}
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.connector.sink.SinkWriter
+import org.apache.flink.connector.opensearch.sink.{OpensearchSinkBuilder, RequestIndexer}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.http.HttpHost
+import org.opensearch.action.index.IndexRequest
+import org.opensearch.client.Requests
+
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  new OpensearchSinkBuilder[String]
+    .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    .build())
+
+def createIndexRequest(element: (String)): IndexRequest = {
+
+  val json = Map(
+    "data" -> element.asInstanceOf[AnyRef]
+  )
+
+  Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
+}
+```
+
+{{< /tab >}}
+{{< tab "Python" >}}
+Opensearch static index:
+```python
+from pyflink.datastream.connectors.opensearch import OpensearchSinkBuilder, OpensearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(OPENSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+# The set_bulk_flush_max_actions instructs the sink to emit after every element, otherwise they would be buffered
+os_sink = OpensearchSinkBuilder() \
+    .set_bulk_flush_max_actions(1) \
+    .set_emitter(OpensearchEmitter.static('foo', 'id')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(os_sink).name('os sink')
+```
+
+Opensearch dynamic index:
+```python
+from pyflink.datastream.connectors.opensearch import OpensearchSinkBuilder, OpensearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(OPENSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+os_sink = OpensearchSinkBuilder() \
+    .set_emitter(OpensearchEmitter.dynamic_index('name', 'id')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(os_sink).name('os dynamic index sink')
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+Note that the example only demonstrates performing a single index
+request for each incoming element. Generally, the `OpensearchEmitter`
+can be used to perform requests of different types (ex.,
+`DeleteRequest`, `UpdateRequest`, etc.). 
+
+Internally, each parallel instance of the Flink Opensearch Sink uses
+a `BulkProcessor` to send action requests to the cluster.
+This will buffer elements before sending them in bulk to the cluster. The `BulkProcessor`
+executes bulk requests one at a time, i.e. there will be no two concurrent
+flushes of the buffered actions in progress.
+
+### Opensearch Sinks and Fault Tolerance
+
+With Flink’s checkpointing enabled, the Flink Opensearch Sink guarantees
+at-least-once delivery of action requests to Opensearch clusters. It does
+so by waiting for all pending action requests in the `BulkProcessor` at the
+time of checkpoints. This effectively assures that all requests before the
+checkpoint was triggered have been successfully acknowledged by Opensearch, before
+proceeding to process more records sent to the sink.
+
+More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{< ref "docs/learn-flink/fault_tolerance" >}}).
+
+To use fault tolerant Opensearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
+
+{{< tabs "d00d1e93-4844-40d7-b0ec-9ec37e73145e" >}}

Review Comment:
   This should use a different id. Can be anything really; just make it different than the ES one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #3: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors (documentation)

Posted by GitBox <gi...@apache.org>.
reta commented on code in PR #3:
URL: https://github.com/apache/flink-connector-opensearch/pull/3#discussion_r1044469621


##########
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchConnectorOptions.java:
##########
@@ -143,7 +143,7 @@ public class OpensearchConnectorOptions {
     public static final ConfigOption<DeliveryGuarantee> DELIVERY_GUARANTEE_OPTION =
             ConfigOptions.key("sink.delivery-guarantee")
                     .enumType(DeliveryGuarantee.class)
-                    .defaultValue(DeliveryGuarantee.NONE)
+                    .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)

Review Comment:
   Updating to match the right default (https://github.com/apache/flink-connector-opensearch/blob/main/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java#L65)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #3: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors (documentation)

Posted by GitBox <gi...@apache.org>.
reta commented on code in PR #3:
URL: https://github.com/apache/flink-connector-opensearch/pull/3#discussion_r1044469621


##########
flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchConnectorOptions.java:
##########
@@ -143,7 +143,7 @@ public class OpensearchConnectorOptions {
     public static final ConfigOption<DeliveryGuarantee> DELIVERY_GUARANTEE_OPTION =
             ConfigOptions.key("sink.delivery-guarantee")
                     .enumType(DeliveryGuarantee.class)
-                    .defaultValue(DeliveryGuarantee.NONE)
+                    .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)

Review Comment:
   Updating to match the right default



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-opensearch] reta commented on pull request #3: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors (documentation)

Posted by GitBox <gi...@apache.org>.
reta commented on PR #3:
URL: https://github.com/apache/flink-connector-opensearch/pull/3#issuecomment-1344290837

   > In the original PR you mentioned this:
   > 
   > ```
   > any mentions and uses of mapping types have been removed: it is deprecated feature, scheduled for removal (the indices with mapping types cannot be created or migrated to Opensearch 1.x and beyond)
   > any mentions and uses have been removed: it is deprecated feature, scheduled for removal (only HighLevelRestClient is used)
   > ```
   > 
   > Was this documented anywhere?
   
   Correct, it is not mentioned because the mapping types are not exposed anywhere anymore (so there is no way to use them). The Elasticsearch has explicit configuration setting for it https://github.com/apache/flink-connector-elasticsearch/blob/main/docs/content/docs/connectors/table/elasticsearch.md#document-type, it is not present for Opensearch).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #3: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors (documentation)

Posted by GitBox <gi...@apache.org>.
reta commented on code in PR #3:
URL: https://github.com/apache/flink-connector-opensearch/pull/3#discussion_r1044467982


##########
docs/content/docs/connectors/table/opensearch.md:
##########
@@ -0,0 +1,286 @@
+---
+title: Opensearch
+weight: 7
+type: docs
+aliases:
+  - /dev/table/connectors/opensearch.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch SQL Connector
+
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append & Upsert Mode" >}}
+
+The Opensearch connector allows for writing into an index of the Opensearch engine. This document describes how to setup the Opensearch Connector to run SQL queries against Opensearch.
+
+The connector can operate in upsert mode for exchanging UPDATE/DELETE messages with the external system using the primary key defined on the DDL.
+
+If no primary key is defined on the DDL, the connector can only operate in append mode for exchanging INSERT only messages with external system.
+
+Dependencies
+------------
+
+{{< sql_download_table "opensearch" >}}
+
+The Opensearch connector is not part of the binary distribution.
+See how to link with it for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
+
+How to create an Opensearch table
+----------------
+
+The example below shows how to create an Opensearch sink table:
+
+```sql
+CREATE TABLE myUserTable (
+  user_id STRING,
+  user_name STRING,
+  uv BIGINT,
+  pv BIGINT,
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'opensearch',
+  'hosts' = 'http://localhost:9200',
+  'index' = 'users'
+);
+```
+
+Connector Options

Review Comment:
   Fixed, I will submit the pull request to Elasticsearch connector to add those + delivery guarantee



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-opensearch] zentol merged pull request #3: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors (documentation)

Posted by GitBox <gi...@apache.org>.
zentol merged PR #3:
URL: https://github.com/apache/flink-connector-opensearch/pull/3


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #3: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors (documentation)

Posted by GitBox <gi...@apache.org>.
reta commented on code in PR #3:
URL: https://github.com/apache/flink-connector-opensearch/pull/3#discussion_r1044443885


##########
docs/content/docs/connectors/datastream/opensearch.md:
##########
@@ -0,0 +1,328 @@
+---
+title: Opensearch
+weight: 5
+type: docs
+aliases:
+  - /dev/connectors/opensearch.html
+  - /apis/streaming/connectors/opensearch.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch Connector
+
+This connector provides sinks that can request document actions to an
+[Opensearch](https://opensearch.org/) Index. To use this connector, add 
+the following dependency to your project:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">Opensearch version</th>
+      <th class="text-left">Maven Dependency</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>1.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+    <tr>
+        <td>2.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+  </tbody>
+</table>
+
+{{< py_download_link "opensearch" >}}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See [here]({{< ref "docs/dev/configuration/overview" >}}) for information
+about how to package the program with the libraries for cluster execution.
+
+## Installing Opensearch
+
+Instructions for setting up an Opensearch cluster can be found
+[here](https://opensearch.org/docs/latest/opensearch/install/index/).
+
+## Opensearch Sink
+
+The example below shows how to configure and create a sink:
+
+{{< tabs "51732edd-4218-470e-adad-b1ebb4021ae4" >}}
+{{< tab "Java" >}}
+
+```java
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.http.HttpHost;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.client.Requests;
+
+import java.util.HashMap;
+import java.util.Map;
+
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new OpensearchSinkBuilder<String>()
+        .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        .build());
+
+private static IndexRequest createIndexRequest(String element) {
+    Map<String, Object> json = new HashMap<>();
+    json.put("data", element);
+
+    return Requests.indexRequest()
+        .index("my-index")
+        .id(element)
+        .source(json);
+}
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.connector.sink.SinkWriter
+import org.apache.flink.connector.opensearch.sink.{OpensearchSinkBuilder, RequestIndexer}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.http.HttpHost
+import org.opensearch.action.index.IndexRequest
+import org.opensearch.client.Requests
+
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  new OpensearchSinkBuilder[String]
+    .setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    .build())
+
+def createIndexRequest(element: (String)): IndexRequest = {
+
+  val json = Map(
+    "data" -> element.asInstanceOf[AnyRef]
+  )
+
+  Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
+}
+```
+
+{{< /tab >}}
+{{< tab "Python" >}}
+Opensearch static index:
+```python
+from pyflink.datastream.connectors.opensearch import OpensearchSinkBuilder, OpensearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(OPENSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+# The set_bulk_flush_max_actions instructs the sink to emit after every element, otherwise they would be buffered
+os_sink = OpensearchSinkBuilder() \
+    .set_bulk_flush_max_actions(1) \
+    .set_emitter(OpensearchEmitter.static('foo', 'id')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(os_sink).name('os sink')
+```
+
+Opensearch dynamic index:
+```python
+from pyflink.datastream.connectors.opensearch import OpensearchSinkBuilder, OpensearchEmitter
+
+env = StreamExecutionEnvironment.get_execution_environment()
+env.add_jars(OPENSEARCH_SQL_CONNECTOR_PATH)
+
+input = ...
+
+os_sink = OpensearchSinkBuilder() \
+    .set_emitter(OpensearchEmitter.dynamic_index('name', 'id')) \
+    .set_hosts(['localhost:9200']) \
+    .build()
+
+input.sink_to(os_sink).name('os dynamic index sink')
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+Note that the example only demonstrates performing a single index
+request for each incoming element. Generally, the `OpensearchEmitter`
+can be used to perform requests of different types (ex.,
+`DeleteRequest`, `UpdateRequest`, etc.). 
+
+Internally, each parallel instance of the Flink Opensearch Sink uses
+a `BulkProcessor` to send action requests to the cluster.
+This will buffer elements before sending them in bulk to the cluster. The `BulkProcessor`
+executes bulk requests one at a time, i.e. there will be no two concurrent
+flushes of the buffered actions in progress.
+
+### Opensearch Sinks and Fault Tolerance
+
+With Flink’s checkpointing enabled, the Flink Opensearch Sink guarantees
+at-least-once delivery of action requests to Opensearch clusters. It does
+so by waiting for all pending action requests in the `BulkProcessor` at the
+time of checkpoints. This effectively assures that all requests before the
+checkpoint was triggered have been successfully acknowledged by Opensearch, before
+proceeding to process more records sent to the sink.
+
+More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{< ref "docs/learn-flink/fault_tolerance" >}}).
+
+To use fault tolerant Opensearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
+
+{{< tabs "d00d1e93-4844-40d7-b0ec-9ec37e73145e" >}}
+{{< tab "Java" >}}
+```java
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+```
+
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+# checkpoint every 5000 msecs
+env.enable_checkpointing(5000)
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+<b>IMPORTANT</b>: Checkpointing is not enabled by default but the default delivery guarantee is `AT_LEAST_ONCE`.
+This causes the sink to buffer requests until it either finishes or the `BulkProcessor` flushes automatically. 
+By default, the `BulkProcessor` will flush after `1000` added actions. To configure the processor to flush more frequently, please refer to the <a href="#configuring-the-internal-bulk-processor">BulkProcessor configuration section</a>.
+</p>
+
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+Using `UpdateRequests` with deterministic IDs and the upsert method it is possible to achieve exactly-once semantics in Opensearch when `AT_LEAST_ONCE` delivery is configured for the connector.
+</p>
+
+### Handling Failing Opensearch Requests
+
+Opensearch action requests may fail due to a variety of reasons, including
+temporarily saturated node queue capacity or malformed documents to be indexed.
+The Flink Opensearch Sink allows the user to retry requests by specifying a backoff-policy.
+
+Below is an example:
+
+{{< tabs "ddb958b3-5dd5-476e-b946-ace3335628b2" >}}
+{{< tab "Java" >}}
+```java
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new OpensearchSinkBuilder<String>()
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        // This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds
+        .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+        .build());
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  new OpensearchSinkBuilder[String]
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    // This enables an exponential backoff retry mechanism, with a maximum of 5 retries and an initial delay of 1000 milliseconds
+    .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+    .build())
+```
+
+{{< /tab >}}
+{{< tab "Python" >}}

Review Comment:
   Removed



##########
docs/content/docs/connectors/datastream/opensearch.md:
##########
@@ -0,0 +1,328 @@
+---
+title: Opensearch
+weight: 5
+type: docs
+aliases:
+  - /dev/connectors/opensearch.html
+  - /apis/streaming/connectors/opensearch.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch Connector
+
+This connector provides sinks that can request document actions to an
+[Opensearch](https://opensearch.org/) Index. To use this connector, add 
+the following dependency to your project:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">Opensearch version</th>
+      <th class="text-left">Maven Dependency</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>1.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+    <tr>
+        <td>2.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+  </tbody>
+</table>
+
+{{< py_download_link "opensearch" >}}

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-opensearch] reta commented on a diff in pull request #3: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors (documentation)

Posted by GitBox <gi...@apache.org>.
reta commented on code in PR #3:
URL: https://github.com/apache/flink-connector-opensearch/pull/3#discussion_r1044440312


##########
docs/content/docs/connectors/datastream/opensearch.md:
##########
@@ -0,0 +1,328 @@
+---
+title: Opensearch
+weight: 5
+type: docs
+aliases:
+  - /dev/connectors/opensearch.html
+  - /apis/streaming/connectors/opensearch.html

Review Comment:
   Gone, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-opensearch] reta commented on pull request #3: [FLINK-25756] [connectors/opensearch] Dedicated Opensearch connectors (documentation)

Posted by GitBox <gi...@apache.org>.
reta commented on PR #3:
URL: https://github.com/apache/flink-connector-opensearch/pull/3#issuecomment-1342717671

   @zentol here is the documentation part, I could bundle it into https://github.com/apache/flink-connector-opensearch/pull/1 if it does not complicate the review, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org