You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/03/21 11:04:08 UTC
[flink] branch release-1.15 updated: [FLINK-26281][connectors/elasticsearch] setting default delivery guarantee to AT_LEAST_ONCE
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new e893b70 [FLINK-26281][connectors/elasticsearch] setting default delivery guarantee to AT_LEAST_ONCE
e893b70 is described below
commit e893b700eaf99ef8b278cfa75478098b7674b762
Author: Alexander Preuß <11...@users.noreply.github.com>
AuthorDate: Thu Mar 10 09:48:58 2022 +0100
[FLINK-26281][connectors/elasticsearch] setting default delivery guarantee to AT_LEAST_ONCE
---
.../docs/connectors/datastream/elasticsearch.md | 52 ++++------------------
.../content/docs/connectors/table/elasticsearch.md | 2 +-
.../elasticsearch/sink/ElasticsearchSink.java | 6 +++
.../sink/ElasticsearchSinkBuilderBase.java | 2 +-
.../table/ElasticsearchConnectorOptions.java | 2 +-
.../sink/ElasticsearchSinkBuilderBaseTest.java | 7 +++
.../sink/Elasticsearch6SinkBuilder.java | 1 -
.../sink/Elasticsearch7SinkBuilder.java | 1 -
8 files changed, 24 insertions(+), 49 deletions(-)
diff --git a/docs/content/docs/connectors/datastream/elasticsearch.md b/docs/content/docs/connectors/datastream/elasticsearch.md
index c8342ae..eb49392 100644
--- a/docs/content/docs/connectors/datastream/elasticsearch.md
+++ b/docs/content/docs/connectors/datastream/elasticsearch.md
@@ -214,10 +214,7 @@ flushes of the buffered actions in progress.
### Elasticsearch Sinks and Fault Tolerance
-By default, the Flink Elasticsearch Sink will not provide any strong delivery guarantees.
-Users have the option to enable at-least-once semantics for the Elasticsearch sink.
-
-With Flink’s checkpointing enabled, the Flink Elasticsearch Sink can guarantee
+With Flink’s checkpointing enabled, the Flink Elasticsearch Sink guarantees
at-least-once delivery of action requests to Elasticsearch clusters. It does
so by waiting for all pending action requests in the `BulkProcessor` at the
time of checkpoints. This effectively assures that all requests before the
@@ -226,34 +223,13 @@ proceeding to process more records sent to the sink.
More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{< ref "docs/learn-flink/fault_tolerance" >}}).
-To use fault tolerant Elasticsearch Sinks, at-least-once delivery has to be configured and checkpointing of the topology needs to be enabled at the execution environment:
+To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
{{< tabs "d00d1e93-4844-40d7-b0ec-9ec37e73145e" >}}
{{< tab "Java" >}}
-Elasticsearch 6:
```java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
-
-Elasticsearch6SinkBuilder sinkBuilder = new Elasticsearch6SinkBuilder<String>()
- .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
- .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
- .setEmitter(
- (element, context, indexer) ->
- indexer.add(createIndexRequest(element)));
-```
-
-Elasticsearch 7:
-```java
-final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.enableCheckpointing(5000); // checkpoint every 5000 msecs
-
-Elasticsearch7SinkBuilder sinkBuilder = new Elasticsearch7SinkBuilder<String>()
- .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
- .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
- .setEmitter(
- (element, context, indexer) ->
- indexer.add(createIndexRequest(element)));
```
{{< /tab >}}
{{< tab "Scala" >}}
@@ -261,28 +237,16 @@ Elasticsearch 6:
```scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
-
-val sinkBuilder = new Elasticsearch6SinkBuilder[String]
- .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
- .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
- .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
- indexer.add(createIndexRequest(element)))
-```
-
-Elasticsearch 7:
-```scala
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-env.enableCheckpointing(5000) // checkpoint every 5000 msecs
-
-val sinkBuilder = new Elasticsearch7SinkBuilder[String]
- .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
- .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
- .setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
- indexer.add(createIndexRequest(element)))
```
{{< /tab >}}
{{< /tabs >}}
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+<b>IMPORTANT</b>: Checkpointing is not enabled by default but the default delivery guarantee is AT_LEAST_ONCE.
+This causes the sink to buffer requests until it either finishes or the BulkProcessor flushes automatically.
+By default, the BulkProcessor will flush after 1000 added Actions. To configure the processor to flush more frequently, please refer to the <a href="#configuring-the-internal-bulk-processor">BulkProcessor configuration section</a>.
+</p>
+
### Handling Failing Elasticsearch Requests
Elasticsearch action requests may fail due to a variety of reasons, including
diff --git a/docs/content/docs/connectors/table/elasticsearch.md b/docs/content/docs/connectors/table/elasticsearch.md
index 78d0ce9..4d92903 100644
--- a/docs/content/docs/connectors/table/elasticsearch.md
+++ b/docs/content/docs/connectors/table/elasticsearch.md
@@ -143,7 +143,7 @@ Connector Options
<td><h5>sink.delivery-guarantee</h5></td>
<td>optional</td>
<td>no</td>
- <td style="word-wrap: break-word;">NONE</td>
+ <td style="word-wrap: break-word;">AT_LEAST_ONCE</td>
<td>String</td>
<td>Optional delivery guarantee when committing. Valid values are <code>NONE</code> or <code>AT_LEAST_ONCE</code>.</td>
</tr>
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
index 30ee238..efe6dc2 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
@@ -19,6 +19,7 @@
package org.apache.flink.connector.elasticsearch.sink;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.DeliveryGuarantee;
@@ -88,4 +89,9 @@ public class ElasticsearchSink<IN> implements Sink<IN> {
context.metricGroup(),
context.getMailboxExecutor());
}
+
+ @VisibleForTesting
+ DeliveryGuarantee getDeliveryGuarantee() {
+ return deliveryGuarantee;
+ }
}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
index fe64c94..8543255 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
@@ -48,7 +48,7 @@ public abstract class ElasticsearchSinkBuilderBase<
private FlushBackoffType bulkFlushBackoffType = FlushBackoffType.NONE;
private int bulkFlushBackoffRetries = -1;
private long bulkFlushBackOffDelay = -1;
- private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE;
+ private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.AT_LEAST_ONCE;
private List<HttpHost> hosts;
protected ElasticsearchEmitter<? super IN> emitter;
private String username;
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
index 672f0727..10ea0ae 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
@@ -143,6 +143,6 @@ public class ElasticsearchConnectorOptions {
public static final ConfigOption<DeliveryGuarantee> DELIVERY_GUARANTEE_OPTION =
ConfigOptions.key("sink.delivery-guarantee")
.enumType(DeliveryGuarantee.class)
- .defaultValue(DeliveryGuarantee.NONE)
+ .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
.withDescription("Optional delivery guarantee when committing.");
}
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
index 695f85c..4d1890f 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import java.util.stream.Stream;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -57,6 +58,12 @@ abstract class ElasticsearchSinkBuilderBaseTest<B extends ElasticsearchSinkBuild
}
@Test
+ void testDefaultDeliveryGuarantee() {
+ assertThat(createMinimalBuilder().build().getDeliveryGuarantee())
+ .isEqualTo(DeliveryGuarantee.AT_LEAST_ONCE);
+ }
+
+ @Test
void testThrowIfExactlyOnceConfigured() {
assertThrows(
IllegalStateException.class,
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch6SinkBuilder.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch6SinkBuilder.java
index 34c098b..c90ccac 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch6SinkBuilder.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch6SinkBuilder.java
@@ -46,7 +46,6 @@ import org.elasticsearch.common.unit.TimeValue;
* .source(element.f1)
* );
* })
- * .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
* .build();
* }</pre>
*
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch7SinkBuilder.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch7SinkBuilder.java
index 3a47827..567f774 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch7SinkBuilder.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch7SinkBuilder.java
@@ -46,7 +46,6 @@ import org.elasticsearch.common.unit.TimeValue;
* .source(element.f1)
* );
* })
- * .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
* .build();
* }</pre>
*