You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/10 11:42:51 UTC
[inlong] branch master updated: [INLONG-5205][Sort] Add reporting metrics for Elasticsearch 6 connector (#5374)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7247af95d [INLONG-5205][Sort] Add reporting metrics for Elasticsearch 6 connector (#5374)
7247af95d is described below
commit 7247af95dbe54bce4a86203cc12537374a591c3c
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Wed Aug 10 19:42:46 2022 +0800
[INLONG-5205][Sort] Add reporting metrics for Elasticsearch 6 connector (#5374)
---
.../sort-connectors/elasticsearch-6/pom.xml | 5 +
.../Elasticsearch6ApiCallBridge.java | 147 ++++++
.../Elasticsearch6BulkProcessorIndexer.java | 85 ++++
.../sort/elasticsearch6/ElasticsearchSink.java | 279 ++++++++++
.../table/Elasticsearch6DynamicSink.java | 105 ++--
.../table/Elasticsearch6DynamicSinkFactory.java | 8 +-
.../Elasticsearch7ApiCallBridge.java | 151 ++++++
.../Elasticsearch7BulkProcessorIndexer.java | 86 ++++
.../sort/elasticsearch7/ElasticsearchSink.java | 265 ++++++++++
.../table/Elasticsearch7DynamicSink.java | 91 ++--
.../sort-connectors/elasticsearch-base/pom.xml | 5 +
.../elasticsearch/BufferingNoOpRequestIndexer.java | 76 +++
.../elasticsearch/ElasticsearchApiCallBridge.java | 116 +++++
.../sort/elasticsearch/ElasticsearchSinkBase.java | 559 +++++++++++++++++++++
.../elasticsearch/ElasticsearchSinkFunction.java | 62 +++
.../PreElasticsearch6BulkProcessorIndexer.java | 88 ++++
.../table/ElasticsearchConfiguration.java | 7 +-
.../elasticsearch/table/ElasticsearchOptions.java | 23 +-
.../table/RowElasticsearchSinkFunction.java | 41 +-
19 files changed, 2087 insertions(+), 112 deletions(-)
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/pom.xml b/inlong-sort/sort-connectors/elasticsearch-6/pom.xml
index 1f3c65852..efd2ad903 100644
--- a/inlong-sort/sort-connectors/elasticsearch-6/pom.xml
+++ b/inlong-sort/sort-connectors/elasticsearch-6/pom.xml
@@ -48,6 +48,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6ApiCallBridge.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6ApiCallBridge.java
new file mode 100644
index 000000000..c8fe3ecbf
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6ApiCallBridge.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.http.HttpHost;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 and later versions.
+ */
+@Internal
+public class Elasticsearch6ApiCallBridge
+ implements ElasticsearchApiCallBridge<RestHighLevelClient> {
+
+ private static final long serialVersionUID = -5222683870097809633L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6ApiCallBridge.class);
+
+ /**
+ * User-provided HTTP Host.
+ */
+ private final List<HttpHost> httpHosts;
+
+ /**
+ * The factory to configure the rest client.
+ */
+ private final RestClientFactory restClientFactory;
+
+ Elasticsearch6ApiCallBridge(List<HttpHost> httpHosts, RestClientFactory restClientFactory) {
+ Preconditions.checkArgument(httpHosts != null && !httpHosts.isEmpty());
+ this.httpHosts = httpHosts;
+ this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
+ }
+
+ @Override
+ public RestHighLevelClient createClient(Map<String, String> clientConfig) {
+ RestClientBuilder builder =
+ RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
+ restClientFactory.configureRestClientBuilder(builder);
+
+ RestHighLevelClient rhlClient = new RestHighLevelClient(builder);
+
+ return rhlClient;
+ }
+
+ @Override
+ public BulkProcessor.Builder createBulkProcessorBuilder(
+ RestHighLevelClient client, BulkProcessor.Listener listener) {
+ return BulkProcessor.builder(client::bulkAsync, listener);
+ }
+
+ @Override
+ public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+ if (!bulkItemResponse.isFailed()) {
+ return null;
+ } else {
+ return bulkItemResponse.getFailure().getCause();
+ }
+ }
+
+ @Override
+ public void configureBulkProcessorBackoff(
+ BulkProcessor.Builder builder,
+ @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+
+ BackoffPolicy backoffPolicy;
+ if (flushBackoffPolicy != null) {
+ switch (flushBackoffPolicy.getBackoffType()) {
+ case CONSTANT:
+ backoffPolicy =
+ BackoffPolicy.constantBackoff(
+ new TimeValue(flushBackoffPolicy.getDelayMillis()),
+ flushBackoffPolicy.getMaxRetryCount());
+ break;
+ case EXPONENTIAL:
+ default:
+ backoffPolicy =
+ BackoffPolicy.exponentialBackoff(
+ new TimeValue(flushBackoffPolicy.getDelayMillis()),
+ flushBackoffPolicy.getMaxRetryCount());
+ }
+ } else {
+ backoffPolicy = BackoffPolicy.noBackoff();
+ }
+
+ builder.setBackoffPolicy(backoffPolicy);
+ }
+
+ @Override
+ public RequestIndexer createBulkProcessorIndexer(
+ BulkProcessor bulkProcessor,
+ boolean flushOnCheckpoint,
+ AtomicLong numPendingRequestsRef) {
+ return new Elasticsearch6BulkProcessorIndexer(
+ bulkProcessor, flushOnCheckpoint, numPendingRequestsRef);
+ }
+
+ @Override
+ public void verifyClientConnection(RestHighLevelClient client) throws IOException {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHosts);
+ }
+
+ if (!client.ping()) {
+ throw new RuntimeException("There are no reachable Elasticsearch nodes!");
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Elasticsearch RestHighLevelClient is connected to {}", httpHosts.toString());
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
new file mode 100644
index 000000000..388dd122c
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/Elasticsearch6BulkProcessorIndexer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. {@link ActionRequest
+ * ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
+ *
+ * <p>Note: This class is binary compatible to Elasticsearch 6.
+ */
+@Internal
+class Elasticsearch6BulkProcessorIndexer implements RequestIndexer {
+
+ private final BulkProcessor bulkProcessor;
+ private final boolean flushOnCheckpoint;
+ private final AtomicLong numPendingRequestsRef;
+
+ Elasticsearch6BulkProcessorIndexer(
+ BulkProcessor bulkProcessor,
+ boolean flushOnCheckpoint,
+ AtomicLong numPendingRequestsRef) {
+ this.bulkProcessor = checkNotNull(bulkProcessor);
+ this.flushOnCheckpoint = flushOnCheckpoint;
+ this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
+ }
+
+ @Override
+ public void add(DeleteRequest... deleteRequests) {
+ for (DeleteRequest deleteRequest : deleteRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(deleteRequest);
+ }
+ }
+
+ @Override
+ public void add(IndexRequest... indexRequests) {
+ for (IndexRequest indexRequest : indexRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(indexRequest);
+ }
+ }
+
+ @Override
+ public void add(UpdateRequest... updateRequests) {
+ for (UpdateRequest updateRequest : updateRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(updateRequest);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
new file mode 100644
index 000000000..c30787d7c
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.http.HttpHost;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest ActionRequests} against a
+ * cluster for each incoming element.
+ *
+ * <p>The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch
+ * cluster. The sink will fail if no cluster can be connected to using the provided transport
+ * addresses passed to the constructor.
+ *
+ * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest
+ * ActionRequests}. This will buffer elements before sending a request to the cluster. The behaviour
+ * of the {@code BulkProcessor} can be configured using these config keys:
+ *
+ * <ul>
+ * <li>{@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ * <li>{@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
+ * <li>{@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
+ * settings in milliseconds
+ * </ul>
+ *
+ * <p>You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation
+ * of {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param <T> Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevelClient> {
+
+ private static final long serialVersionUID = 1L;
+
+ private ElasticsearchSink(
+ Map<String, String> bulkRequestsConfig,
+ List<HttpHost> httpHosts,
+ ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+ ActionRequestFailureHandler failureHandler,
+ RestClientFactory restClientFactory,
+ String inLongMetric) {
+
+ super(
+ new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory),
+ bulkRequestsConfig,
+ elasticsearchSinkFunction,
+ failureHandler,
+ inLongMetric);
+ }
+
+ /**
+ * A builder for creating an {@link ElasticsearchSink}.
+ *
+ * @param <T> Type of the elements handled by the sink this builder creates.
+ */
+ @PublicEvolving
+ public static class Builder<T> {
+
+ private final List<HttpHost> httpHosts;
+ private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+ private Map<String, String> bulkRequestsConfig = new HashMap<>();
+ private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler();
+ private RestClientFactory restClientFactory = restClientBuilder -> {
+ };
+ private String inLongMetric = null;
+
+ /**
+ * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link
+ * RestHighLevelClient}.
+ *
+ * @param httpHosts The list of {@link HttpHost} to which the {@link RestHighLevelClient}
+ * connects to.
+ * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest}
+ * from the incoming element.
+ */
+ public Builder(
+ List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+ this.httpHosts = Preconditions.checkNotNull(httpHosts);
+ this.elasticsearchSinkFunction = Preconditions.checkNotNull(elasticsearchSinkFunction);
+ }
+
+ /**
+ * set InLongMetric for reporting metrics
+ * @param inLongMetric
+ */
+ public void setInLongMetric(String inLongMetric) {
+ this.inLongMetric = inLongMetric;
+ }
+
+ /**
+ * Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to
+ * disable it.
+ *
+ * @param numMaxActions the maximum number of actions to buffer per bulk request.
+ */
+ public void setBulkFlushMaxActions(int numMaxActions) {
+ Preconditions.checkArgument(
+ numMaxActions == -1 || numMaxActions > 0,
+ "Max number of buffered actions must be larger than 0.");
+
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(numMaxActions));
+ }
+
+ /**
+ * Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to
+ * disable it.
+ *
+ * @param maxSizeMb the maximum size of buffered actions, in mb.
+ */
+ public void setBulkFlushMaxSizeMb(int maxSizeMb) {
+ Preconditions.checkArgument(
+ maxSizeMb == -1 || maxSizeMb > 0,
+ "Max size of buffered actions must be larger than 0.");
+
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, String.valueOf(maxSizeMb));
+ }
+
+ /**
+ * Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it.
+ *
+ * @param intervalMillis the bulk flush interval, in milliseconds.
+ */
+ public void setBulkFlushInterval(long intervalMillis) {
+ Preconditions.checkArgument(
+ intervalMillis == -1 || intervalMillis >= 0,
+ "Interval (in milliseconds) between each flush must be larger than or equal to 0.");
+
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, String.valueOf(intervalMillis));
+ }
+
+ /**
+ * Sets whether or not to enable bulk flush backoff behaviour.
+ *
+ * @param enabled whether or not to enable backoffs.
+ */
+ public void setBulkFlushBackoff(boolean enabled) {
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, String.valueOf(enabled));
+ }
+
+ /**
+ * Sets the type of back of to use when flushing bulk requests.
+ *
+ * @param flushBackoffType the backoff type to use.
+ */
+ public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType) {
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE,
+ Preconditions.checkNotNull(flushBackoffType).toString());
+ }
+
+ /**
+ * Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
+ *
+ * @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk
+ * requests
+ */
+ public void setBulkFlushBackoffRetries(int maxRetries) {
+ Preconditions.checkArgument(
+ maxRetries > 0, "Max number of backoff attempts must be larger than 0.");
+
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, String.valueOf(maxRetries));
+ }
+
+ /**
+ * Sets the amount of delay between each backoff attempt when flushing bulk requests, in
+ * milliseconds.
+ *
+ * @param delayMillis the amount of delay between each backoff attempt when flushing bulk
+ * requests, in milliseconds.
+ */
+ public void setBulkFlushBackoffDelay(long delayMillis) {
+ Preconditions.checkArgument(
+ delayMillis >= 0,
+ "Delay (in milliseconds) between each backoff attempt must be larger than or equal to 0.");
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, String.valueOf(delayMillis));
+ }
+
+ /**
+ * Sets a failure handler for action requests.
+ *
+ * @param failureHandler This is used to handle failed {@link ActionRequest}.
+ */
+ public void setFailureHandler(ActionRequestFailureHandler failureHandler) {
+ this.failureHandler = Preconditions.checkNotNull(failureHandler);
+ }
+
+ /**
+ * Sets a REST client factory for custom client configuration.
+ *
+ * @param restClientFactory the factory that configures the rest client.
+ */
+ public void setRestClientFactory(RestClientFactory restClientFactory) {
+ this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
+ }
+
+ /**
+ * Creates the Elasticsearch sink.
+ *
+ * @return the created Elasticsearch sink.
+ */
+ public ElasticsearchSink<T> build() {
+ return new ElasticsearchSink<>(
+ bulkRequestsConfig,
+ httpHosts,
+ elasticsearchSinkFunction,
+ failureHandler,
+ restClientFactory,
+ inLongMetric
+ );
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Builder<?> builder = (Builder<?>) o;
+ return Objects.equals(httpHosts, builder.httpHosts)
+ && Objects.equals(elasticsearchSinkFunction, builder.elasticsearchSinkFunction)
+ && Objects.equals(bulkRequestsConfig, builder.bulkRequestsConfig)
+ && Objects.equals(failureHandler, builder.failureHandler)
+ && Objects.equals(restClientFactory, builder.restClientFactory)
+ && Objects.equals(inLongMetric, builder.inLongMetric);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ httpHosts,
+ elasticsearchSinkFunction,
+ bulkRequestsConfig,
+ failureHandler,
+ restClientFactory,
+ inLongMetric);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
index af2ec05d8..af9841148 100644
--- a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
@@ -21,10 +21,6 @@ package org.apache.inlong.sort.elasticsearch6.table;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.inlong.sort.elasticsearch.table.RequestFactory;
-import org.apache.inlong.sort.elasticsearch.table.IndexGeneratorFactory;
-import org.apache.inlong.sort.elasticsearch.table.KeyExtractor;
-import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
@@ -34,14 +30,17 @@ import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.StringUtils;
-
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.inlong.sort.elasticsearch.table.IndexGeneratorFactory;
+import org.apache.inlong.sort.elasticsearch.table.KeyExtractor;
+import org.apache.inlong.sort.elasticsearch.table.RequestFactory;
import org.apache.inlong.sort.elasticsearch.table.RoutingExtractor;
import org.apache.inlong.sort.elasticsearch.table.RowElasticsearchSinkFunction;
+import org.apache.inlong.sort.elasticsearch6.ElasticsearchSink;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
@@ -49,7 +48,6 @@ import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import javax.annotation.Nullable;
-
import java.util.List;
import java.util.Objects;
@@ -59,19 +57,15 @@ import java.util.Objects;
*/
@PublicEvolving
final class Elasticsearch6DynamicSink implements DynamicTableSink {
+
@VisibleForTesting
static final Elasticsearch6RequestFactory REQUEST_FACTORY = new Elasticsearch6RequestFactory();
private final EncodingFormat<SerializationSchema<RowData>> format;
private final TableSchema schema;
private final Elasticsearch6Configuration config;
-
- public Elasticsearch6DynamicSink(
- EncodingFormat<SerializationSchema<RowData>> format,
- Elasticsearch6Configuration config,
- TableSchema schema) {
- this(format, config, schema, (ElasticsearchSink.Builder::new));
- }
+ private final String inLongMetric;
+ private final ElasticSearchBuilderProvider builderProvider;
// --------------------------------------------------------------
// Hack to make configuration testing possible.
@@ -83,29 +77,27 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
// on the sink itself.
// --------------------------------------------------------------
- private final ElasticSearchBuilderProvider builderProvider;
-
- @FunctionalInterface
- interface ElasticSearchBuilderProvider {
- ElasticsearchSink.Builder<RowData> createBuilder(
- List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
+ public Elasticsearch6DynamicSink(
+ EncodingFormat<SerializationSchema<RowData>> format,
+ Elasticsearch6Configuration config,
+ TableSchema schema,
+ String inLongMetric) {
+ this(format, config, schema, (ElasticsearchSink.Builder::new), inLongMetric);
}
Elasticsearch6DynamicSink(
EncodingFormat<SerializationSchema<RowData>> format,
Elasticsearch6Configuration config,
TableSchema schema,
- ElasticSearchBuilderProvider builderProvider) {
+ ElasticSearchBuilderProvider builderProvider,
+ String inLongMetric) {
this.format = format;
this.schema = schema;
this.config = config;
this.builderProvider = builderProvider;
+ this.inLongMetric = inLongMetric;
}
- // --------------------------------------------------------------
- // End of hack to make configuration testing possible
- // --------------------------------------------------------------
-
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
@@ -117,6 +109,10 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
return builder.build();
}
+ // --------------------------------------------------------------
+ // End of hack to make configuration testing possible
+ // --------------------------------------------------------------
+
@Override
public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
return () -> {
@@ -132,7 +128,8 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
REQUEST_FACTORY,
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
RoutingExtractor.createRoutingExtractor(
- schema, config.getRoutingField().orElse(null)));
+ schema, config.getRoutingField().orElse(null)),
+ inLongMetric);
final ElasticsearchSink.Builder<RowData> builder =
builderProvider.createBuilder(config.getHosts(), upsertFunction);
@@ -142,6 +139,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
builder.setBulkFlushInterval(config.getBulkFlushInterval());
builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
+ builder.setInLongMetric(inLongMetric);
config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
@@ -182,7 +180,37 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
return "Elasticsearch6";
}
- /** Serializable {@link RestClientFactory} used by the sink. */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Elasticsearch6DynamicSink that = (Elasticsearch6DynamicSink) o;
+ return Objects.equals(format, that.format)
+ && Objects.equals(schema, that.schema)
+ && Objects.equals(config, that.config)
+ && Objects.equals(builderProvider, that.builderProvider)
+ && Objects.equals(inLongMetric, that.inLongMetric);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(format, schema, config, builderProvider, inLongMetric);
+ }
+
+ @FunctionalInterface
+ interface ElasticSearchBuilderProvider {
+
+ ElasticsearchSink.Builder<RowData> createBuilder(
+ List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
+ }
+
+ /**
+ * Serializable {@link RestClientFactory} used by the sink.
+ */
@VisibleForTesting
static class DefaultRestClientFactory implements RestClientFactory {
@@ -217,7 +245,9 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
}
}
- /** Serializable {@link RestClientFactory} used by the sink which enable authentication. */
+ /**
+ * Serializable {@link RestClientFactory} used by the sink which enable authentication.
+ */
@VisibleForTesting
static class AuthRestClientFactory implements RestClientFactory {
@@ -274,6 +304,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
* sink.
*/
private static class Elasticsearch6RequestFactory implements RequestFactory {
+
@Override
public UpdateRequest createUpdateRequest(
String index,
@@ -301,24 +332,4 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
return new DeleteRequest(index, docType, key);
}
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Elasticsearch6DynamicSink that = (Elasticsearch6DynamicSink) o;
- return Objects.equals(format, that.format)
- && Objects.equals(schema, that.schema)
- && Objects.equals(config, that.config)
- && Objects.equals(builderProvider, that.builderProvider);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(format, schema, config, builderProvider);
- }
}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
index 4a49939ec..61be6506b 100644
--- a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
@@ -53,6 +53,7 @@ import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FL
import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.ROUTING_FIELD_NAME;
import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.PASSWORD_OPTION;
@@ -79,7 +80,8 @@ public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory
CONNECTION_PATH_PREFIX,
FORMAT_OPTION,
PASSWORD_OPTION,
- USERNAME_OPTION)
+ USERNAME_OPTION,
+ INLONG_METRIC)
.collect(Collectors.toSet());
@Override
@@ -100,8 +102,10 @@ public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory
validate(config, configuration);
+ String inLongMetric = helper.getOptions().get(INLONG_METRIC);
+
return new Elasticsearch6DynamicSink(
- format, config, TableSchemaUtils.getPhysicalSchema(tableSchema));
+ format, config, TableSchemaUtils.getPhysicalSchema(tableSchema), inLongMetric);
}
private void validate(Elasticsearch6Configuration config, Configuration originalConfiguration) {
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7ApiCallBridge.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7ApiCallBridge.java
new file mode 100644
index 000000000..f43708245
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7ApiCallBridge.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch7;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
+import org.apache.flink.util.Preconditions;
+import org.apache.http.HttpHost;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchApiCallBridge;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 7 and later versions.
+ */
+@Internal
+public class Elasticsearch7ApiCallBridge
+ implements ElasticsearchApiCallBridge<RestHighLevelClient> {
+
+ private static final long serialVersionUID = -5222683870097809633L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7ApiCallBridge.class);
+
+ /**
+ * User-provided HTTP Host.
+ */
+ private final List<HttpHost> httpHosts;
+
+ /**
+ * The factory to configure the rest client.
+ */
+ private final RestClientFactory restClientFactory;
+
+ Elasticsearch7ApiCallBridge(List<HttpHost> httpHosts, RestClientFactory restClientFactory) {
+ Preconditions.checkArgument(httpHosts != null && !httpHosts.isEmpty());
+ this.httpHosts = httpHosts;
+ this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
+ }
+
+ @Override
+ public RestHighLevelClient createClient(Map<String, String> clientConfig) {
+ RestClientBuilder builder =
+ RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
+ restClientFactory.configureRestClientBuilder(builder);
+
+ RestHighLevelClient rhlClient = new RestHighLevelClient(builder);
+
+ return rhlClient;
+ }
+
+ @Override
+ public BulkProcessor.Builder createBulkProcessorBuilder(
+ RestHighLevelClient client, BulkProcessor.Listener listener) {
+ return BulkProcessor.builder(
+ (request, bulkListener) ->
+ client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
+ listener);
+ }
+
+ @Override
+ public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
+ if (!bulkItemResponse.isFailed()) {
+ return null;
+ } else {
+ return bulkItemResponse.getFailure().getCause();
+ }
+ }
+
+ @Override
+ public void configureBulkProcessorBackoff(
+ BulkProcessor.Builder builder,
+ @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
+
+ BackoffPolicy backoffPolicy;
+ if (flushBackoffPolicy != null) {
+ switch (flushBackoffPolicy.getBackoffType()) {
+ case CONSTANT:
+ backoffPolicy =
+ BackoffPolicy.constantBackoff(
+ new TimeValue(flushBackoffPolicy.getDelayMillis()),
+ flushBackoffPolicy.getMaxRetryCount());
+ break;
+ case EXPONENTIAL:
+ default:
+ backoffPolicy =
+ BackoffPolicy.exponentialBackoff(
+ new TimeValue(flushBackoffPolicy.getDelayMillis()),
+ flushBackoffPolicy.getMaxRetryCount());
+ }
+ } else {
+ backoffPolicy = BackoffPolicy.noBackoff();
+ }
+
+ builder.setBackoffPolicy(backoffPolicy);
+ }
+
+ @Override
+ public RequestIndexer createBulkProcessorIndexer(
+ BulkProcessor bulkProcessor,
+ boolean flushOnCheckpoint,
+ AtomicLong numPendingRequestsRef) {
+ return new Elasticsearch7BulkProcessorIndexer(
+ bulkProcessor, flushOnCheckpoint, numPendingRequestsRef);
+ }
+
+ @Override
+ public void verifyClientConnection(RestHighLevelClient client) throws IOException {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHosts);
+ }
+
+ if (!client.ping(RequestOptions.DEFAULT)) {
+ throw new RuntimeException("There are no reachable Elasticsearch nodes!");
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Elasticsearch RestHighLevelClient is connected to {}", httpHosts.toString());
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7BulkProcessorIndexer.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7BulkProcessorIndexer.java
new file mode 100644
index 000000000..9094048a1
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/Elasticsearch7BulkProcessorIndexer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch7;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. {@link ActionRequest
+ * ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
+ *
+ * <p>Note: This class is binary compatible to Elasticsearch 7.
+ */
+@Internal
+class Elasticsearch7BulkProcessorIndexer implements RequestIndexer {
+
+ private final BulkProcessor bulkProcessor;
+ private final boolean flushOnCheckpoint;
+ private final AtomicLong numPendingRequestsRef;
+
+ Elasticsearch7BulkProcessorIndexer(
+ BulkProcessor bulkProcessor,
+ boolean flushOnCheckpoint,
+ AtomicLong numPendingRequestsRef) {
+ this.bulkProcessor = checkNotNull(bulkProcessor);
+ this.flushOnCheckpoint = flushOnCheckpoint;
+ this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
+ }
+
+ @Override
+ public void add(DeleteRequest... deleteRequests) {
+ for (DeleteRequest deleteRequest : deleteRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(deleteRequest);
+ }
+ }
+
+ @Override
+ public void add(IndexRequest... indexRequests) {
+ for (IndexRequest indexRequest : indexRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(indexRequest);
+ }
+ }
+
+ @Override
+ public void add(UpdateRequest... updateRequests) {
+ for (UpdateRequest updateRequest : updateRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(updateRequest);
+ }
+ }
+}
+
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
new file mode 100644
index 000000000..595d6da1c
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch7;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Elasticsearch 7.x sink that requests multiple {@link ActionRequest ActionRequests} against a
+ * cluster for each incoming element.
+ *
+ * <p>The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch
+ * cluster. The sink will fail if no cluster can be connected to using the provided transport
+ * addresses passed to the constructor.
+ *
+ * <p>Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest
+ * ActionRequests}. This will buffer elements before sending a request to the cluster. The behaviour
+ * of the {@code BulkProcessor} can be configured using these config keys:
+ *
+ * <ul>
+ * <li>{@code bulk.flush.max.actions}: Maximum amount of elements to buffer
+ * <li>{@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer
+ * <li>{@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two
+ * settings in milliseconds
+ * </ul>
+ *
+ * <p>You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation
+ * of {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param <T> Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class ElasticsearchSink<T> extends ElasticsearchSinkBase<T, RestHighLevelClient> {
+
+ private static final long serialVersionUID = 1L;
+
+ private ElasticsearchSink(
+ Map<String, String> bulkRequestsConfig,
+ List<HttpHost> httpHosts,
+ ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+ ActionRequestFailureHandler failureHandler,
+ RestClientFactory restClientFactory) {
+
+ super(
+ new Elasticsearch7ApiCallBridge(httpHosts, restClientFactory),
+ bulkRequestsConfig,
+ elasticsearchSinkFunction,
+ failureHandler,
+ null);
+ }
+
+ /**
+ * A builder for creating an {@link ElasticsearchSink}.
+ *
+ * @param <T> Type of the elements handled by the sink this builder creates.
+ */
+ @PublicEvolving
+ public static class Builder<T> {
+
+ private final List<HttpHost> httpHosts;
+ private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+ private Map<String, String> bulkRequestsConfig = new HashMap<>();
+ private ActionRequestFailureHandler failureHandler = new NoOpFailureHandler();
+ private RestClientFactory restClientFactory = restClientBuilder -> {};
+
+ /**
+ * Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link
+ * RestHighLevelClient}.
+ *
+ * @param httpHosts The list of {@link HttpHost} to which the {@link RestHighLevelClient}
+ * connects to.
+ * @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest}
+ * from the incoming element.
+ */
+ public Builder(
+ List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
+ this.httpHosts = Preconditions.checkNotNull(httpHosts);
+ this.elasticsearchSinkFunction = Preconditions.checkNotNull(elasticsearchSinkFunction);
+ }
+
+ /**
+ * Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to
+ * disable it.
+ *
+ * @param numMaxActions the maximum number of actions to buffer per bulk request.
+ */
+ public void setBulkFlushMaxActions(int numMaxActions) {
+ Preconditions.checkArgument(
+ numMaxActions == -1 || numMaxActions > 0,
+ "Max number of buffered actions must be larger than 0.");
+
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(numMaxActions));
+ }
+
+ /**
+ * Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to
+ * disable it.
+ *
+ * @param maxSizeMb the maximum size of buffered actions, in mb.
+ */
+ public void setBulkFlushMaxSizeMb(int maxSizeMb) {
+ Preconditions.checkArgument(
+ maxSizeMb == -1 || maxSizeMb > 0,
+ "Max size of buffered actions must be larger than 0.");
+
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, String.valueOf(maxSizeMb));
+ }
+
+ /**
+ * Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it.
+ *
+ * @param intervalMillis the bulk flush interval, in milliseconds.
+ */
+ public void setBulkFlushInterval(long intervalMillis) {
+ Preconditions.checkArgument(
+ intervalMillis == -1 || intervalMillis >= 0,
+ "Interval (in milliseconds) between each flush must be larger than or equal to 0.");
+
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, String.valueOf(intervalMillis));
+ }
+
+ /**
+ * Sets whether or not to enable bulk flush backoff behaviour.
+ *
+ * @param enabled whether or not to enable backoffs.
+ */
+ public void setBulkFlushBackoff(boolean enabled) {
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, String.valueOf(enabled));
+ }
+
+ /**
+ * Sets the type of back of to use when flushing bulk requests.
+ *
+ * @param flushBackoffType the backoff type to use.
+ */
+ public void setBulkFlushBackoffType(FlushBackoffType flushBackoffType) {
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE,
+ Preconditions.checkNotNull(flushBackoffType).toString());
+ }
+
+ /**
+ * Sets the maximum number of retries for a backoff attempt when flushing bulk requests.
+ *
+ * @param maxRetries the maximum number of retries for a backoff attempt when flushing bulk
+ * requests
+ */
+ public void setBulkFlushBackoffRetries(int maxRetries) {
+ Preconditions.checkArgument(
+ maxRetries > 0, "Max number of backoff attempts must be larger than 0.");
+
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, String.valueOf(maxRetries));
+ }
+
+ /**
+ * Sets the amount of delay between each backoff attempt when flushing bulk requests, in
+ * milliseconds.
+ *
+ * @param delayMillis the amount of delay between each backoff attempt when flushing bulk
+ * requests, in milliseconds.
+ */
+ public void setBulkFlushBackoffDelay(long delayMillis) {
+ Preconditions.checkArgument(
+ delayMillis >= 0,
+ "Delay (in milliseconds) between each backoff attempt must be larger than or equal to 0.");
+ this.bulkRequestsConfig.put(
+ CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, String.valueOf(delayMillis));
+ }
+
+ /**
+ * Sets a failure handler for action requests.
+ *
+ * @param failureHandler This is used to handle failed {@link ActionRequest}.
+ */
+ public void setFailureHandler(ActionRequestFailureHandler failureHandler) {
+ this.failureHandler = Preconditions.checkNotNull(failureHandler);
+ }
+
+ /**
+ * Sets a REST client factory for custom client configuration.
+ *
+ * @param restClientFactory the factory that configures the rest client.
+ */
+ public void setRestClientFactory(RestClientFactory restClientFactory) {
+ this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
+ }
+
+ /**
+ * Creates the Elasticsearch sink.
+ *
+ * @return the created Elasticsearch sink.
+ */
+ public ElasticsearchSink<T> build() {
+ return new ElasticsearchSink<>(
+ bulkRequestsConfig,
+ httpHosts,
+ elasticsearchSinkFunction,
+ failureHandler,
+ restClientFactory);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Builder<?> builder = (Builder<?>) o;
+ return Objects.equals(httpHosts, builder.httpHosts)
+ && Objects.equals(elasticsearchSinkFunction, builder.elasticsearchSinkFunction)
+ && Objects.equals(bulkRequestsConfig, builder.bulkRequestsConfig)
+ && Objects.equals(failureHandler, builder.failureHandler)
+ && Objects.equals(restClientFactory, builder.restClientFactory);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ httpHosts,
+ elasticsearchSinkFunction,
+ bulkRequestsConfig,
+ failureHandler,
+ restClientFactory);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
index be09fa99a..bddfa3ebe 100644
--- a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
@@ -21,7 +21,6 @@ package org.apache.inlong.sort.elasticsearch7.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
@@ -31,7 +30,6 @@ import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.StringUtils;
-
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
@@ -42,6 +40,7 @@ import org.apache.inlong.sort.elasticsearch.table.KeyExtractor;
import org.apache.inlong.sort.elasticsearch.table.RequestFactory;
import org.apache.inlong.sort.elasticsearch.table.RoutingExtractor;
import org.apache.inlong.sort.elasticsearch.table.RowElasticsearchSinkFunction;
+import org.apache.inlong.sort.elasticsearch7.ElasticsearchSink;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
@@ -49,7 +48,6 @@ import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import javax.annotation.Nullable;
-
import java.util.List;
import java.util.Objects;
@@ -59,6 +57,7 @@ import java.util.Objects;
*/
@Internal
final class Elasticsearch7DynamicSink implements DynamicTableSink {
+
@VisibleForTesting
static final Elasticsearch7RequestFactory REQUEST_FACTORY =
new Elasticsearch7DynamicSink.Elasticsearch7RequestFactory();
@@ -66,13 +65,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
private final EncodingFormat<SerializationSchema<RowData>> format;
private final TableSchema schema;
private final Elasticsearch7Configuration config;
-
- public Elasticsearch7DynamicSink(
- EncodingFormat<SerializationSchema<RowData>> format,
- Elasticsearch7Configuration config,
- TableSchema schema) {
- this(format, config, schema, (ElasticsearchSink.Builder::new));
- }
+ private final ElasticSearchBuilderProvider builderProvider;
// --------------------------------------------------------------
// Hack to make configuration testing possible.
@@ -84,12 +77,11 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
// on the sink itself.
// --------------------------------------------------------------
- private final ElasticSearchBuilderProvider builderProvider;
-
- @FunctionalInterface
- interface ElasticSearchBuilderProvider {
- ElasticsearchSink.Builder<RowData> createBuilder(
- List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
+ public Elasticsearch7DynamicSink(
+ EncodingFormat<SerializationSchema<RowData>> format,
+ Elasticsearch7Configuration config,
+ TableSchema schema) {
+ this(format, config, schema, (ElasticsearchSink.Builder::new));
}
Elasticsearch7DynamicSink(
@@ -103,10 +95,6 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
this.builderProvider = builderProvider;
}
- // --------------------------------------------------------------
- // End of hack to make configuration testing possible
- // --------------------------------------------------------------
-
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
@@ -118,6 +106,10 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
return builder.build();
}
+ // --------------------------------------------------------------
+ // End of hack to make configuration testing possible
+ // --------------------------------------------------------------
+
@Override
public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
return () -> {
@@ -133,7 +125,8 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
REQUEST_FACTORY,
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
RoutingExtractor.createRoutingExtractor(
- schema, config.getRoutingField().orElse(null)));
+ schema, config.getRoutingField().orElse(null)),
+ null);
final ElasticsearchSink.Builder<RowData> builder =
builderProvider.createBuilder(config.getHosts(), upsertFunction);
@@ -183,7 +176,36 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
return "Elasticsearch7";
}
- /** Serializable {@link RestClientFactory} used by the sink. */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Elasticsearch7DynamicSink that = (Elasticsearch7DynamicSink) o;
+ return Objects.equals(format, that.format)
+ && Objects.equals(schema, that.schema)
+ && Objects.equals(config, that.config)
+ && Objects.equals(builderProvider, that.builderProvider);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(format, schema, config, builderProvider);
+ }
+
+ @FunctionalInterface
+ interface ElasticSearchBuilderProvider {
+
+ ElasticsearchSink.Builder<RowData> createBuilder(
+ List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
+ }
+
+ /**
+ * Serializable {@link RestClientFactory} used by the sink.
+ */
@VisibleForTesting
static class DefaultRestClientFactory implements RestClientFactory {
@@ -218,7 +240,9 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
}
}
- /** Serializable {@link RestClientFactory} used by the sink which enable authentication. */
+ /**
+ * Serializable {@link RestClientFactory} used by the sink which enable authentication.
+ */
@VisibleForTesting
static class AuthRestClientFactory implements RestClientFactory {
@@ -275,6 +299,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
* sink.
*/
private static class Elasticsearch7RequestFactory implements RequestFactory {
+
@Override
public UpdateRequest createUpdateRequest(
String index,
@@ -302,24 +327,4 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
return new DeleteRequest(index, key);
}
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Elasticsearch7DynamicSink that = (Elasticsearch7DynamicSink) o;
- return Objects.equals(format, that.format)
- && Objects.equals(schema, that.schema)
- && Objects.equals(config, that.config)
- && Objects.equals(builderProvider, that.builderProvider);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(format, schema, config, builderProvider);
- }
}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/pom.xml b/inlong-sort/sort-connectors/elasticsearch-base/pom.xml
index eaadca70e..146a76e9d 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/pom.xml
+++ b/inlong-sort/sort-connectors/elasticsearch-base/pom.xml
@@ -31,6 +31,11 @@
<packaging>jar</packaging>
<dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- core dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java
new file mode 100644
index 000000000..12b6e29ee
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/BufferingNoOpRequestIndexer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Collections;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Implementation of a {@link RequestIndexer} that buffers {@link ActionRequest ActionRequests}
+ * before re-sending them to the Elasticsearch cluster upon request.
+ */
+@Internal
+@NotThreadSafe
+class BufferingNoOpRequestIndexer implements RequestIndexer {
+
+ private ConcurrentLinkedQueue<ActionRequest> bufferedRequests;
+
+ BufferingNoOpRequestIndexer() {
+ this.bufferedRequests = new ConcurrentLinkedQueue<ActionRequest>();
+ }
+
+ @Override
+ public void add(DeleteRequest... deleteRequests) {
+ Collections.addAll(bufferedRequests, deleteRequests);
+ }
+
+ @Override
+ public void add(IndexRequest... indexRequests) {
+ Collections.addAll(bufferedRequests, indexRequests);
+ }
+
+ @Override
+ public void add(UpdateRequest... updateRequests) {
+ Collections.addAll(bufferedRequests, updateRequests);
+ }
+
+ void processBufferedRequests(RequestIndexer actualIndexer) {
+ for (ActionRequest request : bufferedRequests) {
+ if (request instanceof IndexRequest) {
+ actualIndexer.add((IndexRequest) request);
+ } else if (request instanceof DeleteRequest) {
+ actualIndexer.add((DeleteRequest) request);
+ } else if (request instanceof UpdateRequest) {
+ actualIndexer.add((UpdateRequest) request);
+ }
+ }
+
+ bufferedRequests.clear();
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java
new file mode 100644
index 000000000..34c448518
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchApiCallBridge.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An {@link ElasticsearchApiCallBridge} is used to bridge incompatible Elasticsearch Java API calls
+ * across different versions. This includes calls to create Elasticsearch clients, handle failed
+ * item responses, etc. Any incompatible Elasticsearch Java APIs should be bridged using this
+ * interface.
+ *
+ * <p>Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since
+ * connecting via an embedded node is allowed, the call bridge will hold reference to the created
+ * embedded node. Each instance of the sink will hold exactly one instance of the call bridge, and
+ * state cleanup is performed when the sink is closed.
+ *
+ * @param <C> The Elasticsearch client, that implements {@link AutoCloseable}.
+ */
+@Internal
+public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends Serializable {
+
+ /**
+ * Creates an Elasticsearch client implementing {@link AutoCloseable}.
+ *
+ * @param clientConfig The configuration to use when constructing the client.
+ * @return The created client.
+ */
+ C createClient(Map<String, String> clientConfig);
+
+ /**
+ * Creates a {@link BulkProcessor.Builder} for creating the bulk processor.
+ *
+ * @param client the Elasticsearch client.
+ * @param listener the bulk processor listener.
+ * @return the bulk processor builder.
+ */
+ BulkProcessor.Builder createBulkProcessorBuilder(C client, BulkProcessor.Listener listener);
+
+ /**
+ * Extracts the cause of failure of a bulk item action.
+ *
+ * @param bulkItemResponse the bulk item response to extract cause of failure
+ * @return the extracted {@link Throwable} from the response ({@code null} is the response is
+ * successful).
+ */
+ @Nullable
+ Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
+
+ /**
+ * Set backoff-related configurations on the provided {@link BulkProcessor.Builder}. The builder
+ * will be later on used to instantiate the actual {@link BulkProcessor}.
+ *
+ * @param builder the {@link BulkProcessor.Builder} to configure.
+ * @param flushBackoffPolicy user-provided backoff retry settings ({@code null} if the user
+ * disabled backoff retries).
+ */
+ void configureBulkProcessorBackoff(
+ BulkProcessor.Builder builder,
+ @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy);
+
+ /**
+ * Verify the client connection by making a test request/ping to the Elasticsearch cluster.
+ *
+ * <p>Called by {@link ElasticsearchSinkBase#open(org.apache.flink.configuration.Configuration)}
+ * after creating the client. This makes sure the underlying client is closed if the connection
+ * is not successful and preventing thread leak.
+ *
+ * @param client the Elasticsearch client.
+ */
+ void verifyClientConnection(C client) throws IOException;
+
+ /**
+ * Creates a {@link RequestIndexer} that is able to work with {@link BulkProcessor} binary
+ * compatible.
+ */
+ default RequestIndexer createBulkProcessorIndexer(
+ BulkProcessor bulkProcessor,
+ boolean flushOnCheckpoint,
+ AtomicLong numPendingRequestsRef) {
+ return new PreElasticsearch6BulkProcessorIndexer(
+ bulkProcessor, flushOnCheckpoint, numPendingRequestsRef);
+ }
+
+ /** Perform any necessary state cleanup. */
+ default void cleanup() {
+ // nothing to cleanup by default
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
new file mode 100644
index 000000000..4dd35d060
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.rest.RestStatus;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all Flink Elasticsearch Sinks.
+ *
+ * <p>This class implements the common behaviour across Elasticsearch versions, such as the use of
+ * an internal {@link BulkProcessor} to buffer multiple {@link ActionRequest}s before sending the
+ * requests to the cluster, as well as passing input records to the user provided {@link
+ * ElasticsearchSinkFunction} for processing.
+ *
+ * <p>The version specific API calls for different Elasticsearch versions should be defined by a
+ * concrete implementation of a {@link ElasticsearchApiCallBridge}, which is provided to the
+ * constructor of this class. This call bridge is used, for example, to create a Elasticsearch
+ * {@link Client}, handle failed item responses, etc.
+ *
+ * @param <T> Type of the elements handled by this sink
+ * @param <C> Type of the Elasticsearch client, which implements {@link AutoCloseable}
+ */
+@Internal
+public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends RichSinkFunction<T>
+ implements CheckpointedFunction {
+
+ public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
+
+ // ------------------------------------------------------------------------
+ // Internal bulk processor configuration
+ // ------------------------------------------------------------------------
+ public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
+ public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
+ public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
+ public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
+ public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
+ public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";
+ private static final long serialVersionUID = -1007596293618451942L;
+ private final Integer bulkProcessorFlushMaxActions;
+ private final Integer bulkProcessorFlushMaxSizeMb;
+ private final Long bulkProcessorFlushIntervalMillis;
+ private final BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy;
+ /**
+ * The config map that contains configuration for the bulk flushing behaviours.
+ *
+ * <p>For {@link org.elasticsearch.client.transport.TransportClient} based implementations, this
+ * config map would also contain Elasticsearch-shipped configuration, and therefore this config
+ * map would also be forwarded when creating the Elasticsearch client.
+ */
+ private final Map<String, String> userConfig;
+ /**
+ * The function that is used to construct multiple {@link ActionRequest ActionRequests} from
+ * each incoming element.
+ */
+ private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
+
+ // ------------------------------------------------------------------------
+ // User-facing API and configuration
+ // ------------------------------------------------------------------------
+ /**
+ * User-provided handler for failed {@link ActionRequest ActionRequests}.
+ */
+ private final ActionRequestFailureHandler failureHandler;
+ /**
+ * Call bridge for different version-specific.
+ */
+ private final ElasticsearchApiCallBridge<C> callBridge;
+ /**
+ * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown
+ * in callbacks and the user considered it should fail the sink via the {@link
+ * ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, int, RequestIndexer)} method.
+ *
+ * <p>Errors will be checked and rethrown before processing each input element, and when the
+ * sink is closed.
+ */
+ private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
+ private final String inLongMetric;
+ /**
+ * If true, the producer will wait until all outstanding action requests have been sent to
+ * Elasticsearch.
+ */
+ private boolean flushOnCheckpoint = true;
+ /**
+ * Provided to the user via the {@link ElasticsearchSinkFunction} to add {@link ActionRequest
+ * ActionRequests}.
+ */
+ private transient RequestIndexer requestIndexer;
+
+ // ------------------------------------------------------------------------
+ // Internals for the Flink Elasticsearch Sink
+ // ------------------------------------------------------------------------
+ /**
+ * Provided to the {@link ActionRequestFailureHandler} to allow users to re-index failed
+ * requests.
+ */
+ private transient BufferingNoOpRequestIndexer failureRequestIndexer;
+ /**
+ * Number of pending action requests not yet acknowledged by Elasticsearch. This value is
+ * maintained only if {@link ElasticsearchSinkBase#flushOnCheckpoint} is {@code true}.
+ *
+ * <p>This is incremented whenever the user adds (or re-adds through the {@link
+ * ActionRequestFailureHandler}) requests to the {@link RequestIndexer}. It is decremented for
+ * each completed request of a bulk request, in {@link BulkProcessor.Listener#afterBulk(long,
+ * BulkRequest, BulkResponse)} and {@link BulkProcessor.Listener#afterBulk(long, BulkRequest,
+ * Throwable)}.
+ */
+ private AtomicLong numPendingRequests = new AtomicLong(0);
+ /**
+ * Elasticsearch client created using the call bridge.
+ */
+ private transient C client;
+ /**
+ * Bulk processor to buffer and send requests to Elasticsearch, created using the client.
+ */
+ private transient BulkProcessor bulkProcessor;
+ private SinkMetricData sinkMetricData;
+
+ public ElasticsearchSinkBase(
+ ElasticsearchApiCallBridge<C> callBridge,
+ Map<String, String> userConfig,
+ ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
+ ActionRequestFailureHandler failureHandler,
+ String inLongMetric) {
+ this.inLongMetric = inLongMetric;
+ this.callBridge = checkNotNull(callBridge);
+ this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction);
+ this.failureHandler = checkNotNull(failureHandler);
+ // we eagerly check if the user-provided sink function and failure handler is serializable;
+ // otherwise, if they aren't serializable, users will merely get a non-informative error
+ // message
+ // "ElasticsearchSinkBase is not serializable"
+
+ checkArgument(
+ InstantiationUtil.isSerializable(elasticsearchSinkFunction),
+ "The implementation of the provided ElasticsearchSinkFunction is not serializable. "
+ + "The object probably contains or references non-serializable fields.");
+
+ checkArgument(
+ InstantiationUtil.isSerializable(failureHandler),
+ "The implementation of the provided ActionRequestFailureHandler is not serializable. "
+ + "The object probably contains or references non-serializable fields.");
+
+ // extract and remove bulk processor related configuration from the user-provided config,
+ // so that the resulting user config only contains configuration related to the
+ // Elasticsearch client.
+
+ checkNotNull(userConfig);
+
+ // copy config so we can remove entries without side-effects
+ userConfig = new HashMap<>(userConfig);
+
+ ParameterTool params = ParameterTool.fromMap(userConfig);
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
+ bulkProcessorFlushMaxActions = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
+ } else {
+ bulkProcessorFlushMaxActions = null;
+ }
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
+ bulkProcessorFlushMaxSizeMb = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
+ } else {
+ bulkProcessorFlushMaxSizeMb = null;
+ }
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
+ bulkProcessorFlushIntervalMillis = params.getLong(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
+ } else {
+ bulkProcessorFlushIntervalMillis = null;
+ }
+
+ boolean bulkProcessorFlushBackoffEnable =
+ params.getBoolean(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, true);
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE);
+
+ if (bulkProcessorFlushBackoffEnable) {
+ this.bulkProcessorFlushBackoffPolicy = new BulkFlushBackoffPolicy();
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)) {
+ bulkProcessorFlushBackoffPolicy.setBackoffType(
+ FlushBackoffType.valueOf(params.get(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)));
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE);
+ }
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES)) {
+ bulkProcessorFlushBackoffPolicy.setMaxRetryCount(
+ params.getInt(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES));
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES);
+ }
+
+ if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY)) {
+ bulkProcessorFlushBackoffPolicy.setDelayMillis(
+ params.getLong(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY));
+ userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY);
+ }
+
+ } else {
+ bulkProcessorFlushBackoffPolicy = null;
+ }
+
+ this.userConfig = userConfig;
+ }
+
+ /**
+ * Disable flushing on checkpoint. When disabled, the sink will not wait for all pending action
+ * requests to be acknowledged by Elasticsearch on checkpoints.
+ *
+ * <p>NOTE: If flushing on checkpoint is disabled, the Flink Elasticsearch Sink does NOT provide
+ * any strong guarantees for at-least-once delivery of action requests.
+ */
+ public void disableFlushOnCheckpoint() {
+ this.flushOnCheckpoint = false;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ client = callBridge.createClient(userConfig);
+ sinkMetricData = new SinkMetricData(getRuntimeContext().getMetricGroup());
+ if (inLongMetric != null && !inLongMetric.isEmpty()) {
+ String[] inLongMetricArray = inLongMetric.split("&");
+ String groupId = inLongMetricArray[0];
+ String streamId = inLongMetricArray[1];
+ String nodeId = inLongMetricArray[2];
+ sinkMetricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, "dirtyBytes");
+ sinkMetricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, "dirtyRecords");
+ sinkMetricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, "numBytesOut");
+ sinkMetricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, "numRecordsOut");
+ sinkMetricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId,
+ "numBytesOutPerSecond");
+ sinkMetricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
+ "numRecordsOutPerSecond");
+ }
+ callBridge.verifyClientConnection(client);
+ bulkProcessor = buildBulkProcessor(new BulkProcessorListener(sinkMetricData));
+ requestIndexer =
+ callBridge.createBulkProcessorIndexer(
+ bulkProcessor, flushOnCheckpoint, numPendingRequests);
+ failureRequestIndexer = new BufferingNoOpRequestIndexer();
+ elasticsearchSinkFunction.open(getRuntimeContext());
+ }
+
+ @Override
+ public void invoke(T value, Context context) throws Exception {
+ checkAsyncErrorsAndRequests();
+ elasticsearchSinkFunction.process(value, getRuntimeContext(), requestIndexer);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ // no initialization needed
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ checkAsyncErrorsAndRequests();
+
+ if (flushOnCheckpoint) {
+ while (numPendingRequests.get() != 0) {
+ bulkProcessor.flush();
+ checkAsyncErrorsAndRequests();
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ elasticsearchSinkFunction.close();
+ if (bulkProcessor != null) {
+ bulkProcessor.close();
+ bulkProcessor = null;
+ }
+
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+
+ callBridge.cleanup();
+
+ // make sure any errors from callbacks are rethrown
+ checkErrorAndRethrow();
+ }
+
+ /**
+ * Build the {@link BulkProcessor}.
+ *
+ * <p>Note: this is exposed for testing purposes.
+ */
+ @VisibleForTesting
+ protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
+ checkNotNull(listener);
+
+ BulkProcessor.Builder bulkProcessorBuilder =
+ callBridge.createBulkProcessorBuilder(client, listener);
+
+ // This makes flush() blocking
+ bulkProcessorBuilder.setConcurrentRequests(0);
+
+ if (bulkProcessorFlushMaxActions != null) {
+ bulkProcessorBuilder.setBulkActions(bulkProcessorFlushMaxActions);
+ }
+
+ if (bulkProcessorFlushMaxSizeMb != null) {
+ configureBulkSize(bulkProcessorBuilder);
+ }
+
+ if (bulkProcessorFlushIntervalMillis != null) {
+ configureFlushInterval(bulkProcessorBuilder);
+ }
+
+ // if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy will be null
+ callBridge.configureBulkProcessorBackoff(
+ bulkProcessorBuilder, bulkProcessorFlushBackoffPolicy);
+
+ return bulkProcessorBuilder.build();
+ }
+
+ private void configureBulkSize(BulkProcessor.Builder bulkProcessorBuilder) {
+ final ByteSizeUnit sizeUnit;
+ if (bulkProcessorFlushMaxSizeMb == -1) {
+ // bulk size can be disabled with -1, however the ByteSizeValue constructor accepts -1
+ // only with BYTES as the size unit
+ sizeUnit = ByteSizeUnit.BYTES;
+ } else {
+ sizeUnit = ByteSizeUnit.MB;
+ }
+ bulkProcessorBuilder.setBulkSize(new ByteSizeValue(bulkProcessorFlushMaxSizeMb, sizeUnit));
+ }
+
+ private void configureFlushInterval(BulkProcessor.Builder bulkProcessorBuilder) {
+ if (bulkProcessorFlushIntervalMillis == -1) {
+ bulkProcessorBuilder.setFlushInterval(null);
+ } else {
+ bulkProcessorBuilder.setFlushInterval(
+ TimeValue.timeValueMillis(bulkProcessorFlushIntervalMillis));
+ }
+ }
+
+ private void checkErrorAndRethrow() {
+ Throwable cause = failureThrowable.get();
+ if (cause != null) {
+ throw new RuntimeException("An error occurred in ElasticsearchSink.", cause);
+ }
+ }
+
+ private void checkAsyncErrorsAndRequests() {
+ checkErrorAndRethrow();
+ failureRequestIndexer.processBufferedRequests(requestIndexer);
+ }
+
+ @VisibleForTesting
+ long getNumPendingRequests() {
+ if (flushOnCheckpoint) {
+ return numPendingRequests.get();
+ } else {
+ throw new UnsupportedOperationException(
+ "The number of pending requests is not maintained when flushing on checkpoint is disabled.");
+ }
+ }
+
+ /**
+ * Used to control whether the retry delay should increase exponentially or remain constant.
+ */
+ @PublicEvolving
+ public enum FlushBackoffType {
+ CONSTANT,
+ EXPONENTIAL
+ }
+
+ /**
+ * Provides a backoff policy for bulk requests. Whenever a bulk request is rejected due to
+ * resource constraints (i.e. the client's internal thread pool is full), the backoff policy
+ * decides how long the bulk processor will wait before the operation is retried internally.
+ *
+ * <p>This is a proxy for version specific backoff policies.
+ */
+ public static class BulkFlushBackoffPolicy implements Serializable {
+
+ private static final long serialVersionUID = -6022851996101826049L;
+
+ // the default values follow the Elasticsearch default settings for BulkProcessor
+ private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
+ private int maxRetryCount = 8;
+ private long delayMillis = 50;
+
+ public FlushBackoffType getBackoffType() {
+ return backoffType;
+ }
+
+ public void setBackoffType(FlushBackoffType backoffType) {
+ this.backoffType = checkNotNull(backoffType);
+ }
+
+ public int getMaxRetryCount() {
+ return maxRetryCount;
+ }
+
+ public void setMaxRetryCount(int maxRetryCount) {
+ checkArgument(maxRetryCount >= 0);
+ this.maxRetryCount = maxRetryCount;
+ }
+
+ public long getDelayMillis() {
+ return delayMillis;
+ }
+
+ public void setDelayMillis(long delayMillis) {
+ checkArgument(delayMillis >= 0);
+ this.delayMillis = delayMillis;
+ }
+ }
+
+ private class BulkProcessorListener implements BulkProcessor.Listener {
+
+ private SinkMetricData sinkMetricData;
+
+ public BulkProcessorListener(SinkMetricData sinkMetricData) {
+ this.sinkMetricData = sinkMetricData;
+ }
+
+ @Override
+ public void beforeBulk(long executionId, BulkRequest request) {
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+ if (response.hasFailures()) {
+ BulkItemResponse itemResponse;
+ Throwable failure;
+ RestStatus restStatus;
+ DocWriteRequest actionRequest;
+
+ try {
+ for (int i = 0; i < response.getItems().length; i++) {
+ itemResponse = response.getItems()[i];
+ failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
+ if (failure != null) {
+ restStatus = itemResponse.getFailure().getStatus();
+ actionRequest = request.requests().get(i);
+ if (sinkMetricData.getDirtyRecords() != null) {
+ sinkMetricData.getDirtyRecords().inc();
+ }
+ if (restStatus == null) {
+ if (actionRequest instanceof ActionRequest) {
+ failureHandler.onFailure(
+ (ActionRequest) actionRequest,
+ failure,
+ -1,
+ failureRequestIndexer);
+ } else {
+ throw new UnsupportedOperationException(
+ "The sink currently only supports ActionRequests");
+ }
+ } else {
+ if (actionRequest instanceof ActionRequest) {
+ failureHandler.onFailure(
+ (ActionRequest) actionRequest,
+ failure,
+ restStatus.getStatus(),
+ failureRequestIndexer);
+ } else {
+ throw new UnsupportedOperationException(
+ "The sink currently only supports ActionRequests");
+ }
+ }
+ }
+ if (sinkMetricData.getNumRecordsOut() != null) {
+ sinkMetricData.getNumRecordsOut().inc();
+ }
+ }
+ } catch (Throwable t) {
+ // fail the sink and skip the rest of the items
+ // if the failure handler decides to throw an exception
+ failureThrowable.compareAndSet(null, t);
+ }
+ }
+
+ if (flushOnCheckpoint) {
+ numPendingRequests.getAndAdd(-request.numberOfActions());
+ }
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+ try {
+ for (DocWriteRequest writeRequest : request.requests()) {
+ if (sinkMetricData.getDirtyRecords() != null) {
+ sinkMetricData.getDirtyRecords().inc();
+ }
+ if (writeRequest instanceof ActionRequest) {
+ failureHandler.onFailure(
+ (ActionRequest) writeRequest, failure, -1, failureRequestIndexer);
+ } else {
+ throw new UnsupportedOperationException(
+ "The sink currently only supports ActionRequests");
+ }
+ }
+ } catch (Throwable t) {
+ // fail the sink and skip the rest of the items
+ // if the failure handler decides to throw an exception
+ failureThrowable.compareAndSet(null, t);
+ }
+
+ if (flushOnCheckpoint) {
+ numPendingRequests.getAndAdd(-request.numberOfActions());
+ }
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
new file mode 100644
index 000000000..dc4bf5af1
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+/**
+ * Creates multiple {@link ActionRequest ActionRequests} from an element in a stream.
+ *
+ * This is used by sinks to prepare elements for sending them to Elasticsearch.
+ *
+ *
+ * @param <T> The type of the element handled by this {@code ElasticsearchSinkFunction}
+ */
+@PublicEvolving
+public interface ElasticsearchSinkFunction<T> extends Serializable, Function {
+
+ /**
+ * Initialization method for the function. It is called once before the actual working process
+ * methods.
+ */
+ default void open(RuntimeContext ctx) throws Exception {
+ }
+
+ /**
+ * Tear-down method for the function. It is called when the sink closes.
+ */
+ default void close() throws Exception {
+ }
+
+ /**
+ * Process the incoming element to produce multiple {@link ActionRequest ActionsRequests}. The
+ * produced requests should be added to the provided {@link RequestIndexer}.
+ *
+ * @param element incoming element to process
+ * @param ctx runtime context containing information about the sink instance
+ * @param indexer request indexer that {@code ActionRequest} should be added to
+ */
+ void process(T element, RuntimeContext ctx, RequestIndexer indexer);
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java
new file mode 100644
index 000000000..be7b9668c
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/PreElasticsearch6BulkProcessorIndexer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link BulkProcessor}. {@link ActionRequest
+ * ActionRequests} will be buffered before sending a bulk request to the Elasticsearch cluster.
+ *
+ * @deprecated This class is not binary compatible with newer Elasticsearch 6+ versions (i.e. the
+ * {@link #add(UpdateRequest...)} ). However, this module is currently compiled against a very
+ * old Elasticsearch version.
+ */
+@Deprecated
+@Internal
+class PreElasticsearch6BulkProcessorIndexer implements RequestIndexer {
+
+ private final BulkProcessor bulkProcessor;
+ private final boolean flushOnCheckpoint;
+ private final AtomicLong numPendingRequestsRef;
+
+ PreElasticsearch6BulkProcessorIndexer(
+ BulkProcessor bulkProcessor,
+ boolean flushOnCheckpoint,
+ AtomicLong numPendingRequestsRef) {
+ this.bulkProcessor = checkNotNull(bulkProcessor);
+ this.flushOnCheckpoint = flushOnCheckpoint;
+ this.numPendingRequestsRef = checkNotNull(numPendingRequestsRef);
+ }
+
+ @Override
+ public void add(DeleteRequest... deleteRequests) {
+ for (DeleteRequest deleteRequest : deleteRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(deleteRequest);
+ }
+ }
+
+ @Override
+ public void add(IndexRequest... indexRequests) {
+ for (IndexRequest indexRequest : indexRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(indexRequest);
+ }
+ }
+
+ @Override
+ public void add(UpdateRequest... updateRequests) {
+ for (UpdateRequest updateRequest : updateRequests) {
+ if (flushOnCheckpoint) {
+ numPendingRequestsRef.getAndIncrement();
+ }
+ this.bulkProcessor.add(updateRequest);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
index 86ed88875..07d8adbf9 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
@@ -20,12 +20,12 @@ package org.apache.inlong.sort.elasticsearch.table;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
import java.time.Duration;
import java.util.Objects;
@@ -39,8 +39,11 @@ import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FA
import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.PASSWORD_OPTION;
import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.USERNAME_OPTION;
-/** Accessor methods to elasticsearch options. */
+/**
+ * Accessor methods to elasticsearch options.
+ */
public class ElasticsearchConfiguration {
+
protected final ReadableConfig config;
private final ClassLoader classLoader;
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java
index 0d74c2c3b..6954bd36a 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.description.Description;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
import java.time.Duration;
import java.util.List;
@@ -33,15 +33,6 @@ import static org.apache.flink.configuration.description.TextElement.text;
* Options for {@link org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch.
*/
public class ElasticsearchOptions {
- /**
- * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with {@code
- * DISABLED} option.
- */
- public enum BackOffType {
- DISABLED,
- CONSTANT,
- EXPONENTIAL
- }
public static final ConfigOption<List<String>> HOSTS_OPTION =
ConfigOptions.key("hosts")
@@ -75,13 +66,11 @@ public class ElasticsearchOptions {
.defaultValue("_")
.withDescription(
"Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
-
public static final ConfigOption<String> ROUTING_FIELD_NAME =
ConfigOptions.key("routing.field-name")
.stringType()
.noDefaultValue()
.withDescription("Elasticsearch routing filed.");
-
public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
ConfigOptions.key("failure-handler")
.stringType()
@@ -159,4 +148,14 @@ public class ElasticsearchOptions {
private ElasticsearchOptions() {
}
+
+ /**
+ * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with {@code
+ * DISABLED} option.
+ */
+ public enum BackOffType {
+ DISABLED,
+ CONSTANT,
+ EXPONENTIAL
+ }
}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index 2e227ad55..8af55faaf 100644
--- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -20,12 +20,12 @@ package org.apache.inlong.sort.elasticsearch.table;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
-
+import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
@@ -49,9 +49,16 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
private final XContentType contentType;
private final RequestFactory requestFactory;
private final Function<RowData, String> createKey;
+ private final String inLongMetric;
private final Function<RowData, String> createRouting;
+ private transient RuntimeContext runtimeContext;
+
+ private SinkMetricData sinkMetricData;
+ private Long dataSize = 0L;
+ private Long rowSize = 0L;
+
public RowElasticsearchSinkFunction(
IndexGenerator indexGenerator,
@Nullable String docType, // this is deprecated in es 7+
@@ -59,7 +66,8 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
XContentType contentType,
RequestFactory requestFactory,
Function<RowData, String> createKey,
- @Nullable Function<RowData, String> createRouting) {
+ @Nullable Function<RowData, String> createRouting,
+ String inLongMetric) {
this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
this.docType = docType;
this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
@@ -67,11 +75,27 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
this.requestFactory = Preconditions.checkNotNull(requestFactory);
this.createKey = Preconditions.checkNotNull(createKey);
this.createRouting = createRouting;
+ this.inLongMetric = inLongMetric;
}
@Override
- public void open() {
+ public void open(RuntimeContext ctx) {
indexGenerator.open();
+ this.runtimeContext = ctx;
+ sinkMetricData = new SinkMetricData(runtimeContext.getMetricGroup());
+ if (inLongMetric != null && !inLongMetric.isEmpty()) {
+ String[] inLongMetricArray = inLongMetric.split("&");
+ String groupId = inLongMetricArray[0];
+ String streamId = inLongMetricArray[1];
+ String nodeId = inLongMetricArray[2];
+ sinkMetricData.registerMetricsForDirtyBytes(groupId, streamId, nodeId, "dirtyBytes");
+ sinkMetricData.registerMetricsForDirtyRecords(groupId, streamId, nodeId, "dirtyRecords");
+ sinkMetricData.registerMetricsForNumBytesOut(groupId, streamId, nodeId, "numBytesOut");
+ sinkMetricData.registerMetricsForNumRecordsOut(groupId, streamId, nodeId, "numRecordsOut");
+ sinkMetricData.registerMetricsForNumBytesOutPerSecond(groupId, streamId, nodeId, "numBytesOutPerSecond");
+ sinkMetricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, nodeId,
+ "numRecordsOutPerSecond");
+ }
}
@Override
@@ -93,6 +117,9 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
private void processUpsert(RowData row, RequestIndexer indexer) {
final byte[] document = serializationSchema.serialize(row);
final String key = createKey.apply(row);
+ if (sinkMetricData.getNumBytesOut() != null) {
+ sinkMetricData.getNumBytesOut().inc(document.length);
+ }
if (key != null) {
final UpdateRequest updateRequest =
requestFactory.createUpdateRequest(
@@ -137,7 +164,8 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
&& Objects.equals(serializationSchema, that.serializationSchema)
&& contentType == that.contentType
&& Objects.equals(requestFactory, that.requestFactory)
- && Objects.equals(createKey, that.createKey);
+ && Objects.equals(createKey, that.createKey)
+ && Objects.equals(inLongMetric, that.inLongMetric);
}
@Override
@@ -148,6 +176,7 @@ public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<R
serializationSchema,
contentType,
requestFactory,
- createKey);
+ createKey,
+ inLongMetric);
}
}