You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/05/17 14:37:59 UTC
[flink] 02/02: [FLINK-17027] Introduce a new Elasticsearch 6
connector with new property keys
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ccd2d531d1cb577113d5021efd6277031eeef9d1
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri May 15 20:06:46 2020 +0200
[FLINK-17027] Introduce a new Elasticsearch 6 connector with new property keys
This closes #12184
---
.../flink-connector-elasticsearch6/pom.xml | 22 ++
.../table/Elasticsearch6Configuration.java | 80 +++++++
.../table/Elasticsearch6DynamicSink.java | 251 ++++++++++++++++++++
.../table/Elasticsearch6DynamicSinkFactory.java | 155 ++++++++++++
.../org.apache.flink.table.factories.Factory | 16 ++
.../Elasticsearch6DynamicSinkFactoryTest.java | 207 ++++++++++++++++
.../table/Elasticsearch6DynamicSinkITCase.java | 262 +++++++++++++++++++++
.../table/Elasticsearch6DynamicSinkTest.java | 195 +++++++++++++++
8 files changed, 1188 insertions(+)
diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml b/flink-connectors/flink-connector-elasticsearch6/pom.xml
index 25a9f5a..272f9af 100644
--- a/flink-connectors/flink-connector-elasticsearch6/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml
@@ -145,6 +145,14 @@ under the License.
<scope>test</scope>
</dependency>
+ <!-- Table API integration tests -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- Elasticsearch table sink factory testing -->
<dependency>
<groupId>org.apache.flink</groupId>
@@ -154,4 +162,18 @@ under the License.
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <!-- Enforce single fork execution because of spawning
+ Elasticsearch cluster multiple times -->
+ <forkCount>1</forkCount>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java
new file mode 100644
index 0000000..c06898e
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+
+import org.apache.http.HttpHost;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+
+/**
+ * Elasticsearch 7 specific configuration.
+ */
+@Internal
+final class Elasticsearch6Configuration extends ElasticsearchConfiguration {
+ Elasticsearch6Configuration(ReadableConfig config, ClassLoader classLoader) {
+ super(config, classLoader);
+ }
+
+ public List<HttpHost> getHosts() {
+ return config.get(HOSTS_OPTION).stream()
+ .map(Elasticsearch6Configuration::validateAndParseHostsString)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Parse Hosts String to list.
+ *
+ * <p>Hosts String format was given as following:
+ *
+ * <pre>
+ * connector.hosts = http://host_name:9092;http://host_name:9093
+ * </pre>
+ */
+ private static HttpHost validateAndParseHostsString(String host) {
+ try {
+ HttpHost httpHost = HttpHost.create(host);
+ if (httpHost.getPort() < 0) {
+ throw new ValidationException(String.format(
+ "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing port.",
+ host,
+ HOSTS_OPTION.key()));
+ }
+
+ if (httpHost.getSchemeName() == null) {
+ throw new ValidationException(String.format(
+ "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'. Missing scheme.",
+ host,
+ HOSTS_OPTION.key()));
+ }
+ return httpHost;
+ } catch (Exception e) {
+ throw new ValidationException(String.format(
+ "Could not parse host '%s' in option '%s'. It should follow the format 'http://host_name:port'.",
+ host,
+ HOSTS_OPTION.key()), e);
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
new file mode 100644
index 0000000..eadf659
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link ElasticsearchSink} from a logical
+ * description.
+ */
+@PublicEvolving
+final class Elasticsearch6DynamicSink implements DynamicTableSink {
+ @VisibleForTesting
+ static final Elasticsearch7RequestFactory REQUEST_FACTORY = new Elasticsearch7RequestFactory();
+
+ private final SinkFormat<SerializationSchema<RowData>> format;
+ private final TableSchema schema;
+ private final Elasticsearch6Configuration config;
+
+ public Elasticsearch6DynamicSink(
+ SinkFormat<SerializationSchema<RowData>> format,
+ Elasticsearch6Configuration config,
+ TableSchema schema) {
+ this(format, config, schema, (ElasticsearchSink.Builder::new));
+ }
+
+ //--------------------------------------------------------------
+ // Hack to make configuration testing possible.
+ //
+ // The code in this block should never be used outside of tests.
+ // Having a way to inject a builder we can assert the builder in
+ // the test. We can not assert everything though, e.g. it is not
+ // possible to assert flushing on checkpoint, as it is configured
+ // on the sink itself.
+ //--------------------------------------------------------------
+
+ private final ElasticSearchBuilderProvider builderProvider;
+
+ @FunctionalInterface
+ interface ElasticSearchBuilderProvider {
+ ElasticsearchSink.Builder<RowData> createBuilder(
+ List<HttpHost> httpHosts,
+ RowElasticsearchSinkFunction upsertSinkFunction);
+ }
+
+ Elasticsearch6DynamicSink(
+ SinkFormat<SerializationSchema<RowData>> format,
+ Elasticsearch6Configuration config,
+ TableSchema schema,
+ ElasticSearchBuilderProvider builderProvider) {
+ this.format = format;
+ this.schema = schema;
+ this.config = config;
+ this.builderProvider = builderProvider;
+ }
+
+ //--------------------------------------------------------------
+ // End of hack to make configuration testing possible
+ //--------------------------------------------------------------
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+ for (RowKind kind : requestedMode.getContainedKinds()) {
+ if (kind != RowKind.UPDATE_BEFORE) {
+ builder.addContainedKind(kind);
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
+ return () -> {
+ SerializationSchema<RowData> format = this.format.createSinkFormat(context, schema.toRowDataType());
+
+ final RowElasticsearchSinkFunction upsertFunction =
+ new RowElasticsearchSinkFunction(
+ IndexGeneratorFactory.createIndexGenerator(config.getIndex(), schema),
+ config.getDocumentType(),
+ format,
+ XContentType.JSON,
+ REQUEST_FACTORY,
+ KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter())
+ );
+
+ final ElasticsearchSink.Builder<RowData> builder = builderProvider.createBuilder(
+ config.getHosts(),
+ upsertFunction);
+
+ builder.setFailureHandler(config.getFailureHandler());
+ config.getBulkFlushMaxActions().ifPresent(builder::setBulkFlushMaxActions);
+ config.getBulkFlushMaxSize().ifPresent(builder::setBulkFlushMaxSizeMb);
+ config.getBulkFlushInterval().ifPresent(builder::setBulkFlushInterval);
+ builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
+ config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
+ config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
+ config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
+
+ config.getPathPrefix()
+ .ifPresent(pathPrefix -> builder.setRestClientFactory(new DefaultRestClientFactory(pathPrefix)));
+
+ final ElasticsearchSink<RowData> sink = builder.build();
+
+ if (config.isDisableFlushOnCheckpoint()) {
+ sink.disableFlushOnCheckpoint();
+ }
+
+ return sink;
+ };
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return this;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Elasticsearch7";
+ }
+
+ /**
+ * Serializable {@link RestClientFactory} used by the sink.
+ */
+ @VisibleForTesting
+ static class DefaultRestClientFactory implements RestClientFactory {
+
+ private final String pathPrefix;
+
+ public DefaultRestClientFactory(@Nullable String pathPrefix) {
+ this.pathPrefix = pathPrefix;
+ }
+
+ @Override
+ public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
+ if (pathPrefix != null) {
+ restClientBuilder.setPathPrefix(pathPrefix);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DefaultRestClientFactory that = (DefaultRestClientFactory) o;
+ return Objects.equals(pathPrefix, that.pathPrefix);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pathPrefix);
+ }
+ }
+
+ /**
+ * Version-specific creation of {@link org.elasticsearch.action.ActionRequest}s used by the sink.
+ */
+ private static class Elasticsearch7RequestFactory implements RequestFactory {
+ @Override
+ public UpdateRequest createUpdateRequest(
+ String index,
+ String docType,
+ String key,
+ XContentType contentType,
+ byte[] document) {
+ return new UpdateRequest(index, docType, key)
+ .doc(document, contentType)
+ .upsert(document, contentType);
+ }
+
+ @Override
+ public IndexRequest createIndexRequest(
+ String index,
+ String docType,
+ String key,
+ XContentType contentType,
+ byte[] document) {
+ return new IndexRequest(index, docType, index)
+ .source(document, contentType);
+ }
+
+ @Override
+ public DeleteRequest createDeleteRequest(String index, String docType, String key) {
+ return new DeleteRequest(index, docType, key);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Elasticsearch6DynamicSink that = (Elasticsearch6DynamicSink) o;
+ return Objects.equals(format, that.format) &&
+ Objects.equals(schema, that.schema) &&
+ Objects.equals(config, that.config) &&
+ Objects.equals(builderProvider, that.builderProvider);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(format, schema, config, builderProvider);
+ }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
new file mode 100644
index 0000000..65c90b5
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.DOCUMENT_TYPE_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+
+/**
+ * A {@link DynamicTableSinkFactory} for discovering {@link Elasticsearch6DynamicSink}.
+ */
+@Internal
+public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory {
+ private static final Set<ConfigOption<?>> requiredOptions = Stream.of(
+ HOSTS_OPTION,
+ INDEX_OPTION,
+ DOCUMENT_TYPE_OPTION
+ ).collect(Collectors.toSet());
+ private static final Set<ConfigOption<?>> optionalOptions = Stream.of(
+ KEY_DELIMITER_OPTION,
+ FAILURE_HANDLER_OPTION,
+ FLUSH_ON_CHECKPOINT_OPTION,
+ BULK_FLASH_MAX_SIZE_OPTION,
+ BULK_FLUSH_MAX_ACTIONS_OPTION,
+ BULK_FLUSH_INTERVAL_OPTION,
+ BULK_FLUSH_BACKOFF_TYPE_OPTION,
+ BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+ BULK_FLUSH_BACKOFF_DELAY_OPTION,
+ CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+ CONNECTION_PATH_PREFIX,
+ FORMAT_OPTION
+ ).collect(Collectors.toSet());
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ TableSchema tableSchema = context.getCatalogTable().getSchema();
+ ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+ final SinkFormat<SerializationSchema<RowData>> format = helper.discoverSinkFormat(
+ SerializationFormatFactory.class,
+ FORMAT_OPTION);
+
+ helper.validate();
+ Configuration configuration = new Configuration();
+ context.getCatalogTable()
+ .getOptions()
+ .forEach(configuration::setString);
+ Elasticsearch6Configuration config = new Elasticsearch6Configuration(configuration, context.getClassLoader());
+
+ validate(config, configuration);
+
+ return new Elasticsearch6DynamicSink(
+ format,
+ config,
+ TableSchemaUtils.getPhysicalSchema(tableSchema));
+ }
+
+ private void validate(Elasticsearch6Configuration config, Configuration originalConfiguration) {
+ config.getFailureHandler(); // checks if we can instantiate the custom failure handler
+ config.getHosts(); // validate hosts
+ validate(
+ config.getIndex().length() >= 1,
+ () -> String.format("'%s' must not be empty", INDEX_OPTION.key()));
+ validate(
+ config.getBulkFlushMaxActions().map(maxActions -> maxActions >= 1).orElse(true),
+ () -> String.format(
+ "'%s' must be at least 1 character. Got: %s",
+ BULK_FLUSH_MAX_ACTIONS_OPTION.key(),
+ config.getBulkFlushMaxActions().get())
+ );
+ validate(
+ config.getBulkFlushMaxSize().map(maxSize -> maxSize >= 1024 * 1024).orElse(true),
+ () -> String.format(
+ "'%s' must be at least 1mb character. Got: %s",
+ BULK_FLASH_MAX_SIZE_OPTION.key(),
+ originalConfiguration.get(BULK_FLASH_MAX_SIZE_OPTION).toHumanReadableString())
+ );
+ validate(
+ config.getBulkFlushBackoffRetries().map(retries -> retries >= 1).orElse(true),
+ () -> String.format(
+ "'%s' must be at least 1. Got: %s",
+ BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
+ config.getBulkFlushBackoffRetries().get())
+ );
+ }
+
+ private static void validate(boolean condition, Supplier<String> message) {
+ if (!condition) {
+ throw new ValidationException(message.get());
+ }
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return "elasticsearch-6";
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return requiredOptions;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return optionalOptions;
+ }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..29a8593
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSinkFactory
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
new file mode 100644
index 0000000..f1be1b2
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
+
+/**
+ * Tests for validation in {@link Elasticsearch6DynamicSinkFactory}.
+ */
+public class Elasticsearch6DynamicSinkFactoryTest {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void validateEmptyConfiguration() {
+ Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+ thrown.expect(ValidationException.class);
+ thrown.expectMessage(
+ "One or more required options are missing.\n" +
+ "\n" +
+ "Missing required options are:\n" +
+ "\n" +
+ "document-type\n" +
+ "hosts\n" +
+ "index");
+ sinkFactory.createDynamicTableSink(
+ context()
+ .withSchema(TableSchema.builder()
+ .field("a", DataTypes.TIME())
+ .build())
+ .build()
+ );
+ }
+
+ @Test
+ public void validateWrongIndex() {
+ Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+ thrown.expect(ValidationException.class);
+ thrown.expectMessage(
+ "'index' must not be empty");
+ sinkFactory.createDynamicTableSink(
+ context()
+ .withSchema(TableSchema.builder()
+ .field("a", DataTypes.TIME())
+ .build())
+ .withOption("index", "")
+ .withOption("document-type", "MyType")
+ .withOption("hosts", "http://localhost:12345")
+ .build()
+ );
+ }
+
+ @Test
+ public void validateWrongHosts() {
+ Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+ thrown.expect(ValidationException.class);
+ thrown.expectMessage(
+ "Could not parse host 'wrong-host' in option 'hosts'. It should follow the format 'http://host_name:port'.");
+ sinkFactory.createDynamicTableSink(
+ context()
+ .withSchema(TableSchema.builder()
+ .field("a", DataTypes.TIME())
+ .build())
+ .withOption("index", "MyIndex")
+ .withOption("document-type", "MyType")
+ .withOption("hosts", "wrong-host")
+ .build()
+ );
+ }
+
+ @Test
+ public void validateWrongFlushSize() {
+ Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+ thrown.expect(ValidationException.class);
+ thrown.expectMessage(
+ "'sink.bulk-flush.max-size' must be at least 1mb character. Got: 1024 bytes");
+ sinkFactory.createDynamicTableSink(
+ context()
+ .withSchema(TableSchema.builder()
+ .field("a", DataTypes.TIME())
+ .build())
+ .withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+ .withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+ .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234")
+ .withOption(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION.key(), "1kb")
+ .build()
+ );
+ }
+
+ @Test
+ public void validateWrongRetries() {
+ Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+ thrown.expect(ValidationException.class);
+ thrown.expectMessage(
+ "'sink.bulk-flush.back-off.max-retries' must be at least 1. Got: 0");
+ sinkFactory.createDynamicTableSink(
+ context()
+ .withSchema(TableSchema.builder()
+ .field("a", DataTypes.TIME())
+ .build())
+ .withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+ .withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+ .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234")
+ .withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(), "0")
+ .build()
+ );
+ }
+
+ @Test
+ public void validateWrongMaxActions() {
+ Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+ thrown.expect(ValidationException.class);
+ thrown.expectMessage(
+ "'sink.bulk-flush.max-actions' must be at least 1 character. Got: 0");
+ sinkFactory.createDynamicTableSink(
+ context()
+ .withSchema(TableSchema.builder()
+ .field("a", DataTypes.TIME())
+ .build())
+ .withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+ .withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+ .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234")
+ .withOption(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "0")
+ .build()
+ );
+ }
+
+ @Test
+ public void validateWrongBackoffDelay() {
+ Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+ thrown.expect(ValidationException.class);
+ thrown.expectMessage(
+ "Invalid value for option 'sink.bulk-flush.back-off.delay'.");
+ sinkFactory.createDynamicTableSink(
+ context()
+ .withSchema(TableSchema.builder()
+ .field("a", DataTypes.TIME())
+ .build())
+ .withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+ .withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+ .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234")
+ .withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "-1s")
+ .build()
+ );
+ }
+
+ @Test
+ public void validatePrimaryKeyOnIllegalColumn() {
+ Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+ thrown.expect(ValidationException.class);
+ thrown.expectMessage(
+ "The table has a primary key on columns of illegal types: " +
+ "[ARRAY, MAP, MULTISET, ROW, RAW, VARBINARY].\n" +
+ " Elasticsearch sink does not support primary keys on columns of types: " +
+ "[ARRAY, MAP, MULTISET, STRUCTURED_TYPE, ROW, RAW, BINARY, VARBINARY].");
+ sinkFactory.createDynamicTableSink(
+ context()
+ .withSchema(TableSchema.builder()
+ .field("a", DataTypes.BIGINT().notNull())
+ .field("b", DataTypes.ARRAY(DataTypes.BIGINT().notNull()).notNull())
+ .field("c", DataTypes.MAP(DataTypes.BIGINT(), DataTypes.STRING()).notNull())
+ .field("d", DataTypes.MULTISET(DataTypes.BIGINT().notNull()).notNull())
+ .field("e", DataTypes.ROW(DataTypes.FIELD("a", DataTypes.BIGINT())).notNull())
+ .field("f", DataTypes.RAW(Types.BIG_INT).notNull())
+ .field("g", DataTypes.BYTES().notNull())
+ .primaryKey("a", "b", "c", "d", "e", "f", "g")
+ .build())
+ .withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+ .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234")
+ .withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "1s")
+ .build()
+ );
+ }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
new file mode 100644
index 0000000..3c09653
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.testutils.ElasticsearchResource;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.search.SearchHits;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
+import static org.apache.flink.table.api.Expressions.row;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT tests for {@link Elasticsearch6DynamicSink}.
+ */
+public class Elasticsearch6DynamicSinkITCase {
+
+ @ClassRule
+ public static ElasticsearchResource elasticsearchResource = new ElasticsearchResource("es-6-dynamic-sink-tests");
+
+ @Test
+ public void testWritingDocuments() throws Exception {
+ TableSchema schema = TableSchema.builder()
+ .field("a", DataTypes.BIGINT().notNull())
+ .field("b", DataTypes.TIME())
+ .field("c", DataTypes.STRING().notNull())
+ .field("d", DataTypes.FLOAT())
+ .field("e", DataTypes.TINYINT().notNull())
+ .field("f", DataTypes.DATE())
+ .field("g", DataTypes.TIMESTAMP().notNull())
+ .primaryKey("a", "g")
+ .build();
+ GenericRowData rowData = GenericRowData.of(
+ 1L,
+ 12345,
+ StringData.fromString("ABCDE"),
+ 12.12f,
+ (byte) 2,
+ 12345,
+ TimestampData.fromLocalDateTime(LocalDateTime.parse("2012-12-12T12:12:12")));
+
+ String index = "writing-documents";
+ String myType = "MyType";
+ Elasticsearch6DynamicSinkFactory sinkFactory = new Elasticsearch6DynamicSinkFactory();
+
+ SinkFunctionProvider sinkRuntimeProvider = (SinkFunctionProvider) sinkFactory.createDynamicTableSink(
+ context()
+ .withSchema(schema)
+ .withOption(ElasticsearchOptions.INDEX_OPTION.key(), index)
+ .withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType)
+ .withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200")
+ .withOption(ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false")
+ .build()
+ ).getSinkRuntimeProvider(new MockContext());
+
+ SinkFunction<RowData> sinkFunction = sinkRuntimeProvider.createSinkFunction();
+ StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+ rowData.setRowKind(RowKind.UPDATE_AFTER);
+ environment.<RowData>fromElements(rowData).addSink(sinkFunction);
+ environment.execute();
+
+ Client client = elasticsearchResource.getClient();
+ Map<String, Object> response = client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12")).actionGet().getSource();
+ Map<Object, Object> expectedMap = new HashMap<>();
+ expectedMap.put("a", 1);
+ expectedMap.put("b", "00:00:12Z");
+ expectedMap.put("c", "ABCDE");
+ expectedMap.put("d", 12.12d);
+ expectedMap.put("e", 2);
+ expectedMap.put("f", "2003-10-20");
+ expectedMap.put("g", "2012-12-12T12:12:12Z");
+ assertThat(response, equalTo(expectedMap));
+ }
+
+ @Test
+ public void testWritingDocumentsFromTableApi() throws Exception {
+ TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build());
+
+ String index = "table-api";
+ String myType = "MyType";
+ tableEnvironment.executeSql("CREATE TABLE esTable (" +
+ "a BIGINT NOT NULL,\n" +
+ "b TIME,\n" +
+ "c STRING NOT NULL,\n" +
+ "d FLOAT,\n" +
+ "e TINYINT NOT NULL,\n" +
+ "f DATE,\n" +
+ "g TIMESTAMP NOT NULL,\n" +
+ "h as a + 2,\n" +
+ "PRIMARY KEY (a, g) NOT ENFORCED\n" +
+ ")\n" +
+ "WITH (\n" +
+ String.format("'%s'='%s',\n", "connector", "elasticsearch-6") +
+ String.format("'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index) +
+ String.format("'%s'='%s',\n", ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType) +
+ String.format("'%s'='%s',\n", ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200") +
+ String.format("'%s'='%s'\n", ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") +
+ ")");
+
+ tableEnvironment.fromValues(
+ row(
+ 1L,
+ LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+ "ABCDE",
+ 12.12f,
+ (byte) 2,
+ LocalDate.ofEpochDay(12345),
+ LocalDateTime.parse("2012-12-12T12:12:12"))
+ ).executeInsert("esTable")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(this.getClass().getClassLoader())
+ .get();
+
+ Client client = elasticsearchResource.getClient();
+ Map<String, Object> response = client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12"))
+ .actionGet()
+ .getSource();
+ Map<Object, Object> expectedMap = new HashMap<>();
+ expectedMap.put("a", 1);
+ expectedMap.put("b", "00:00:12Z");
+ expectedMap.put("c", "ABCDE");
+ expectedMap.put("d", 12.12d);
+ expectedMap.put("e", 2);
+ expectedMap.put("f", "2003-10-20");
+ expectedMap.put("g", "2012-12-12T12:12:12Z");
+ assertThat(response, equalTo(expectedMap));
+ }
+
+ @Test
+ public void testWritingDocumentsNoPrimaryKey() throws Exception {
+ TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build());
+
+ String index = "no-primary-key";
+ String myType = "MyType";
+ tableEnvironment.executeSql("CREATE TABLE esTable (" +
+ "a BIGINT NOT NULL,\n" +
+ "b TIME,\n" +
+ "c STRING NOT NULL,\n" +
+ "d FLOAT,\n" +
+ "e TINYINT NOT NULL,\n" +
+ "f DATE,\n" +
+ "g TIMESTAMP NOT NULL\n" +
+ ")\n" +
+ "WITH (\n" +
+ String.format("'%s'='%s',\n", "connector", "elasticsearch-6") +
+ String.format("'%s'='%s',\n", ElasticsearchOptions.INDEX_OPTION.key(), index) +
+ String.format("'%s'='%s',\n", ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType) +
+ String.format("'%s'='%s',\n", ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200") +
+ String.format("'%s'='%s'\n", ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") +
+ ")");
+
+ tableEnvironment.fromValues(
+ row(
+ 1L,
+ LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+ "ABCDE",
+ 12.12f,
+ (byte) 2,
+ LocalDate.ofEpochDay(12345),
+ LocalDateTime.parse("2012-12-12T12:12:12"))
+ ).executeInsert("esTable")
+ .getJobClient()
+ .get()
+ .getJobExecutionResult(this.getClass().getClassLoader())
+ .get();
+
+ Client client = elasticsearchResource.getClient();
+
+ // search API does not return documents that were not indexed, we might need to query
+ // the index a few times
+ Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1));
+ SearchHits hits;
+ do {
+ hits = client.prepareSearch(index)
+ .execute()
+ .actionGet()
+ .getHits();
+ if (hits.getTotalHits() == 0) {
+ Thread.sleep(100);
+ }
+ } while (hits.getTotalHits() == 0 && deadline.hasTimeLeft());
+
+ Map<String, Object> result = hits.getAt(0).getSourceAsMap();
+ Map<Object, Object> expectedMap = new HashMap<>();
+ expectedMap.put("a", 1);
+ expectedMap.put("b", "00:00:12Z");
+ expectedMap.put("c", "ABCDE");
+ expectedMap.put("d", 12.12d);
+ expectedMap.put("e", 2);
+ expectedMap.put("f", "2003-10-20");
+ expectedMap.put("g", "2012-12-12T12:12:12Z");
+ assertThat(result, equalTo(expectedMap));
+ }
+
+ private static class MockContext implements DynamicTableSink.Context {
+ @Override
+ public boolean isBounded() {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<?> createTypeInformation(DataType consumedDataType) {
+ return null;
+ }
+
+ @Override
+ public DynamicTableSink.DataStructureConverter createDataStructureConverter(DataType consumedDataType) {
+ return null;
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
new file mode 100644
index 0000000..df54147
--- /dev/null
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link Elasticsearch6DynamicSink} parameters.
+ */
+public class Elasticsearch6DynamicSinkTest {
+
+ private static final String FIELD_KEY = "key";
+ private static final String FIELD_FRUIT_NAME = "fruit_name";
+ private static final String FIELD_COUNT = "count";
+ private static final String FIELD_TS = "ts";
+
+ private static final String HOSTNAME = "host1";
+ private static final int PORT = 1234;
+ private static final String SCHEMA = "https";
+ private static final String INDEX = "MyIndex";
+ private static final String DOC_TYPE = "MyType";
+
+ @Test
+ public void testBuilder() {
+ final TableSchema schema = createTestSchema();
+
+ BuilderProvider provider = new BuilderProvider();
+ final Elasticsearch6DynamicSink testSink = new Elasticsearch6DynamicSink(
+ new DummySinkFormat(),
+ new Elasticsearch6Configuration(getConfig(), this.getClass().getClassLoader()),
+ schema,
+ provider
+ );
+
+ testSink.getSinkRuntimeProvider(new MockSinkContext()).createSinkFunction();
+
+ verify(provider.builderSpy).setFailureHandler(new DummyFailureHandler());
+ verify(provider.builderSpy).setBulkFlushBackoff(true);
+ verify(provider.builderSpy).setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
+ verify(provider.builderSpy).setBulkFlushBackoffDelay(123);
+ verify(provider.builderSpy).setBulkFlushBackoffRetries(3);
+ verify(provider.builderSpy).setBulkFlushInterval(100);
+ verify(provider.builderSpy).setBulkFlushMaxActions(1000);
+ verify(provider.builderSpy).setBulkFlushMaxSizeMb(1);
+ verify(provider.builderSpy).setRestClientFactory(new Elasticsearch6DynamicSink.DefaultRestClientFactory("/myapp"));
+ verify(provider.sinkSpy).disableFlushOnCheckpoint();
+ }
+
+ private Configuration getConfig() {
+ Configuration configuration = new Configuration();
+ configuration.setString(ElasticsearchOptions.INDEX_OPTION.key(), INDEX);
+ configuration.setString(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), DOC_TYPE);
+ configuration.setString(ElasticsearchOptions.HOSTS_OPTION.key(), SCHEMA + "://" + HOSTNAME + ":" + PORT);
+ configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION.key(), "exponential");
+ configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "123");
+ configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(), "3");
+ configuration.setString(ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION.key(), "100");
+ configuration.setString(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "1000");
+ configuration.setString(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION.key(), "1mb");
+ configuration.setString(ElasticsearchOptions.CONNECTION_PATH_PREFIX.key(), "/myapp");
+ configuration.setString(ElasticsearchOptions.FAILURE_HANDLER_OPTION.key(), DummyFailureHandler.class.getName());
+ configuration.setString(ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false");
+ return configuration;
+ }
+
+ private static class BuilderProvider implements Elasticsearch6DynamicSink.ElasticSearchBuilderProvider {
+ public ElasticsearchSink.Builder<RowData> builderSpy;
+ public ElasticsearchSink<RowData> sinkSpy;
+
+ @Override
+ public ElasticsearchSink.Builder<RowData> createBuilder(
+ List<HttpHost> httpHosts,
+ RowElasticsearchSinkFunction upsertSinkFunction) {
+ builderSpy = Mockito.spy(new ElasticsearchSink.Builder<>(httpHosts, upsertSinkFunction));
+ doAnswer(
+ invocation -> {
+ sinkSpy = Mockito.spy((ElasticsearchSink<RowData>) invocation.callRealMethod());
+ return sinkSpy;
+ }
+ ).when(builderSpy).build();
+
+ return builderSpy;
+ }
+ }
+
+ private TableSchema createTestSchema() {
+ return TableSchema.builder()
+ .field(FIELD_KEY, DataTypes.BIGINT())
+ .field(FIELD_FRUIT_NAME, DataTypes.STRING())
+ .field(FIELD_COUNT, DataTypes.DECIMAL(10, 4))
+ .field(FIELD_TS, DataTypes.TIMESTAMP(3))
+ .build();
+ }
+
+ private static class DummySerializationSchema implements SerializationSchema<RowData> {
+
+ private static final DummySerializationSchema INSTANCE = new DummySerializationSchema();
+
+ @Override
+ public byte[] serialize(RowData element) {
+ return new byte[0];
+ }
+ }
+
+ private static class DummySinkFormat implements SinkFormat<SerializationSchema<RowData>> {
+ @Override
+ public SerializationSchema<RowData> createSinkFormat(
+ DynamicTableSink.Context context,
+ DataType consumedDataType) {
+ return DummySerializationSchema.INSTANCE;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return null;
+ }
+ }
+
+ private static class MockSinkContext implements DynamicTableSink.Context {
+ @Override
+ public boolean isBounded() {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<?> createTypeInformation(DataType consumedDataType) {
+ return null;
+ }
+
+ @Override
+ public DynamicTableSink.DataStructureConverter createDataStructureConverter(DataType consumedDataType) {
+ return null;
+ }
+ }
+
+ /**
+ * Custom failure handler for testing.
+ */
+ public static class DummyFailureHandler implements ActionRequestFailureHandler {
+
+ @Override
+ public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) {
+ // do nothing
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof DummyFailureHandler;
+ }
+
+ @Override
+ public int hashCode() {
+ return DummyFailureHandler.class.hashCode();
+ }
+ }
+}