You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/03/21 11:04:08 UTC

[flink] branch release-1.15 updated: [FLINK-26281][connectors/elasticsearch] setting default delivery guarantee to AT_LEAST_ONCE

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new e893b70  [FLINK-26281][connectors/elasticsearch] setting default delivery guarantee to AT_LEAST_ONCE
e893b70 is described below

commit e893b700eaf99ef8b278cfa75478098b7674b762
Author: Alexander Preuß <11...@users.noreply.github.com>
AuthorDate: Thu Mar 10 09:48:58 2022 +0100

    [FLINK-26281][connectors/elasticsearch] setting default delivery guarantee to AT_LEAST_ONCE
---
 .../docs/connectors/datastream/elasticsearch.md    | 52 ++++------------------
 .../content/docs/connectors/table/elasticsearch.md |  2 +-
 .../elasticsearch/sink/ElasticsearchSink.java      |  6 +++
 .../sink/ElasticsearchSinkBuilderBase.java         |  2 +-
 .../table/ElasticsearchConnectorOptions.java       |  2 +-
 .../sink/ElasticsearchSinkBuilderBaseTest.java     |  7 +++
 .../sink/Elasticsearch6SinkBuilder.java            |  1 -
 .../sink/Elasticsearch7SinkBuilder.java            |  1 -
 8 files changed, 24 insertions(+), 49 deletions(-)

diff --git a/docs/content/docs/connectors/datastream/elasticsearch.md b/docs/content/docs/connectors/datastream/elasticsearch.md
index c8342ae..eb49392 100644
--- a/docs/content/docs/connectors/datastream/elasticsearch.md
+++ b/docs/content/docs/connectors/datastream/elasticsearch.md
@@ -214,10 +214,7 @@ flushes of the buffered actions in progress.
 
 ### Elasticsearch Sinks and Fault Tolerance
 
-By default, the Flink Elasticsearch Sink will not provide any strong delivery guarantees.
-Users have the option to enable at-least-once semantics for the Elasticsearch sink.
-
-With Flink’s checkpointing enabled, the Flink Elasticsearch Sink can guarantee
+With Flink’s checkpointing enabled, the Flink Elasticsearch Sink guarantees
 at-least-once delivery of action requests to Elasticsearch clusters. It does
 so by waiting for all pending action requests in the `BulkProcessor` at the
 time of checkpoints. This effectively assures that all requests before the
@@ -226,34 +223,13 @@ proceeding to process more records sent to the sink.
 
 More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{< ref "docs/learn-flink/fault_tolerance" >}}).
 
-To use fault tolerant Elasticsearch Sinks, at-least-once delivery has to be configured and checkpointing of the topology needs to be enabled at the execution environment:
+To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
 
 {{< tabs "d00d1e93-4844-40d7-b0ec-9ec37e73145e" >}}
 {{< tab "Java" >}}
-Elasticsearch 6:
 ```java
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.enableCheckpointing(5000); // checkpoint every 5000 msecs
-
-Elasticsearch6SinkBuilder sinkBuilder = new Elasticsearch6SinkBuilder<String>()
-    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
-    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
-    .setEmitter(
-    (element, context, indexer) -> 
-    indexer.add(createIndexRequest(element)));
-```
-
-Elasticsearch 7:
-```java
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.enableCheckpointing(5000); // checkpoint every 5000 msecs
-
-Elasticsearch7SinkBuilder sinkBuilder = new Elasticsearch7SinkBuilder<String>()
-    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
-    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
-    .setEmitter(
-    (element, context, indexer) -> 
-    indexer.add(createIndexRequest(element)));
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -261,28 +237,16 @@ Elasticsearch 6:
 ```scala
 val env = StreamExecutionEnvironment.getExecutionEnvironment()
 env.enableCheckpointing(5000) // checkpoint every 5000 msecs
-
-val sinkBuilder = new Elasticsearch6SinkBuilder[String]
-  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
-  .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
-  .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
-  indexer.add(createIndexRequest(element)))
-```
-
-Elasticsearch 7:
-```scala
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-env.enableCheckpointing(5000) // checkpoint every 5000 msecs
-
-val sinkBuilder = new Elasticsearch7SinkBuilder[String]
-  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
-  .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
-  .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
-  indexer.add(createIndexRequest(element)))
 ```
 {{< /tab >}}
 {{< /tabs >}}
 
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+<b>IMPORTANT</b>: Checkpointing is not enabled by default but the default delivery guarantee is AT_LEAST_ONCE.
+This causes the sink to buffer requests until it either finishes or the BulkProcessor flushes automatically. 
+By default, the BulkProcessor will flush after 1000 added Actions. To configure the processor to flush more frequently, please refer to the <a href="#configuring-the-internal-bulk-processor">BulkProcessor configuration section</a>.
+</p>
+
 ### Handling Failing Elasticsearch Requests
 
 Elasticsearch action requests may fail due to a variety of reasons, including
diff --git a/docs/content/docs/connectors/table/elasticsearch.md b/docs/content/docs/connectors/table/elasticsearch.md
index 78d0ce9..4d92903 100644
--- a/docs/content/docs/connectors/table/elasticsearch.md
+++ b/docs/content/docs/connectors/table/elasticsearch.md
@@ -143,7 +143,7 @@ Connector Options
       <td><h5>sink.delivery-guarantee</h5></td>
       <td>optional</td>
       <td>no</td>
-      <td style="word-wrap: break-word;">NONE</td>
+      <td style="word-wrap: break-word;">AT_LEAST_ONCE</td>
       <td>String</td>
       <td>Optional delivery guarantee when committing. Valid values are <code>NONE</code> or <code>AT_LEAST_ONCE</code>.</td>
     </tr>
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
index 30ee238..efe6dc2 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.elasticsearch.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.connector.base.DeliveryGuarantee;
@@ -88,4 +89,9 @@ public class ElasticsearchSink<IN> implements Sink<IN> {
                 context.metricGroup(),
                 context.getMailboxExecutor());
     }
+
+    @VisibleForTesting
+    DeliveryGuarantee getDeliveryGuarantee() {
+        return deliveryGuarantee;
+    }
 }
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
index fe64c94..8543255 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
@@ -48,7 +48,7 @@ public abstract class ElasticsearchSinkBuilderBase<
     private FlushBackoffType bulkFlushBackoffType = FlushBackoffType.NONE;
     private int bulkFlushBackoffRetries = -1;
     private long bulkFlushBackOffDelay = -1;
-    private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE;
+    private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.AT_LEAST_ONCE;
     private List<HttpHost> hosts;
     protected ElasticsearchEmitter<? super IN> emitter;
     private String username;
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
index 672f0727..10ea0ae 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
@@ -143,6 +143,6 @@ public class ElasticsearchConnectorOptions {
     public static final ConfigOption<DeliveryGuarantee> DELIVERY_GUARANTEE_OPTION =
             ConfigOptions.key("sink.delivery-guarantee")
                     .enumType(DeliveryGuarantee.class)
-                    .defaultValue(DeliveryGuarantee.NONE)
+                    .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
                     .withDescription("Optional delivery guarantee when committing.");
 }
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
index 695f85c..4d1890f 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.stream.Stream;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -57,6 +58,12 @@ abstract class ElasticsearchSinkBuilderBaseTest<B extends ElasticsearchSinkBuild
     }
 
     @Test
+    void testDefaultDeliveryGuarantee() {
+        assertThat(createMinimalBuilder().build().getDeliveryGuarantee())
+                .isEqualTo(DeliveryGuarantee.AT_LEAST_ONCE);
+    }
+
+    @Test
     void testThrowIfExactlyOnceConfigured() {
         assertThrows(
                 IllegalStateException.class,
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch6SinkBuilder.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch6SinkBuilder.java
index 34c098b..c90ccac 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch6SinkBuilder.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch6SinkBuilder.java
@@ -46,7 +46,6 @@ import org.elasticsearch.common.unit.TimeValue;
  *              .source(element.f1)
  *          );
  *      })
- *     .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  *     .build();
  * }</pre>
  *
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch7SinkBuilder.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch7SinkBuilder.java
index 3a47827..567f774 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch7SinkBuilder.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch7SinkBuilder.java
@@ -46,7 +46,6 @@ import org.elasticsearch.common.unit.TimeValue;
  *              .source(element.f1)
  *          );
  *      })
- *     .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  *     .build();
  * }</pre>
  *