You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/07 11:00:52 UTC
[inlong] branch master updated: [INLONG-4659][Sort] Support field routing for Elasticsearch connector (#4849)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2fac488d9 [INLONG-4659][Sort] Support field routing for Elasticsearch connector (#4849)
2fac488d9 is described below
commit 2fac488d9d279022a12a9b5a1737b537e616b986
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Thu Jul 7 19:00:47 2022 +0800
[INLONG-4659][Sort] Support field routing for Elasticsearch connector (#4849)
---
.idea/vcs.xml | 4 +-
.../protocol/node/load/ElasticsearchLoadNode.java | 8 +-
.../sort-connectors/elasticsearch-6/pom.xml | 109 ++++-
.../table/Elasticsearch6Configuration.java | 80 ++++
.../table/Elasticsearch6DynamicSink.java | 324 ++++++++++++++
.../table/Elasticsearch6DynamicSinkFactory.java | 173 ++++++++
.../org.apache.flink.table.factories.Factory | 16 +
.../table/Elasticsearch6DynamicSinkITCase.java | 483 +++++++++++++++++++++
.../src/test/resources/log4j2-test.properties | 28 ++
.../sort-connectors/elasticsearch-7/pom.xml | 108 ++++-
.../table/Elasticsearch7Configuration.java | 71 +++
.../table/Elasticsearch7DynamicSink.java | 325 ++++++++++++++
.../table/Elasticsearch7DynamicSinkFactory.java | 173 ++++++++
.../org.apache.flink.table.factories.Factory | 16 +
.../table/Elasticsearch7DynamicSinkITCase.java | 451 +++++++++++++++++++
.../src/test/resources/log4j2-test.properties | 28 ++
.../sort-connectors/elasticsearch-base/pom.xml | 175 ++++++++
.../table/AbstractTimeIndexGenerator.java | 38 ++
.../table/ElasticsearchConfiguration.java | 170 ++++++++
.../elasticsearch/table/ElasticsearchOptions.java | 162 +++++++
.../table/ElasticsearchValidationUtils.java | 99 +++++
.../sort/elasticsearch/table/IndexGenerator.java | 39 ++
.../elasticsearch/table/IndexGeneratorBase.java | 49 +++
.../elasticsearch/table/IndexGeneratorFactory.java | 275 ++++++++++++
.../sort/elasticsearch/table/KeyExtractor.java | 128 ++++++
.../sort/elasticsearch/table/RequestFactory.java | 52 +++
.../sort/elasticsearch/table/RoutingExtractor.java | 65 +++
.../table/RowElasticsearchSinkFunction.java | 153 +++++++
.../elasticsearch/table/StaticIndexGenerator.java | 34 ++
.../sort/elasticsearch/table/TestContext.java | 71 +++
.../src/test/resources/log4j2-test.properties | 28 ++
inlong-sort/sort-connectors/pom.xml | 1 +
.../sort/parser/ElasticsearchSqlParseTest.java | 4 +-
licenses/inlong-sort-connectors/LICENSE | 23 +
pom.xml | 11 +-
35 files changed, 3948 insertions(+), 26 deletions(-)
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index e8d2bb62c..cf5cd1de8 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -19,7 +19,7 @@
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
- <component name="IssueNavigationConfiguration">
+ <component name="IssueNavigationConfiguration">
<option name="links">
<list>
<IssueNavigationLink>
@@ -33,4 +33,4 @@
</list>
</option>
</component>
-</project>
+</project>
\ No newline at end of file
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ElasticsearchLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ElasticsearchLoadNode.java
index c3d400b73..8cfc4ccaf 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ElasticsearchLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ElasticsearchLoadNode.java
@@ -97,18 +97,22 @@ public class ElasticsearchLoadNode extends LoadNode implements Serializable {
this.version = version;
}
+ /**
+ * if you want to set field routing, set the routing.field-name
+ */
@Override
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
- options.put("connector", "elasticsearch-7");
+ options.put("connector", "elasticsearch-7-inlong");
if (version == 6) {
- options.put("connector", "elasticsearch-6");
+ options.put("connector", "elasticsearch-6-inlong");
options.put("document-type", documentType);
}
options.put("hosts", hosts);
options.put("index", index);
options.put("password", password);
options.put("username", username);
+ options.put("routing.field-name", primaryKey);
return options;
}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/pom.xml b/inlong-sort/sort-connectors/elasticsearch-6/pom.xml
index c6039ed70..1360dca65 100644
--- a/inlong-sort/sort-connectors/elasticsearch-6/pom.xml
+++ b/inlong-sort/sort-connectors/elasticsearch-6/pom.xml
@@ -31,32 +31,125 @@
<name>Apache InLong - Sort-connector-elasticsearch6</name>
<packaging>jar</packaging>
-
- <properties>
- <elasticsearch.version>6.8.17</elasticsearch.version>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-elasticsearch-base</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <exclusions>
+ <!-- Elasticsearch Java Client has been moved to a different module in 5.x -->
+ <exclusion>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>${elasticsearch.version}</version>
+ <version>${elasticsearch6.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
- <version>${elasticsearch.version}</version>
+ <version>${elasticsearch6.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
- <version>${elasticsearch.version}</version>
+ <version>${elasticsearch6.version}</version>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>1.15.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-elasticsearch-base</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ </exclusion>
+ </exclusions>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!--
+ Including elasticsearch transport dependency for tests. Netty3 is not here anymore in 6.x
+ -->
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>transport</artifactId>
+ <version>${elasticsearch6.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.elasticsearch.plugin</groupId>
+ <artifactId>transport-netty4-client</artifactId>
+ <version>${elasticsearch6.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Elasticsearch table descriptor testing -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_2.12</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Table API integration tests -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Elasticsearch table sink factory testing -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java
new file mode 100644
index 000000000..3d8212b16
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.inlong.sort.elasticsearch.table.ElasticsearchConfiguration;
+
+import org.apache.http.HttpHost;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+
+/** Elasticsearch 6 specific configuration. */
+@Internal
+final class Elasticsearch6Configuration extends ElasticsearchConfiguration {
+ Elasticsearch6Configuration(ReadableConfig config, ClassLoader classLoader) {
+ super(config, classLoader);
+ }
+
+ public List<HttpHost> getHosts() {
+ return config.get(HOSTS_OPTION).stream()
+ .map(Elasticsearch6Configuration::validateAndParseHostsString)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Parse Hosts String to list.
+ *
+ * <p>Hosts String format was given as following:
+ *
+ * <pre>
+ * connector.hosts = http://host_name:9092;http://host_name:9093
+ * </pre>
+ */
+ private static HttpHost validateAndParseHostsString(String host) {
+ try {
+ HttpHost httpHost = HttpHost.create(host);
+ if (httpHost.getPort() < 0) {
+ throw new ValidationException(
+ String.format(
+ "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.",
+ host, HOSTS_OPTION.key()));
+ }
+
+ if (httpHost.getSchemeName() == null) {
+ throw new ValidationException(
+ String.format(
+ "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.",
+ host, HOSTS_OPTION.key()));
+ }
+ return httpHost;
+ } catch (Exception e) {
+ throw new ValidationException(
+ String.format(
+ "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.",
+ host, HOSTS_OPTION.key()),
+ e);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
new file mode 100644
index 000000000..af2ec05d8
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.inlong.sort.elasticsearch.table.RequestFactory;
+import org.apache.inlong.sort.elasticsearch.table.IndexGeneratorFactory;
+import org.apache.inlong.sort.elasticsearch.table.KeyExtractor;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.inlong.sort.elasticsearch.table.RoutingExtractor;
+import org.apache.inlong.sort.elasticsearch.table.RowElasticsearchSinkFunction;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link ElasticsearchSink} from a
+ * logical description.
+ */
+@PublicEvolving
+final class Elasticsearch6DynamicSink implements DynamicTableSink {
+ @VisibleForTesting
+ static final Elasticsearch6RequestFactory REQUEST_FACTORY = new Elasticsearch6RequestFactory();
+
+ private final EncodingFormat<SerializationSchema<RowData>> format;
+ private final TableSchema schema;
+ private final Elasticsearch6Configuration config;
+
+ public Elasticsearch6DynamicSink(
+ EncodingFormat<SerializationSchema<RowData>> format,
+ Elasticsearch6Configuration config,
+ TableSchema schema) {
+ this(format, config, schema, (ElasticsearchSink.Builder::new));
+ }
+
+ // --------------------------------------------------------------
+ // Hack to make configuration testing possible.
+ //
+ // The code in this block should never be used outside of tests.
+ // Having a way to inject a builder we can assert the builder in
+ // the test. We can not assert everything though, e.g. it is not
+ // possible to assert flushing on checkpoint, as it is configured
+ // on the sink itself.
+ // --------------------------------------------------------------
+
+ private final ElasticSearchBuilderProvider builderProvider;
+
+ @FunctionalInterface
+ interface ElasticSearchBuilderProvider {
+ ElasticsearchSink.Builder<RowData> createBuilder(
+ List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
+ }
+
+ Elasticsearch6DynamicSink(
+ EncodingFormat<SerializationSchema<RowData>> format,
+ Elasticsearch6Configuration config,
+ TableSchema schema,
+ ElasticSearchBuilderProvider builderProvider) {
+ this.format = format;
+ this.schema = schema;
+ this.config = config;
+ this.builderProvider = builderProvider;
+ }
+
+ // --------------------------------------------------------------
+ // End of hack to make configuration testing possible
+ // --------------------------------------------------------------
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+ for (RowKind kind : requestedMode.getContainedKinds()) {
+ if (kind != RowKind.UPDATE_BEFORE) {
+ builder.addContainedKind(kind);
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
+ return () -> {
+ SerializationSchema<RowData> format =
+ this.format.createRuntimeEncoder(context, schema.toRowDataType());
+
+ final RowElasticsearchSinkFunction upsertFunction =
+ new RowElasticsearchSinkFunction(
+ IndexGeneratorFactory.createIndexGenerator(config.getIndex(), schema),
+ config.getDocumentType(),
+ format,
+ XContentType.JSON,
+ REQUEST_FACTORY,
+ KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
+ RoutingExtractor.createRoutingExtractor(
+ schema, config.getRoutingField().orElse(null)));
+
+ final ElasticsearchSink.Builder<RowData> builder =
+ builderProvider.createBuilder(config.getHosts(), upsertFunction);
+
+ builder.setFailureHandler(config.getFailureHandler());
+ builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+ builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
+ builder.setBulkFlushInterval(config.getBulkFlushInterval());
+ builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
+ config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
+ config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
+ config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
+
+ // we must overwrite the default factory which is defined with a lambda because of a bug
+ // in shading lambda serialization shading see FLINK-18006
+ if (config.getUsername().isPresent()
+ && config.getPassword().isPresent()
+ && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
+ && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
+ builder.setRestClientFactory(
+ new AuthRestClientFactory(
+ config.getPathPrefix().orElse(null),
+ config.getUsername().get(),
+ config.getPassword().get()));
+ } else {
+ builder.setRestClientFactory(
+ new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
+ }
+
+ final ElasticsearchSink<RowData> sink = builder.build();
+
+ if (config.isDisableFlushOnCheckpoint()) {
+ sink.disableFlushOnCheckpoint();
+ }
+
+ return sink;
+ };
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return this;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Elasticsearch6";
+ }
+
+ /** Serializable {@link RestClientFactory} used by the sink. */
+ @VisibleForTesting
+ static class DefaultRestClientFactory implements RestClientFactory {
+
+ private final String pathPrefix;
+
+ public DefaultRestClientFactory(@Nullable String pathPrefix) {
+ this.pathPrefix = pathPrefix;
+ }
+
+ @Override
+ public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+ if (pathPrefix != null) {
+ restClientBuilder.setPathPrefix(pathPrefix);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DefaultRestClientFactory that = (DefaultRestClientFactory) o;
+ return Objects.equals(pathPrefix, that.pathPrefix);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pathPrefix);
+ }
+ }
+
+ /** Serializable {@link RestClientFactory} used by the sink which enable authentication. */
+ @VisibleForTesting
+ static class AuthRestClientFactory implements RestClientFactory {
+
+ private final String pathPrefix;
+ private final String username;
+ private final String password;
+ private transient CredentialsProvider credentialsProvider;
+
+ public AuthRestClientFactory(
+ @Nullable String pathPrefix, String username, String password) {
+ this.pathPrefix = pathPrefix;
+ this.password = password;
+ this.username = username;
+ }
+
+ @Override
+ public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+ if (pathPrefix != null) {
+ restClientBuilder.setPathPrefix(pathPrefix);
+ }
+ if (credentialsProvider == null) {
+ credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+ }
+ restClientBuilder.setHttpClientConfigCallback(
+ httpAsyncClientBuilder ->
+ httpAsyncClientBuilder.setDefaultCredentialsProvider(
+ credentialsProvider));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AuthRestClientFactory that = (AuthRestClientFactory) o;
+ return Objects.equals(pathPrefix, that.pathPrefix)
+ && Objects.equals(username, that.username)
+ && Objects.equals(password, that.password);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pathPrefix, username, password);
+ }
+ }
+
+ /**
+ * Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the
+ * sink.
+ */
+ private static class Elasticsearch6RequestFactory implements RequestFactory {
+ @Override
+ public UpdateRequest createUpdateRequest(
+ String index,
+ String docType,
+ String key,
+ XContentType contentType,
+ byte[] document) {
+ return new UpdateRequest(index, docType, key)
+ .doc(document, contentType)
+ .upsert(document, contentType);
+ }
+
+ @Override
+ public IndexRequest createIndexRequest(
+ String index,
+ String docType,
+ String key,
+ XContentType contentType,
+ byte[] document) {
+ return new IndexRequest(index, docType, key).source(document, contentType);
+ }
+
+ @Override
+ public DeleteRequest createDeleteRequest(String index, String docType, String key) {
+ return new DeleteRequest(index, docType, key);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Elasticsearch6DynamicSink that = (Elasticsearch6DynamicSink) o;
+ return Objects.equals(format, that.format)
+ && Objects.equals(schema, that.schema)
+ && Objects.equals(config, that.config)
+ && Objects.equals(builderProvider, that.builderProvider);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(format, schema, config, builderProvider);
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
new file mode 100644
index 000000000..4a49939ec
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.StringUtils;
+import org.apache.inlong.sort.elasticsearch.table.ElasticsearchValidationUtils;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.DOCUMENT_TYPE_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.ROUTING_FIELD_NAME;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.PASSWORD_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.USERNAME_OPTION;
+
+/** A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}. */
+@Internal
+public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
+ private static final Set<ConfigOption<?>> requiredOptions =
+ Stream.of(HOSTS_OPTION, INDEX_OPTION, DOCUMENT_TYPE_OPTION).collect(Collectors.toSet());
+ private static final Set<ConfigOption<?>> optionalOptions =
+ Stream.of(
+ KEY_DELIMITER_OPTION,
+ ROUTING_FIELD_NAME,
+ FAILURE_HANDLER_OPTION,
+ FLUSH_ON_CHECKPOINT_OPTION,
+ BULK_FLASH_MAX_SIZE_OPTION,
+ BULK_FLUSH_MAX_ACTIONS_OPTION,
+ BULK_FLUSH_INTERVAL_OPTION,
+ BULK_FLUSH_BACKOFF_TYPE_OPTION,
+ BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+ BULK_FLUSH_BACKOFF_DELAY_OPTION,
+ CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+ CONNECTION_PATH_PREFIX,
+ FORMAT_OPTION,
+ PASSWORD_OPTION,
+ USERNAME_OPTION)
+ .collect(Collectors.toSet());
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ TableSchema tableSchema = context.getCatalogTable().getSchema();
+ ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+
+ final EncodingFormat<SerializationSchema<RowData>> format =
+ helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
+
+ helper.validate();
+ Configuration configuration = new Configuration();
+ context.getCatalogTable().getOptions().forEach(configuration::setString);
+ Elasticsearch6Configuration config =
+ new Elasticsearch6Configuration(configuration, context.getClassLoader());
+
+ validate(config, configuration);
+
+ return new Elasticsearch6DynamicSink(
+ format, config, TableSchemaUtils.getPhysicalSchema(tableSchema));
+ }
+
+ private void validate(Elasticsearch6Configuration config, Configuration originalConfiguration) {
+ config.getFailureHandler(); // checks if we can instantiate the custom failure handler
+ config.getHosts(); // validate hosts
+ validate(
+ config.getIndex().length() >= 1,
+ () -> String.format("'%s' must not be empty", INDEX_OPTION.key()));
+ int maxActions = config.getBulkFlushMaxActions();
+ validate(
+ maxActions == -1 || maxActions >= 1,
+ () ->
+ String.format(
+ "'%s' must be at least 1. Got: %s",
+ BULK_FLUSH_MAX_ACTIONS_OPTION.key(), maxActions));
+ long maxSize = config.getBulkFlushMaxByteSize();
+ long mb1 = 1024 * 1024;
+ validate(
+ maxSize == -1 || (maxSize >= mb1 && maxSize % mb1 == 0),
+ () ->
+ String.format(
+ "'%s' must be in MB granularity. Got: %s",
+ BULK_FLASH_MAX_SIZE_OPTION.key(),
+ originalConfiguration
+ .get(BULK_FLASH_MAX_SIZE_OPTION)
+ .toHumanReadableString()));
+ validate(
+ config.getBulkFlushBackoffRetries().map(retries -> retries >= 1).orElse(true),
+ () ->
+ String.format(
+ "'%s' must be at least 1. Got: %s",
+ BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
+ config.getBulkFlushBackoffRetries().get()));
+ if (config.getUsername().isPresent()
+ && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) {
+ validate(
+ config.getPassword().isPresent()
+ && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get()),
+ () ->
+ String.format(
+ "'%s' and '%s' must be set at the same time. Got: username '%s' and password '%s'",
+ USERNAME_OPTION.key(),
+ PASSWORD_OPTION.key(),
+ config.getUsername().get(),
+ config.getPassword().orElse("")));
+ }
+ }
+
+ private static void validate(boolean condition, Supplier<String> message) {
+ if (!condition) {
+ throw new ValidationException(message.get());
+ }
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return "elasticsearch-6-inlong";
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return requiredOptions;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return optionalOptions;
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/elasticsearch-6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..944dbf0cf
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.elasticsearch6.table.Elasticsearch6DynamicSinkFactory
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/test/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkITCase.java b/inlong-sort/sort-connectors/elasticsearch-6/src/test/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkITCase.java
new file mode 100644
index 000000000..970edbcc8
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/test/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkITCase.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch6.table;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.inlong.sort.elasticsearch.table.TestContext.context;
+import static org.apache.flink.table.api.Expressions.row;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/** IT tests for {@link Elasticsearch6DynamicSink}. */
+public class Elasticsearch6DynamicSinkITCase {
+
+ @ClassRule
+ public static ElasticsearchContainer elasticsearchContainer =
+ new ElasticsearchContainer(
+ DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch-oss")
+ .withTag("6.3.1"));
+
+ @SuppressWarnings("deprecation")
+ protected final Client getClient() {
+ TransportAddress transportAddress =
+ new TransportAddress(elasticsearchContainer.getTcpHost());
+ String expectedClusterName = "docker-cluster";
+ Settings settings = Settings.builder().put("cluster.name", expectedClusterName).build();
+ return new PreBuiltTransportClient(settings).addTransportAddress(transportAddress);
+ }
+
+ @Test
+ public void testWritingDocuments() throws Exception {
+ ResolvedSchema schema =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("a", DataTypes.BIGINT().notNull()),
+ Column.physical("b", DataTypes.TIME()),
+ Column.physical("c", DataTypes.STRING().notNull()),
+ Column.physical("d", DataTypes.FLOAT()),
+ Column.physical("e", DataTypes.TINYINT().notNull()),
+ Column.physical("f", DataTypes.DATE()),
+ Column.physical("g", DataTypes.TIMESTAMP().notNull())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("name", Arrays.asList("a", "g")));
+ GenericRowData rowData =
+ GenericRowData.of(
+ 1L,
+ 12345,
+ StringData.fromString("ABCDE"),
+ 12.12f,
+ (byte) 2,
+ 12345,
+ TimestampData.fromLocalDateTime(
+ LocalDateTime.parse("2012-12-12T12:12:12")));
+
+ String index = "writing-documents";
+ String myType = "MyType";
+ Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+ SinkFunctionProvider sinkRuntimeProvider =
+ (SinkFunctionProvider)
+ sinkFactory
+ .createDynamicTableSink(
+ context()
+ .withSchema(schema)
+ .withOption(
+ ElasticsearchOptions.INDEX_OPTION.key(),
+ index)
+ .withOption(
+ ElasticsearchOptions.DOCUMENT_TYPE_OPTION
+ .key(),
+ myType)
+ .withOption(
+ ElasticsearchOptions.HOSTS_OPTION.key(),
+ elasticsearchContainer.getHttpHostAddress())
+ .withOption(
+ ElasticsearchOptions
+ .FLUSH_ON_CHECKPOINT_OPTION
+ .key(),
+ "false")
+ .build())
+ .getSinkRuntimeProvider(new MockContext());
+
+ SinkFunction<RowData> sinkFunction = sinkRuntimeProvider.createSinkFunction();
+ StreamExecutionEnvironment environment =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+ rowData.setRowKind(RowKind.UPDATE_AFTER);
+ environment.<RowData>fromElements(rowData).addSink(sinkFunction);
+ environment.execute();
+
+ Client client = getClient();
+
+ Map<Object, Object> expectedMap = new HashMap<>();
+ expectedMap.put("a", 1);
+ expectedMap.put("b", "00:00:12");
+ expectedMap.put("c", "ABCDE");
+ expectedMap.put("d", 12.12d);
+ expectedMap.put("e", 2);
+ expectedMap.put("f", "2003-10-20");
+ expectedMap.put("g", "2012-12-12 12:12:12");
+ Map<String, Object> response =
+ client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12"))
+ .actionGet()
+ .getSource();
+ assertThat(response, equalTo(expectedMap));
+ }
+
+ @Test
+ public void testWritingDocumentsFromTableApi() throws Exception {
+ TableEnvironment tableEnvironment =
+ TableEnvironment.create(
+ EnvironmentSettings.newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build());
+
+ String index = "table-api";
+ String myType = "MyType";
+ tableEnvironment.executeSql(
+ "CREATE TABLE esTable ("
+ + "a BIGINT NOT NULL,\n"
+ + "b TIME,\n"
+ + "c STRING NOT NULL,\n"
+ + "d FLOAT,\n"
+ + "e TINYINT NOT NULL,\n"
+ + "f DATE,\n"
+ + "g TIMESTAMP NOT NULL,\n"
+ + "h as a + 2,\n"
+ + "PRIMARY KEY (a, g) NOT ENFORCED\n"
+ + ")\n"
+ + "WITH (\n"
+ + String.format("'%s'='%s',\n", "connector", "elasticsearch-6-inlong")
+ + String.format(
+ "'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index)
+ + String.format(
+ "'%s'='%s',\n",
+ ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType)
+ + String.format(
+ "'%s'='%s',\n",
+ ElasticsearchOptions.HOSTS_OPTION.key(),
+ elasticsearchContainer.getHttpHostAddress())
+ + String.format(
+ "'%s'='%s'\n",
+ ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false")
+ + ")");
+
+ tableEnvironment
+ .fromValues(
+ row(
+ 1L,
+ LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+ "ABCDE",
+ 12.12f,
+ (byte) 2,
+ LocalDate.ofEpochDay(12345),
+ LocalDateTime.parse("2012-12-12T12:12:12")))
+ .executeInsert("esTable")
+ .await();
+
+ Client client = getClient();
+ Map<Object, Object> expectedMap = new HashMap<>();
+ expectedMap.put("a", 1);
+ expectedMap.put("b", "00:00:12");
+ expectedMap.put("c", "ABCDE");
+ expectedMap.put("d", 12.12d);
+ expectedMap.put("e", 2);
+ expectedMap.put("f", "2003-10-20");
+ expectedMap.put("g", "2012-12-12 12:12:12");
+ Map<String, Object> response =
+ client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12"))
+ .actionGet()
+ .getSource();
+ assertThat(response, equalTo(expectedMap));
+ }
+
+ @Test
+ public void testWritingDocumentsNoPrimaryKey() throws Exception {
+ TableEnvironment tableEnvironment =
+ TableEnvironment.create(
+ EnvironmentSettings.newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build());
+
+ String index = "no-primary-key";
+ String myType = "MyType";
+ tableEnvironment.executeSql(
+ "CREATE TABLE esTable ("
+ + "a BIGINT NOT NULL,\n"
+ + "b TIME,\n"
+ + "c STRING NOT NULL,\n"
+ + "d FLOAT,\n"
+ + "e TINYINT NOT NULL,\n"
+ + "f DATE,\n"
+ + "g TIMESTAMP NOT NULL\n"
+ + ")\n"
+ + "WITH (\n"
+ + String.format("'%s'='%s',\n", "connector", "elasticsearch-6-inlong")
+ + String.format(
+ "'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index)
+ + String.format(
+ "'%s'='%s',\n",
+ ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType)
+ + String.format(
+ "'%s'='%s',\n",
+ ElasticsearchOptions.HOSTS_OPTION.key(),
+ elasticsearchContainer.getHttpHostAddress())
+ + String.format(
+ "'%s'='%s'\n",
+ ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false")
+ + ")");
+
+ tableEnvironment
+ .fromValues(
+ row(
+ 1L,
+ LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+ "ABCDE",
+ 12.12f,
+ (byte) 2,
+ LocalDate.ofEpochDay(12345),
+ LocalDateTime.parse("2012-12-12T12:12:12")),
+ row(
+ 2L,
+ LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+ "FGHIJK",
+ 13.13f,
+ (byte) 4,
+ LocalDate.ofEpochDay(12345),
+ LocalDateTime.parse("2013-12-12T13:13:13")))
+ .executeInsert("esTable")
+ .await();
+
+ Client client = getClient();
+
+ // search API does not return documents that were not indexed, we might need to query
+ // the index a few times
+ Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
+ SearchHits hits;
+ do {
+ hits = client.prepareSearch(index).execute().actionGet().getHits();
+ if (hits.getTotalHits() < 2) {
+ Thread.sleep(200);
+ }
+ } while (hits.getTotalHits() < 2 && deadline.hasTimeLeft());
+
+ if (hits.getTotalHits() < 2) {
+ throw new AssertionError("Could not retrieve results from Elasticsearch.");
+ }
+
+ HashSet<Map<String, Object>> resultSet = new HashSet<>();
+ resultSet.add(hits.getAt(0).getSourceAsMap());
+ resultSet.add(hits.getAt(1).getSourceAsMap());
+ Map<Object, Object> expectedMap1 = new HashMap<>();
+ expectedMap1.put("a", 1);
+ expectedMap1.put("b", "00:00:12");
+ expectedMap1.put("c", "ABCDE");
+ expectedMap1.put("d", 12.12d);
+ expectedMap1.put("e", 2);
+ expectedMap1.put("f", "2003-10-20");
+ expectedMap1.put("g", "2012-12-12 12:12:12");
+ Map<Object, Object> expectedMap2 = new HashMap<>();
+ expectedMap2.put("a", 2);
+ expectedMap2.put("b", "00:00:12");
+ expectedMap2.put("c", "FGHIJK");
+ expectedMap2.put("d", 13.13d);
+ expectedMap2.put("e", 4);
+ expectedMap2.put("f", "2003-10-20");
+ expectedMap2.put("g", "2013-12-12 13:13:13");
+ HashSet<Map<Object, Object>> expectedSet = new HashSet<>();
+ expectedSet.add(expectedMap1);
+ expectedSet.add(expectedMap2);
+ assertThat(resultSet, equalTo(expectedSet));
+ }
+
+ @Test
+ public void testWritingDocumentsWithDynamicIndex() throws Exception {
+ TableEnvironment tableEnvironment =
+ TableEnvironment.create(
+ EnvironmentSettings.newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build());
+
+ String index = "dynamic-index-{b|yyyy-MM-dd}";
+ String myType = "MyType";
+ tableEnvironment.executeSql(
+ "CREATE TABLE esTable ("
+ + "a BIGINT NOT NULL,\n"
+ + "b TIMESTAMP NOT NULL,\n"
+ + "PRIMARY KEY (a) NOT ENFORCED\n"
+ + ")\n"
+ + "WITH (\n"
+ + String.format("'%s'='%s',\n", "connector", "elasticsearch-6-inlong")
+ + String.format(
+ "'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index)
+ + String.format(
+ "'%s'='%s',\n",
+ ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType)
+ + String.format(
+ "'%s'='%s',\n",
+ ElasticsearchOptions.HOSTS_OPTION.key(),
+ elasticsearchContainer.getHttpHostAddress())
+ + String.format(
+ "'%s'='%s'\n",
+ ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false")
+ + ")");
+
+ tableEnvironment
+ .fromValues(row(1L, LocalDateTime.parse("2012-12-12T12:12:12")))
+ .executeInsert("esTable")
+ .await();
+
+ Client client = getClient();
+ Map<String, Object> response =
+ client.get(new GetRequest("dynamic-index-2012-12-12", myType, "1"))
+ .actionGet()
+ .getSource();
+ Map<Object, Object> expectedMap = new HashMap<>();
+ expectedMap.put("a", 1);
+ expectedMap.put("b", "2012-12-12 12:12:12");
+ assertThat(response, equalTo(expectedMap));
+ }
+
+ /**
+ * testing for ES field routing
+ * according to <a href="https://github.com/apache/flink/pull/14493/files">MR</a>
+ * and <a href="https://github.com/apache/flink/tree/release-1.13.2-rc2">flink release-1.13.2-rc2</a>
+ * @throws Exception
+ */
+ @Test
+ public void testWritingDocumentsWithRouting() throws Exception {
+ ResolvedSchema schema =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("a", DataTypes.BIGINT().notNull()),
+ Column.physical("b", DataTypes.TIME()),
+ Column.physical("c", DataTypes.STRING().notNull()),
+ Column.physical("d", DataTypes.FLOAT()),
+ Column.physical("e", DataTypes.TINYINT().notNull()),
+ Column.physical("f", DataTypes.DATE()),
+ Column.physical("g", DataTypes.TIMESTAMP().notNull())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("name", Arrays.asList("a", "g")));
+
+ GenericRowData rowData =
+ GenericRowData.of(
+ 1L,
+ 12345,
+ StringData.fromString("ABCDE"),
+ 12.12f,
+ (byte) 2,
+ 12345,
+ TimestampData.fromLocalDateTime(
+ LocalDateTime.parse("2012-12-12T12:12:12")));
+
+ String index = "writing-documents";
+ String myType = "MyType";
+ Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+ SinkFunctionProvider sinkRuntimeProvider =
+ (SinkFunctionProvider)
+ sinkFactory
+ .createDynamicTableSink(
+ context()
+ .withSchema(schema)
+ .withOption(
+ ElasticsearchOptions.INDEX_OPTION.key(),
+ index)
+ .withOption(
+ ElasticsearchOptions.DOCUMENT_TYPE_OPTION
+ .key(),
+ myType)
+ .withOption(
+ ElasticsearchOptions.ROUTING_FIELD_NAME
+ .key(),
+ "c")
+ .withOption(
+ ElasticsearchOptions.HOSTS_OPTION.key(),
+ elasticsearchContainer.getHttpHostAddress())
+ .withOption(
+ ElasticsearchOptions
+ .FLUSH_ON_CHECKPOINT_OPTION
+ .key(),
+ "false")
+ .build())
+ .getSinkRuntimeProvider(new MockContext());
+
+ SinkFunction<RowData> sinkFunction = sinkRuntimeProvider.createSinkFunction();
+ StreamExecutionEnvironment environment =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+ rowData.setRowKind(RowKind.UPDATE_AFTER);
+ environment.<RowData>fromElements(rowData).addSink(sinkFunction);
+ environment.execute();
+
+ Client client = getClient();
+ Map<Object, Object> expectedMap = new HashMap<>();
+ expectedMap.put("a", 1);
+ expectedMap.put("b", "00:00:12");
+ expectedMap.put("c", "ABCDE");
+ expectedMap.put("d", 12.12d);
+ expectedMap.put("e", 2);
+ expectedMap.put("f", "2003-10-20");
+ expectedMap.put("g", "2012-12-12 12:12:12");
+ GetResponse response =
+ client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12").routing("ABCDE"))
+ .actionGet();
+ assertThat(response.getSource(), equalTo(expectedMap));
+ }
+
+ private static class MockContext implements DynamicTableSink.Context {
+ @Override
+ public boolean isBounded() {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<?> createTypeInformation(DataType consumedDataType) {
+ return null;
+ }
+
+ @Override
+ public DynamicTableSink.DataStructureConverter createDataStructureConverter(
+ DataType consumedDataType) {
+ return null;
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-6/src/test/resources/log4j2-test.properties b/inlong-sort/sort-connectors/elasticsearch-6/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..835c2ec9a
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-6/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/pom.xml b/inlong-sort/sort-connectors/elasticsearch-7/pom.xml
index b8ef3cc9c..c807f1e97 100644
--- a/inlong-sort/sort-connectors/elasticsearch-7/pom.xml
+++ b/inlong-sort/sort-connectors/elasticsearch-7/pom.xml
@@ -31,31 +31,125 @@
<name>Apache InLong - Sort-connector-elasticsearch7</name>
<packaging>jar</packaging>
- <properties>
- <elasticsearch.version>7.9.2</elasticsearch.version>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-elasticsearch-base</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <exclusions>
+ <!-- Elasticsearch Java Client has been moved to a different module in 5.x -->
+ <exclusion>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>${elasticsearch.version}</version>
+ <version>${elasticsearch7.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
- <version>${elasticsearch.version}</version>
+ <version>${elasticsearch7.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
- <version>${elasticsearch.version}</version>
+ <version>${elasticsearch7.version}</version>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>1.15.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-elasticsearch-base</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ </exclusion>
+ </exclusions>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!--
+ Including elasticsearch transport dependency for tests. Netty3 is not here anymore in 7.x
+ -->
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>transport</artifactId>
+ <version>${elasticsearch7.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.elasticsearch.plugin</groupId>
+ <artifactId>transport-netty4-client</artifactId>
+ <version>${elasticsearch7.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Elasticsearch table descriptor testing -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_2.12</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
</dependency>
+
+ <!-- Table API integration tests -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Elasticsearch table sink factory testing -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>transport</artifactId>
+ <version>7.5.1</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7Configuration.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7Configuration.java
new file mode 100644
index 000000000..64b93a732
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7Configuration.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch7.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.inlong.sort.elasticsearch.table.ElasticsearchConfiguration;
+
+import org.apache.http.HttpHost;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+
+/** Elasticsearch 7 specific configuration. */
+@Internal
+final class Elasticsearch7Configuration extends ElasticsearchConfiguration {
+ Elasticsearch7Configuration(ReadableConfig config, ClassLoader classLoader) {
+ super(config, classLoader);
+ }
+
+ public List<HttpHost> getHosts() {
+ return config.get(HOSTS_OPTION).stream()
+ .map(Elasticsearch7Configuration::validateAndParseHostsString)
+ .collect(Collectors.toList());
+ }
+
+ private static HttpHost validateAndParseHostsString(String host) {
+ try {
+ HttpHost httpHost = HttpHost.create(host);
+ if (httpHost.getPort() < 0) {
+ throw new ValidationException(
+ String.format(
+ "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.",
+ host, HOSTS_OPTION.key()));
+ }
+
+ if (httpHost.getSchemeName() == null) {
+ throw new ValidationException(
+ String.format(
+ "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.",
+ host, HOSTS_OPTION.key()));
+ }
+ return httpHost;
+ } catch (Exception e) {
+ throw new ValidationException(
+ String.format(
+ "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.",
+ host, HOSTS_OPTION.key()),
+ e);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
new file mode 100644
index 000000000..be09fa99a
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch7.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.inlong.sort.elasticsearch.table.IndexGeneratorFactory;
+import org.apache.inlong.sort.elasticsearch.table.KeyExtractor;
+import org.apache.inlong.sort.elasticsearch.table.RequestFactory;
+import org.apache.inlong.sort.elasticsearch.table.RoutingExtractor;
+import org.apache.inlong.sort.elasticsearch.table.RowElasticsearchSinkFunction;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link ElasticsearchSink} from a
+ * logical description.
+ */
+@Internal
+final class Elasticsearch7DynamicSink implements DynamicTableSink {
+ @VisibleForTesting
+ static final Elasticsearch7RequestFactory REQUEST_FACTORY =
+ new Elasticsearch7DynamicSink.Elasticsearch7RequestFactory();
+
+ private final EncodingFormat<SerializationSchema<RowData>> format;
+ private final TableSchema schema;
+ private final Elasticsearch7Configuration config;
+
+ public Elasticsearch7DynamicSink(
+ EncodingFormat<SerializationSchema<RowData>> format,
+ Elasticsearch7Configuration config,
+ TableSchema schema) {
+ this(format, config, schema, (ElasticsearchSink.Builder::new));
+ }
+
+ // --------------------------------------------------------------
+ // Hack to make configuration testing possible.
+ //
+ // The code in this block should never be used outside of tests.
+ // Having a way to inject a builder we can assert the builder in
+ // the test. We can not assert everything though, e.g. it is not
+ // possible to assert flushing on checkpoint, as it is configured
+ // on the sink itself.
+ // --------------------------------------------------------------
+
+ private final ElasticSearchBuilderProvider builderProvider;
+
+ @FunctionalInterface
+ interface ElasticSearchBuilderProvider {
+ ElasticsearchSink.Builder<RowData> createBuilder(
+ List<HttpHost> httpHosts, RowElasticsearchSinkFunction upsertSinkFunction);
+ }
+
+ Elasticsearch7DynamicSink(
+ EncodingFormat<SerializationSchema<RowData>> format,
+ Elasticsearch7Configuration config,
+ TableSchema schema,
+ ElasticSearchBuilderProvider builderProvider) {
+ this.format = format;
+ this.schema = schema;
+ this.config = config;
+ this.builderProvider = builderProvider;
+ }
+
+ // --------------------------------------------------------------
+ // End of hack to make configuration testing possible
+ // --------------------------------------------------------------
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+ for (RowKind kind : requestedMode.getContainedKinds()) {
+ if (kind != RowKind.UPDATE_BEFORE) {
+ builder.addContainedKind(kind);
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
+ return () -> {
+ SerializationSchema<RowData> format =
+ this.format.createRuntimeEncoder(context, schema.toRowDataType());
+
+ final RowElasticsearchSinkFunction upsertFunction =
+ new RowElasticsearchSinkFunction(
+ IndexGeneratorFactory.createIndexGenerator(config.getIndex(), schema),
+ null, // this is deprecated in es 7+
+ format,
+ XContentType.JSON,
+ REQUEST_FACTORY,
+ KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
+ RoutingExtractor.createRoutingExtractor(
+ schema, config.getRoutingField().orElse(null)));
+
+ final ElasticsearchSink.Builder<RowData> builder =
+ builderProvider.createBuilder(config.getHosts(), upsertFunction);
+
+ builder.setFailureHandler(config.getFailureHandler());
+ builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
+ builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
+ builder.setBulkFlushInterval(config.getBulkFlushInterval());
+ builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
+ config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
+ config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
+ config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
+
+ // we must overwrite the default factory which is defined with a lambda because of a bug
+ // in shading lambda serialization shading see FLINK-18006
+ if (config.getUsername().isPresent()
+ && config.getPassword().isPresent()
+ && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
+ && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
+ builder.setRestClientFactory(
+ new AuthRestClientFactory(
+ config.getPathPrefix().orElse(null),
+ config.getUsername().get(),
+ config.getPassword().get()));
+ } else {
+ builder.setRestClientFactory(
+ new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
+ }
+
+ final ElasticsearchSink<RowData> sink = builder.build();
+
+ if (config.isDisableFlushOnCheckpoint()) {
+ sink.disableFlushOnCheckpoint();
+ }
+
+ return sink;
+ };
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return this;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Elasticsearch7";
+ }
+
+ /** Serializable {@link RestClientFactory} used by the sink. */
+ @VisibleForTesting
+ static class DefaultRestClientFactory implements RestClientFactory {
+
+ private final String pathPrefix;
+
+ public DefaultRestClientFactory(@Nullable String pathPrefix) {
+ this.pathPrefix = pathPrefix;
+ }
+
+ @Override
+ public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+ if (pathPrefix != null) {
+ restClientBuilder.setPathPrefix(pathPrefix);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DefaultRestClientFactory that = (DefaultRestClientFactory) o;
+ return Objects.equals(pathPrefix, that.pathPrefix);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pathPrefix);
+ }
+ }
+
+ /** Serializable {@link RestClientFactory} used by the sink which enable authentication. */
+ @VisibleForTesting
+ static class AuthRestClientFactory implements RestClientFactory {
+
+ private final String pathPrefix;
+ private final String username;
+ private final String password;
+ private transient CredentialsProvider credentialsProvider;
+
+ public AuthRestClientFactory(
+ @Nullable String pathPrefix, String username, String password) {
+ this.pathPrefix = pathPrefix;
+ this.password = password;
+ this.username = username;
+ }
+
+ @Override
+ public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+ if (pathPrefix != null) {
+ restClientBuilder.setPathPrefix(pathPrefix);
+ }
+ if (credentialsProvider == null) {
+ credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+ }
+ restClientBuilder.setHttpClientConfigCallback(
+ httpAsyncClientBuilder ->
+ httpAsyncClientBuilder.setDefaultCredentialsProvider(
+ credentialsProvider));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AuthRestClientFactory that = (AuthRestClientFactory) o;
+ return Objects.equals(pathPrefix, that.pathPrefix)
+ && Objects.equals(username, that.username)
+ && Objects.equals(password, that.password);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pathPrefix, password, username);
+ }
+ }
+
+ /**
+ * Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the
+ * sink.
+ */
+ private static class Elasticsearch7RequestFactory implements RequestFactory {
+ @Override
+ public UpdateRequest createUpdateRequest(
+ String index,
+ String docType,
+ String key,
+ XContentType contentType,
+ byte[] document) {
+ return new UpdateRequest(index, key)
+ .doc(document, contentType)
+ .upsert(document, contentType);
+ }
+
+ @Override
+ public IndexRequest createIndexRequest(
+ String index,
+ String docType,
+ String key,
+ XContentType contentType,
+ byte[] document) {
+ return new IndexRequest(index).id(key).source(document, contentType);
+ }
+
+ @Override
+ public DeleteRequest createDeleteRequest(String index, String docType, String key) {
+ return new DeleteRequest(index, key);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Elasticsearch7DynamicSink that = (Elasticsearch7DynamicSink) o;
+ return Objects.equals(format, that.format)
+ && Objects.equals(schema, that.schema)
+ && Objects.equals(config, that.config)
+ && Objects.equals(builderProvider, that.builderProvider);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(format, schema, config, builderProvider);
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
new file mode 100644
index 000000000..8bbdca6af
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch7.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.util.StringUtils;
+import org.apache.inlong.sort.elasticsearch.table.ElasticsearchValidationUtils;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.ROUTING_FIELD_NAME;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.PASSWORD_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.USERNAME_OPTION;
+
+/** A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch7DynamicSink}. */
+@Internal
+public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory {
+ private static final Set<ConfigOption<?>> requiredOptions =
+ Stream.of(HOSTS_OPTION, INDEX_OPTION).collect(Collectors.toSet());
+ private static final Set<ConfigOption<?>> optionalOptions =
+ Stream.of(
+ KEY_DELIMITER_OPTION,
+ ROUTING_FIELD_NAME,
+ FAILURE_HANDLER_OPTION,
+ FLUSH_ON_CHECKPOINT_OPTION,
+ BULK_FLASH_MAX_SIZE_OPTION,
+ BULK_FLUSH_MAX_ACTIONS_OPTION,
+ BULK_FLUSH_INTERVAL_OPTION,
+ BULK_FLUSH_BACKOFF_TYPE_OPTION,
+ BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+ BULK_FLUSH_BACKOFF_DELAY_OPTION,
+ CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+ CONNECTION_PATH_PREFIX,
+ FORMAT_OPTION,
+ PASSWORD_OPTION,
+ USERNAME_OPTION)
+ .collect(Collectors.toSet());
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ TableSchema tableSchema = context.getCatalogTable().getSchema();
+ ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
+
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+
+ final EncodingFormat<SerializationSchema<RowData>> format =
+ helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
+
+ helper.validate();
+ Configuration configuration = new Configuration();
+ context.getCatalogTable().getOptions().forEach(configuration::setString);
+ Elasticsearch7Configuration config =
+ new Elasticsearch7Configuration(configuration, context.getClassLoader());
+
+ validate(config, configuration);
+
+ return new Elasticsearch7DynamicSink(
+ format, config, TableSchemaUtils.getPhysicalSchema(tableSchema));
+ }
+
+ private void validate(Elasticsearch7Configuration config, Configuration originalConfiguration) {
+ config.getFailureHandler(); // checks if we can instantiate the custom failure handler
+ config.getHosts(); // validate hosts
+ validate(
+ config.getIndex().length() >= 1,
+ () -> String.format("'%s' must not be empty", INDEX_OPTION.key()));
+ int maxActions = config.getBulkFlushMaxActions();
+ validate(
+ maxActions == -1 || maxActions >= 1,
+ () ->
+ String.format(
+ "'%s' must be at least 1. Got: %s",
+ BULK_FLUSH_MAX_ACTIONS_OPTION.key(), maxActions));
+ long maxSize = config.getBulkFlushMaxByteSize();
+ long mb1 = 1024 * 1024;
+ validate(
+ maxSize == -1 || (maxSize >= mb1 && maxSize % mb1 == 0),
+ () ->
+ String.format(
+ "'%s' must be in MB granularity. Got: %s",
+ BULK_FLASH_MAX_SIZE_OPTION.key(),
+ originalConfiguration
+ .get(BULK_FLASH_MAX_SIZE_OPTION)
+ .toHumanReadableString()));
+ validate(
+ config.getBulkFlushBackoffRetries().map(retries -> retries >= 1).orElse(true),
+ () ->
+ String.format(
+ "'%s' must be at least 1. Got: %s",
+ BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
+ config.getBulkFlushBackoffRetries().get()));
+ if (config.getUsername().isPresent()
+ && !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) {
+ validate(
+ config.getPassword().isPresent()
+ && !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get()),
+ () ->
+ String.format(
+ "'%s' and '%s' must be set at the same time. Got: username '%s' and password '%s'",
+ USERNAME_OPTION.key(),
+ PASSWORD_OPTION.key(),
+ config.getUsername().get(),
+ config.getPassword().orElse("")));
+ }
+ }
+
+ private static void validate(boolean condition, Supplier<String> message) {
+ if (!condition) {
+ throw new ValidationException(message.get());
+ }
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return "elasticsearch-7-inlong";
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return requiredOptions;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return optionalOptions;
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/elasticsearch-7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..54507bd7f
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.elasticsearch7.table.Elasticsearch7DynamicSinkFactory
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/test/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkITCase.java b/inlong-sort/sort-connectors/elasticsearch-7/src/test/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkITCase.java
new file mode 100644
index 000000000..9cf07a127
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/test/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkITCase.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch7.table;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.inlong.sort.elasticsearch.table.TestContext.context;
+import static org.apache.flink.table.api.Expressions.row;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/** IT tests for {@link Elasticsearch7DynamicSink}. */
+public class Elasticsearch7DynamicSinkITCase {
+
+ @ClassRule
+ public static ElasticsearchContainer elasticsearchContainer =
+ new ElasticsearchContainer(
+ DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch-oss")
+ .withTag("7.5.1"));
+
+ @SuppressWarnings("deprecation")
+ protected final Client getClient() {
+ TransportAddress transportAddress =
+ new TransportAddress(elasticsearchContainer.getTcpHost());
+ String expectedClusterName = "docker-cluster";
+ Settings settings = Settings.builder().put("cluster.name", expectedClusterName).build();
+ return new PreBuiltTransportClient(settings).addTransportAddress(transportAddress);
+ }
+
+ @Test
+ public void testWritingDocuments() throws Exception {
+ ResolvedSchema schema =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("a", DataTypes.BIGINT().notNull()),
+ Column.physical("b", DataTypes.TIME()),
+ Column.physical("c", DataTypes.STRING().notNull()),
+ Column.physical("d", DataTypes.FLOAT()),
+ Column.physical("e", DataTypes.TINYINT().notNull()),
+ Column.physical("f", DataTypes.DATE()),
+ Column.physical("g", DataTypes.TIMESTAMP().notNull())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("name", Arrays.asList("a", "g")));
+
+ GenericRowData rowData =
+ GenericRowData.of(
+ 1L,
+ 12345,
+ StringData.fromString("ABCDE"),
+ 12.12f,
+ (byte) 2,
+ 12345,
+ TimestampData.fromLocalDateTime(
+ LocalDateTime.parse("2012-12-12T12:12:12")));
+
+ String index = "writing-documents";
+ Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory();
+
+ SinkFunctionProvider sinkRuntimeProvider =
+ (SinkFunctionProvider)
+ sinkFactory
+ .createDynamicTableSink(
+ context()
+ .withSchema(schema)
+ .withOption(
+ ElasticsearchOptions.INDEX_OPTION.key(),
+ index)
+ .withOption(
+ ElasticsearchOptions.HOSTS_OPTION.key(),
+ elasticsearchContainer.getHttpHostAddress())
+ .withOption(
+ ElasticsearchOptions
+ .FLUSH_ON_CHECKPOINT_OPTION
+ .key(),
+ "false")
+ .build())
+ .getSinkRuntimeProvider(new MockContext());
+
+ SinkFunction<RowData> sinkFunction = sinkRuntimeProvider.createSinkFunction();
+ StreamExecutionEnvironment environment =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+ rowData.setRowKind(RowKind.UPDATE_AFTER);
+ environment.<RowData>fromElements(rowData).addSink(sinkFunction);
+ environment.execute();
+
+ Client client = getClient();
+
+ Map<Object, Object> expectedMap = new HashMap<>();
+ expectedMap.put("a", 1);
+ expectedMap.put("b", "00:00:12");
+ expectedMap.put("c", "ABCDE");
+ expectedMap.put("d", 12.12d);
+ expectedMap.put("e", 2);
+ expectedMap.put("f", "2003-10-20");
+ expectedMap.put("g", "2012-12-12 12:12:12");
+ Map<String, Object> response =
+ client.get(new GetRequest(index, "1_2012-12-12T12:12:12")).actionGet().getSource();
+ assertThat(response, equalTo(expectedMap));
+ }
+
+ @Test
+ public void testWritingDocumentsFromTableApi() throws Exception {
+ TableEnvironment tableEnvironment =
+ TableEnvironment.create(
+ EnvironmentSettings.newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build());
+
+ String index = "table-api";
+ tableEnvironment.executeSql(
+ "CREATE TABLE esTable ("
+ + "a BIGINT NOT NULL,\n"
+ + "b TIME,\n"
+ + "c STRING NOT NULL,\n"
+ + "d FLOAT,\n"
+ + "e TINYINT NOT NULL,\n"
+ + "f DATE,\n"
+ + "g TIMESTAMP NOT NULL,"
+ + "h as a + 2,\n"
+ + "PRIMARY KEY (a, g) NOT ENFORCED\n"
+ + ")\n"
+ + "WITH (\n"
+ + String.format("'%s'='%s',\n", "connector", "elasticsearch-7-inlong")
+ + String.format(
+ "'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index)
+ + String.format(
+ "'%s'='%s',\n",
+ ElasticsearchOptions.HOSTS_OPTION.key(),
+ elasticsearchContainer.getHttpHostAddress())
+ + String.format(
+ "'%s'='%s'\n",
+ ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false")
+ + ")");
+
+ tableEnvironment
+ .fromValues(
+ row(
+ 1L,
+ LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+ "ABCDE",
+ 12.12f,
+ (byte) 2,
+ LocalDate.ofEpochDay(12345),
+ LocalDateTime.parse("2012-12-12T12:12:12")))
+ .executeInsert("esTable")
+ .await();
+
+ Client client = getClient();
+
+ Map<Object, Object> expectedMap = new HashMap<>();
+ expectedMap.put("a", 1);
+ expectedMap.put("b", "00:00:12");
+ expectedMap.put("c", "ABCDE");
+ expectedMap.put("d", 12.12d);
+ expectedMap.put("e", 2);
+ expectedMap.put("f", "2003-10-20");
+ expectedMap.put("g", "2012-12-12 12:12:12");
+ Map<String, Object> response =
+ client.get(new GetRequest(index, "1_2012-12-12T12:12:12")).actionGet().getSource();
+ assertThat(response, equalTo(expectedMap));
+ }
+
+ @Test
+ public void testWritingDocumentsNoPrimaryKey() throws Exception {
+ TableEnvironment tableEnvironment =
+ TableEnvironment.create(
+ EnvironmentSettings.newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build());
+
+ String index = "no-primary-key";
+ tableEnvironment.executeSql(
+ "CREATE TABLE esTable ("
+ + "a BIGINT NOT NULL,\n"
+ + "b TIME,\n"
+ + "c STRING NOT NULL,\n"
+ + "d FLOAT,\n"
+ + "e TINYINT NOT NULL,\n"
+ + "f DATE,\n"
+ + "g TIMESTAMP NOT NULL\n"
+ + ")\n"
+ + "WITH (\n"
+ + String.format("'%s'='%s',\n", "connector", "elasticsearch-7-inlong")
+ + String.format(
+ "'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index)
+ + String.format(
+ "'%s'='%s',\n",
+ ElasticsearchOptions.HOSTS_OPTION.key(),
+ elasticsearchContainer.getHttpHostAddress())
+ + String.format(
+ "'%s'='%s'\n",
+ ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false")
+ + ")");
+
+ tableEnvironment
+ .fromValues(
+ row(
+ 1L,
+ LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+ "ABCDE",
+ 12.12f,
+ (byte) 2,
+ LocalDate.ofEpochDay(12345),
+ LocalDateTime.parse("2012-12-12T12:12:12")),
+ row(
+ 2L,
+ LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+ "FGHIJK",
+ 13.13f,
+ (byte) 4,
+ LocalDate.ofEpochDay(12345),
+ LocalDateTime.parse("2013-12-12T13:13:13")))
+ .executeInsert("esTable")
+ .await();
+
+ Client client = getClient();
+
+ // search API does not return documents that were not indexed, we might need to query
+ // the index a few times
+ Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
+ SearchHits hits;
+ do {
+ hits = client.prepareSearch(index).execute().actionGet().getHits();
+ if (hits.getTotalHits().value < 2) {
+ Thread.sleep(200);
+ }
+ } while (hits.getTotalHits().value < 2 && deadline.hasTimeLeft());
+
+ if (hits.getTotalHits().value < 2) {
+ throw new AssertionError("Could not retrieve results from Elasticsearch.");
+ }
+
+ HashSet<Map<String, Object>> resultSet = new HashSet<>();
+ resultSet.add(hits.getAt(0).getSourceAsMap());
+ resultSet.add(hits.getAt(1).getSourceAsMap());
+ Map<Object, Object> expectedMap1 = new HashMap<>();
+ expectedMap1.put("a", 1);
+ expectedMap1.put("b", "00:00:12");
+ expectedMap1.put("c", "ABCDE");
+ expectedMap1.put("d", 12.12d);
+ expectedMap1.put("e", 2);
+ expectedMap1.put("f", "2003-10-20");
+ expectedMap1.put("g", "2012-12-12 12:12:12");
+ Map<Object, Object> expectedMap2 = new HashMap<>();
+ expectedMap2.put("a", 2);
+ expectedMap2.put("b", "00:00:12");
+ expectedMap2.put("c", "FGHIJK");
+ expectedMap2.put("d", 13.13d);
+ expectedMap2.put("e", 4);
+ expectedMap2.put("f", "2003-10-20");
+ expectedMap2.put("g", "2013-12-12 13:13:13");
+ HashSet<Map<Object, Object>> expectedSet = new HashSet<>();
+ expectedSet.add(expectedMap1);
+ expectedSet.add(expectedMap2);
+ assertThat(resultSet, equalTo(expectedSet));
+ }
+
+ @Test
+ public void testWritingDocumentsWithDynamicIndex() throws Exception {
+ TableEnvironment tableEnvironment =
+ TableEnvironment.create(
+ EnvironmentSettings.newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build());
+
+ String index = "dynamic-index-{b|yyyy-MM-dd}";
+ tableEnvironment.executeSql(
+ "CREATE TABLE esTable ("
+ + "a BIGINT NOT NULL,\n"
+ + "b TIMESTAMP NOT NULL,\n"
+ + "PRIMARY KEY (a) NOT ENFORCED\n"
+ + ")\n"
+ + "WITH (\n"
+ + String.format("'%s'='%s',\n", "connector", "elasticsearch-7-inlong")
+ + String.format(
+ "'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index)
+ + String.format(
+ "'%s'='%s',\n",
+ ElasticsearchOptions.HOSTS_OPTION.key(),
+ elasticsearchContainer.getHttpHostAddress())
+ + String.format(
+ "'%s'='%s'\n",
+ ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false")
+ + ")");
+
+ tableEnvironment
+ .fromValues(row(1L, LocalDateTime.parse("2012-12-12T12:12:12")))
+ .executeInsert("esTable")
+ .await();
+
+ Client client = getClient();
+ Map<String, Object> response =
+ client.get(new GetRequest("dynamic-index-2012-12-12", "1")).actionGet().getSource();
+ Map<Object, Object> expectedMap = new HashMap<>();
+ expectedMap.put("a", 1);
+ expectedMap.put("b", "2012-12-12 12:12:12");
+ assertThat(response, equalTo(expectedMap));
+ }
+
+ @Test
+ public void testWritingDocumentsWithRouting() throws Exception {
+ ResolvedSchema schema =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("a", DataTypes.BIGINT().notNull()),
+ Column.physical("b", DataTypes.TIME()),
+ Column.physical("c", DataTypes.STRING().notNull()),
+ Column.physical("d", DataTypes.FLOAT()),
+ Column.physical("e", DataTypes.TINYINT().notNull()),
+ Column.physical("f", DataTypes.DATE()),
+ Column.physical("g", DataTypes.TIMESTAMP().notNull())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("name", Arrays.asList("a", "g")));
+
+ GenericRowData rowData =
+ GenericRowData.of(
+ 1L,
+ 12345,
+ StringData.fromString("ABCDE"),
+ 12.12f,
+ (byte) 2,
+ 12345,
+ TimestampData.fromLocalDateTime(
+ LocalDateTime.parse("2012-12-12T12:12:12")));
+
+ String index = "writing-documents";
+ Elasticsearch7DynamicSinkFactory sinkFactory = new Elasticsearch7DynamicSinkFactory();
+
+ SinkFunctionProvider sinkRuntimeProvider =
+ (SinkFunctionProvider)
+ sinkFactory
+ .createDynamicTableSink(
+ context()
+ .withSchema(schema)
+ .withOption(
+ ElasticsearchOptions.INDEX_OPTION.key(),
+ index)
+ .withOption(
+ ElasticsearchOptions.ROUTING_FIELD_NAME
+ .key(),
+ "c")
+ .withOption(
+ ElasticsearchOptions.HOSTS_OPTION.key(),
+ elasticsearchContainer.getHttpHostAddress())
+ .withOption(
+ ElasticsearchOptions
+ .FLUSH_ON_CHECKPOINT_OPTION
+ .key(),
+ "false")
+ .build())
+ .getSinkRuntimeProvider(new MockContext());
+
+ SinkFunction<RowData> sinkFunction = sinkRuntimeProvider.createSinkFunction();
+ StreamExecutionEnvironment environment =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+ rowData.setRowKind(RowKind.UPDATE_AFTER);
+ environment.<RowData>fromElements(rowData).addSink(sinkFunction);
+ environment.execute();
+
+ Client client = getClient();
+ Map<Object, Object> expectedMap = new HashMap<>();
+ expectedMap.put("a", 1);
+ expectedMap.put("b", "00:00:12");
+ expectedMap.put("c", "ABCDE");
+ expectedMap.put("d", 12.12d);
+ expectedMap.put("e", 2);
+ expectedMap.put("f", "2003-10-20");
+ expectedMap.put("g", "2012-12-12 12:12:12");
+ GetResponse response =
+ client.get(new GetRequest(index, "1_2012-12-12T12:12:12").routing("ABCDE"))
+ .actionGet();
+ assertThat(response.getSource(), equalTo(expectedMap));
+ }
+
+ private static class MockContext implements DynamicTableSink.Context {
+ @Override
+ public boolean isBounded() {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<?> createTypeInformation(DataType consumedDataType) {
+ return null;
+ }
+
+ @Override
+ public DynamicTableSink.DataStructureConverter createDataStructureConverter(
+ DataType consumedDataType) {
+ return null;
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-7/src/test/resources/log4j2-test.properties b/inlong-sort/sort-connectors/elasticsearch-7/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..835c2ec9a
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-7/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/pom.xml b/inlong-sort/sort-connectors/elasticsearch-base/pom.xml
new file mode 100644
index 000000000..36b6cfeb4
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/pom.xml
@@ -0,0 +1,175 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>sort-connectors</artifactId>
+ <groupId>org.apache.inlong</groupId>
+ <version>1.3.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>sort-connector-elasticsearch-base</artifactId>
+ <name>Apache InLong - Sort-connector-elasticsearch-base</name>
+ <packaging>jar</packaging>
+
+
+ <dependencies>
+
+ <!-- core dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-elasticsearch-base_2.12</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${elasticsearch6.version}</version>
+ <!--
+ FLINK-7133: Excluding all org.ow2.asm from elasticsearch dependencies because
+ 1. from the POV of client they are optional,
+ 2. the version configured by default at the time of writing this comment (1.7.1) depends on asm 4.1
+ and when it is shaded into elasticsearch-base artifact it conflicts with newer shaded versions of asm
+ resulting in errors at the runtime when application is executed locally, e.g. from IDE.
+ -->
+ <exclusions>
+ <exclusion>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Table ecosystem -->
+ <!-- Projects depending on this project won't depend on flink-table-*. -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+ <!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_2.12</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <!-- Elasticsearch table descriptor testing -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_2.12</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Elasticsearch table descriptor testing -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Elasticsearch table sink factory testing -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!--
+ Including Log4j2 dependencies for tests is required for the
+ embedded Elasticsearch nodes used in tests to run correctly.
+ -->
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java
new file mode 100644
index 000000000..2f7eebd88
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import java.time.format.DateTimeFormatter;
+
+/** Abstract class for time related {@link IndexGenerator}. */
+public abstract class AbstractTimeIndexGenerator extends IndexGeneratorBase {
+
+ private final String dateTimeFormat;
+ protected transient DateTimeFormatter dateTimeFormatter;
+
+ public AbstractTimeIndexGenerator(String index, String dateTimeFormat) {
+ super(index);
+ this.dateTimeFormat = dateTimeFormat;
+ }
+
+ @Override
+ public void open() {
+ this.dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat);
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
new file mode 100644
index 000000000..86ed88875
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.PASSWORD_OPTION;
+import static org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.USERNAME_OPTION;
+
+/** Accessor methods to elasticsearch options. */
+public class ElasticsearchConfiguration {
+ protected final ReadableConfig config;
+ private final ClassLoader classLoader;
+
+ public ElasticsearchConfiguration(ReadableConfig config, ClassLoader classLoader) {
+ this.config = config;
+ this.classLoader = classLoader;
+ }
+
+ public ActionRequestFailureHandler getFailureHandler() {
+ final ActionRequestFailureHandler failureHandler;
+ String value = config.get(FAILURE_HANDLER_OPTION);
+ switch (value.toUpperCase()) {
+ case "FAIL":
+ failureHandler = new NoOpFailureHandler();
+ break;
+ case "IGNORE":
+ failureHandler = new IgnoringFailureHandler();
+ break;
+ case "RETRY-REJECTED":
+ failureHandler = new RetryRejectedExecutionFailureHandler();
+ break;
+ default:
+ try {
+ Class<?> failureHandlerClass = Class.forName(value, false, classLoader);
+ failureHandler =
+ (ActionRequestFailureHandler)
+ InstantiationUtil.instantiate(failureHandlerClass);
+ } catch (ClassNotFoundException e) {
+ throw new ValidationException(
+ "Could not instantiate the failure handler class: " + value, e);
+ }
+ break;
+ }
+ return failureHandler;
+ }
+
+ public String getDocumentType() {
+ return config.get(ElasticsearchOptions.DOCUMENT_TYPE_OPTION);
+ }
+
+ public int getBulkFlushMaxActions() {
+ int maxActions = config.get(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+ // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+ return maxActions == 0 ? -1 : maxActions;
+ }
+
+ public long getBulkFlushMaxByteSize() {
+ long maxSize = config.get(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).getBytes();
+ // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+ return maxSize == 0 ? -1 : maxSize;
+ }
+
+ public long getBulkFlushInterval() {
+ long interval = config.get(BULK_FLUSH_INTERVAL_OPTION).toMillis();
+ // convert 0 to -1, because Elasticsearch client use -1 to disable this configuration.
+ return interval == 0 ? -1 : interval;
+ }
+
+ public Optional<String> getUsername() {
+ return config.getOptional(USERNAME_OPTION);
+ }
+
+ public Optional<String> getPassword() {
+ return config.getOptional(PASSWORD_OPTION);
+ }
+
+ public boolean isBulkFlushBackoffEnabled() {
+ return config.get(BULK_FLUSH_BACKOFF_TYPE_OPTION)
+ != ElasticsearchOptions.BackOffType.DISABLED;
+ }
+
+ public Optional<ElasticsearchSinkBase.FlushBackoffType> getBulkFlushBackoffType() {
+ switch (config.get(BULK_FLUSH_BACKOFF_TYPE_OPTION)) {
+ case CONSTANT:
+ return Optional.of(ElasticsearchSinkBase.FlushBackoffType.CONSTANT);
+ case EXPONENTIAL:
+ return Optional.of(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
+ default:
+ return Optional.empty();
+ }
+ }
+
+ public Optional<Integer> getBulkFlushBackoffRetries() {
+ return config.getOptional(BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION);
+ }
+
+ public Optional<Long> getBulkFlushBackoffDelay() {
+ return config.getOptional(BULK_FLUSH_BACKOFF_DELAY_OPTION).map(Duration::toMillis);
+ }
+
+ public boolean isDisableFlushOnCheckpoint() {
+ return !config.get(ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION);
+ }
+
+ public String getIndex() {
+ return config.get(ElasticsearchOptions.INDEX_OPTION);
+ }
+
+ public String getKeyDelimiter() {
+ return config.get(ElasticsearchOptions.KEY_DELIMITER_OPTION);
+ }
+
+ public Optional<String> getRoutingField() {
+ return config.getOptional(ElasticsearchOptions.ROUTING_FIELD_NAME);
+ }
+
+ public Optional<String> getPathPrefix() {
+ return config.getOptional(ElasticsearchOptions.CONNECTION_PATH_PREFIX);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ElasticsearchConfiguration that = (ElasticsearchConfiguration) o;
+ return Objects.equals(config, that.config) && Objects.equals(classLoader, that.classLoader);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(config, classLoader);
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java
new file mode 100644
index 000000000..0d74c2c3b
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Options for {@link org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch.
+ */
+public class ElasticsearchOptions {
+ /**
+ * Backoff strategy. Extends {@link ElasticsearchSinkBase.FlushBackoffType} with {@code
+ * DISABLED} option.
+ */
+ public enum BackOffType {
+ DISABLED,
+ CONSTANT,
+ EXPONENTIAL
+ }
+
+ public static final ConfigOption<List<String>> HOSTS_OPTION =
+ ConfigOptions.key("hosts")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription("Elasticsearch hosts to connect to.");
+ public static final ConfigOption<String> INDEX_OPTION =
+ ConfigOptions.key("index")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Elasticsearch index for every record.");
+ public static final ConfigOption<String> DOCUMENT_TYPE_OPTION =
+ ConfigOptions.key("document-type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Elasticsearch document type.");
+ public static final ConfigOption<String> PASSWORD_OPTION =
+ ConfigOptions.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Password used to connect to Elasticsearch instance.");
+ public static final ConfigOption<String> USERNAME_OPTION =
+ ConfigOptions.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Username used to connect to Elasticsearch instance.");
+ public static final ConfigOption<String> KEY_DELIMITER_OPTION =
+ ConfigOptions.key("document-id.key-delimiter")
+ .stringType()
+ .defaultValue("_")
+ .withDescription(
+ "Delimiter for composite keys e.g., \"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
+
+ public static final ConfigOption<String> ROUTING_FIELD_NAME =
+ ConfigOptions.key("routing.field-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Elasticsearch routing filed.");
+
+ public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
+ ConfigOptions.key("failure-handler")
+ .stringType()
+ .defaultValue("fail")
+ .withDescription(
+ Description.builder()
+ .text(
+ "Failure handling strategy in case a request to Elasticsearch fails")
+ .list(
+ text(
+ "\"fail\" (throws an exception if a request fails "
+ + "and thus causes a job failure)"),
+ text(
+ "\"ignore\" (ignores failures and drops the request)"),
+ text(
+ "\"retry-rejected\" (re-adds requests that have failed "
+ + "due to queue capacity saturation)"),
+ text(
+ "\"class name\" for failure handling with "
+ + "a ActionRequestFailureHandler subclass"))
+ .build());
+ public static final ConfigOption<Boolean> FLUSH_ON_CHECKPOINT_OPTION =
+ ConfigOptions.key("sink.flush-on-checkpoint")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Disables flushing on checkpoint");
+ public static final ConfigOption<Integer> BULK_FLUSH_MAX_ACTIONS_OPTION =
+ ConfigOptions.key("sink.bulk-flush.max-actions")
+ .intType()
+ .defaultValue(1000)
+ .withDescription("Maximum number of actions to buffer for each bulk request.");
+ public static final ConfigOption<MemorySize> BULK_FLASH_MAX_SIZE_OPTION =
+ ConfigOptions.key("sink.bulk-flush.max-size")
+ .memoryType()
+ .defaultValue(MemorySize.parse("2mb"))
+ .withDescription("Maximum size of buffered actions per bulk request");
+ public static final ConfigOption<Duration> BULK_FLUSH_INTERVAL_OPTION =
+ ConfigOptions.key("sink.bulk-flush.interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1))
+ .withDescription("Bulk flush interval");
+ public static final ConfigOption<BackOffType> BULK_FLUSH_BACKOFF_TYPE_OPTION =
+ ConfigOptions.key("sink.bulk-flush.backoff.strategy")
+ .enumType(BackOffType.class)
+ .defaultValue(BackOffType.DISABLED)
+ .withDescription("Backoff strategy");
+ public static final ConfigOption<Integer> BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION =
+ ConfigOptions.key("sink.bulk-flush.backoff.max-retries")
+ .intType()
+ .noDefaultValue()
+ .withDescription("Maximum number of retries.");
+ public static final ConfigOption<Duration> BULK_FLUSH_BACKOFF_DELAY_OPTION =
+ ConfigOptions.key("sink.bulk-flush.backoff.delay")
+ .durationType()
+ .noDefaultValue()
+ .withDescription("Delay between each backoff attempt.");
+ public static final ConfigOption<Duration> CONNECTION_MAX_RETRY_TIMEOUT_OPTION =
+ ConfigOptions.key("connection.max-retry-timeout")
+ .durationType()
+ .noDefaultValue()
+ .withDescription("Maximum timeout between retries.");
+ public static final ConfigOption<String> CONNECTION_PATH_PREFIX =
+ ConfigOptions.key("connection.path-prefix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Prefix string to be added to every REST communication.");
+ public static final ConfigOption<String> FORMAT_OPTION =
+ ConfigOptions.key("format")
+ .stringType()
+ .defaultValue("json")
+ .withDescription(
+ "The format must produce a valid JSON document. "
+ + "Please refer to the documentation on formats for more details.");
+
+ private ElasticsearchOptions() {
+
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
new file mode 100644
index 000000000..d4564e33a
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
@@ -0,0 +1,99 @@
+/*
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+
+/** Utility methods for validating Elasticsearch properties. */
+public class ElasticsearchValidationUtils {
+
+ private static final Set<LogicalTypeRoot> ILLEGAL_PRIMARY_KEY_TYPES = new LinkedHashSet<>();
+
+ static {
+ ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ARRAY);
+ ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MAP);
+ ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MULTISET);
+ ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.STRUCTURED_TYPE);
+ ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ROW);
+ ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.RAW);
+ ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.BINARY);
+ ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.VARBINARY);
+ }
+
+ /**
+ * Checks that the table does not have primary key defined on illegal types. In Elasticsearch
+ * the primary key is used to calculate the Elasticsearch document id, which is a string of up
+ * to 512 bytes. It cannot have whitespaces. As of now it is calculated by concatenating the
+ * fields. Certain types do not have a good string representation to be used in this scenario.
+ * The illegal types are mostly {@link LogicalTypeFamily#COLLECTION} types and {@link
+ * LogicalTypeRoot#RAW} type.
+ */
+ public static void validatePrimaryKey(TableSchema schema) {
+ schema.getPrimaryKey()
+ .ifPresent(
+ key -> {
+ List<LogicalTypeRoot> illegalTypes =
+ key.getColumns().stream()
+ .map(
+ fieldName -> {
+ LogicalType logicalType =
+ schema.getFieldDataType(fieldName)
+ .get()
+ .getLogicalType();
+ if (hasRoot(
+ logicalType,
+ LogicalTypeRoot.DISTINCT_TYPE)) {
+ return ((DistinctType) logicalType)
+ .getSourceType()
+ .getTypeRoot();
+ } else {
+ return logicalType.getTypeRoot();
+ }
+ })
+ .filter(ILLEGAL_PRIMARY_KEY_TYPES::contains)
+ .collect(Collectors.toList());
+
+ if (!illegalTypes.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "The table has a primary key on columns of illegal types: %s.\n"
+ + " Elasticsearch sink does not support "
+ + "primary keys on columns of types: %s.",
+ illegalTypes, ILLEGAL_PRIMARY_KEY_TYPES));
+ }
+ });
+ }
+
+ private ElasticsearchValidationUtils() {
+
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
new file mode 100644
index 000000000..8e175fe51
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+
+/** This interface is responsible to generate index name from given {@link Row} record. */
+public interface IndexGenerator extends Serializable {
+
+ /**
+ * Initialize the index generator, this will be called only once before {@link
+ * #generate(RowData)} is called.
+ */
+ default void open() {
+
+ }
+
+ /** Generate index name according the the given row. */
+ String generate(RowData row);
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
new file mode 100644
index 000000000..39fbe2d65
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import java.util.Objects;
+
+/** Base class for {@link IndexGenerator}. */
+public abstract class IndexGeneratorBase implements IndexGenerator {
+
+ private static final long serialVersionUID = 1L;
+ protected final String index;
+
+ public IndexGeneratorBase(String index) {
+ this.index = index;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof IndexGeneratorBase)) {
+ return false;
+ }
+ IndexGeneratorBase that = (IndexGeneratorBase) o;
+ return index.equals(that.index);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(index);
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
new file mode 100644
index 000000000..968fe2706
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Factory of {@link IndexGenerator}.
+ *
+ * <p>Flink supports both static index and dynamic index.
+ *
+ * <p>If you want to have a static index, this option value should be a plain string, e.g.
+ * 'myusers', all the records will be consistently written into "myusers" index.
+ *
+ * <p>If you want to have a dynamic index, you can use '{field_name}' to reference a field value in
+ * the record to dynamically generate a target index. You can also use
+ * '{field_name|date_format_string}' to convert a field value of TIMESTAMP/DATE/TIME type into the
+ * format specified by date_format_string. The date_format_string is compatible with {@link
+ * java.text.SimpleDateFormat}. For example, if the option value is 'myusers_{log_ts|yyyy-MM-dd}',
+ * then a record with log_ts field value 2020-03-27 12:25:55 will be written into
+ * "myusers_2020-03-27" index.
+ */
+public final class IndexGeneratorFactory {
+
+ private IndexGeneratorFactory() {
+
+ }
+
+ public static IndexGenerator createIndexGenerator(String index, TableSchema schema) {
+ final IndexHelper indexHelper = new IndexHelper();
+ if (indexHelper.checkIsDynamicIndex(index)) {
+ return createRuntimeIndexGenerator(
+ index, schema.getFieldNames(), schema.getFieldDataTypes(), indexHelper);
+ } else {
+ return new StaticIndexGenerator(index);
+ }
+ }
+
+ interface DynamicFormatter extends Serializable {
+ String format(@Nonnull Object fieldValue, DateTimeFormatter formatter);
+ }
+
+ private static IndexGenerator createRuntimeIndexGenerator(
+ String index, String[] fieldNames, DataType[] fieldTypes, IndexHelper indexHelper) {
+ final String dynamicIndexPatternStr = indexHelper.extractDynamicIndexPatternStr(index);
+ final String indexPrefix = index.substring(0, index.indexOf(dynamicIndexPatternStr));
+ final String indexSuffix =
+ index.substring(indexPrefix.length() + dynamicIndexPatternStr.length());
+
+ final boolean isDynamicIndexWithFormat = indexHelper.checkIsDynamicIndexWithFormat(index);
+ final int indexFieldPos =
+ indexHelper.extractIndexFieldPos(index, fieldNames, isDynamicIndexWithFormat);
+ final LogicalType indexFieldType = fieldTypes[indexFieldPos].getLogicalType();
+ final LogicalTypeRoot indexFieldLogicalTypeRoot = indexFieldType.getTypeRoot();
+
+ // validate index field type
+ indexHelper.validateIndexFieldType(indexFieldLogicalTypeRoot);
+
+ // time extract dynamic index pattern
+ final RowData.FieldGetter fieldGetter =
+ RowData.createFieldGetter(indexFieldType, indexFieldPos);
+
+ if (isDynamicIndexWithFormat) {
+ final String dateTimeFormat =
+ indexHelper.extractDateFormat(index, indexFieldLogicalTypeRoot);
+ DynamicFormatter formatFunction =
+ createFormatFunction(indexFieldType, indexFieldLogicalTypeRoot);
+
+ return new AbstractTimeIndexGenerator(index, dateTimeFormat) {
+ @Override
+ public String generate(RowData row) {
+ Object fieldOrNull = fieldGetter.getFieldOrNull(row);
+ final String formattedField;
+ // TODO we can possibly optimize it to use the nullability of the field
+ if (fieldOrNull != null) {
+ formattedField = formatFunction.format(fieldOrNull, dateTimeFormatter);
+ } else {
+ formattedField = "null";
+ }
+ return indexPrefix.concat(formattedField).concat(indexSuffix);
+ }
+ };
+ }
+ // general dynamic index pattern
+ return new IndexGeneratorBase(index) {
+ @Override
+ public String generate(RowData row) {
+ Object indexField = fieldGetter.getFieldOrNull(row);
+ return indexPrefix
+ .concat(indexField == null ? "null" : indexField.toString())
+ .concat(indexSuffix);
+ }
+ };
+ }
+
+ private static DynamicFormatter createFormatFunction(
+ LogicalType indexFieldType, LogicalTypeRoot indexFieldLogicalTypeRoot) {
+ switch (indexFieldLogicalTypeRoot) {
+ case DATE:
+ return (value, dateTimeFormatter) -> {
+ Integer indexField = (Integer) value;
+ return LocalDate.ofEpochDay(indexField).format(dateTimeFormatter);
+ };
+ case TIME_WITHOUT_TIME_ZONE:
+ return (value, dateTimeFormatter) -> {
+ Integer indexField = (Integer) value;
+ return LocalTime.ofNanoOfDay(indexField * 1_000_000L).format(dateTimeFormatter);
+ };
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return (value, dateTimeFormatter) -> {
+ TimestampData indexField = (TimestampData) value;
+ return indexField.toLocalDateTime().format(dateTimeFormatter);
+ };
+ case TIMESTAMP_WITH_TIME_ZONE:
+ throw new UnsupportedOperationException(
+ "TIMESTAMP_WITH_TIME_ZONE is not supported yet");
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return (value, dateTimeFormatter) -> {
+ TimestampData indexField = (TimestampData) value;
+ return indexField.toInstant().atZone(ZoneOffset.UTC).format(dateTimeFormatter);
+ };
+ default:
+ throw new TableException(
+ String.format(
+ "Unsupported type '%s' found in Elasticsearch dynamic index field, "
+ + "time-related pattern only support types are: DATE,TIME,TIMESTAMP.",
+ indexFieldType));
+ }
+ }
+
+ /**
+ * Helper class for {@link IndexGeneratorFactory}, this helper can use to validate index field
+ * type ans parse index format from pattern.
+ */
+ private static class IndexHelper {
+ private static final Pattern dynamicIndexPattern = Pattern.compile("\\{[^\\{\\}]+\\}?");
+ private static final Pattern dynamicIndexTimeExtractPattern =
+ Pattern.compile(".*\\{.+\\|.*\\}.*");
+ private static final List<LogicalTypeRoot> supportedTypes = new ArrayList<>();
+ private static final Map<LogicalTypeRoot, String> defaultFormats = new HashMap<>();
+
+ static {
+ // time related types
+ supportedTypes.add(LogicalTypeRoot.DATE);
+ supportedTypes.add(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE);
+ supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+ supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE);
+ supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+ // general types
+ supportedTypes.add(LogicalTypeRoot.VARCHAR);
+ supportedTypes.add(LogicalTypeRoot.CHAR);
+ supportedTypes.add(LogicalTypeRoot.TINYINT);
+ supportedTypes.add(LogicalTypeRoot.INTEGER);
+ supportedTypes.add(LogicalTypeRoot.BIGINT);
+ }
+
+ static {
+ defaultFormats.put(LogicalTypeRoot.DATE, "yyyy_MM_dd");
+ defaultFormats.put(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, "HH_mm_ss");
+ defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, "yyyy_MM_dd_HH_mm_ss");
+ defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, "yyyy_MM_dd_HH_mm_ss");
+ defaultFormats.put(
+ LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, "yyyy_MM_dd_HH_mm_ssX");
+ }
+
+ /** Validate the index field Type. */
+ void validateIndexFieldType(LogicalTypeRoot logicalType) {
+ if (!supportedTypes.contains(logicalType)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unsupported type %s of index field, " + "Supported types are: %s",
+ logicalType, supportedTypes));
+ }
+ }
+
+ /** Get the default date format. */
+ String getDefaultFormat(LogicalTypeRoot logicalType) {
+ return defaultFormats.get(logicalType);
+ }
+
+ /** Check general dynamic index is enabled or not by index pattern. */
+ boolean checkIsDynamicIndex(String index) {
+ final Matcher matcher = dynamicIndexPattern.matcher(index);
+ int count = 0;
+ while (matcher.find()) {
+ count++;
+ }
+ if (count > 1) {
+ throw new TableException(
+ String.format(
+ "Chaining dynamic index pattern %s is not supported,"
+ + " only support single dynamic index pattern.",
+ index));
+ }
+ return count == 1;
+ }
+
+ /** Check time extract dynamic index is enabled or not by index pattern. */
+ boolean checkIsDynamicIndexWithFormat(String index) {
+ return dynamicIndexTimeExtractPattern.matcher(index).matches();
+ }
+
+ /** Extract dynamic index pattern string from index pattern string. */
+ String extractDynamicIndexPatternStr(String index) {
+ int start = index.indexOf("{");
+ int end = index.lastIndexOf("}");
+ return index.substring(start, end + 1);
+ }
+
+ /** Extract index field position in a fieldNames, return the field position. */
+ int extractIndexFieldPos(
+ String index, String[] fieldNames, boolean isDynamicIndexWithFormat) {
+ List<String> fieldList = Arrays.asList(fieldNames);
+ String indexFieldName;
+ if (isDynamicIndexWithFormat) {
+ indexFieldName = index.substring(index.indexOf("{") + 1, index.indexOf("|"));
+ } else {
+ indexFieldName = index.substring(index.indexOf("{") + 1, index.indexOf("}"));
+ }
+ if (!fieldList.contains(indexFieldName)) {
+ throw new TableException(
+ String.format(
+ "Unknown field '%s' in index pattern '%s', please check the field name.",
+ indexFieldName, index));
+ }
+ return fieldList.indexOf(indexFieldName);
+ }
+
+ /** Extract dateTime format by the date format that extracted from index pattern string. */
+ private String extractDateFormat(String index, LogicalTypeRoot logicalType) {
+ String format = index.substring(index.indexOf("|") + 1, index.indexOf("}"));
+ if ("".equals(format)) {
+ format = getDefaultFormat(logicalType);
+ }
+ return format;
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java
new file mode 100644
index 000000000..a1b74eb1a
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/** An extractor for a Elasticsearch key from a {@link RowData}. */
+public class KeyExtractor implements Function<RowData, String>, Serializable {
+ private final FieldFormatter[] fieldFormatters;
+ private final String keyDelimiter;
+
+ private interface FieldFormatter extends Serializable {
+ String format(RowData rowData);
+ }
+
+ private KeyExtractor(FieldFormatter[] fieldFormatters, String keyDelimiter) {
+ this.fieldFormatters = fieldFormatters;
+ this.keyDelimiter = keyDelimiter;
+ }
+
+ @Override
+ public String apply(RowData rowData) {
+ final StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < fieldFormatters.length; i++) {
+ if (i > 0) {
+ builder.append(keyDelimiter);
+ }
+ final String value = fieldFormatters[i].format(rowData);
+ builder.append(value);
+ }
+ return builder.toString();
+ }
+
+ private static class ColumnWithIndex {
+ public TableColumn column;
+ public int index;
+
+ public ColumnWithIndex(TableColumn column, int index) {
+ this.column = column;
+ this.index = index;
+ }
+
+ public LogicalType getType() {
+ return column.getType().getLogicalType();
+ }
+
+ public int getIndex() {
+ return index;
+ }
+ }
+
+ public static Function<RowData, String> createKeyExtractor(
+ TableSchema schema, String keyDelimiter) {
+ return schema.getPrimaryKey()
+ .map(
+ key -> {
+ Map<String, ColumnWithIndex> namesToColumns = new HashMap<>();
+ List<TableColumn> tableColumns = schema.getTableColumns();
+ for (int i = 0; i < schema.getFieldCount(); i++) {
+ TableColumn column = tableColumns.get(i);
+ namesToColumns.put(
+ column.getName(), new ColumnWithIndex(column, i));
+ }
+
+ FieldFormatter[] fieldFormatters =
+ key.getColumns().stream()
+ .map(namesToColumns::get)
+ .map(
+ column ->
+ toFormatter(
+ column.index, column.getType()))
+ .toArray(FieldFormatter[]::new);
+
+ return (Function<RowData, String>)
+ new KeyExtractor(fieldFormatters, keyDelimiter);
+ })
+ .orElseGet(() -> (Function<RowData, String> & Serializable) (row) -> null);
+ }
+
+ private static FieldFormatter toFormatter(int index, LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case DATE:
+ return (row) -> LocalDate.ofEpochDay(row.getInt(index)).toString();
+ case TIME_WITHOUT_TIME_ZONE:
+ return (row) ->
+ LocalTime.ofNanoOfDay((long) row.getInt(index) * 1_000_000L).toString();
+ case INTERVAL_YEAR_MONTH:
+ return (row) -> Period.ofDays(row.getInt(index)).toString();
+ case INTERVAL_DAY_TIME:
+ return (row) -> Duration.ofMillis(row.getLong(index)).toString();
+ case DISTINCT_TYPE:
+ return toFormatter(index, ((DistinctType) type).getSourceType());
+ default:
+ RowData.FieldGetter fieldGetter = RowData.createFieldGetter(type, index);
+ return (row) -> fieldGetter.getFieldOrNull(row).toString();
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java
new file mode 100644
index 000000000..70df59792
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import java.io.Serializable;
+
+/** For version-agnostic creating of {@link ActionRequest}s. */
+public interface RequestFactory extends Serializable {
+ /**
+ * Creates an update request to be added to a {@link RequestIndexer}. Note: the type field has
+ * been deprecated since Elasticsearch 7.x and it would not take any effort.
+ */
+ UpdateRequest createUpdateRequest(
+ String index, String docType, String key, XContentType contentType, byte[] document);
+
+ /**
+ * Creates an index request to be added to a {@link RequestIndexer}. Note: the type field has
+ * been deprecated since Elasticsearch 7.x and it would not take any effort.
+ */
+ IndexRequest createIndexRequest(
+ String index, String docType, String key, XContentType contentType, byte[] document);
+
+ /**
+ * Creates a delete request to be added to a {@link RequestIndexer}. Note: the type field has
+ * been deprecated since Elasticsearch 7.x and it would not take any effort.
+ */
+ DeleteRequest createDeleteRequest(String index, String docType, String key);
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RoutingExtractor.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RoutingExtractor.java
new file mode 100644
index 000000000..4ddcd544a
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RoutingExtractor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * An extractor for a Elasticsearch routing from a {@link RowData}.
+ */
+@Internal
+public class RoutingExtractor {
+ private RoutingExtractor() {
+ }
+
+ public static Function<RowData, String> createRoutingExtractor(
+ TableSchema schema,
+ @Nullable String filedName) {
+ if (filedName == null) {
+ return null;
+ }
+ List<TableColumn> tableColumns = schema.getTableColumns();
+ for (int i = 0; i < schema.getFieldCount(); i++) {
+ TableColumn column = tableColumns.get(i);
+ if (column.getName().equals(filedName)) {
+ RowData.FieldGetter fieldGetter = RowData.createFieldGetter(
+ column.getType().getLogicalType(),
+ i);
+ return (Function<RowData, String> & Serializable) (row) -> {
+ Object fieldOrNull = fieldGetter.getFieldOrNull(row);
+ if (fieldOrNull != null) {
+ return fieldOrNull.toString();
+ } else {
+ return null;
+ }
+ };
+ }
+ }
+ throw new IllegalArgumentException("Filed " + filedName + " not exist in table schema.");
+ }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
new file mode 100644
index 000000000..2e227ad55
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+/** Sink function for converting upserts into Elasticsearch {@link ActionRequest}s. */
+public class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final IndexGenerator indexGenerator;
+ private final String docType;
+ private final SerializationSchema<RowData> serializationSchema;
+ private final XContentType contentType;
+ private final RequestFactory requestFactory;
+ private final Function<RowData, String> createKey;
+
+ private final Function<RowData, String> createRouting;
+
+ public RowElasticsearchSinkFunction(
+ IndexGenerator indexGenerator,
+ @Nullable String docType, // this is deprecated in es 7+
+ SerializationSchema<RowData> serializationSchema,
+ XContentType contentType,
+ RequestFactory requestFactory,
+ Function<RowData, String> createKey,
+ @Nullable Function<RowData, String> createRouting) {
+ this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
+ this.docType = docType;
+ this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
+ this.contentType = Preconditions.checkNotNull(contentType);
+ this.requestFactory = Preconditions.checkNotNull(requestFactory);
+ this.createKey = Preconditions.checkNotNull(createKey);
+ this.createRouting = createRouting;
+ }
+
+ @Override
+ public void open() {
+ indexGenerator.open();
+ }
+
+ @Override
+ public void process(RowData element, RuntimeContext ctx, RequestIndexer indexer) {
+ switch (element.getRowKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ processUpsert(element, indexer);
+ break;
+ case UPDATE_BEFORE:
+ case DELETE:
+ processDelete(element, indexer);
+ break;
+ default:
+ throw new TableException("Unsupported message kind: " + element.getRowKind());
+ }
+ }
+
+ private void processUpsert(RowData row, RequestIndexer indexer) {
+ final byte[] document = serializationSchema.serialize(row);
+ final String key = createKey.apply(row);
+ if (key != null) {
+ final UpdateRequest updateRequest =
+ requestFactory.createUpdateRequest(
+ indexGenerator.generate(row), docType, key, contentType, document);
+ addRouting(updateRequest, row);
+ indexer.add(updateRequest);
+ } else {
+ final IndexRequest indexRequest =
+ requestFactory.createIndexRequest(
+ indexGenerator.generate(row), docType, key, contentType, document);
+ addRouting(indexRequest, row);
+ indexer.add(indexRequest);
+ }
+ }
+
+ private void processDelete(RowData row, RequestIndexer indexer) {
+ final String key = createKey.apply(row);
+ final DeleteRequest deleteRequest =
+ requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key);
+ addRouting(deleteRequest, row);
+ indexer.add(deleteRequest);
+ }
+
+ private void addRouting(DocWriteRequest<?> request, RowData row) {
+ if (null != createRouting) {
+ String routing = createRouting.apply(row);
+ request.routing(routing);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RowElasticsearchSinkFunction that = (RowElasticsearchSinkFunction) o;
+ return Objects.equals(indexGenerator, that.indexGenerator)
+ && Objects.equals(docType, that.docType)
+ && Objects.equals(serializationSchema, that.serializationSchema)
+ && contentType == that.contentType
+ && Objects.equals(requestFactory, that.requestFactory)
+ && Objects.equals(createKey, that.createKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ indexGenerator,
+ docType,
+ serializationSchema,
+ contentType,
+ requestFactory,
+ createKey);
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
new file mode 100644
index 000000000..fdacd5626
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.table.data.RowData;
+
+/** A static {@link IndexGenerator} which generate fixed index name. */
+
+public final class StaticIndexGenerator extends IndexGeneratorBase {
+
+ public StaticIndexGenerator(String index) {
+ super(index);
+ }
+
+ public String generate(RowData row) {
+ return index;
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/test/java/org/apache/inlong/sort/elasticsearch/table/TestContext.java b/inlong-sort/sort-connectors/elasticsearch-base/src/test/java/org/apache/inlong/sort/elasticsearch/table/TestContext.java
new file mode 100644
index 000000000..e10b9b0c3
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/test/java/org/apache/inlong/sort/elasticsearch/table/TestContext.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.elasticsearch.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** A utility class for mocking {@link DynamicTableFactory.Context}. */
+public class TestContext {
+
+ private ResolvedSchema schema = ResolvedSchema.of(Column.physical("a", DataTypes.TIME()));
+
+ private final Map<String, String> options = new HashMap<>();
+
+ public static TestContext context() {
+ return new TestContext();
+ }
+
+ public TestContext withSchema(ResolvedSchema schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ public DynamicTableFactory.Context build() {
+ return new FactoryUtil.DefaultDynamicTableContext(
+ ObjectIdentifier.of("default", "default", "t1"),
+ new ResolvedCatalogTable(
+ CatalogTable.of(
+ Schema.newBuilder().fromResolvedSchema(schema).build(),
+ "mock context",
+ Collections.emptyList(),
+ options),
+ schema),
+ new Configuration(),
+ TestContext.class.getClassLoader(),
+ false);
+ }
+
+ public TestContext withOption(String key, String value) {
+ options.put(key, value);
+ return this;
+ }
+}
diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/test/resources/log4j2-test.properties b/inlong-sort/sort-connectors/elasticsearch-base/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..835c2ec9a
--- /dev/null
+++ b/inlong-sort/sort-connectors/elasticsearch-base/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/inlong-sort/sort-connectors/pom.xml b/inlong-sort/sort-connectors/pom.xml
index e687ba679..2a4e4902e 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -48,6 +48,7 @@
<module>mongodb-cdc</module>
<module>sqlserver-cdc</module>
<module>oracle-cdc</module>
+ <module>elasticsearch-base</module>
<module>elasticsearch-6</module>
<module>elasticsearch-7</module>
</modules>
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
index e965cc42f..4ed00fb72 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
@@ -54,7 +54,7 @@ public class ElasticsearchSqlParseTest extends AbstractTestBase {
Map<String, String> map = new HashMap<>();
return new MySqlExtractNode("1", "mysql_input", fields,
null, map, "age",
- Collections.singletonList("user"), "localhost", "root", "Eminem@123456",
+ Collections.singletonList("user"), "localhost", "root", "888888",
"test", null, null,
true, null);
}
@@ -72,7 +72,7 @@ public class ElasticsearchSqlParseTest extends AbstractTestBase {
return new ElasticsearchLoadNode("2", "kafka_output", fields, relations, null, null,
2, null,
"test", "http://localhost:9200",
- "my_admin", "my_password", null, "age", 7);
+ "elastic", "my_password", null, "age", 7);
}
private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 23c2037d5..7f2471260 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -480,6 +480,29 @@
Source : pulsar-flink-connector_2.11 1.13.6.1-rc9 (Please note that the software have been modified.)
License : https://github.com/streamnative/pulsar-flink/blob/master/LICENSE
+ 1.3.8 inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6Configuration.java
+ inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
+ inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
+ inlong-sort/sort-connectors/elasticsearch-6/src/test/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkITCase.java
+ inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7Configuration.java
+ inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
+ inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
+ inlong-sort/sort-connectors/elasticsearch-7/src/test/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkITCase.java
+ inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/AbstractTimeIndexGenerator.java
+ inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorFactory.java
+ inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchConfiguration.java
+ inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/KeyExtractor.java
+ inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchOptions.java
+ inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RequestFactory.java
+ inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/ElasticsearchValidationUtils.java
+ inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGenerator.java
+ inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+ inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/IndexGeneratorBase.java
+ inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/StaticIndexGenerator.java
+ inlong-sort/sort-connectors/elasticsearch-base/src/test/java/org/apache/inlong/sort/elasticsearch/table/TestContext.java
+ source : flink-connector-elasticsearch 1.13.2 (Please note that the software have been modified.)
+ License : https://github.com/apache/flink/blob/release-1.13.2-rc2/LICENSE
+
=======================================================================
Apache InLong Subcomponents:
diff --git a/pom.xml b/pom.xml
index 3aff01ba1..6a30758f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -155,7 +155,8 @@
<mybatis.starter.version>2.1.3</mybatis.starter.version>
<mybatis.version>3.5.9</mybatis.version>
<druid.version>1.2.6</druid.version>
- <elasticsearch.version>6.8.23</elasticsearch.version>
+ <elasticsearch6.version>6.8.17</elasticsearch6.version>
+ <elasticsearch7.version>7.9.2</elasticsearch7.version>
<shiro.version>1.8.0</shiro.version>
<snappy.version>1.1.8.4</snappy.version>
@@ -845,22 +846,22 @@
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>${elasticsearch.version}</version>
+ <version>${elasticsearch6.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
- <version>${elasticsearch.version}</version>
+ <version>${elasticsearch6.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
- <version>${elasticsearch.version}</version>
+ <version>${elasticsearch6.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client-sniffer</artifactId>
- <version>${elasticsearch.version}</version>
+ <version>${elasticsearch6.version}</version>
</dependency>
<dependency>